HBASE-26304 Reflect out of band locality improvements in metrics and balancer (#3803)
[hbase.git] / hbase-server / src / main / java / org / apache / hadoop / hbase / util / RegionMover.java
blob286caf8ed3b0f66ad5d7890a280c06a12c3927bd
1 /**
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
20 package org.apache.hadoop.hbase.util;
22 import java.io.BufferedInputStream;
23 import java.io.BufferedOutputStream;
24 import java.io.Closeable;
25 import java.io.DataInputStream;
26 import java.io.DataOutputStream;
27 import java.io.File;
28 import java.io.FileInputStream;
29 import java.io.FileOutputStream;
30 import java.io.IOException;
31 import java.nio.file.Files;
32 import java.nio.file.Paths;
33 import java.util.ArrayList;
34 import java.util.Collection;
35 import java.util.Collections;
36 import java.util.EnumSet;
37 import java.util.HashSet;
38 import java.util.Iterator;
39 import java.util.List;
40 import java.util.Locale;
41 import java.util.Optional;
42 import java.util.Set;
43 import java.util.concurrent.Callable;
44 import java.util.concurrent.CancellationException;
45 import java.util.concurrent.ExecutionException;
46 import java.util.concurrent.ExecutorService;
47 import java.util.concurrent.Executors;
48 import java.util.concurrent.Future;
49 import java.util.concurrent.TimeUnit;
50 import java.util.concurrent.TimeoutException;
51 import java.util.function.Predicate;
52 import org.apache.commons.io.IOUtils;
53 import org.apache.hadoop.conf.Configuration;
54 import org.apache.hadoop.hbase.ClusterMetrics.Option;
55 import org.apache.hadoop.hbase.HBaseConfiguration;
56 import org.apache.hadoop.hbase.HConstants;
57 import org.apache.hadoop.hbase.ServerName;
58 import org.apache.hadoop.hbase.UnknownRegionException;
59 import org.apache.hadoop.hbase.client.Admin;
60 import org.apache.hadoop.hbase.client.Connection;
61 import org.apache.hadoop.hbase.client.ConnectionFactory;
62 import org.apache.hadoop.hbase.client.DoNotRetryRegionException;
63 import org.apache.hadoop.hbase.client.RegionInfo;
64 import org.apache.hadoop.hbase.master.RackManager;
65 import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
66 import org.apache.hadoop.hbase.net.Address;
67 import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
68 import org.apache.yetus.audience.InterfaceAudience;
69 import org.slf4j.Logger;
70 import org.slf4j.LoggerFactory;
72 import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
73 import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
75 /**
76 * Tool for loading/unloading regions to/from given regionserver This tool can be run from Command
77 * line directly as a utility. Supports Ack/No Ack mode for loading/unloading operations.Ack mode
78 * acknowledges if regions are online after movement while noAck mode is best effort mode that
79 * improves performance but will still move on if region is stuck/not moved. Motivation behind noAck
80 * mode being RS shutdown where even if a Region is stuck, upon shutdown master will move it
81 * anyways. This can also be used by constructiong an Object using the builder and then calling
82 * {@link #load()} or {@link #unload()} methods for the desired operations.
84 @InterfaceAudience.Public
85 public class RegionMover extends AbstractHBaseTool implements Closeable {
86 public static final String MOVE_RETRIES_MAX_KEY = "hbase.move.retries.max";
87 public static final String MOVE_WAIT_MAX_KEY = "hbase.move.wait.max";
88 public static final String SERVERSTART_WAIT_MAX_KEY = "hbase.serverstart.wait.max";
89 public static final int DEFAULT_MOVE_RETRIES_MAX = 5;
90 public static final int DEFAULT_MOVE_WAIT_MAX = 60;
91 public static final int DEFAULT_SERVERSTART_WAIT_MAX = 180;
93 private static final Logger LOG = LoggerFactory.getLogger(RegionMover.class);
95 private RegionMoverBuilder rmbuilder;
96 private boolean ack = true;
97 private int maxthreads = 1;
98 private int timeout;
99 private String loadUnload;
100 private String hostname;
101 private String filename;
102 private String excludeFile;
103 private String designatedFile;
104 private int port;
105 private Connection conn;
106 private Admin admin;
107 private RackManager rackManager;
109 private RegionMover(RegionMoverBuilder builder) throws IOException {
110 this.hostname = builder.hostname;
111 this.filename = builder.filename;
112 this.excludeFile = builder.excludeFile;
113 this.designatedFile = builder.designatedFile;
114 this.maxthreads = builder.maxthreads;
115 this.ack = builder.ack;
116 this.port = builder.port;
117 this.timeout = builder.timeout;
118 setConf(builder.conf);
119 this.conn = ConnectionFactory.createConnection(conf);
120 this.admin = conn.getAdmin();
121 // Only while running unit tests, builder.rackManager will not be null for the convenience of
122 // providing custom rackManager. Otherwise for regular workflow/user triggered action,
123 // builder.rackManager is supposed to be null. Hence, setter of builder.rackManager is
124 // provided as @InterfaceAudience.Private and it is commented that this is just
125 // to be used by unit test.
126 rackManager = builder.rackManager == null ? new RackManager(conf) : builder.rackManager;
129 private RegionMover() {
132 @Override
133 public void close() {
134 IOUtils.closeQuietly(this.admin, e -> LOG.warn("failed to close admin", e));
135 IOUtils.closeQuietly(this.conn, e -> LOG.warn("failed to close conn", e));
139 * Builder for Region mover. Use the {@link #build()} method to create RegionMover object. Has
140 * {@link #filename(String)}, {@link #excludeFile(String)}, {@link #maxthreads(int)},
141 * {@link #ack(boolean)}, {@link #timeout(int)}, {@link #designatedFile(String)} methods to set
142 * the corresponding options.
144 public static class RegionMoverBuilder {
145 private boolean ack = true;
146 private int maxthreads = 1;
147 private int timeout = Integer.MAX_VALUE;
148 private String hostname;
149 private String filename;
150 private String excludeFile = null;
151 private String designatedFile = null;
152 private String defaultDir = System.getProperty("java.io.tmpdir");
153 @InterfaceAudience.Private
154 final int port;
155 private final Configuration conf;
156 private RackManager rackManager;
158 public RegionMoverBuilder(String hostname) {
159 this(hostname, createConf());
163 * Creates a new configuration and sets region mover specific overrides
165 private static Configuration createConf() {
166 Configuration conf = HBaseConfiguration.create();
167 conf.setInt("hbase.client.prefetch.limit", 1);
168 conf.setInt("hbase.client.pause", 500);
169 conf.setInt("hbase.client.retries.number", 100);
170 return conf;
174 * @param hostname Hostname to unload regions from or load regions to. Can be either hostname
175 * or hostname:port.
176 * @param conf Configuration object
178 public RegionMoverBuilder(String hostname, Configuration conf) {
179 String[] splitHostname = hostname.toLowerCase().split(":");
180 this.hostname = splitHostname[0];
181 if (splitHostname.length == 2) {
182 this.port = Integer.parseInt(splitHostname[1]);
183 } else {
184 this.port = conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT);
186 this.filename = defaultDir + File.separator + System.getProperty("user.name") + this.hostname
187 + ":" + Integer.toString(this.port);
188 this.conf = conf;
192 * Path of file where regions will be written to during unloading/read from during loading
193 * @param filename
194 * @return RegionMoverBuilder object
196 public RegionMoverBuilder filename(String filename) {
197 this.filename = filename;
198 return this;
202 * Set the max number of threads that will be used to move regions
204 public RegionMoverBuilder maxthreads(int threads) {
205 this.maxthreads = threads;
206 return this;
210 * Path of file containing hostnames to be excluded during region movement. Exclude file should
211 * have 'host:port' per line. Port is mandatory here as we can have many RS running on a single
212 * host.
214 public RegionMoverBuilder excludeFile(String excludefile) {
215 this.excludeFile = excludefile;
216 return this;
220 * Set the designated file. Designated file contains hostnames where region moves. Designated
221 * file should have 'host:port' per line. Port is mandatory here as we can have many RS running
222 * on a single host.
223 * @param designatedFile The designated file
224 * @return RegionMoverBuilder object
226 public RegionMoverBuilder designatedFile(String designatedFile) {
227 this.designatedFile = designatedFile;
228 return this;
232 * Set ack/noAck mode.
233 * <p>
234 * In ack mode regions are acknowledged before and after moving and the move is retried
235 * hbase.move.retries.max times, if unsuccessful we quit with exit code 1.No Ack mode is a best
236 * effort mode,each region movement is tried once.This can be used during graceful shutdown as
237 * even if we have a stuck region,upon shutdown it'll be reassigned anyway.
238 * <p>
239 * @param ack
240 * @return RegionMoverBuilder object
242 public RegionMoverBuilder ack(boolean ack) {
243 this.ack = ack;
244 return this;
248 * Set the timeout for Load/Unload operation in seconds.This is a global timeout,threadpool for
249 * movers also have a separate time which is hbase.move.wait.max * number of regions to
250 * load/unload
251 * @param timeout in seconds
252 * @return RegionMoverBuilder object
254 public RegionMoverBuilder timeout(int timeout) {
255 this.timeout = timeout;
256 return this;
260 * Set specific rackManager implementation.
261 * This setter method is for testing purpose only.
263 * @param rackManager rackManager impl
264 * @return RegionMoverBuilder object
266 @InterfaceAudience.Private
267 public RegionMoverBuilder rackManager(RackManager rackManager) {
268 this.rackManager = rackManager;
269 return this;
273 * This method builds the appropriate RegionMover object which can then be used to load/unload
274 * using load and unload methods
275 * @return RegionMover object
277 public RegionMover build() throws IOException {
278 return new RegionMover(this);
283 * Loads the specified {@link #hostname} with regions listed in the {@link #filename} RegionMover
284 * Object has to be created using {@link #RegionMover(RegionMoverBuilder)}
285 * @return true if loading succeeded, false otherwise
287 public boolean load() throws ExecutionException, InterruptedException, TimeoutException {
288 ExecutorService loadPool = Executors.newFixedThreadPool(1);
289 Future<Boolean> loadTask = loadPool.submit(getMetaRegionMovePlan());
290 boolean isMetaMoved = waitTaskToFinish(loadPool, loadTask, "loading");
291 if (!isMetaMoved) {
292 return false;
294 loadPool = Executors.newFixedThreadPool(1);
295 loadTask = loadPool.submit(getNonMetaRegionsMovePlan());
296 return waitTaskToFinish(loadPool, loadTask, "loading");
299 private Callable<Boolean> getMetaRegionMovePlan() {
300 return getRegionsMovePlan(true);
303 private Callable<Boolean> getNonMetaRegionsMovePlan() {
304 return getRegionsMovePlan(false);
307 private Callable<Boolean> getRegionsMovePlan(boolean moveMetaRegion) {
308 return () -> {
309 try {
310 List<RegionInfo> regionsToMove = readRegionsFromFile(filename);
311 if (regionsToMove.isEmpty()) {
312 LOG.info("No regions to load.Exiting");
313 return true;
315 Optional<RegionInfo> metaRegion = getMetaRegionInfoIfToBeMoved(regionsToMove);
316 if (moveMetaRegion) {
317 if (metaRegion.isPresent()) {
318 loadRegions(Collections.singletonList(metaRegion.get()));
320 } else {
321 metaRegion.ifPresent(regionsToMove::remove);
322 loadRegions(regionsToMove);
324 } catch (Exception e) {
325 LOG.error("Error while loading regions to " + hostname, e);
326 return false;
328 return true;
332 private Optional<RegionInfo> getMetaRegionInfoIfToBeMoved(List<RegionInfo> regionsToMove) {
333 return regionsToMove.stream().filter(RegionInfo::isMetaRegion).findFirst();
336 private void loadRegions(List<RegionInfo> regionsToMove)
337 throws Exception {
338 ServerName server = getTargetServer();
339 List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>());
340 LOG.info(
341 "Moving " + regionsToMove.size() + " regions to " + server + " using " + this.maxthreads
342 + " threads.Ack mode:" + this.ack);
344 final ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads);
345 List<Future<Boolean>> taskList = new ArrayList<>();
346 int counter = 0;
347 while (counter < regionsToMove.size()) {
348 RegionInfo region = regionsToMove.get(counter);
349 ServerName currentServer = MoveWithAck.getServerNameForRegion(region, admin, conn);
350 if (currentServer == null) {
351 LOG.warn(
352 "Could not get server for Region:" + region.getRegionNameAsString() + " moving on");
353 counter++;
354 continue;
355 } else if (server.equals(currentServer)) {
356 LOG.info(
357 "Region " + region.getRegionNameAsString() + " is already on target server=" + server);
358 counter++;
359 continue;
361 if (ack) {
362 Future<Boolean> task = moveRegionsPool
363 .submit(new MoveWithAck(conn, region, currentServer, server, movedRegions));
364 taskList.add(task);
365 } else {
366 Future<Boolean> task = moveRegionsPool
367 .submit(new MoveWithoutAck(admin, region, currentServer, server, movedRegions));
368 taskList.add(task);
370 counter++;
373 moveRegionsPool.shutdown();
374 long timeoutInSeconds = regionsToMove.size() * admin.getConfiguration()
375 .getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
376 waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds);
380 * Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.In
381 * noAck mode we do not make sure that region is successfully online on the target region
382 * server,hence it is best effort.We do not unload regions to hostnames given in
383 * {@link #excludeFile}. If designatedFile is present with some contents, we will unload regions
384 * to hostnames provided in {@link #designatedFile}
386 * @return true if unloading succeeded, false otherwise
388 public boolean unload() throws InterruptedException, ExecutionException, TimeoutException {
389 return unloadRegions(false);
393 * Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.In
394 * noAck mode we do not make sure that region is successfully online on the target region
395 * server,hence it is best effort.We do not unload regions to hostnames given in
396 * {@link #excludeFile}. If designatedFile is present with some contents, we will unload regions
397 * to hostnames provided in {@link #designatedFile}.
398 * While unloading regions, destination RegionServers are selected from different rack i.e
399 * regions should not move to any RegionServers that belong to same rack as source RegionServer.
401 * @return true if unloading succeeded, false otherwise
403 public boolean unloadFromRack()
404 throws InterruptedException, ExecutionException, TimeoutException {
405 return unloadRegions(true);
408 private boolean unloadRegions(boolean unloadFromRack) throws InterruptedException,
409 ExecutionException, TimeoutException {
410 deleteFile(this.filename);
411 ExecutorService unloadPool = Executors.newFixedThreadPool(1);
412 Future<Boolean> unloadTask = unloadPool.submit(() -> {
413 List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>());
414 try {
415 // Get Online RegionServers
416 List<ServerName> regionServers = new ArrayList<>();
417 RSGroupInfo rsgroup = admin.getRSGroup(Address.fromParts(hostname, port));
418 LOG.info("{} belongs to {}", hostname, rsgroup.getName());
419 regionServers.addAll(filterRSGroupServers(rsgroup, admin.getRegionServers()));
420 // Remove the host Region server from target Region Servers list
421 ServerName server = stripServer(regionServers, hostname, port);
422 if (server == null) {
423 LOG.info("Could not find server '{}:{}' in the set of region servers. giving up.",
424 hostname, port);
425 LOG.debug("List of region servers: {}", regionServers);
426 return false;
428 // Remove RS not present in the designated file
429 includeExcludeRegionServers(designatedFile, regionServers, true);
431 // Remove RS present in the exclude file
432 includeExcludeRegionServers(excludeFile, regionServers, false);
434 if (unloadFromRack) {
435 // remove regionServers that belong to same rack (as source host) since the goal is to
436 // unload regions from source regionServer to destination regionServers
437 // that belong to different rack only.
438 String sourceRack = rackManager.getRack(server);
439 List<String> racks = rackManager.getRack(regionServers);
440 Iterator<ServerName> iterator = regionServers.iterator();
441 int i = 0;
442 while (iterator.hasNext()) {
443 iterator.next();
444 if (racks.size() > i && racks.get(i) != null && racks.get(i).equals(sourceRack)) {
445 iterator.remove();
447 i++;
451 // Remove decommissioned RS
452 Set<ServerName> decommissionedRS = new HashSet<>(admin.listDecommissionedRegionServers());
453 if (CollectionUtils.isNotEmpty(decommissionedRS)) {
454 regionServers.removeIf(decommissionedRS::contains);
455 LOG.debug("Excluded RegionServers from unloading regions to because they " +
456 "are marked as decommissioned. Servers: {}", decommissionedRS);
459 stripMaster(regionServers);
460 if (regionServers.isEmpty()) {
461 LOG.warn("No Regions were moved - no servers available");
462 return false;
463 } else {
464 LOG.info("Available servers {}", regionServers);
466 unloadRegions(server, regionServers, movedRegions);
467 } catch (Exception e) {
468 LOG.error("Error while unloading regions ", e);
469 return false;
470 } finally {
471 if (movedRegions != null) {
472 writeFile(filename, movedRegions);
475 return true;
477 return waitTaskToFinish(unloadPool, unloadTask, "unloading");
480 @InterfaceAudience.Private
481 Collection<ServerName> filterRSGroupServers(RSGroupInfo rsgroup,
482 Collection<ServerName> onlineServers) {
483 if (rsgroup.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
484 return onlineServers;
486 List<ServerName> serverLists = new ArrayList<>(rsgroup.getServers().size());
487 for (ServerName server : onlineServers) {
488 Address address = Address.fromParts(server.getHostname(), server.getPort());
489 if (rsgroup.containsServer(address)) {
490 serverLists.add(server);
493 return serverLists;
496 private void unloadRegions(ServerName server, List<ServerName> regionServers,
497 List<RegionInfo> movedRegions) throws Exception {
498 while (true) {
499 List<RegionInfo> regionsToMove = admin.getRegions(server);
500 regionsToMove.removeAll(movedRegions);
501 if (regionsToMove.isEmpty()) {
502 LOG.info("No Regions to move....Quitting now");
503 break;
505 LOG.info("Moving {} regions from {} to {} servers using {} threads .Ack Mode: {}",
506 regionsToMove.size(), this.hostname, regionServers.size(), this.maxthreads, ack);
508 Optional<RegionInfo> metaRegion = getMetaRegionInfoIfToBeMoved(regionsToMove);
509 if (metaRegion.isPresent()) {
510 RegionInfo meta = metaRegion.get();
511 submitRegionMovesWhileUnloading(server, regionServers, movedRegions,
512 Collections.singletonList(meta));
513 regionsToMove.remove(meta);
515 submitRegionMovesWhileUnloading(server, regionServers, movedRegions, regionsToMove);
519 private void submitRegionMovesWhileUnloading(ServerName server, List<ServerName> regionServers,
520 List<RegionInfo> movedRegions, List<RegionInfo> regionsToMove) throws Exception {
521 final ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads);
522 List<Future<Boolean>> taskList = new ArrayList<>();
523 int serverIndex = 0;
524 for (RegionInfo regionToMove : regionsToMove) {
525 if (ack) {
526 Future<Boolean> task = moveRegionsPool.submit(
527 new MoveWithAck(conn, regionToMove, server, regionServers.get(serverIndex),
528 movedRegions));
529 taskList.add(task);
530 } else {
531 Future<Boolean> task = moveRegionsPool.submit(
532 new MoveWithoutAck(admin, regionToMove, server, regionServers.get(serverIndex),
533 movedRegions));
534 taskList.add(task);
536 serverIndex = (serverIndex + 1) % regionServers.size();
538 moveRegionsPool.shutdown();
539 long timeoutInSeconds = regionsToMove.size() * admin.getConfiguration()
540 .getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
541 waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds);
544 private boolean waitTaskToFinish(ExecutorService pool, Future<Boolean> task, String operation)
545 throws TimeoutException, InterruptedException, ExecutionException {
546 pool.shutdown();
547 try {
548 if (!pool.awaitTermination((long) this.timeout, TimeUnit.SECONDS)) {
549 LOG.warn(
550 "Timed out before finishing the " + operation + " operation. Timeout: " + this.timeout
551 + "sec");
552 pool.shutdownNow();
554 } catch (InterruptedException e) {
555 pool.shutdownNow();
556 Thread.currentThread().interrupt();
558 try {
559 return task.get(5, TimeUnit.SECONDS);
560 } catch (InterruptedException e) {
561 LOG.warn("Interrupted while " + operation + " Regions on " + this.hostname, e);
562 throw e;
563 } catch (ExecutionException e) {
564 LOG.error("Error while " + operation + " regions on RegionServer " + this.hostname, e);
565 throw e;
569 private void waitMoveTasksToFinish(ExecutorService moveRegionsPool,
570 List<Future<Boolean>> taskList, long timeoutInSeconds) throws Exception {
571 try {
572 if (!moveRegionsPool.awaitTermination(timeoutInSeconds, TimeUnit.SECONDS)) {
573 moveRegionsPool.shutdownNow();
575 } catch (InterruptedException e) {
576 moveRegionsPool.shutdownNow();
577 Thread.currentThread().interrupt();
579 for (Future<Boolean> future : taskList) {
580 try {
581 // if even after shutdownNow threads are stuck we wait for 5 secs max
582 if (!future.get(5, TimeUnit.SECONDS)) {
583 LOG.error("Was Not able to move region....Exiting Now");
584 throw new Exception("Could not move region Exception");
586 } catch (InterruptedException e) {
587 LOG.error("Interrupted while waiting for Thread to Complete " + e.getMessage(), e);
588 throw e;
589 } catch (ExecutionException e) {
590 boolean ignoreFailure = ignoreRegionMoveFailure(e);
591 if (ignoreFailure) {
592 LOG.debug("Ignore region move failure, it might have been split/merged.", e);
593 } else {
594 LOG.error("Got Exception From Thread While moving region {}", e.getMessage(), e);
595 throw e;
597 } catch (CancellationException e) {
598 LOG.error("Thread for moving region cancelled. Timeout for cancellation:" + timeoutInSeconds
599 + "secs", e);
600 throw e;
605 private boolean ignoreRegionMoveFailure(ExecutionException e) {
606 boolean ignoreFailure = false;
607 if (e.getCause() instanceof UnknownRegionException) {
608 // region does not exist anymore
609 ignoreFailure = true;
610 } else if (e.getCause() instanceof DoNotRetryRegionException
611 && e.getCause().getMessage() != null && e.getCause().getMessage()
612 .contains(AssignmentManager.UNEXPECTED_STATE_REGION + "state=SPLIT,")) {
613 // region is recently split
614 ignoreFailure = true;
616 return ignoreFailure;
619 private ServerName getTargetServer() throws Exception {
620 ServerName server = null;
621 int maxWaitInSeconds =
622 admin.getConfiguration().getInt(SERVERSTART_WAIT_MAX_KEY, DEFAULT_SERVERSTART_WAIT_MAX);
623 long maxWait = EnvironmentEdgeManager.currentTime() + maxWaitInSeconds * 1000;
624 while (EnvironmentEdgeManager.currentTime() < maxWait) {
625 try {
626 List<ServerName> regionServers = new ArrayList<>();
627 regionServers.addAll(admin.getRegionServers());
628 // Remove the host Region server from target Region Servers list
629 server = stripServer(regionServers, hostname, port);
630 if (server != null) {
631 break;
632 } else {
633 LOG.warn("Server " + hostname + ":" + port + " is not up yet, waiting");
635 } catch (IOException e) {
636 LOG.warn("Could not get list of region servers", e);
638 Thread.sleep(500);
640 if (server == null) {
641 LOG.error("Server " + hostname + ":" + port + " is not up. Giving up.");
642 throw new Exception("Server " + hostname + ":" + port + " to load regions not online");
644 return server;
647 private List<RegionInfo> readRegionsFromFile(String filename) throws IOException {
648 List<RegionInfo> regions = new ArrayList<>();
649 File f = new File(filename);
650 if (!f.exists()) {
651 return regions;
653 try (DataInputStream dis = new DataInputStream(
654 new BufferedInputStream(new FileInputStream(f)))) {
655 int numRegions = dis.readInt();
656 int index = 0;
657 while (index < numRegions) {
658 regions.add(RegionInfo.parseFromOrNull(Bytes.readByteArray(dis)));
659 index++;
661 } catch (IOException e) {
662 LOG.error("Error while reading regions from file:" + filename, e);
663 throw e;
665 return regions;
669 * Write the number of regions moved in the first line followed by regions moved in subsequent
670 * lines
672 private void writeFile(String filename, List<RegionInfo> movedRegions) throws IOException {
673 try (DataOutputStream dos = new DataOutputStream(
674 new BufferedOutputStream(new FileOutputStream(filename)))) {
675 dos.writeInt(movedRegions.size());
676 for (RegionInfo region : movedRegions) {
677 Bytes.writeByteArray(dos, RegionInfo.toByteArray(region));
679 } catch (IOException e) {
680 LOG.error(
681 "ERROR: Was Not able to write regions moved to output file but moved " + movedRegions
682 .size() + " regions", e);
683 throw e;
687 private void deleteFile(String filename) {
688 File f = new File(filename);
689 if (f.exists()) {
690 f.delete();
695 * @param filename The file should have 'host:port' per line
696 * @return List of servers from the file in format 'hostname:port'.
698 private List<String> readServersFromFile(String filename) throws IOException {
699 List<String> servers = new ArrayList<>();
700 if (filename != null) {
701 try {
702 Files.readAllLines(Paths.get(filename)).stream().map(String::trim)
703 .filter(((Predicate<String>) String::isEmpty).negate()).map(String::toLowerCase)
704 .forEach(servers::add);
705 } catch (IOException e) {
706 LOG.error("Exception while reading servers from file,", e);
707 throw e;
710 return servers;
714 * Designates or excludes the servername whose hostname and port portion matches the list given
715 * in the file.
716 * Example:<br>
717 * If you want to designated RSs, suppose designatedFile has RS1, regionServers has RS1, RS2 and
718 * RS3. When we call includeExcludeRegionServers(designatedFile, regionServers, true), RS2 and
719 * RS3 are removed from regionServers list so that regions can move to only RS1.
720 * If you want to exclude RSs, suppose excludeFile has RS1, regionServers has RS1, RS2 and RS3.
721 * When we call includeExcludeRegionServers(excludeFile, servers, false), RS1 is removed from
722 * regionServers list so that regions can move to only RS2 and RS3.
724 private void includeExcludeRegionServers(String fileName, List<ServerName> regionServers,
725 boolean isInclude) throws IOException {
726 if (fileName != null) {
727 List<String> servers = readServersFromFile(fileName);
728 if (servers.isEmpty()) {
729 LOG.warn("No servers provided in the file: {}." + fileName);
730 return;
732 Iterator<ServerName> i = regionServers.iterator();
733 while (i.hasNext()) {
734 String rs = i.next().getServerName();
735 String rsPort = rs.split(ServerName.SERVERNAME_SEPARATOR)[0].toLowerCase() + ":" + rs
736 .split(ServerName.SERVERNAME_SEPARATOR)[1];
737 if (isInclude != servers.contains(rsPort)) {
738 i.remove();
745 * Exclude master from list of RSs to move regions to
747 private void stripMaster(List<ServerName> regionServers) throws IOException {
748 ServerName master = admin.getClusterMetrics(EnumSet.of(Option.MASTER)).getMasterName();
749 stripServer(regionServers, master.getHostname(), master.getPort());
753 * Remove the servername whose hostname and port portion matches from the passed array of servers.
754 * Returns as side-effect the servername removed.
755 * @return server removed from list of Region Servers
757 private ServerName stripServer(List<ServerName> regionServers, String hostname, int port) {
758 for (Iterator<ServerName> iter = regionServers.iterator(); iter.hasNext();) {
759 ServerName server = iter.next();
760 if (server.getAddress().getHostName().equalsIgnoreCase(hostname) &&
761 server.getAddress().getPort() == port) {
762 iter.remove();
763 return server;
766 return null;
769 @Override
770 protected void addOptions() {
771 this.addRequiredOptWithArg("r", "regionserverhost", "region server <hostname>|<hostname:port>");
772 this.addRequiredOptWithArg("o", "operation", "Expected: load/unload/unload_from_rack");
773 this.addOptWithArg("m", "maxthreads",
774 "Define the maximum number of threads to use to unload and reload the regions");
775 this.addOptWithArg("x", "excludefile",
776 "File with <hostname:port> per line to exclude as unload targets; default excludes only "
777 + "target host; useful for rack decommisioning.");
778 this.addOptWithArg("d","designatedfile","File with <hostname:port> per line as unload targets;"
779 + "default is all online hosts");
780 this.addOptWithArg("f", "filename",
781 "File to save regions list into unloading, or read from loading; "
782 + "default /tmp/<usernamehostname:port>");
783 this.addOptNoArg("n", "noack",
784 "Turn on No-Ack mode(default: false) which won't check if region is online on target "
785 + "RegionServer, hence best effort. This is more performant in unloading and loading "
786 + "but might lead to region being unavailable for some time till master reassigns it "
787 + "in case the move failed");
788 this.addOptWithArg("t", "timeout", "timeout in seconds after which the tool will exit "
789 + "irrespective of whether it finished or not;default Integer.MAX_VALUE");
792 @Override
793 protected void processOptions(CommandLine cmd) {
794 String hostname = cmd.getOptionValue("r");
795 rmbuilder = new RegionMoverBuilder(hostname);
796 if (cmd.hasOption('m')) {
797 rmbuilder.maxthreads(Integer.parseInt(cmd.getOptionValue('m')));
799 if (cmd.hasOption('n')) {
800 rmbuilder.ack(false);
802 if (cmd.hasOption('f')) {
803 rmbuilder.filename(cmd.getOptionValue('f'));
805 if (cmd.hasOption('x')) {
806 rmbuilder.excludeFile(cmd.getOptionValue('x'));
808 if (cmd.hasOption('d')) {
809 rmbuilder.designatedFile(cmd.getOptionValue('d'));
811 if (cmd.hasOption('t')) {
812 rmbuilder.timeout(Integer.parseInt(cmd.getOptionValue('t')));
814 this.loadUnload = cmd.getOptionValue("o").toLowerCase(Locale.ROOT);
817 @Override
818 protected int doWork() throws Exception {
819 boolean success;
820 try (RegionMover rm = rmbuilder.build()) {
821 if (loadUnload.equalsIgnoreCase("load")) {
822 success = rm.load();
823 } else if (loadUnload.equalsIgnoreCase("unload")) {
824 success = rm.unload();
825 } else if (loadUnload.equalsIgnoreCase("unload_from_rack")) {
826 success = rm.unloadFromRack();
827 } else {
828 printUsage();
829 success = false;
832 return (success ? 0 : 1);
835 public static void main(String[] args) {
836 try (RegionMover mover = new RegionMover()) {
837 mover.doStaticMain(args);