HBASE-21843 RegionGroupingProvider breaks the meta wal file name pattern which may...
[hbase.git] / hbase-server / src / main / java / org / apache / hadoop / hbase / util / RegionSplitter.java
blob1b586348075607a9ffe9215d54fa82d2b310be58
1 /**
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org.apache.hadoop.hbase.util;
21 import java.io.IOException;
22 import java.math.BigInteger;
23 import java.util.Arrays;
24 import java.util.Collection;
25 import java.util.EnumSet;
26 import java.util.LinkedList;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Set;
30 import java.util.TreeMap;
31 import org.apache.commons.lang3.ArrayUtils;
32 import org.apache.commons.lang3.StringUtils;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.fs.FSDataInputStream;
35 import org.apache.hadoop.fs.FSDataOutputStream;
36 import org.apache.hadoop.fs.FileSystem;
37 import org.apache.hadoop.fs.Path;
38 import org.apache.hadoop.hbase.ClusterMetrics;
39 import org.apache.hadoop.hbase.ClusterMetrics.Option;
40 import org.apache.hadoop.hbase.HBaseConfiguration;
41 import org.apache.hadoop.hbase.HConstants;
42 import org.apache.hadoop.hbase.HRegionInfo;
43 import org.apache.hadoop.hbase.HRegionLocation;
44 import org.apache.hadoop.hbase.MetaTableAccessor;
45 import org.apache.hadoop.hbase.ServerName;
46 import org.apache.hadoop.hbase.TableName;
47 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
48 import org.apache.hadoop.hbase.client.TableDescriptor;
49 import org.apache.yetus.audience.InterfaceAudience;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52 import org.apache.hadoop.hbase.client.Admin;
53 import org.apache.hadoop.hbase.client.ClusterConnection;
54 import org.apache.hadoop.hbase.client.Connection;
55 import org.apache.hadoop.hbase.client.ConnectionFactory;
56 import org.apache.hadoop.hbase.client.NoServerForRegionException;
57 import org.apache.hadoop.hbase.client.RegionLocator;
58 import org.apache.hadoop.hbase.client.Table;
59 import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
61 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
62 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
63 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
64 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
65 import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
66 import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
67 import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
68 import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser;
69 import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
70 import org.apache.hbase.thirdparty.org.apache.commons.cli.OptionBuilder;
71 import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
72 import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
74 /**
75 * The {@link RegionSplitter} class provides several utilities to help in the
76 * administration lifecycle for developers who choose to manually split regions
77 * instead of having HBase handle that automatically. The most useful utilities
78 * are:
79 * <p>
80 * <ul>
81 * <li>Create a table with a specified number of pre-split regions
82 * <li>Execute a rolling split of all regions on an existing table
83 * </ul>
84 * <p>
85 * Both operations can be safely done on a live server.
86 * <p>
87 * <b>Question:</b> How do I turn off automatic splitting? <br>
88 * <b>Answer:</b> Automatic splitting is determined by the configuration value
89 * <i>HConstants.HREGION_MAX_FILESIZE</i>. It is not recommended that you set this
90 * to Long.MAX_VALUE in case you forget about manual splits. A suggested setting
91 * is 100GB, which would result in &gt; 1hr major compactions if reached.
92 * <p>
93 * <b>Question:</b> Why did the original authors decide to manually split? <br>
94 * <b>Answer:</b> Specific workload characteristics of our use case allowed us
95 * to benefit from a manual split system.
96 * <p>
97 * <ul>
98 * <li>Data (~1k) that would grow instead of being replaced
99 * <li>Data growth was roughly uniform across all regions
100 * <li>OLTP workload. Data loss is a big deal.
101 * </ul>
102 * <p>
103 * <b>Question:</b> Why is manual splitting good for this workload? <br>
104 * <b>Answer:</b> Although automated splitting is not a bad option, there are
105 * benefits to manual splitting.
106 * <p>
107 * <ul>
108 * <li>With growing amounts of data, splits will continually be needed. Since
109 * you always know exactly what regions you have, long-term debugging and
110 * profiling is much easier with manual splits. It is hard to trace the logs to
111 * understand region level problems if it keeps splitting and getting renamed.
112 * <li>Data offlining bugs + unknown number of split regions == oh crap! If an
113 * WAL or StoreFile was mistakenly unprocessed by HBase due to a weird bug and
114 * you notice it a day or so later, you can be assured that the regions
115 * specified in these files are the same as the current regions and you have
116 * less headaches trying to restore/replay your data.
117 * <li>You can finely tune your compaction algorithm. With roughly uniform data
118 * growth, it's easy to cause split / compaction storms as the regions all
119 * roughly hit the same data size at the same time. With manual splits, you can
120 * let staggered, time-based major compactions spread out your network IO load.
121 * </ul>
122 * <p>
123 * <b>Question:</b> What's the optimal number of pre-split regions to create? <br>
124 * <b>Answer:</b> Mileage will vary depending upon your application.
125 * <p>
126 * The short answer for our application is that we started with 10 pre-split
127 * regions / server and watched our data growth over time. It's better to err on
128 * the side of too little regions and rolling split later.
129 * <p>
130 * The more complicated answer is that this depends upon the largest storefile
131 * in your region. With a growing data size, this will get larger over time. You
132 * want the largest region to be just big enough that the
133 * {@link org.apache.hadoop.hbase.regionserver.HStore} compact
134 * selection algorithm only compacts it due to a timed major. If you don't, your
135 * cluster can be prone to compaction storms as the algorithm decides to run
136 * major compactions on a large series of regions all at once. Note that
137 * compaction storms are due to the uniform data growth, not the manual split
138 * decision.
139 * <p>
140 * If you pre-split your regions too thin, you can increase the major compaction
141 * interval by configuring HConstants.MAJOR_COMPACTION_PERIOD. If your data size
142 * grows too large, use this script to perform a network IO safe rolling split
143 * of all regions.
145 @InterfaceAudience.Private
146 public class RegionSplitter {
147 private static final Logger LOG = LoggerFactory.getLogger(RegionSplitter.class);
150 * A generic interface for the RegionSplitter code to use for all it's
151 * functionality. Note that the original authors of this code use
152 * {@link HexStringSplit} to partition their table and set it as default, but
153 * provided this for your custom algorithm. To use, create a new derived class
154 * from this interface and call {@link RegionSplitter#createPresplitTable} or
155 * RegionSplitter#rollingSplit(TableName, SplitAlgorithm, Configuration) with the
156 * argument splitClassName giving the name of your class.
158 public interface SplitAlgorithm {
160 * Split a pre-existing region into 2 regions.
162 * @param start
163 * first row (inclusive)
164 * @param end
165 * last row (exclusive)
166 * @return the split row to use
168 byte[] split(byte[] start, byte[] end);
171 * Split an entire table.
173 * @param numRegions
174 * number of regions to split the table into
176 * @throws RuntimeException
177 * user input is validated at this time. may throw a runtime
178 * exception in response to a parse failure
179 * @return array of split keys for the initial regions of the table. The
180 * length of the returned array should be numRegions-1.
182 byte[][] split(int numRegions);
185 * Some MapReduce jobs may want to run multiple mappers per region,
186 * this is intended for such usecase.
188 * @param start first row (inclusive)
189 * @param end last row (exclusive)
190 * @param numSplits number of splits to generate
191 * @param inclusive whether start and end are returned as split points
193 byte[][] split(byte[] start, byte[] end, int numSplits, boolean inclusive);
196 * In HBase, the first row is represented by an empty byte array. This might
197 * cause problems with your split algorithm or row printing. All your APIs
198 * will be passed firstRow() instead of empty array.
200 * @return your representation of your first row
202 byte[] firstRow();
205 * In HBase, the last row is represented by an empty byte array. This might
206 * cause problems with your split algorithm or row printing. All your APIs
207 * will be passed firstRow() instead of empty array.
209 * @return your representation of your last row
211 byte[] lastRow();
214 * In HBase, the last row is represented by an empty byte array. Set this
215 * value to help the split code understand how to evenly divide the first
216 * region.
218 * @param userInput
219 * raw user input (may throw RuntimeException on parse failure)
221 void setFirstRow(String userInput);
224 * In HBase, the last row is represented by an empty byte array. Set this
225 * value to help the split code understand how to evenly divide the last
226 * region. Note that this last row is inclusive for all rows sharing the
227 * same prefix.
229 * @param userInput
230 * raw user input (may throw RuntimeException on parse failure)
232 void setLastRow(String userInput);
235 * @param input
236 * user or file input for row
237 * @return byte array representation of this row for HBase
239 byte[] strToRow(String input);
242 * @param row
243 * byte array representing a row in HBase
244 * @return String to use for debug &amp; file printing
246 String rowToStr(byte[] row);
249 * @return the separator character to use when storing / printing the row
251 String separator();
254 * Set the first row
255 * @param userInput byte array of the row key.
257 void setFirstRow(byte[] userInput);
260 * Set the last row
261 * @param userInput byte array of the row key.
263 void setLastRow(byte[] userInput);
267 * The main function for the RegionSplitter application. Common uses:
268 * <p>
269 * <ul>
270 * <li>create a table named 'myTable' with 60 pre-split regions containing 2
271 * column families 'test' &amp; 'rs', assuming the keys are hex-encoded ASCII:
272 * <ul>
273 * <li>bin/hbase org.apache.hadoop.hbase.util.RegionSplitter -c 60 -f test:rs
274 * myTable HexStringSplit
275 * </ul>
276 * <li>create a table named 'myTable' with 50 pre-split regions,
277 * assuming the keys are decimal-encoded ASCII:
278 * <ul>
279 * <li>bin/hbase org.apache.hadoop.hbase.util.RegionSplitter -c 50
280 * myTable DecimalStringSplit
281 * </ul>
282 * <li>perform a rolling split of 'myTable' (i.e. 60 =&gt; 120 regions), # 2
283 * outstanding splits at a time, assuming keys are uniformly distributed
284 * bytes:
285 * <ul>
286 * <li>bin/hbase org.apache.hadoop.hbase.util.RegionSplitter -r -o 2 myTable
287 * UniformSplit
288 * </ul>
289 * </ul>
291 * There are three SplitAlgorithms built into RegionSplitter, HexStringSplit,
292 * DecimalStringSplit, and UniformSplit. These are different strategies for
293 * choosing region boundaries. See their source code for details.
295 * @param args
296 * Usage: RegionSplitter &lt;TABLE&gt; &lt;SPLITALGORITHM&gt;
297 * &lt;-c &lt;# regions&gt; -f &lt;family:family:...&gt; | -r
298 * [-o &lt;# outstanding splits&gt;]&gt;
299 * [-D &lt;conf.param=value&gt;]
300 * @throws IOException
301 * HBase IO problem
302 * @throws InterruptedException
303 * user requested exit
304 * @throws ParseException
305 * problem parsing user input
307 @SuppressWarnings("static-access")
308 public static void main(String[] args) throws IOException,
309 InterruptedException, ParseException {
310 Configuration conf = HBaseConfiguration.create();
312 // parse user input
313 Options opt = new Options();
314 opt.addOption(OptionBuilder.withArgName("property=value").hasArg()
315 .withDescription("Override HBase Configuration Settings").create("D"));
316 opt.addOption(OptionBuilder.withArgName("region count").hasArg()
317 .withDescription(
318 "Create a new table with a pre-split number of regions")
319 .create("c"));
320 opt.addOption(OptionBuilder.withArgName("family:family:...").hasArg()
321 .withDescription(
322 "Column Families to create with new table. Required with -c")
323 .create("f"));
324 opt.addOption("h", false, "Print this usage help");
325 opt.addOption("r", false, "Perform a rolling split of an existing region");
326 opt.addOption(OptionBuilder.withArgName("count").hasArg().withDescription(
327 "Max outstanding splits that have unfinished major compactions")
328 .create("o"));
329 opt.addOption(null, "firstrow", true,
330 "First Row in Table for Split Algorithm");
331 opt.addOption(null, "lastrow", true,
332 "Last Row in Table for Split Algorithm");
333 opt.addOption(null, "risky", false,
334 "Skip verification steps to complete quickly. "
335 + "STRONGLY DISCOURAGED for production systems. ");
336 CommandLine cmd = new GnuParser().parse(opt, args);
338 if (cmd.hasOption("D")) {
339 for (String confOpt : cmd.getOptionValues("D")) {
340 String[] kv = confOpt.split("=", 2);
341 if (kv.length == 2) {
342 conf.set(kv[0], kv[1]);
343 LOG.debug("-D configuration override: " + kv[0] + "=" + kv[1]);
344 } else {
345 throw new ParseException("-D option format invalid: " + confOpt);
350 if (cmd.hasOption("risky")) {
351 conf.setBoolean("split.verify", false);
354 boolean createTable = cmd.hasOption("c") && cmd.hasOption("f");
355 boolean rollingSplit = cmd.hasOption("r");
356 boolean oneOperOnly = createTable ^ rollingSplit;
358 if (2 != cmd.getArgList().size() || !oneOperOnly || cmd.hasOption("h")) {
359 new HelpFormatter().printHelp("bin/hbase regionsplitter <TABLE> <SPLITALGORITHM>\n"+
360 "SPLITALGORITHM is the java class name of a class implementing " +
361 "SplitAlgorithm, or one of the special strings HexStringSplit or " +
362 "DecimalStringSplit or UniformSplit, which are built-in split algorithms. " +
363 "HexStringSplit treats keys as hexadecimal ASCII, and " +
364 "DecimalStringSplit treats keys as decimal ASCII, and " +
365 "UniformSplit treats keys as arbitrary bytes.", opt);
366 return;
368 TableName tableName = TableName.valueOf(cmd.getArgs()[0]);
369 String splitClass = cmd.getArgs()[1];
370 SplitAlgorithm splitAlgo = newSplitAlgoInstance(conf, splitClass);
372 if (cmd.hasOption("firstrow")) {
373 splitAlgo.setFirstRow(cmd.getOptionValue("firstrow"));
375 if (cmd.hasOption("lastrow")) {
376 splitAlgo.setLastRow(cmd.getOptionValue("lastrow"));
379 if (createTable) {
380 conf.set("split.count", cmd.getOptionValue("c"));
381 createPresplitTable(tableName, splitAlgo, cmd.getOptionValue("f").split(":"), conf);
384 if (rollingSplit) {
385 if (cmd.hasOption("o")) {
386 conf.set("split.outstanding", cmd.getOptionValue("o"));
388 rollingSplit(tableName, splitAlgo, conf);
392 static void createPresplitTable(TableName tableName, SplitAlgorithm splitAlgo,
393 String[] columnFamilies, Configuration conf)
394 throws IOException, InterruptedException {
395 final int splitCount = conf.getInt("split.count", 0);
396 Preconditions.checkArgument(splitCount > 1, "Split count must be > 1");
398 Preconditions.checkArgument(columnFamilies.length > 0,
399 "Must specify at least one column family. ");
400 LOG.debug("Creating table " + tableName + " with " + columnFamilies.length
401 + " column families. Presplitting to " + splitCount + " regions");
403 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
404 for (String cf : columnFamilies) {
405 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(cf));
407 try (Connection connection = ConnectionFactory.createConnection(conf)) {
408 Admin admin = connection.getAdmin();
409 try {
410 Preconditions.checkArgument(!admin.tableExists(tableName),
411 "Table already exists: " + tableName);
412 admin.createTable(builder.build(), splitAlgo.split(splitCount));
413 } finally {
414 admin.close();
416 LOG.debug("Table created! Waiting for regions to show online in META...");
417 if (!conf.getBoolean("split.verify", true)) {
418 // NOTE: createTable is synchronous on the table, but not on the regions
419 int onlineRegions = 0;
420 while (onlineRegions < splitCount) {
421 onlineRegions = MetaTableAccessor.getRegionCount(connection, tableName);
422 LOG.debug(onlineRegions + " of " + splitCount + " regions online...");
423 if (onlineRegions < splitCount) {
424 Thread.sleep(10 * 1000); // sleep
428 LOG.debug("Finished creating table with " + splitCount + " regions");
433 * Alternative getCurrentNrHRS which is no longer available.
434 * @param connection
435 * @return Rough count of regionservers out on cluster.
436 * @throws IOException if a remote or network exception occurs
438 private static int getRegionServerCount(final Connection connection) throws IOException {
439 try (Admin admin = connection.getAdmin()) {
440 ClusterMetrics status = admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
441 Collection<ServerName> servers = status.getLiveServerMetrics().keySet();
442 return servers == null || servers.isEmpty()? 0: servers.size();
446 private static byte [] readFile(final FileSystem fs, final Path path) throws IOException {
447 FSDataInputStream tmpIn = fs.open(path);
448 try {
449 byte [] rawData = new byte[tmpIn.available()];
450 tmpIn.readFully(rawData);
451 return rawData;
452 } finally {
453 tmpIn.close();
457 static void rollingSplit(TableName tableName, SplitAlgorithm splitAlgo, Configuration conf)
458 throws IOException, InterruptedException {
459 final int minOS = conf.getInt("split.outstanding", 2);
460 try (Connection connection = ConnectionFactory.createConnection(conf)) {
461 // Max outstanding splits. default == 50% of servers
462 final int MAX_OUTSTANDING = Math.max(getRegionServerCount(connection) / 2, minOS);
464 Path hbDir = FSUtils.getRootDir(conf);
465 Path tableDir = FSUtils.getTableDir(hbDir, tableName);
466 Path splitFile = new Path(tableDir, "_balancedSplit");
467 FileSystem fs = FileSystem.get(conf);
469 // Get a list of daughter regions to create
470 LinkedList<Pair<byte[], byte[]>> tmpRegionSet = null;
471 try (Table table = connection.getTable(tableName)) {
472 tmpRegionSet = getSplits(connection, tableName, splitAlgo);
474 LinkedList<Pair<byte[], byte[]>> outstanding = Lists.newLinkedList();
475 int splitCount = 0;
476 final int origCount = tmpRegionSet.size();
478 // all splits must compact & we have 1 compact thread, so 2 split
479 // requests to the same RS can stall the outstanding split queue.
480 // To fix, group the regions into an RS pool and round-robin through it
481 LOG.debug("Bucketing regions by regionserver...");
482 TreeMap<ServerName, LinkedList<Pair<byte[], byte[]>>> daughterRegions =
483 Maps.newTreeMap();
484 // Get a regionLocator. Need it in below.
485 try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
486 for (Pair<byte[], byte[]> dr : tmpRegionSet) {
487 ServerName rsLocation = regionLocator.getRegionLocation(dr.getSecond()).getServerName();
488 if (!daughterRegions.containsKey(rsLocation)) {
489 LinkedList<Pair<byte[], byte[]>> entry = Lists.newLinkedList();
490 daughterRegions.put(rsLocation, entry);
492 daughterRegions.get(rsLocation).add(dr);
494 LOG.debug("Done with bucketing. Split time!");
495 long startTime = System.currentTimeMillis();
497 // Open the split file and modify it as splits finish
498 byte[] rawData = readFile(fs, splitFile);
500 FSDataOutputStream splitOut = fs.create(splitFile);
501 try {
502 splitOut.write(rawData);
504 try {
505 // *** split code ***
506 while (!daughterRegions.isEmpty()) {
507 LOG.debug(daughterRegions.size() + " RS have regions to splt.");
509 // Get ServerName to region count mapping
510 final TreeMap<ServerName, Integer> rsSizes = Maps.newTreeMap();
511 List<HRegionLocation> hrls = regionLocator.getAllRegionLocations();
512 for (HRegionLocation hrl: hrls) {
513 ServerName sn = hrl.getServerName();
514 if (rsSizes.containsKey(sn)) {
515 rsSizes.put(sn, rsSizes.get(sn) + 1);
516 } else {
517 rsSizes.put(sn, 1);
521 // Round-robin through the ServerName list. Choose the lightest-loaded servers
522 // first to keep the master from load-balancing regions as we split.
523 for (Map.Entry<ServerName, LinkedList<Pair<byte[], byte[]>>> daughterRegion :
524 daughterRegions.entrySet()) {
525 Pair<byte[], byte[]> dr = null;
526 ServerName rsLoc = daughterRegion.getKey();
527 LinkedList<Pair<byte[], byte[]>> regionList = daughterRegion.getValue();
529 // Find a region in the ServerName list that hasn't been moved
530 LOG.debug("Finding a region on " + rsLoc);
531 while (!regionList.isEmpty()) {
532 dr = regionList.pop();
534 // get current region info
535 byte[] split = dr.getSecond();
536 HRegionLocation regionLoc = regionLocator.getRegionLocation(split);
538 // if this region moved locations
539 ServerName newRs = regionLoc.getServerName();
540 if (newRs.compareTo(rsLoc) != 0) {
541 LOG.debug("Region with " + splitAlgo.rowToStr(split)
542 + " moved to " + newRs + ". Relocating...");
543 // relocate it, don't use it right now
544 if (!daughterRegions.containsKey(newRs)) {
545 LinkedList<Pair<byte[], byte[]>> entry = Lists.newLinkedList();
546 daughterRegions.put(newRs, entry);
548 daughterRegions.get(newRs).add(dr);
549 dr = null;
550 continue;
553 // make sure this region wasn't already split
554 byte[] sk = regionLoc.getRegionInfo().getStartKey();
555 if (sk.length != 0) {
556 if (Bytes.equals(split, sk)) {
557 LOG.debug("Region already split on "
558 + splitAlgo.rowToStr(split) + ". Skipping this region...");
559 ++splitCount;
560 dr = null;
561 continue;
563 byte[] start = dr.getFirst();
564 Preconditions.checkArgument(Bytes.equals(start, sk), splitAlgo
565 .rowToStr(start) + " != " + splitAlgo.rowToStr(sk));
568 // passed all checks! found a good region
569 break;
571 if (regionList.isEmpty()) {
572 daughterRegions.remove(rsLoc);
574 if (dr == null)
575 continue;
577 // we have a good region, time to split!
578 byte[] split = dr.getSecond();
579 LOG.debug("Splitting at " + splitAlgo.rowToStr(split));
580 try (Admin admin = connection.getAdmin()) {
581 admin.split(tableName, split);
584 LinkedList<Pair<byte[], byte[]>> finished = Lists.newLinkedList();
585 LinkedList<Pair<byte[], byte[]>> local_finished = Lists.newLinkedList();
586 if (conf.getBoolean("split.verify", true)) {
587 // we need to verify and rate-limit our splits
588 outstanding.addLast(dr);
589 // with too many outstanding splits, wait for some to finish
590 while (outstanding.size() >= MAX_OUTSTANDING) {
591 LOG.debug("Wait for outstanding splits " + outstanding.size());
592 local_finished = splitScan(outstanding, connection, tableName, splitAlgo);
593 if (local_finished.isEmpty()) {
594 Thread.sleep(30 * 1000);
595 } else {
596 finished.addAll(local_finished);
597 outstanding.removeAll(local_finished);
598 LOG.debug(local_finished.size() + " outstanding splits finished");
601 } else {
602 finished.add(dr);
605 // mark each finished region as successfully split.
606 for (Pair<byte[], byte[]> region : finished) {
607 splitOut.writeChars("- " + splitAlgo.rowToStr(region.getFirst())
608 + " " + splitAlgo.rowToStr(region.getSecond()) + "\n");
609 splitCount++;
610 if (splitCount % 10 == 0) {
611 long tDiff = (System.currentTimeMillis() - startTime)
612 / splitCount;
613 LOG.debug("STATUS UPDATE: " + splitCount + " / " + origCount
614 + ". Avg Time / Split = "
615 + org.apache.hadoop.util.StringUtils.formatTime(tDiff));
620 if (conf.getBoolean("split.verify", true)) {
621 while (!outstanding.isEmpty()) {
622 LOG.debug("Finally Wait for outstanding splits " + outstanding.size());
623 LinkedList<Pair<byte[], byte[]>> finished = splitScan(outstanding,
624 connection, tableName, splitAlgo);
625 if (finished.isEmpty()) {
626 Thread.sleep(30 * 1000);
627 } else {
628 outstanding.removeAll(finished);
629 for (Pair<byte[], byte[]> region : finished) {
630 splitOut.writeChars("- " + splitAlgo.rowToStr(region.getFirst())
631 + " " + splitAlgo.rowToStr(region.getSecond()) + "\n");
632 splitCount++;
634 LOG.debug("Finally " + finished.size() + " outstanding splits finished");
638 LOG.debug("All regions have been successfully split!");
639 } finally {
640 long tDiff = System.currentTimeMillis() - startTime;
641 LOG.debug("TOTAL TIME = "
642 + org.apache.hadoop.util.StringUtils.formatTime(tDiff));
643 LOG.debug("Splits = " + splitCount);
644 if (0 < splitCount) {
645 LOG.debug("Avg Time / Split = "
646 + org.apache.hadoop.util.StringUtils.formatTime(tDiff / splitCount));
649 } finally {
650 splitOut.close();
651 fs.delete(splitFile, false);
658 * @throws IOException if the specified SplitAlgorithm class couldn't be
659 * instantiated
661 public static SplitAlgorithm newSplitAlgoInstance(Configuration conf,
662 String splitClassName) throws IOException {
663 Class<?> splitClass;
665 // For split algorithms builtin to RegionSplitter, the user can specify
666 // their simple class name instead of a fully qualified class name.
667 if(splitClassName.equals(HexStringSplit.class.getSimpleName())) {
668 splitClass = HexStringSplit.class;
669 } else if (splitClassName.equals(DecimalStringSplit.class.getSimpleName())) {
670 splitClass = DecimalStringSplit.class;
671 } else if (splitClassName.equals(UniformSplit.class.getSimpleName())) {
672 splitClass = UniformSplit.class;
673 } else {
674 try {
675 splitClass = conf.getClassByName(splitClassName);
676 } catch (ClassNotFoundException e) {
677 throw new IOException("Couldn't load split class " + splitClassName, e);
679 if(splitClass == null) {
680 throw new IOException("Failed loading split class " + splitClassName);
682 if(!SplitAlgorithm.class.isAssignableFrom(splitClass)) {
683 throw new IOException(
684 "Specified split class doesn't implement SplitAlgorithm");
687 try {
688 return splitClass.asSubclass(SplitAlgorithm.class).getDeclaredConstructor().newInstance();
689 } catch (Exception e) {
690 throw new IOException("Problem loading split algorithm: ", e);
694 static LinkedList<Pair<byte[], byte[]>> splitScan(
695 LinkedList<Pair<byte[], byte[]>> regionList,
696 final Connection connection,
697 final TableName tableName,
698 SplitAlgorithm splitAlgo)
699 throws IOException, InterruptedException {
700 LinkedList<Pair<byte[], byte[]>> finished = Lists.newLinkedList();
701 LinkedList<Pair<byte[], byte[]>> logicalSplitting = Lists.newLinkedList();
702 LinkedList<Pair<byte[], byte[]>> physicalSplitting = Lists.newLinkedList();
704 // Get table info
705 Pair<Path, Path> tableDirAndSplitFile =
706 getTableDirAndSplitFile(connection.getConfiguration(), tableName);
707 Path tableDir = tableDirAndSplitFile.getFirst();
708 FileSystem fs = tableDir.getFileSystem(connection.getConfiguration());
709 // Clear the cache to forcibly refresh region information
710 ((ClusterConnection)connection).clearRegionCache();
711 TableDescriptor htd = null;
712 try (Table table = connection.getTable(tableName)) {
713 htd = table.getDescriptor();
715 try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
717 // for every region that hasn't been verified as a finished split
718 for (Pair<byte[], byte[]> region : regionList) {
719 byte[] start = region.getFirst();
720 byte[] split = region.getSecond();
722 // see if the new split daughter region has come online
723 try {
724 HRegionInfo dri = regionLocator.getRegionLocation(split).getRegionInfo();
725 if (dri.isOffline() || !Bytes.equals(dri.getStartKey(), split)) {
726 logicalSplitting.add(region);
727 continue;
729 } catch (NoServerForRegionException nsfre) {
730 // NSFRE will occur if the old hbase:meta entry has no server assigned
731 LOG.info(nsfre.toString(), nsfre);
732 logicalSplitting.add(region);
733 continue;
736 try {
737 // when a daughter region is opened, a compaction is triggered
738 // wait until compaction completes for both daughter regions
739 LinkedList<HRegionInfo> check = Lists.newLinkedList();
740 check.add(regionLocator.getRegionLocation(start).getRegionInfo());
741 check.add(regionLocator.getRegionLocation(split).getRegionInfo());
742 for (HRegionInfo hri : check.toArray(new HRegionInfo[check.size()])) {
743 byte[] sk = hri.getStartKey();
744 if (sk.length == 0)
745 sk = splitAlgo.firstRow();
747 HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
748 connection.getConfiguration(), fs, tableDir, hri, true);
750 // Check every Column Family for that region -- check does not have references.
751 boolean refFound = false;
752 for (ColumnFamilyDescriptor c : htd.getColumnFamilies()) {
753 if ((refFound = regionFs.hasReferences(c.getNameAsString()))) {
754 break;
758 // compaction is completed when all reference files are gone
759 if (!refFound) {
760 check.remove(hri);
763 if (check.isEmpty()) {
764 finished.add(region);
765 } else {
766 physicalSplitting.add(region);
768 } catch (NoServerForRegionException nsfre) {
769 LOG.debug("No Server Exception thrown for: " + splitAlgo.rowToStr(start));
770 physicalSplitting.add(region);
771 ((ClusterConnection)connection).clearRegionCache();
775 LOG.debug("Split Scan: " + finished.size() + " finished / "
776 + logicalSplitting.size() + " split wait / "
777 + physicalSplitting.size() + " reference wait");
779 return finished;
784 * @param conf
785 * @param tableName
786 * @return A Pair where first item is table dir and second is the split file.
787 * @throws IOException if a remote or network exception occurs
789 private static Pair<Path, Path> getTableDirAndSplitFile(final Configuration conf,
790 final TableName tableName)
791 throws IOException {
792 Path hbDir = FSUtils.getRootDir(conf);
793 Path tableDir = FSUtils.getTableDir(hbDir, tableName);
794 Path splitFile = new Path(tableDir, "_balancedSplit");
795 return new Pair<>(tableDir, splitFile);
798 static LinkedList<Pair<byte[], byte[]>> getSplits(final Connection connection,
799 TableName tableName, SplitAlgorithm splitAlgo)
800 throws IOException {
801 Pair<Path, Path> tableDirAndSplitFile =
802 getTableDirAndSplitFile(connection.getConfiguration(), tableName);
803 Path tableDir = tableDirAndSplitFile.getFirst();
804 Path splitFile = tableDirAndSplitFile.getSecond();
806 FileSystem fs = tableDir.getFileSystem(connection.getConfiguration());
808 // Using strings because (new byte[]{0}).equals(new byte[]{0}) == false
809 Set<Pair<String, String>> daughterRegions = Sets.newHashSet();
811 // Does a split file exist?
812 if (!fs.exists(splitFile)) {
813 // NO = fresh start. calculate splits to make
814 LOG.debug("No " + splitFile.getName() + " file. Calculating splits ");
816 // Query meta for all regions in the table
817 Set<Pair<byte[], byte[]>> rows = Sets.newHashSet();
818 Pair<byte[][], byte[][]> tmp = null;
819 try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
820 tmp = regionLocator.getStartEndKeys();
822 Preconditions.checkArgument(tmp.getFirst().length == tmp.getSecond().length,
823 "Start and End rows should be equivalent");
824 for (int i = 0; i < tmp.getFirst().length; ++i) {
825 byte[] start = tmp.getFirst()[i], end = tmp.getSecond()[i];
826 if (start.length == 0)
827 start = splitAlgo.firstRow();
828 if (end.length == 0)
829 end = splitAlgo.lastRow();
830 rows.add(Pair.newPair(start, end));
832 LOG.debug("Table " + tableName + " has " + rows.size() + " regions that will be split.");
834 // prepare the split file
835 Path tmpFile = new Path(tableDir, "_balancedSplit_prepare");
836 FSDataOutputStream tmpOut = fs.create(tmpFile);
838 // calculate all the splits == [daughterRegions] = [(start, splitPoint)]
839 for (Pair<byte[], byte[]> r : rows) {
840 byte[] splitPoint = splitAlgo.split(r.getFirst(), r.getSecond());
841 String startStr = splitAlgo.rowToStr(r.getFirst());
842 String splitStr = splitAlgo.rowToStr(splitPoint);
843 daughterRegions.add(Pair.newPair(startStr, splitStr));
844 LOG.debug("Will Split [" + startStr + " , "
845 + splitAlgo.rowToStr(r.getSecond()) + ") at " + splitStr);
846 tmpOut.writeChars("+ " + startStr + splitAlgo.separator() + splitStr
847 + "\n");
849 tmpOut.close();
850 fs.rename(tmpFile, splitFile);
851 } else {
852 LOG.debug("_balancedSplit file found. Replay log to restore state...");
853 FSUtils.getInstance(fs, connection.getConfiguration())
854 .recoverFileLease(fs, splitFile, connection.getConfiguration(), null);
856 // parse split file and process remaining splits
857 FSDataInputStream tmpIn = fs.open(splitFile);
858 StringBuilder sb = new StringBuilder(tmpIn.available());
859 while (tmpIn.available() > 0) {
860 sb.append(tmpIn.readChar());
862 tmpIn.close();
863 for (String line : sb.toString().split("\n")) {
864 String[] cmd = line.split(splitAlgo.separator());
865 Preconditions.checkArgument(3 == cmd.length);
866 byte[] start = splitAlgo.strToRow(cmd[1]);
867 String startStr = splitAlgo.rowToStr(start);
868 byte[] splitPoint = splitAlgo.strToRow(cmd[2]);
869 String splitStr = splitAlgo.rowToStr(splitPoint);
870 Pair<String, String> r = Pair.newPair(startStr, splitStr);
871 if (cmd[0].equals("+")) {
872 LOG.debug("Adding: " + r);
873 daughterRegions.add(r);
874 } else {
875 LOG.debug("Removing: " + r);
876 Preconditions.checkArgument(cmd[0].equals("-"),
877 "Unknown option: " + cmd[0]);
878 Preconditions.checkState(daughterRegions.contains(r),
879 "Missing row: " + r);
880 daughterRegions.remove(r);
883 LOG.debug("Done reading. " + daughterRegions.size() + " regions left.");
885 LinkedList<Pair<byte[], byte[]>> ret = Lists.newLinkedList();
886 for (Pair<String, String> r : daughterRegions) {
887 ret.add(Pair.newPair(splitAlgo.strToRow(r.getFirst()), splitAlgo
888 .strToRow(r.getSecond())));
890 return ret;
894 * HexStringSplit is a well-known {@link SplitAlgorithm} for choosing region
895 * boundaries. The format of a HexStringSplit region boundary is the ASCII
896 * representation of an MD5 checksum, or any other uniformly distributed
897 * hexadecimal value. Row are hex-encoded long values in the range
898 * <b>"00000000" =&gt; "FFFFFFFF"</b> and are left-padded with zeros to keep the
899 * same order lexicographically as if they were binary.
901 * Since this split algorithm uses hex strings as keys, it is easy to read &amp;
902 * write in the shell but takes up more space and may be non-intuitive.
904 public static class HexStringSplit extends NumberStringSplit {
905 final static String DEFAULT_MIN_HEX = "00000000";
906 final static String DEFAULT_MAX_HEX = "FFFFFFFF";
907 final static int RADIX_HEX = 16;
909 public HexStringSplit() {
910 super(DEFAULT_MIN_HEX, DEFAULT_MAX_HEX, RADIX_HEX);
916 * The format of a DecimalStringSplit region boundary is the ASCII representation of
917 * reversed sequential number, or any other uniformly distributed decimal value.
918 * Row are decimal-encoded long values in the range
919 * <b>"00000000" =&gt; "99999999"</b> and are left-padded with zeros to keep the
920 * same order lexicographically as if they were binary.
922 public static class DecimalStringSplit extends NumberStringSplit {
923 final static String DEFAULT_MIN_DEC = "00000000";
924 final static String DEFAULT_MAX_DEC = "99999999";
925 final static int RADIX_DEC = 10;
927 public DecimalStringSplit() {
928 super(DEFAULT_MIN_DEC, DEFAULT_MAX_DEC, RADIX_DEC);
933 public abstract static class NumberStringSplit implements SplitAlgorithm {
935 String firstRow;
936 BigInteger firstRowInt;
937 String lastRow;
938 BigInteger lastRowInt;
939 int rowComparisonLength;
940 int radix;
942 NumberStringSplit(String minRow, String maxRow, int radix) {
943 this.firstRow = minRow;
944 this.lastRow = maxRow;
945 this.radix = radix;
946 this.firstRowInt = BigInteger.ZERO;
947 this.lastRowInt = new BigInteger(lastRow, this.radix);
948 this.rowComparisonLength = lastRow.length();
951 @Override
952 public byte[] split(byte[] start, byte[] end) {
953 BigInteger s = convertToBigInteger(start);
954 BigInteger e = convertToBigInteger(end);
955 Preconditions.checkArgument(!e.equals(BigInteger.ZERO));
956 return convertToByte(split2(s, e));
959 @Override
960 public byte[][] split(int n) {
961 Preconditions.checkArgument(lastRowInt.compareTo(firstRowInt) > 0,
962 "last row (%s) is configured less than first row (%s)", lastRow,
963 firstRow);
964 // +1 to range because the last row is inclusive
965 BigInteger range = lastRowInt.subtract(firstRowInt).add(BigInteger.ONE);
966 Preconditions.checkState(range.compareTo(BigInteger.valueOf(n)) >= 0,
967 "split granularity (%s) is greater than the range (%s)", n, range);
969 BigInteger[] splits = new BigInteger[n - 1];
970 BigInteger sizeOfEachSplit = range.divide(BigInteger.valueOf(n));
971 for (int i = 1; i < n; i++) {
972 // NOTE: this means the last region gets all the slop.
973 // This is not a big deal if we're assuming n << MAXHEX
974 splits[i - 1] = firstRowInt.add(sizeOfEachSplit.multiply(BigInteger
975 .valueOf(i)));
977 return convertToBytes(splits);
980 @Override
981 public byte[][] split(byte[] start, byte[] end, int numSplits, boolean inclusive) {
982 BigInteger s = convertToBigInteger(start);
983 BigInteger e = convertToBigInteger(end);
985 Preconditions.checkArgument(e.compareTo(s) > 0,
986 "last row (%s) is configured less than first row (%s)", rowToStr(end),
987 end);
988 // +1 to range because the last row is inclusive
989 BigInteger range = e.subtract(s).add(BigInteger.ONE);
990 Preconditions.checkState(range.compareTo(BigInteger.valueOf(numSplits)) >= 0,
991 "split granularity (%s) is greater than the range (%s)", numSplits, range);
993 BigInteger[] splits = new BigInteger[numSplits - 1];
994 BigInteger sizeOfEachSplit = range.divide(BigInteger.valueOf(numSplits));
995 for (int i = 1; i < numSplits; i++) {
996 // NOTE: this means the last region gets all the slop.
997 // This is not a big deal if we're assuming n << MAXHEX
998 splits[i - 1] = s.add(sizeOfEachSplit.multiply(BigInteger
999 .valueOf(i)));
1002 if (inclusive) {
1003 BigInteger[] inclusiveSplitPoints = new BigInteger[numSplits + 1];
1004 inclusiveSplitPoints[0] = convertToBigInteger(start);
1005 inclusiveSplitPoints[numSplits] = convertToBigInteger(end);
1006 System.arraycopy(splits, 0, inclusiveSplitPoints, 1, splits.length);
1007 return convertToBytes(inclusiveSplitPoints);
1008 } else {
1009 return convertToBytes(splits);
1013 @Override
1014 public byte[] firstRow() {
1015 return convertToByte(firstRowInt);
1018 @Override
1019 public byte[] lastRow() {
1020 return convertToByte(lastRowInt);
1023 @Override
1024 public void setFirstRow(String userInput) {
1025 firstRow = userInput;
1026 firstRowInt = new BigInteger(firstRow, radix);
1029 @Override
1030 public void setLastRow(String userInput) {
1031 lastRow = userInput;
1032 lastRowInt = new BigInteger(lastRow, radix);
1033 // Precondition: lastRow > firstRow, so last's length is the greater
1034 rowComparisonLength = lastRow.length();
1037 @Override
1038 public byte[] strToRow(String in) {
1039 return convertToByte(new BigInteger(in, radix));
1042 @Override
1043 public String rowToStr(byte[] row) {
1044 return Bytes.toStringBinary(row);
1047 @Override
1048 public String separator() {
1049 return " ";
1052 @Override
1053 public void setFirstRow(byte[] userInput) {
1054 firstRow = Bytes.toString(userInput);
1057 @Override
1058 public void setLastRow(byte[] userInput) {
1059 lastRow = Bytes.toString(userInput);
1063 * Divide 2 numbers in half (for split algorithm)
1065 * @param a number #1
1066 * @param b number #2
1067 * @return the midpoint of the 2 numbers
1069 public BigInteger split2(BigInteger a, BigInteger b) {
1070 return a.add(b).divide(BigInteger.valueOf(2)).abs();
1074 * Returns an array of bytes corresponding to an array of BigIntegers
1076 * @param bigIntegers numbers to convert
1077 * @return bytes corresponding to the bigIntegers
1079 public byte[][] convertToBytes(BigInteger[] bigIntegers) {
1080 byte[][] returnBytes = new byte[bigIntegers.length][];
1081 for (int i = 0; i < bigIntegers.length; i++) {
1082 returnBytes[i] = convertToByte(bigIntegers[i]);
1084 return returnBytes;
1088 * Returns the bytes corresponding to the BigInteger
1090 * @param bigInteger number to convert
1091 * @param pad padding length
1092 * @return byte corresponding to input BigInteger
1094 public byte[] convertToByte(BigInteger bigInteger, int pad) {
1095 String bigIntegerString = bigInteger.toString(radix);
1096 bigIntegerString = StringUtils.leftPad(bigIntegerString, pad, '0');
1097 return Bytes.toBytes(bigIntegerString);
1101 * Returns the bytes corresponding to the BigInteger
1103 * @param bigInteger number to convert
1104 * @return corresponding bytes
1106 public byte[] convertToByte(BigInteger bigInteger) {
1107 return convertToByte(bigInteger, rowComparisonLength);
1111 * Returns the BigInteger represented by the byte array
1113 * @param row byte array representing row
1114 * @return the corresponding BigInteger
1116 public BigInteger convertToBigInteger(byte[] row) {
1117 return (row.length > 0) ? new BigInteger(Bytes.toString(row), radix)
1118 : BigInteger.ZERO;
1121 @Override
1122 public String toString() {
1123 return this.getClass().getSimpleName() + " [" + rowToStr(firstRow())
1124 + "," + rowToStr(lastRow()) + "]";
1129 * A SplitAlgorithm that divides the space of possible keys evenly. Useful
1130 * when the keys are approximately uniform random bytes (e.g. hashes). Rows
1131 * are raw byte values in the range <b>00 =&gt; FF</b> and are right-padded with
1132 * zeros to keep the same memcmp() order. This is the natural algorithm to use
1133 * for a byte[] environment and saves space, but is not necessarily the
1134 * easiest for readability.
1136 public static class UniformSplit implements SplitAlgorithm {
1137 static final byte xFF = (byte) 0xFF;
1138 byte[] firstRowBytes = ArrayUtils.EMPTY_BYTE_ARRAY;
1139 byte[] lastRowBytes =
1140 new byte[] {xFF, xFF, xFF, xFF, xFF, xFF, xFF, xFF};
1141 @Override
1142 public byte[] split(byte[] start, byte[] end) {
1143 return Bytes.split(start, end, 1)[1];
1146 @Override
1147 public byte[][] split(int numRegions) {
1148 Preconditions.checkArgument(
1149 Bytes.compareTo(lastRowBytes, firstRowBytes) > 0,
1150 "last row (%s) is configured less than first row (%s)",
1151 Bytes.toStringBinary(lastRowBytes),
1152 Bytes.toStringBinary(firstRowBytes));
1154 byte[][] splits = Bytes.split(firstRowBytes, lastRowBytes, true,
1155 numRegions - 1);
1156 Preconditions.checkState(splits != null,
1157 "Could not split region with given user input: " + this);
1159 // remove endpoints, which are included in the splits list
1161 return splits == null? null: Arrays.copyOfRange(splits, 1, splits.length - 1);
1164 @Override
1165 public byte[][] split(byte[] start, byte[] end, int numSplits, boolean inclusive) {
1166 if (Arrays.equals(start, HConstants.EMPTY_BYTE_ARRAY)) {
1167 start = firstRowBytes;
1169 if (Arrays.equals(end, HConstants.EMPTY_BYTE_ARRAY)) {
1170 end = lastRowBytes;
1172 Preconditions.checkArgument(
1173 Bytes.compareTo(end, start) > 0,
1174 "last row (%s) is configured less than first row (%s)",
1175 Bytes.toStringBinary(end),
1176 Bytes.toStringBinary(start));
1178 byte[][] splits = Bytes.split(start, end, true,
1179 numSplits - 1);
1180 Preconditions.checkState(splits != null,
1181 "Could not calculate input splits with given user input: " + this);
1182 if (inclusive) {
1183 return splits;
1184 } else {
1185 // remove endpoints, which are included in the splits list
1186 return Arrays.copyOfRange(splits, 1, splits.length - 1);
1190 @Override
1191 public byte[] firstRow() {
1192 return firstRowBytes;
1195 @Override
1196 public byte[] lastRow() {
1197 return lastRowBytes;
1200 @Override
1201 public void setFirstRow(String userInput) {
1202 firstRowBytes = Bytes.toBytesBinary(userInput);
1205 @Override
1206 public void setLastRow(String userInput) {
1207 lastRowBytes = Bytes.toBytesBinary(userInput);
1211 @Override
1212 public void setFirstRow(byte[] userInput) {
1213 firstRowBytes = userInput;
1216 @Override
1217 public void setLastRow(byte[] userInput) {
1218 lastRowBytes = userInput;
1221 @Override
1222 public byte[] strToRow(String input) {
1223 return Bytes.toBytesBinary(input);
1226 @Override
1227 public String rowToStr(byte[] row) {
1228 return Bytes.toStringBinary(row);
1231 @Override
1232 public String separator() {
1233 return ",";
1236 @Override
1237 public String toString() {
1238 return this.getClass().getSimpleName() + " [" + rowToStr(firstRow())
1239 + "," + rowToStr(lastRow()) + "]";