3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
20 package org
.apache
.hadoop
.hbase
.util
;
22 import java
.io
.BufferedInputStream
;
23 import java
.io
.BufferedOutputStream
;
24 import java
.io
.Closeable
;
25 import java
.io
.DataInputStream
;
26 import java
.io
.DataOutputStream
;
28 import java
.io
.FileInputStream
;
29 import java
.io
.FileOutputStream
;
30 import java
.io
.IOException
;
31 import java
.nio
.file
.Files
;
32 import java
.nio
.file
.Paths
;
33 import java
.util
.ArrayList
;
34 import java
.util
.Collection
;
35 import java
.util
.Collections
;
36 import java
.util
.EnumSet
;
37 import java
.util
.HashSet
;
38 import java
.util
.Iterator
;
39 import java
.util
.List
;
40 import java
.util
.Locale
;
41 import java
.util
.Optional
;
43 import java
.util
.concurrent
.Callable
;
44 import java
.util
.concurrent
.CancellationException
;
45 import java
.util
.concurrent
.ExecutionException
;
46 import java
.util
.concurrent
.ExecutorService
;
47 import java
.util
.concurrent
.Executors
;
48 import java
.util
.concurrent
.Future
;
49 import java
.util
.concurrent
.TimeUnit
;
50 import java
.util
.concurrent
.TimeoutException
;
51 import java
.util
.function
.Predicate
;
52 import org
.apache
.commons
.io
.IOUtils
;
53 import org
.apache
.hadoop
.conf
.Configuration
;
54 import org
.apache
.hadoop
.hbase
.ClusterMetrics
.Option
;
55 import org
.apache
.hadoop
.hbase
.HBaseConfiguration
;
56 import org
.apache
.hadoop
.hbase
.HConstants
;
57 import org
.apache
.hadoop
.hbase
.ServerName
;
58 import org
.apache
.hadoop
.hbase
.UnknownRegionException
;
59 import org
.apache
.hadoop
.hbase
.client
.Admin
;
60 import org
.apache
.hadoop
.hbase
.client
.Connection
;
61 import org
.apache
.hadoop
.hbase
.client
.ConnectionFactory
;
62 import org
.apache
.hadoop
.hbase
.client
.DoNotRetryRegionException
;
63 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
64 import org
.apache
.hadoop
.hbase
.master
.RackManager
;
65 import org
.apache
.hadoop
.hbase
.master
.assignment
.AssignmentManager
;
66 import org
.apache
.hadoop
.hbase
.net
.Address
;
67 import org
.apache
.hadoop
.hbase
.rsgroup
.RSGroupInfo
;
68 import org
.apache
.yetus
.audience
.InterfaceAudience
;
69 import org
.slf4j
.Logger
;
70 import org
.slf4j
.LoggerFactory
;
72 import org
.apache
.hbase
.thirdparty
.org
.apache
.commons
.cli
.CommandLine
;
73 import org
.apache
.hbase
.thirdparty
.org
.apache
.commons
.collections4
.CollectionUtils
;
76 * Tool for loading/unloading regions to/from given regionserver This tool can be run from Command
77 * line directly as a utility. Supports Ack/No Ack mode for loading/unloading operations.Ack mode
78 * acknowledges if regions are online after movement while noAck mode is best effort mode that
79 * improves performance but will still move on if region is stuck/not moved. Motivation behind noAck
80 * mode being RS shutdown where even if a Region is stuck, upon shutdown master will move it
81 * anyways. This can also be used by constructiong an Object using the builder and then calling
82 * {@link #load()} or {@link #unload()} methods for the desired operations.
84 @InterfaceAudience.Public
85 public class RegionMover
extends AbstractHBaseTool
implements Closeable
{
86 public static final String MOVE_RETRIES_MAX_KEY
= "hbase.move.retries.max";
87 public static final String MOVE_WAIT_MAX_KEY
= "hbase.move.wait.max";
88 public static final String SERVERSTART_WAIT_MAX_KEY
= "hbase.serverstart.wait.max";
89 public static final int DEFAULT_MOVE_RETRIES_MAX
= 5;
90 public static final int DEFAULT_MOVE_WAIT_MAX
= 60;
91 public static final int DEFAULT_SERVERSTART_WAIT_MAX
= 180;
93 private static final Logger LOG
= LoggerFactory
.getLogger(RegionMover
.class);
95 private RegionMoverBuilder rmbuilder
;
96 private boolean ack
= true;
97 private int maxthreads
= 1;
99 private String loadUnload
;
100 private String hostname
;
101 private String filename
;
102 private String excludeFile
;
103 private String designatedFile
;
105 private Connection conn
;
107 private RackManager rackManager
;
109 private RegionMover(RegionMoverBuilder builder
) throws IOException
{
110 this.hostname
= builder
.hostname
;
111 this.filename
= builder
.filename
;
112 this.excludeFile
= builder
.excludeFile
;
113 this.designatedFile
= builder
.designatedFile
;
114 this.maxthreads
= builder
.maxthreads
;
115 this.ack
= builder
.ack
;
116 this.port
= builder
.port
;
117 this.timeout
= builder
.timeout
;
118 setConf(builder
.conf
);
119 this.conn
= ConnectionFactory
.createConnection(conf
);
120 this.admin
= conn
.getAdmin();
121 // Only while running unit tests, builder.rackManager will not be null for the convenience of
122 // providing custom rackManager. Otherwise for regular workflow/user triggered action,
123 // builder.rackManager is supposed to be null. Hence, setter of builder.rackManager is
124 // provided as @InterfaceAudience.Private and it is commented that this is just
125 // to be used by unit test.
126 rackManager
= builder
.rackManager
== null ?
new RackManager(conf
) : builder
.rackManager
;
129 private RegionMover() {
133 public void close() {
134 IOUtils
.closeQuietly(this.admin
, e
-> LOG
.warn("failed to close admin", e
));
135 IOUtils
.closeQuietly(this.conn
, e
-> LOG
.warn("failed to close conn", e
));
139 * Builder for Region mover. Use the {@link #build()} method to create RegionMover object. Has
140 * {@link #filename(String)}, {@link #excludeFile(String)}, {@link #maxthreads(int)},
141 * {@link #ack(boolean)}, {@link #timeout(int)}, {@link #designatedFile(String)} methods to set
142 * the corresponding options.
144 public static class RegionMoverBuilder
{
145 private boolean ack
= true;
146 private int maxthreads
= 1;
147 private int timeout
= Integer
.MAX_VALUE
;
148 private String hostname
;
149 private String filename
;
150 private String excludeFile
= null;
151 private String designatedFile
= null;
152 private String defaultDir
= System
.getProperty("java.io.tmpdir");
153 @InterfaceAudience.Private
155 private final Configuration conf
;
156 private RackManager rackManager
;
158 public RegionMoverBuilder(String hostname
) {
159 this(hostname
, createConf());
163 * Creates a new configuration and sets region mover specific overrides
165 private static Configuration
createConf() {
166 Configuration conf
= HBaseConfiguration
.create();
167 conf
.setInt("hbase.client.prefetch.limit", 1);
168 conf
.setInt("hbase.client.pause", 500);
169 conf
.setInt("hbase.client.retries.number", 100);
174 * @param hostname Hostname to unload regions from or load regions to. Can be either hostname
176 * @param conf Configuration object
178 public RegionMoverBuilder(String hostname
, Configuration conf
) {
179 String
[] splitHostname
= hostname
.toLowerCase().split(":");
180 this.hostname
= splitHostname
[0];
181 if (splitHostname
.length
== 2) {
182 this.port
= Integer
.parseInt(splitHostname
[1]);
184 this.port
= conf
.getInt(HConstants
.REGIONSERVER_PORT
, HConstants
.DEFAULT_REGIONSERVER_PORT
);
186 this.filename
= defaultDir
+ File
.separator
+ System
.getProperty("user.name") + this.hostname
187 + ":" + Integer
.toString(this.port
);
192 * Path of file where regions will be written to during unloading/read from during loading
194 * @return RegionMoverBuilder object
196 public RegionMoverBuilder
filename(String filename
) {
197 this.filename
= filename
;
202 * Set the max number of threads that will be used to move regions
204 public RegionMoverBuilder
maxthreads(int threads
) {
205 this.maxthreads
= threads
;
210 * Path of file containing hostnames to be excluded during region movement. Exclude file should
211 * have 'host:port' per line. Port is mandatory here as we can have many RS running on a single
214 public RegionMoverBuilder
excludeFile(String excludefile
) {
215 this.excludeFile
= excludefile
;
220 * Set the designated file. Designated file contains hostnames where region moves. Designated
221 * file should have 'host:port' per line. Port is mandatory here as we can have many RS running
223 * @param designatedFile The designated file
224 * @return RegionMoverBuilder object
226 public RegionMoverBuilder
designatedFile(String designatedFile
) {
227 this.designatedFile
= designatedFile
;
232 * Set ack/noAck mode.
234 * In ack mode regions are acknowledged before and after moving and the move is retried
235 * hbase.move.retries.max times, if unsuccessful we quit with exit code 1.No Ack mode is a best
236 * effort mode,each region movement is tried once.This can be used during graceful shutdown as
237 * even if we have a stuck region,upon shutdown it'll be reassigned anyway.
240 * @return RegionMoverBuilder object
242 public RegionMoverBuilder
ack(boolean ack
) {
248 * Set the timeout for Load/Unload operation in seconds.This is a global timeout,threadpool for
249 * movers also have a separate time which is hbase.move.wait.max * number of regions to
251 * @param timeout in seconds
252 * @return RegionMoverBuilder object
254 public RegionMoverBuilder
timeout(int timeout
) {
255 this.timeout
= timeout
;
260 * Set specific rackManager implementation.
261 * This setter method is for testing purpose only.
263 * @param rackManager rackManager impl
264 * @return RegionMoverBuilder object
266 @InterfaceAudience.Private
267 public RegionMoverBuilder
rackManager(RackManager rackManager
) {
268 this.rackManager
= rackManager
;
273 * This method builds the appropriate RegionMover object which can then be used to load/unload
274 * using load and unload methods
275 * @return RegionMover object
277 public RegionMover
build() throws IOException
{
278 return new RegionMover(this);
283 * Loads the specified {@link #hostname} with regions listed in the {@link #filename} RegionMover
284 * Object has to be created using {@link #RegionMover(RegionMoverBuilder)}
285 * @return true if loading succeeded, false otherwise
287 public boolean load() throws ExecutionException
, InterruptedException
, TimeoutException
{
288 ExecutorService loadPool
= Executors
.newFixedThreadPool(1);
289 Future
<Boolean
> loadTask
= loadPool
.submit(getMetaRegionMovePlan());
290 boolean isMetaMoved
= waitTaskToFinish(loadPool
, loadTask
, "loading");
294 loadPool
= Executors
.newFixedThreadPool(1);
295 loadTask
= loadPool
.submit(getNonMetaRegionsMovePlan());
296 return waitTaskToFinish(loadPool
, loadTask
, "loading");
299 private Callable
<Boolean
> getMetaRegionMovePlan() {
300 return getRegionsMovePlan(true);
303 private Callable
<Boolean
> getNonMetaRegionsMovePlan() {
304 return getRegionsMovePlan(false);
307 private Callable
<Boolean
> getRegionsMovePlan(boolean moveMetaRegion
) {
310 List
<RegionInfo
> regionsToMove
= readRegionsFromFile(filename
);
311 if (regionsToMove
.isEmpty()) {
312 LOG
.info("No regions to load.Exiting");
315 Optional
<RegionInfo
> metaRegion
= getMetaRegionInfoIfToBeMoved(regionsToMove
);
316 if (moveMetaRegion
) {
317 if (metaRegion
.isPresent()) {
318 loadRegions(Collections
.singletonList(metaRegion
.get()));
321 metaRegion
.ifPresent(regionsToMove
::remove
);
322 loadRegions(regionsToMove
);
324 } catch (Exception e
) {
325 LOG
.error("Error while loading regions to " + hostname
, e
);
332 private Optional
<RegionInfo
> getMetaRegionInfoIfToBeMoved(List
<RegionInfo
> regionsToMove
) {
333 return regionsToMove
.stream().filter(RegionInfo
::isMetaRegion
).findFirst();
336 private void loadRegions(List
<RegionInfo
> regionsToMove
)
338 ServerName server
= getTargetServer();
339 List
<RegionInfo
> movedRegions
= Collections
.synchronizedList(new ArrayList
<>());
341 "Moving " + regionsToMove
.size() + " regions to " + server
+ " using " + this.maxthreads
342 + " threads.Ack mode:" + this.ack
);
344 final ExecutorService moveRegionsPool
= Executors
.newFixedThreadPool(this.maxthreads
);
345 List
<Future
<Boolean
>> taskList
= new ArrayList
<>();
347 while (counter
< regionsToMove
.size()) {
348 RegionInfo region
= regionsToMove
.get(counter
);
349 ServerName currentServer
= MoveWithAck
.getServerNameForRegion(region
, admin
, conn
);
350 if (currentServer
== null) {
352 "Could not get server for Region:" + region
.getRegionNameAsString() + " moving on");
355 } else if (server
.equals(currentServer
)) {
357 "Region " + region
.getRegionNameAsString() + " is already on target server=" + server
);
362 Future
<Boolean
> task
= moveRegionsPool
363 .submit(new MoveWithAck(conn
, region
, currentServer
, server
, movedRegions
));
366 Future
<Boolean
> task
= moveRegionsPool
367 .submit(new MoveWithoutAck(admin
, region
, currentServer
, server
, movedRegions
));
373 moveRegionsPool
.shutdown();
374 long timeoutInSeconds
= regionsToMove
.size() * admin
.getConfiguration()
375 .getLong(MOVE_WAIT_MAX_KEY
, DEFAULT_MOVE_WAIT_MAX
);
376 waitMoveTasksToFinish(moveRegionsPool
, taskList
, timeoutInSeconds
);
380 * Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.In
381 * noAck mode we do not make sure that region is successfully online on the target region
382 * server,hence it is best effort.We do not unload regions to hostnames given in
383 * {@link #excludeFile}. If designatedFile is present with some contents, we will unload regions
384 * to hostnames provided in {@link #designatedFile}
386 * @return true if unloading succeeded, false otherwise
388 public boolean unload() throws InterruptedException
, ExecutionException
, TimeoutException
{
389 return unloadRegions(false);
393 * Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.In
394 * noAck mode we do not make sure that region is successfully online on the target region
395 * server,hence it is best effort.We do not unload regions to hostnames given in
396 * {@link #excludeFile}. If designatedFile is present with some contents, we will unload regions
397 * to hostnames provided in {@link #designatedFile}.
398 * While unloading regions, destination RegionServers are selected from different rack i.e
399 * regions should not move to any RegionServers that belong to same rack as source RegionServer.
401 * @return true if unloading succeeded, false otherwise
403 public boolean unloadFromRack()
404 throws InterruptedException
, ExecutionException
, TimeoutException
{
405 return unloadRegions(true);
408 private boolean unloadRegions(boolean unloadFromRack
) throws InterruptedException
,
409 ExecutionException
, TimeoutException
{
410 deleteFile(this.filename
);
411 ExecutorService unloadPool
= Executors
.newFixedThreadPool(1);
412 Future
<Boolean
> unloadTask
= unloadPool
.submit(() -> {
413 List
<RegionInfo
> movedRegions
= Collections
.synchronizedList(new ArrayList
<>());
415 // Get Online RegionServers
416 List
<ServerName
> regionServers
= new ArrayList
<>();
417 RSGroupInfo rsgroup
= admin
.getRSGroup(Address
.fromParts(hostname
, port
));
418 LOG
.info("{} belongs to {}", hostname
, rsgroup
.getName());
419 regionServers
.addAll(filterRSGroupServers(rsgroup
, admin
.getRegionServers()));
420 // Remove the host Region server from target Region Servers list
421 ServerName server
= stripServer(regionServers
, hostname
, port
);
422 if (server
== null) {
423 LOG
.info("Could not find server '{}:{}' in the set of region servers. giving up.",
425 LOG
.debug("List of region servers: {}", regionServers
);
428 // Remove RS not present in the designated file
429 includeExcludeRegionServers(designatedFile
, regionServers
, true);
431 // Remove RS present in the exclude file
432 includeExcludeRegionServers(excludeFile
, regionServers
, false);
434 if (unloadFromRack
) {
435 // remove regionServers that belong to same rack (as source host) since the goal is to
436 // unload regions from source regionServer to destination regionServers
437 // that belong to different rack only.
438 String sourceRack
= rackManager
.getRack(server
);
439 List
<String
> racks
= rackManager
.getRack(regionServers
);
440 Iterator
<ServerName
> iterator
= regionServers
.iterator();
442 while (iterator
.hasNext()) {
444 if (racks
.size() > i
&& racks
.get(i
) != null && racks
.get(i
).equals(sourceRack
)) {
451 // Remove decommissioned RS
452 Set
<ServerName
> decommissionedRS
= new HashSet
<>(admin
.listDecommissionedRegionServers());
453 if (CollectionUtils
.isNotEmpty(decommissionedRS
)) {
454 regionServers
.removeIf(decommissionedRS
::contains
);
455 LOG
.debug("Excluded RegionServers from unloading regions to because they " +
456 "are marked as decommissioned. Servers: {}", decommissionedRS
);
459 stripMaster(regionServers
);
460 if (regionServers
.isEmpty()) {
461 LOG
.warn("No Regions were moved - no servers available");
464 LOG
.info("Available servers {}", regionServers
);
466 unloadRegions(server
, regionServers
, movedRegions
);
467 } catch (Exception e
) {
468 LOG
.error("Error while unloading regions ", e
);
471 if (movedRegions
!= null) {
472 writeFile(filename
, movedRegions
);
477 return waitTaskToFinish(unloadPool
, unloadTask
, "unloading");
480 @InterfaceAudience.Private
481 Collection
<ServerName
> filterRSGroupServers(RSGroupInfo rsgroup
,
482 Collection
<ServerName
> onlineServers
) {
483 if (rsgroup
.getName().equals(RSGroupInfo
.DEFAULT_GROUP
)) {
484 return onlineServers
;
486 List
<ServerName
> serverLists
= new ArrayList
<>(rsgroup
.getServers().size());
487 for (ServerName server
: onlineServers
) {
488 Address address
= Address
.fromParts(server
.getHostname(), server
.getPort());
489 if (rsgroup
.containsServer(address
)) {
490 serverLists
.add(server
);
496 private void unloadRegions(ServerName server
, List
<ServerName
> regionServers
,
497 List
<RegionInfo
> movedRegions
) throws Exception
{
499 List
<RegionInfo
> regionsToMove
= admin
.getRegions(server
);
500 regionsToMove
.removeAll(movedRegions
);
501 if (regionsToMove
.isEmpty()) {
502 LOG
.info("No Regions to move....Quitting now");
505 LOG
.info("Moving {} regions from {} to {} servers using {} threads .Ack Mode: {}",
506 regionsToMove
.size(), this.hostname
, regionServers
.size(), this.maxthreads
, ack
);
508 Optional
<RegionInfo
> metaRegion
= getMetaRegionInfoIfToBeMoved(regionsToMove
);
509 if (metaRegion
.isPresent()) {
510 RegionInfo meta
= metaRegion
.get();
511 submitRegionMovesWhileUnloading(server
, regionServers
, movedRegions
,
512 Collections
.singletonList(meta
));
513 regionsToMove
.remove(meta
);
515 submitRegionMovesWhileUnloading(server
, regionServers
, movedRegions
, regionsToMove
);
519 private void submitRegionMovesWhileUnloading(ServerName server
, List
<ServerName
> regionServers
,
520 List
<RegionInfo
> movedRegions
, List
<RegionInfo
> regionsToMove
) throws Exception
{
521 final ExecutorService moveRegionsPool
= Executors
.newFixedThreadPool(this.maxthreads
);
522 List
<Future
<Boolean
>> taskList
= new ArrayList
<>();
524 for (RegionInfo regionToMove
: regionsToMove
) {
526 Future
<Boolean
> task
= moveRegionsPool
.submit(
527 new MoveWithAck(conn
, regionToMove
, server
, regionServers
.get(serverIndex
),
531 Future
<Boolean
> task
= moveRegionsPool
.submit(
532 new MoveWithoutAck(admin
, regionToMove
, server
, regionServers
.get(serverIndex
),
536 serverIndex
= (serverIndex
+ 1) % regionServers
.size();
538 moveRegionsPool
.shutdown();
539 long timeoutInSeconds
= regionsToMove
.size() * admin
.getConfiguration()
540 .getLong(MOVE_WAIT_MAX_KEY
, DEFAULT_MOVE_WAIT_MAX
);
541 waitMoveTasksToFinish(moveRegionsPool
, taskList
, timeoutInSeconds
);
544 private boolean waitTaskToFinish(ExecutorService pool
, Future
<Boolean
> task
, String operation
)
545 throws TimeoutException
, InterruptedException
, ExecutionException
{
548 if (!pool
.awaitTermination((long) this.timeout
, TimeUnit
.SECONDS
)) {
550 "Timed out before finishing the " + operation
+ " operation. Timeout: " + this.timeout
554 } catch (InterruptedException e
) {
556 Thread
.currentThread().interrupt();
559 return task
.get(5, TimeUnit
.SECONDS
);
560 } catch (InterruptedException e
) {
561 LOG
.warn("Interrupted while " + operation
+ " Regions on " + this.hostname
, e
);
563 } catch (ExecutionException e
) {
564 LOG
.error("Error while " + operation
+ " regions on RegionServer " + this.hostname
, e
);
569 private void waitMoveTasksToFinish(ExecutorService moveRegionsPool
,
570 List
<Future
<Boolean
>> taskList
, long timeoutInSeconds
) throws Exception
{
572 if (!moveRegionsPool
.awaitTermination(timeoutInSeconds
, TimeUnit
.SECONDS
)) {
573 moveRegionsPool
.shutdownNow();
575 } catch (InterruptedException e
) {
576 moveRegionsPool
.shutdownNow();
577 Thread
.currentThread().interrupt();
579 for (Future
<Boolean
> future
: taskList
) {
581 // if even after shutdownNow threads are stuck we wait for 5 secs max
582 if (!future
.get(5, TimeUnit
.SECONDS
)) {
583 LOG
.error("Was Not able to move region....Exiting Now");
584 throw new Exception("Could not move region Exception");
586 } catch (InterruptedException e
) {
587 LOG
.error("Interrupted while waiting for Thread to Complete " + e
.getMessage(), e
);
589 } catch (ExecutionException e
) {
590 boolean ignoreFailure
= ignoreRegionMoveFailure(e
);
592 LOG
.debug("Ignore region move failure, it might have been split/merged.", e
);
594 LOG
.error("Got Exception From Thread While moving region {}", e
.getMessage(), e
);
597 } catch (CancellationException e
) {
598 LOG
.error("Thread for moving region cancelled. Timeout for cancellation:" + timeoutInSeconds
605 private boolean ignoreRegionMoveFailure(ExecutionException e
) {
606 boolean ignoreFailure
= false;
607 if (e
.getCause() instanceof UnknownRegionException
) {
608 // region does not exist anymore
609 ignoreFailure
= true;
610 } else if (e
.getCause() instanceof DoNotRetryRegionException
611 && e
.getCause().getMessage() != null && e
.getCause().getMessage()
612 .contains(AssignmentManager
.UNEXPECTED_STATE_REGION
+ "state=SPLIT,")) {
613 // region is recently split
614 ignoreFailure
= true;
616 return ignoreFailure
;
619 private ServerName
getTargetServer() throws Exception
{
620 ServerName server
= null;
621 int maxWaitInSeconds
=
622 admin
.getConfiguration().getInt(SERVERSTART_WAIT_MAX_KEY
, DEFAULT_SERVERSTART_WAIT_MAX
);
623 long maxWait
= EnvironmentEdgeManager
.currentTime() + maxWaitInSeconds
* 1000;
624 while (EnvironmentEdgeManager
.currentTime() < maxWait
) {
626 List
<ServerName
> regionServers
= new ArrayList
<>();
627 regionServers
.addAll(admin
.getRegionServers());
628 // Remove the host Region server from target Region Servers list
629 server
= stripServer(regionServers
, hostname
, port
);
630 if (server
!= null) {
633 LOG
.warn("Server " + hostname
+ ":" + port
+ " is not up yet, waiting");
635 } catch (IOException e
) {
636 LOG
.warn("Could not get list of region servers", e
);
640 if (server
== null) {
641 LOG
.error("Server " + hostname
+ ":" + port
+ " is not up. Giving up.");
642 throw new Exception("Server " + hostname
+ ":" + port
+ " to load regions not online");
647 private List
<RegionInfo
> readRegionsFromFile(String filename
) throws IOException
{
648 List
<RegionInfo
> regions
= new ArrayList
<>();
649 File f
= new File(filename
);
653 try (DataInputStream dis
= new DataInputStream(
654 new BufferedInputStream(new FileInputStream(f
)))) {
655 int numRegions
= dis
.readInt();
657 while (index
< numRegions
) {
658 regions
.add(RegionInfo
.parseFromOrNull(Bytes
.readByteArray(dis
)));
661 } catch (IOException e
) {
662 LOG
.error("Error while reading regions from file:" + filename
, e
);
669 * Write the number of regions moved in the first line followed by regions moved in subsequent
672 private void writeFile(String filename
, List
<RegionInfo
> movedRegions
) throws IOException
{
673 try (DataOutputStream dos
= new DataOutputStream(
674 new BufferedOutputStream(new FileOutputStream(filename
)))) {
675 dos
.writeInt(movedRegions
.size());
676 for (RegionInfo region
: movedRegions
) {
677 Bytes
.writeByteArray(dos
, RegionInfo
.toByteArray(region
));
679 } catch (IOException e
) {
681 "ERROR: Was Not able to write regions moved to output file but moved " + movedRegions
682 .size() + " regions", e
);
687 private void deleteFile(String filename
) {
688 File f
= new File(filename
);
695 * @param filename The file should have 'host:port' per line
696 * @return List of servers from the file in format 'hostname:port'.
698 private List
<String
> readServersFromFile(String filename
) throws IOException
{
699 List
<String
> servers
= new ArrayList
<>();
700 if (filename
!= null) {
702 Files
.readAllLines(Paths
.get(filename
)).stream().map(String
::trim
)
703 .filter(((Predicate
<String
>) String
::isEmpty
).negate()).map(String
::toLowerCase
)
704 .forEach(servers
::add
);
705 } catch (IOException e
) {
706 LOG
.error("Exception while reading servers from file,", e
);
714 * Designates or excludes the servername whose hostname and port portion matches the list given
717 * If you want to designated RSs, suppose designatedFile has RS1, regionServers has RS1, RS2 and
718 * RS3. When we call includeExcludeRegionServers(designatedFile, regionServers, true), RS2 and
719 * RS3 are removed from regionServers list so that regions can move to only RS1.
720 * If you want to exclude RSs, suppose excludeFile has RS1, regionServers has RS1, RS2 and RS3.
721 * When we call includeExcludeRegionServers(excludeFile, servers, false), RS1 is removed from
722 * regionServers list so that regions can move to only RS2 and RS3.
724 private void includeExcludeRegionServers(String fileName
, List
<ServerName
> regionServers
,
725 boolean isInclude
) throws IOException
{
726 if (fileName
!= null) {
727 List
<String
> servers
= readServersFromFile(fileName
);
728 if (servers
.isEmpty()) {
729 LOG
.warn("No servers provided in the file: {}." + fileName
);
732 Iterator
<ServerName
> i
= regionServers
.iterator();
733 while (i
.hasNext()) {
734 String rs
= i
.next().getServerName();
735 String rsPort
= rs
.split(ServerName
.SERVERNAME_SEPARATOR
)[0].toLowerCase() + ":" + rs
736 .split(ServerName
.SERVERNAME_SEPARATOR
)[1];
737 if (isInclude
!= servers
.contains(rsPort
)) {
745 * Exclude master from list of RSs to move regions to
747 private void stripMaster(List
<ServerName
> regionServers
) throws IOException
{
748 ServerName master
= admin
.getClusterMetrics(EnumSet
.of(Option
.MASTER
)).getMasterName();
749 stripServer(regionServers
, master
.getHostname(), master
.getPort());
753 * Remove the servername whose hostname and port portion matches from the passed array of servers.
754 * Returns as side-effect the servername removed.
755 * @return server removed from list of Region Servers
757 private ServerName
stripServer(List
<ServerName
> regionServers
, String hostname
, int port
) {
758 for (Iterator
<ServerName
> iter
= regionServers
.iterator(); iter
.hasNext();) {
759 ServerName server
= iter
.next();
760 if (server
.getAddress().getHostName().equalsIgnoreCase(hostname
) &&
761 server
.getAddress().getPort() == port
) {
770 protected void addOptions() {
771 this.addRequiredOptWithArg("r", "regionserverhost", "region server <hostname>|<hostname:port>");
772 this.addRequiredOptWithArg("o", "operation", "Expected: load/unload/unload_from_rack");
773 this.addOptWithArg("m", "maxthreads",
774 "Define the maximum number of threads to use to unload and reload the regions");
775 this.addOptWithArg("x", "excludefile",
776 "File with <hostname:port> per line to exclude as unload targets; default excludes only "
777 + "target host; useful for rack decommisioning.");
778 this.addOptWithArg("d","designatedfile","File with <hostname:port> per line as unload targets;"
779 + "default is all online hosts");
780 this.addOptWithArg("f", "filename",
781 "File to save regions list into unloading, or read from loading; "
782 + "default /tmp/<usernamehostname:port>");
783 this.addOptNoArg("n", "noack",
784 "Turn on No-Ack mode(default: false) which won't check if region is online on target "
785 + "RegionServer, hence best effort. This is more performant in unloading and loading "
786 + "but might lead to region being unavailable for some time till master reassigns it "
787 + "in case the move failed");
788 this.addOptWithArg("t", "timeout", "timeout in seconds after which the tool will exit "
789 + "irrespective of whether it finished or not;default Integer.MAX_VALUE");
793 protected void processOptions(CommandLine cmd
) {
794 String hostname
= cmd
.getOptionValue("r");
795 rmbuilder
= new RegionMoverBuilder(hostname
);
796 if (cmd
.hasOption('m')) {
797 rmbuilder
.maxthreads(Integer
.parseInt(cmd
.getOptionValue('m')));
799 if (cmd
.hasOption('n')) {
800 rmbuilder
.ack(false);
802 if (cmd
.hasOption('f')) {
803 rmbuilder
.filename(cmd
.getOptionValue('f'));
805 if (cmd
.hasOption('x')) {
806 rmbuilder
.excludeFile(cmd
.getOptionValue('x'));
808 if (cmd
.hasOption('d')) {
809 rmbuilder
.designatedFile(cmd
.getOptionValue('d'));
811 if (cmd
.hasOption('t')) {
812 rmbuilder
.timeout(Integer
.parseInt(cmd
.getOptionValue('t')));
814 this.loadUnload
= cmd
.getOptionValue("o").toLowerCase(Locale
.ROOT
);
818 protected int doWork() throws Exception
{
820 try (RegionMover rm
= rmbuilder
.build()) {
821 if (loadUnload
.equalsIgnoreCase("load")) {
823 } else if (loadUnload
.equalsIgnoreCase("unload")) {
824 success
= rm
.unload();
825 } else if (loadUnload
.equalsIgnoreCase("unload_from_rack")) {
826 success
= rm
.unloadFromRack();
832 return (success ?
0 : 1);
835 public static void main(String
[] args
) {
836 try (RegionMover mover
= new RegionMover()) {
837 mover
.doStaticMain(args
);