3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
;
21 import java
.io
.IOException
;
22 import java
.security
.PrivilegedAction
;
23 import java
.util
.ArrayList
;
24 import java
.util
.HashSet
;
25 import java
.util
.List
;
27 import org
.apache
.hadoop
.conf
.Configuration
;
28 import org
.apache
.hadoop
.fs
.FileSystem
;
29 import org
.apache
.hadoop
.hbase
.client
.RegionInfoBuilder
;
30 import org
.apache
.hadoop
.hbase
.client
.RegionReplicaUtil
;
31 import org
.apache
.hadoop
.hbase
.master
.HMaster
;
32 import org
.apache
.hadoop
.hbase
.regionserver
.HRegion
;
33 import org
.apache
.hadoop
.hbase
.regionserver
.HRegion
.FlushResult
;
34 import org
.apache
.hadoop
.hbase
.regionserver
.HRegionServer
;
35 import org
.apache
.hadoop
.hbase
.regionserver
.Region
;
36 import org
.apache
.hadoop
.hbase
.security
.User
;
37 import org
.apache
.hadoop
.hbase
.test
.MetricsAssertHelper
;
38 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
39 import org
.apache
.hadoop
.hbase
.util
.JVMClusterUtil
;
40 import org
.apache
.hadoop
.hbase
.util
.JVMClusterUtil
.MasterThread
;
41 import org
.apache
.hadoop
.hbase
.util
.JVMClusterUtil
.RegionServerThread
;
42 import org
.apache
.hadoop
.hbase
.util
.Threads
;
43 import org
.apache
.yetus
.audience
.InterfaceAudience
;
44 import org
.apache
.yetus
.audience
.InterfaceStability
;
45 import org
.slf4j
.Logger
;
46 import org
.slf4j
.LoggerFactory
;
48 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.RegionServerStartupResponse
;
51 * This class creates a single process HBase cluster. each server. The master uses the 'default'
52 * FileSystem. The RegionServers, if we are running on DistributedFilesystem, create a FileSystem
53 * instance each and will close down their instance on the way out.
55 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience
.PHOENIX
)
56 @InterfaceStability.Evolving
57 public class SingleProcessHBaseCluster
extends HBaseClusterInterface
{
58 private static final Logger LOG
=
59 LoggerFactory
.getLogger(SingleProcessHBaseCluster
.class.getName());
60 public LocalHBaseCluster hbaseCluster
;
61 private static int index
;
64 * Start a MiniHBaseCluster.
65 * @param conf Configuration to be used for cluster
66 * @param numRegionServers initial number of region servers to start.
68 public SingleProcessHBaseCluster(Configuration conf
, int numRegionServers
)
69 throws IOException
, InterruptedException
{
70 this(conf
, 1, numRegionServers
);
74 * Start a MiniHBaseCluster.
75 * @param conf Configuration to be used for cluster
76 * @param numMasters initial number of masters to start.
77 * @param numRegionServers initial number of region servers to start.
79 public SingleProcessHBaseCluster(Configuration conf
, int numMasters
, int numRegionServers
)
80 throws IOException
, InterruptedException
{
81 this(conf
, numMasters
, numRegionServers
, null, null);
85 * Start a MiniHBaseCluster.
86 * @param conf Configuration to be used for cluster
87 * @param numMasters initial number of masters to start.
88 * @param numRegionServers initial number of region servers to start.
90 public SingleProcessHBaseCluster(Configuration conf
, int numMasters
, int numRegionServers
,
91 Class
<?
extends HMaster
> masterClass
,
92 Class
<?
extends SingleProcessHBaseCluster
.MiniHBaseClusterRegionServer
> regionserverClass
)
93 throws IOException
, InterruptedException
{
94 this(conf
, numMasters
, 0, numRegionServers
, null, masterClass
, regionserverClass
);
98 * @param rsPorts Ports that RegionServer should use; pass ports if you want to test cluster
99 * restart where for sure the regionservers come up on same address+port (but just with
100 * different startcode); by default mini hbase clusters choose new arbitrary ports on
101 * each cluster start.
103 public SingleProcessHBaseCluster(Configuration conf
, int numMasters
, int numAlwaysStandByMasters
,
104 int numRegionServers
, List
<Integer
> rsPorts
, Class
<?
extends HMaster
> masterClass
,
105 Class
<?
extends SingleProcessHBaseCluster
.MiniHBaseClusterRegionServer
> regionserverClass
)
106 throws IOException
, InterruptedException
{
110 CompatibilityFactory
.getInstance(MetricsAssertHelper
.class).init();
112 init(numMasters
, numAlwaysStandByMasters
, numRegionServers
, rsPorts
, masterClass
,
114 this.initialClusterStatus
= getClusterMetrics();
117 public Configuration
getConfiguration() {
122 * Subclass so can get at protected methods (none at moment). Also, creates a FileSystem instance
123 * per instantiation. Adds a shutdown own FileSystem on the way out. Shuts down own Filesystem
124 * only, not All filesystems as the FileSystem system exit hook does.
126 public static class MiniHBaseClusterRegionServer
extends HRegionServer
{
127 private Thread shutdownThread
= null;
128 private User user
= null;
130 * List of RegionServers killed so far. ServerName also comprises startCode of a server, so any
131 * restarted instances of the same server will have different ServerName and will not coincide
132 * with past dead ones. So there's no need to cleanup this list.
134 static Set
<ServerName
> killedServers
= new HashSet
<>();
136 public MiniHBaseClusterRegionServer(Configuration conf
)
137 throws IOException
, InterruptedException
{
139 this.user
= User
.getCurrent();
143 protected void handleReportForDutyResponse(final RegionServerStartupResponse c
)
145 super.handleReportForDutyResponse(c
);
146 // Run this thread to shutdown our filesystem on way out.
147 this.shutdownThread
= new SingleFileSystemShutdownThread(getFileSystem());
153 this.user
.runAs(new PrivilegedAction
<Object
>() {
155 public Object
run() {
160 } catch (Throwable t
) {
161 LOG
.error("Exception in run", t
);
163 // Run this on the way out.
164 if (this.shutdownThread
!= null) {
165 this.shutdownThread
.start();
166 Threads
.shutdown(this.shutdownThread
, 30000);
171 private void runRegionServer() {
176 protected void kill() {
177 killedServers
.add(getServerName());
182 public void abort(final String reason
, final Throwable cause
) {
183 this.user
.runAs(new PrivilegedAction
<Object
>() {
185 public Object
run() {
186 abortRegionServer(reason
, cause
);
192 private void abortRegionServer(String reason
, Throwable cause
) {
193 super.abort(reason
, cause
);
198 * Alternate shutdown hook. Just shuts down the passed fs, not all as default filesystem hook
201 static class SingleFileSystemShutdownThread
extends Thread
{
202 private final FileSystem fs
;
204 SingleFileSystemShutdownThread(final FileSystem fs
) {
205 super("Shutdown of " + fs
);
212 LOG
.info("Hook closing fs=" + this.fs
);
214 } catch (NullPointerException npe
) {
215 LOG
.debug("Need to fix these: " + npe
.toString());
216 } catch (IOException e
) {
217 LOG
.warn("Running hook", e
);
222 private void init(final int nMasterNodes
, final int numAlwaysStandByMasters
,
223 final int nRegionNodes
, List
<Integer
> rsPorts
, Class
<?
extends HMaster
> masterClass
,
224 Class
<?
extends SingleProcessHBaseCluster
.MiniHBaseClusterRegionServer
> regionserverClass
)
225 throws IOException
, InterruptedException
{
227 if (masterClass
== null) {
228 masterClass
= HMaster
.class;
230 if (regionserverClass
== null) {
231 regionserverClass
= SingleProcessHBaseCluster
.MiniHBaseClusterRegionServer
.class;
234 // start up a LocalHBaseCluster
235 hbaseCluster
= new LocalHBaseCluster(conf
, nMasterNodes
, numAlwaysStandByMasters
, 0,
236 masterClass
, regionserverClass
);
238 // manually add the regionservers as other users
239 for (int i
= 0; i
< nRegionNodes
; i
++) {
240 Configuration rsConf
= HBaseConfiguration
.create(conf
);
241 if (rsPorts
!= null) {
242 rsConf
.setInt(HConstants
.REGIONSERVER_PORT
, rsPorts
.get(i
));
244 User user
= HBaseTestingUtil
.getDifferentUser(rsConf
, ".hfs." + index
++);
245 hbaseCluster
.addRegionServer(rsConf
, i
, user
);
248 hbaseCluster
.startup();
249 } catch (IOException e
) {
252 } catch (Throwable t
) {
253 LOG
.error("Error starting cluster", t
);
255 throw new IOException("Shutting down", t
);
260 public void startRegionServer(String hostname
, int port
) throws IOException
{
261 final Configuration newConf
= HBaseConfiguration
.create(conf
);
262 newConf
.setInt(HConstants
.REGIONSERVER_PORT
, port
);
263 startRegionServer(newConf
);
267 public void killRegionServer(ServerName serverName
) throws IOException
{
268 HRegionServer server
= getRegionServer(getRegionServerIndex(serverName
));
269 if (server
instanceof MiniHBaseClusterRegionServer
) {
270 LOG
.info("Killing " + server
.toString());
271 ((MiniHBaseClusterRegionServer
) server
).kill();
273 abortRegionServer(getRegionServerIndex(serverName
));
278 public boolean isKilledRS(ServerName serverName
) {
279 return MiniHBaseClusterRegionServer
.killedServers
.contains(serverName
);
283 public void stopRegionServer(ServerName serverName
) throws IOException
{
284 stopRegionServer(getRegionServerIndex(serverName
));
288 public void suspendRegionServer(ServerName serverName
) throws IOException
{
289 suspendRegionServer(getRegionServerIndex(serverName
));
293 public void resumeRegionServer(ServerName serverName
) throws IOException
{
294 resumeRegionServer(getRegionServerIndex(serverName
));
298 public void waitForRegionServerToStop(ServerName serverName
, long timeout
) throws IOException
{
299 // ignore timeout for now
300 waitOnRegionServer(getRegionServerIndex(serverName
));
304 public void startZkNode(String hostname
, int port
) throws IOException
{
305 LOG
.warn("Starting zookeeper nodes on mini cluster is not supported");
309 public void killZkNode(ServerName serverName
) throws IOException
{
310 LOG
.warn("Aborting zookeeper nodes on mini cluster is not supported");
314 public void stopZkNode(ServerName serverName
) throws IOException
{
315 LOG
.warn("Stopping zookeeper nodes on mini cluster is not supported");
319 public void waitForZkNodeToStart(ServerName serverName
, long timeout
) throws IOException
{
320 LOG
.warn("Waiting for zookeeper nodes to start on mini cluster is not supported");
324 public void waitForZkNodeToStop(ServerName serverName
, long timeout
) throws IOException
{
325 LOG
.warn("Waiting for zookeeper nodes to stop on mini cluster is not supported");
329 public void startDataNode(ServerName serverName
) throws IOException
{
330 LOG
.warn("Starting datanodes on mini cluster is not supported");
334 public void killDataNode(ServerName serverName
) throws IOException
{
335 LOG
.warn("Aborting datanodes on mini cluster is not supported");
339 public void stopDataNode(ServerName serverName
) throws IOException
{
340 LOG
.warn("Stopping datanodes on mini cluster is not supported");
344 public void waitForDataNodeToStart(ServerName serverName
, long timeout
) throws IOException
{
345 LOG
.warn("Waiting for datanodes to start on mini cluster is not supported");
349 public void waitForDataNodeToStop(ServerName serverName
, long timeout
) throws IOException
{
350 LOG
.warn("Waiting for datanodes to stop on mini cluster is not supported");
354 public void startNameNode(ServerName serverName
) throws IOException
{
355 LOG
.warn("Starting namenodes on mini cluster is not supported");
359 public void killNameNode(ServerName serverName
) throws IOException
{
360 LOG
.warn("Aborting namenodes on mini cluster is not supported");
364 public void stopNameNode(ServerName serverName
) throws IOException
{
365 LOG
.warn("Stopping namenodes on mini cluster is not supported");
369 public void waitForNameNodeToStart(ServerName serverName
, long timeout
) throws IOException
{
370 LOG
.warn("Waiting for namenodes to start on mini cluster is not supported");
374 public void waitForNameNodeToStop(ServerName serverName
, long timeout
) throws IOException
{
375 LOG
.warn("Waiting for namenodes to stop on mini cluster is not supported");
379 public void startMaster(String hostname
, int port
) throws IOException
{
384 public void killMaster(ServerName serverName
) throws IOException
{
385 abortMaster(getMasterIndex(serverName
));
389 public void stopMaster(ServerName serverName
) throws IOException
{
390 stopMaster(getMasterIndex(serverName
));
394 public void waitForMasterToStop(ServerName serverName
, long timeout
) throws IOException
{
395 // ignore timeout for now
396 waitOnMaster(getMasterIndex(serverName
));
400 * Starts a region server thread running
401 * @return New RegionServerThread
403 public JVMClusterUtil
.RegionServerThread
startRegionServer() throws IOException
{
404 final Configuration newConf
= HBaseConfiguration
.create(conf
);
405 return startRegionServer(newConf
);
408 private JVMClusterUtil
.RegionServerThread
startRegionServer(Configuration configuration
)
410 User rsUser
= HBaseTestingUtil
.getDifferentUser(configuration
, ".hfs." + index
++);
411 JVMClusterUtil
.RegionServerThread t
= null;
414 hbaseCluster
.addRegionServer(configuration
, hbaseCluster
.getRegionServers().size(), rsUser
);
416 t
.waitForServerOnline();
417 } catch (InterruptedException ie
) {
418 throw new IOException("Interrupted adding regionserver to cluster", ie
);
424 * Starts a region server thread and waits until its processed by master. Throws an exception when
425 * it can't start a region server or when the region server is not processed by master within the
427 * @return New RegionServerThread
429 public JVMClusterUtil
.RegionServerThread
startRegionServerAndWait(long timeout
)
432 JVMClusterUtil
.RegionServerThread t
= startRegionServer();
433 ServerName rsServerName
= t
.getRegionServer().getServerName();
435 long start
= EnvironmentEdgeManager
.currentTime();
436 ClusterMetrics clusterStatus
= getClusterMetrics();
437 while ((EnvironmentEdgeManager
.currentTime() - start
) < timeout
) {
438 if (clusterStatus
!= null && clusterStatus
.getLiveServerMetrics().containsKey(rsServerName
)) {
443 if (t
.getRegionServer().isOnline()) {
444 throw new IOException("RS: " + rsServerName
+ " online, but not processed by master");
446 throw new IOException("RS: " + rsServerName
+ " is offline");
451 * Cause a region server to exit doing basic clean up only on its way out.
452 * @param serverNumber Used as index into a list.
454 public String
abortRegionServer(int serverNumber
) {
455 HRegionServer server
= getRegionServer(serverNumber
);
456 LOG
.info("Aborting " + server
.toString());
457 server
.abort("Aborting for tests", new Exception("Trace info"));
458 return server
.toString();
462 * Shut down the specified region server cleanly
463 * @param serverNumber Used as index into a list.
464 * @return the region server that was stopped
466 public JVMClusterUtil
.RegionServerThread
stopRegionServer(int serverNumber
) {
467 return stopRegionServer(serverNumber
, true);
471 * Shut down the specified region server cleanly
472 * @param serverNumber Used as index into a list.
473 * @param shutdownFS True is we are to shutdown the filesystem as part of this regionserver's
474 * shutdown. Usually we do but you do not want to do this if you are running multiple
475 * regionservers in a test and you shut down one before end of the test.
476 * @return the region server that was stopped
478 public JVMClusterUtil
.RegionServerThread
stopRegionServer(int serverNumber
,
479 final boolean shutdownFS
) {
480 JVMClusterUtil
.RegionServerThread server
= hbaseCluster
.getRegionServers().get(serverNumber
);
481 LOG
.info("Stopping " + server
.toString());
482 server
.getRegionServer().stop("Stopping rs " + serverNumber
);
487 * Suspend the specified region server
488 * @param serverNumber Used as index into a list.
490 public JVMClusterUtil
.RegionServerThread
suspendRegionServer(int serverNumber
) {
491 JVMClusterUtil
.RegionServerThread server
= hbaseCluster
.getRegionServers().get(serverNumber
);
492 LOG
.info("Suspending {}", server
.toString());
498 * Resume the specified region server
499 * @param serverNumber Used as index into a list.
501 public JVMClusterUtil
.RegionServerThread
resumeRegionServer(int serverNumber
) {
502 JVMClusterUtil
.RegionServerThread server
= hbaseCluster
.getRegionServers().get(serverNumber
);
503 LOG
.info("Resuming {}", server
.toString());
509 * Wait for the specified region server to stop. Removes this thread from list of running threads.
510 * @return Name of region server that just went down.
512 public String
waitOnRegionServer(final int serverNumber
) {
513 return this.hbaseCluster
.waitOnRegionServer(serverNumber
);
517 * Starts a master thread running
518 * @return New RegionServerThread
520 public JVMClusterUtil
.MasterThread
startMaster() throws IOException
{
521 Configuration c
= HBaseConfiguration
.create(conf
);
522 User user
= HBaseTestingUtil
.getDifferentUser(c
, ".hfs." + index
++);
524 JVMClusterUtil
.MasterThread t
= null;
526 t
= hbaseCluster
.addMaster(c
, hbaseCluster
.getMasters().size(), user
);
528 } catch (InterruptedException ie
) {
529 throw new IOException("Interrupted adding master to cluster", ie
);
531 conf
.set(HConstants
.MASTER_ADDRS_KEY
,
532 hbaseCluster
.getConfiguration().get(HConstants
.MASTER_ADDRS_KEY
));
537 * Returns the current active master, if available.
538 * @return the active HMaster, null if none is active.
540 public HMaster
getMaster() {
541 return this.hbaseCluster
.getActiveMaster();
545 * Returns the current active master thread, if available.
546 * @return the active MasterThread, null if none is active.
548 public MasterThread
getMasterThread() {
549 for (MasterThread mt
: hbaseCluster
.getLiveMasters()) {
550 if (mt
.getMaster().isActiveMaster()) {
558 * Returns the master at the specified index, if available.
559 * @return the active HMaster, null if none is active.
561 public HMaster
getMaster(final int serverNumber
) {
562 return this.hbaseCluster
.getMaster(serverNumber
);
566 * Cause a master to exit without shutting down entire cluster.
567 * @param serverNumber Used as index into a list.
569 public String
abortMaster(int serverNumber
) {
570 HMaster server
= getMaster(serverNumber
);
571 LOG
.info("Aborting " + server
.toString());
572 server
.abort("Aborting for tests", new Exception("Trace info"));
573 return server
.toString();
577 * Shut down the specified master cleanly
578 * @param serverNumber Used as index into a list.
579 * @return the region server that was stopped
581 public JVMClusterUtil
.MasterThread
stopMaster(int serverNumber
) {
582 return stopMaster(serverNumber
, true);
586 * Shut down the specified master cleanly
587 * @param serverNumber Used as index into a list.
588 * @param shutdownFS True is we are to shutdown the filesystem as part of this master's shutdown.
589 * Usually we do but you do not want to do this if you are running multiple master in a
590 * test and you shut down one before end of the test.
591 * @return the master that was stopped
593 public JVMClusterUtil
.MasterThread
stopMaster(int serverNumber
, final boolean shutdownFS
) {
594 JVMClusterUtil
.MasterThread server
= hbaseCluster
.getMasters().get(serverNumber
);
595 LOG
.info("Stopping " + server
.toString());
596 server
.getMaster().stop("Stopping master " + serverNumber
);
601 * Wait for the specified master to stop. Removes this thread from list of running threads.
602 * @return Name of master that just went down.
604 public String
waitOnMaster(final int serverNumber
) {
605 return this.hbaseCluster
.waitOnMaster(serverNumber
);
609 * Blocks until there is an active master and that master has completed initialization.
610 * @return true if an active master becomes available. false if there are no masters left.
613 public boolean waitForActiveAndReadyMaster(long timeout
) throws IOException
{
614 long start
= EnvironmentEdgeManager
.currentTime();
615 while (EnvironmentEdgeManager
.currentTime() - start
< timeout
) {
616 for (JVMClusterUtil
.MasterThread mt
: getMasterThreads()) {
617 if (mt
.getMaster().isActiveMaster() && mt
.getMaster().isInitialized()) {
627 * Returns list of master threads.
629 public List
<JVMClusterUtil
.MasterThread
> getMasterThreads() {
630 return this.hbaseCluster
.getMasters();
634 * Returns list of live master threads (skips the aborted and the killed)
636 public List
<JVMClusterUtil
.MasterThread
> getLiveMasterThreads() {
637 return this.hbaseCluster
.getLiveMasters();
641 * Wait for Mini HBase Cluster to shut down.
644 this.hbaseCluster
.join();
648 * Shut down the mini HBase cluster
651 public void shutdown() throws IOException
{
652 if (this.hbaseCluster
!= null) {
653 this.hbaseCluster
.shutdown();
658 public void close() throws IOException
{
662 public ClusterMetrics
getClusterMetrics() throws IOException
{
663 HMaster master
= getMaster();
664 return master
== null ?
null : master
.getClusterMetrics();
667 private void executeFlush(HRegion region
) throws IOException
{
668 if (!RegionReplicaUtil
.isDefaultReplica(region
.getRegionInfo())) {
671 // retry 5 times if we can not flush
672 for (int i
= 0; i
< 5; i
++) {
673 FlushResult result
= region
.flush(true);
674 if (result
.getResult() != FlushResult
.Result
.CANNOT_FLUSH
) {
682 * Call flushCache on all regions on all participating regionservers.
684 public void flushcache() throws IOException
{
685 for (JVMClusterUtil
.RegionServerThread t
: this.hbaseCluster
.getRegionServers()) {
686 for (HRegion r
: t
.getRegionServer().getOnlineRegionsLocalContext()) {
693 * Call flushCache on all regions of the specified table.
695 public void flushcache(TableName tableName
) throws IOException
{
696 for (JVMClusterUtil
.RegionServerThread t
: this.hbaseCluster
.getRegionServers()) {
697 for (HRegion r
: t
.getRegionServer().getOnlineRegionsLocalContext()) {
698 if (r
.getTableDescriptor().getTableName().equals(tableName
)) {
706 * Call flushCache on all regions on all participating regionservers.
708 public void compact(boolean major
) throws IOException
{
709 for (JVMClusterUtil
.RegionServerThread t
: this.hbaseCluster
.getRegionServers()) {
710 for (HRegion r
: t
.getRegionServer().getOnlineRegionsLocalContext()) {
711 if (RegionReplicaUtil
.isDefaultReplica(r
.getRegionInfo())) {
719 * Call flushCache on all regions of the specified table.
721 public void compact(TableName tableName
, boolean major
) throws IOException
{
722 for (JVMClusterUtil
.RegionServerThread t
: this.hbaseCluster
.getRegionServers()) {
723 for (HRegion r
: t
.getRegionServer().getOnlineRegionsLocalContext()) {
724 if (r
.getTableDescriptor().getTableName().equals(tableName
)) {
725 if (RegionReplicaUtil
.isDefaultReplica(r
.getRegionInfo())) {
734 * Returns number of live region servers in the cluster currently.
736 public int getNumLiveRegionServers() {
737 return this.hbaseCluster
.getLiveRegionServers().size();
741 * Returns list of region server threads. Does not return the master even though it is also a
744 public List
<JVMClusterUtil
.RegionServerThread
> getRegionServerThreads() {
745 return this.hbaseCluster
.getRegionServers();
749 * Returns List of live region server threads (skips the aborted and the killed)
751 public List
<JVMClusterUtil
.RegionServerThread
> getLiveRegionServerThreads() {
752 return this.hbaseCluster
.getLiveRegionServers();
756 * Grab a numbered region server of your choice.
757 * @return region server
759 public HRegionServer
getRegionServer(int serverNumber
) {
760 return hbaseCluster
.getRegionServer(serverNumber
);
763 public HRegionServer
getRegionServer(ServerName serverName
) {
764 return hbaseCluster
.getRegionServers().stream().map(t
-> t
.getRegionServer())
765 .filter(r
-> r
.getServerName().equals(serverName
)).findFirst().orElse(null);
768 public List
<HRegion
> getRegions(byte[] tableName
) {
769 return getRegions(TableName
.valueOf(tableName
));
772 public List
<HRegion
> getRegions(TableName tableName
) {
773 List
<HRegion
> ret
= new ArrayList
<>();
774 for (JVMClusterUtil
.RegionServerThread rst
: getRegionServerThreads()) {
775 HRegionServer hrs
= rst
.getRegionServer();
776 for (Region region
: hrs
.getOnlineRegionsLocalContext()) {
777 if (region
.getTableDescriptor().getTableName().equals(tableName
)) {
778 ret
.add((HRegion
) region
);
786 * Returns index into List of {@link SingleProcessHBaseCluster#getRegionServerThreads()} of HRS
787 * carrying regionName. Returns -1 if none found.
789 public int getServerWithMeta() {
790 return getServerWith(RegionInfoBuilder
.FIRST_META_REGIONINFO
.getRegionName());
794 * Get the location of the specified region
795 * @param regionName Name of the region in bytes
796 * @return Index into List of {@link SingleProcessHBaseCluster#getRegionServerThreads()} of HRS
797 * carrying hbase:meta. Returns -1 if none found.
799 public int getServerWith(byte[] regionName
) {
801 for (JVMClusterUtil
.RegionServerThread rst
: getRegionServerThreads()) {
802 HRegionServer hrs
= rst
.getRegionServer();
803 if (!hrs
.isStopped()) {
804 Region region
= hrs
.getOnlineRegion(regionName
);
805 if (region
!= null) {
815 public ServerName
getServerHoldingRegion(final TableName tn
, byte[] regionName
)
817 int index
= getServerWith(regionName
);
821 return getRegionServer(index
).getServerName();
825 * Counts the total numbers of regions being served by the currently online region servers by
826 * asking each how many regions they have. Does not look at hbase:meta at all. Count includes
828 * @return number of regions being served by all region servers
830 public long countServedRegions() {
832 for (JVMClusterUtil
.RegionServerThread rst
: getLiveRegionServerThreads()) {
833 count
+= rst
.getRegionServer().getNumberOfOnlineRegions();
839 * Do a simulated kill all masters and regionservers. Useful when it is impossible to bring the
840 * mini-cluster back for clean shutdown.
842 public void killAll() {
844 MasterThread activeMaster
= null;
845 for (MasterThread masterThread
: getMasterThreads()) {
846 if (!masterThread
.getMaster().isActiveMaster()) {
847 masterThread
.getMaster().abort("killAll");
849 activeMaster
= masterThread
;
853 if (activeMaster
!= null) {
854 activeMaster
.getMaster().abort("killAll");
856 for (RegionServerThread rst
: getRegionServerThreads()) {
857 rst
.getRegionServer().abort("killAll");
862 public void waitUntilShutDown() {
863 this.hbaseCluster
.join();
866 public List
<HRegion
> findRegionsForTable(TableName tableName
) {
867 ArrayList
<HRegion
> ret
= new ArrayList
<>();
868 for (JVMClusterUtil
.RegionServerThread rst
: getRegionServerThreads()) {
869 HRegionServer hrs
= rst
.getRegionServer();
870 for (Region region
: hrs
.getRegions(tableName
)) {
871 if (region
.getTableDescriptor().getTableName().equals(tableName
)) {
872 ret
.add((HRegion
) region
);
879 protected int getRegionServerIndex(ServerName serverName
) {
880 // we have a small number of region servers, this should be fine for now.
881 List
<RegionServerThread
> servers
= getRegionServerThreads();
882 for (int i
= 0; i
< servers
.size(); i
++) {
883 if (servers
.get(i
).getRegionServer().getServerName().equals(serverName
)) {
890 protected int getMasterIndex(ServerName serverName
) {
891 List
<MasterThread
> masters
= getMasterThreads();
892 for (int i
= 0; i
< masters
.size(); i
++) {
893 if (masters
.get(i
).getMaster().getServerName().equals(serverName
)) {