3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
.util
;
21 import java
.io
.IOException
;
22 import java
.math
.BigInteger
;
23 import java
.util
.Arrays
;
24 import java
.util
.Collection
;
25 import java
.util
.EnumSet
;
26 import java
.util
.LinkedList
;
27 import java
.util
.List
;
30 import java
.util
.TreeMap
;
31 import org
.apache
.commons
.lang3
.ArrayUtils
;
32 import org
.apache
.commons
.lang3
.StringUtils
;
33 import org
.apache
.hadoop
.conf
.Configuration
;
34 import org
.apache
.hadoop
.fs
.FSDataInputStream
;
35 import org
.apache
.hadoop
.fs
.FSDataOutputStream
;
36 import org
.apache
.hadoop
.fs
.FileSystem
;
37 import org
.apache
.hadoop
.fs
.Path
;
38 import org
.apache
.hadoop
.hbase
.ClusterMetrics
;
39 import org
.apache
.hadoop
.hbase
.ClusterMetrics
.Option
;
40 import org
.apache
.hadoop
.hbase
.HBaseConfiguration
;
41 import org
.apache
.hadoop
.hbase
.HConstants
;
42 import org
.apache
.hadoop
.hbase
.HRegionInfo
;
43 import org
.apache
.hadoop
.hbase
.HRegionLocation
;
44 import org
.apache
.hadoop
.hbase
.MetaTableAccessor
;
45 import org
.apache
.hadoop
.hbase
.ServerName
;
46 import org
.apache
.hadoop
.hbase
.TableName
;
47 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptor
;
48 import org
.apache
.hadoop
.hbase
.client
.TableDescriptor
;
49 import org
.apache
.yetus
.audience
.InterfaceAudience
;
50 import org
.slf4j
.Logger
;
51 import org
.slf4j
.LoggerFactory
;
52 import org
.apache
.hadoop
.hbase
.client
.Admin
;
53 import org
.apache
.hadoop
.hbase
.client
.ClusterConnection
;
54 import org
.apache
.hadoop
.hbase
.client
.Connection
;
55 import org
.apache
.hadoop
.hbase
.client
.ConnectionFactory
;
56 import org
.apache
.hadoop
.hbase
.client
.NoServerForRegionException
;
57 import org
.apache
.hadoop
.hbase
.client
.RegionLocator
;
58 import org
.apache
.hadoop
.hbase
.client
.Table
;
59 import org
.apache
.hadoop
.hbase
.regionserver
.HRegionFileSystem
;
61 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptorBuilder
;
62 import org
.apache
.hadoop
.hbase
.client
.TableDescriptorBuilder
;
63 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.base
.Preconditions
;
64 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.Lists
;
65 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.Maps
;
66 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.Sets
;
67 import org
.apache
.hbase
.thirdparty
.org
.apache
.commons
.cli
.CommandLine
;
68 import org
.apache
.hbase
.thirdparty
.org
.apache
.commons
.cli
.GnuParser
;
69 import org
.apache
.hbase
.thirdparty
.org
.apache
.commons
.cli
.HelpFormatter
;
70 import org
.apache
.hbase
.thirdparty
.org
.apache
.commons
.cli
.OptionBuilder
;
71 import org
.apache
.hbase
.thirdparty
.org
.apache
.commons
.cli
.Options
;
72 import org
.apache
.hbase
.thirdparty
.org
.apache
.commons
.cli
.ParseException
;
75 * The {@link RegionSplitter} class provides several utilities to help in the
76 * administration lifecycle for developers who choose to manually split regions
77 * instead of having HBase handle that automatically. The most useful utilities
81 * <li>Create a table with a specified number of pre-split regions
82 * <li>Execute a rolling split of all regions on an existing table
85 * Both operations can be safely done on a live server.
87 * <b>Question:</b> How do I turn off automatic splitting? <br>
88 * <b>Answer:</b> Automatic splitting is determined by the configuration value
89 * <i>HConstants.HREGION_MAX_FILESIZE</i>. It is not recommended that you set this
90 * to Long.MAX_VALUE in case you forget about manual splits. A suggested setting
91 * is 100GB, which would result in > 1hr major compactions if reached.
93 * <b>Question:</b> Why did the original authors decide to manually split? <br>
94 * <b>Answer:</b> Specific workload characteristics of our use case allowed us
95 * to benefit from a manual split system.
98 * <li>Data (~1k) that would grow instead of being replaced
99 * <li>Data growth was roughly uniform across all regions
100 * <li>OLTP workload. Data loss is a big deal.
103 * <b>Question:</b> Why is manual splitting good for this workload? <br>
104 * <b>Answer:</b> Although automated splitting is not a bad option, there are
105 * benefits to manual splitting.
108 * <li>With growing amounts of data, splits will continually be needed. Since
109 * you always know exactly what regions you have, long-term debugging and
110 * profiling is much easier with manual splits. It is hard to trace the logs to
111 * understand region level problems if it keeps splitting and getting renamed.
112 * <li>Data offlining bugs + unknown number of split regions == oh crap! If an
113 * WAL or StoreFile was mistakenly unprocessed by HBase due to a weird bug and
114 * you notice it a day or so later, you can be assured that the regions
115 * specified in these files are the same as the current regions and you have
116 * less headaches trying to restore/replay your data.
117 * <li>You can finely tune your compaction algorithm. With roughly uniform data
118 * growth, it's easy to cause split / compaction storms as the regions all
119 * roughly hit the same data size at the same time. With manual splits, you can
120 * let staggered, time-based major compactions spread out your network IO load.
123 * <b>Question:</b> What's the optimal number of pre-split regions to create? <br>
124 * <b>Answer:</b> Mileage will vary depending upon your application.
126 * The short answer for our application is that we started with 10 pre-split
127 * regions / server and watched our data growth over time. It's better to err on
128 * the side of too little regions and rolling split later.
130 * The more complicated answer is that this depends upon the largest storefile
131 * in your region. With a growing data size, this will get larger over time. You
132 * want the largest region to be just big enough that the
133 * {@link org.apache.hadoop.hbase.regionserver.HStore} compact
134 * selection algorithm only compacts it due to a timed major. If you don't, your
135 * cluster can be prone to compaction storms as the algorithm decides to run
136 * major compactions on a large series of regions all at once. Note that
137 * compaction storms are due to the uniform data growth, not the manual split
140 * If you pre-split your regions too thin, you can increase the major compaction
141 * interval by configuring HConstants.MAJOR_COMPACTION_PERIOD. If your data size
142 * grows too large, use this script to perform a network IO safe rolling split
145 @InterfaceAudience.Private
146 public class RegionSplitter
{
147 private static final Logger LOG
= LoggerFactory
.getLogger(RegionSplitter
.class);
150 * A generic interface for the RegionSplitter code to use for all it's
151 * functionality. Note that the original authors of this code use
152 * {@link HexStringSplit} to partition their table and set it as default, but
153 * provided this for your custom algorithm. To use, create a new derived class
154 * from this interface and call {@link RegionSplitter#createPresplitTable} or
155 * RegionSplitter#rollingSplit(TableName, SplitAlgorithm, Configuration) with the
156 * argument splitClassName giving the name of your class.
158 public interface SplitAlgorithm
{
160 * Split a pre-existing region into 2 regions.
163 * first row (inclusive)
165 * last row (exclusive)
166 * @return the split row to use
168 byte[] split(byte[] start
, byte[] end
);
171 * Split an entire table.
174 * number of regions to split the table into
176 * @throws RuntimeException
177 * user input is validated at this time. may throw a runtime
178 * exception in response to a parse failure
179 * @return array of split keys for the initial regions of the table. The
180 * length of the returned array should be numRegions-1.
182 byte[][] split(int numRegions
);
185 * Some MapReduce jobs may want to run multiple mappers per region,
186 * this is intended for such usecase.
188 * @param start first row (inclusive)
189 * @param end last row (exclusive)
190 * @param numSplits number of splits to generate
191 * @param inclusive whether start and end are returned as split points
193 byte[][] split(byte[] start
, byte[] end
, int numSplits
, boolean inclusive
);
196 * In HBase, the first row is represented by an empty byte array. This might
197 * cause problems with your split algorithm or row printing. All your APIs
198 * will be passed firstRow() instead of empty array.
200 * @return your representation of your first row
205 * In HBase, the last row is represented by an empty byte array. This might
206 * cause problems with your split algorithm or row printing. All your APIs
207 * will be passed firstRow() instead of empty array.
209 * @return your representation of your last row
214 * In HBase, the last row is represented by an empty byte array. Set this
215 * value to help the split code understand how to evenly divide the first
219 * raw user input (may throw RuntimeException on parse failure)
221 void setFirstRow(String userInput
);
224 * In HBase, the last row is represented by an empty byte array. Set this
225 * value to help the split code understand how to evenly divide the last
226 * region. Note that this last row is inclusive for all rows sharing the
230 * raw user input (may throw RuntimeException on parse failure)
232 void setLastRow(String userInput
);
236 * user or file input for row
237 * @return byte array representation of this row for HBase
239 byte[] strToRow(String input
);
243 * byte array representing a row in HBase
244 * @return String to use for debug & file printing
246 String
rowToStr(byte[] row
);
249 * @return the separator character to use when storing / printing the row
255 * @param userInput byte array of the row key.
257 void setFirstRow(byte[] userInput
);
261 * @param userInput byte array of the row key.
263 void setLastRow(byte[] userInput
);
267 * The main function for the RegionSplitter application. Common uses:
270 * <li>create a table named 'myTable' with 60 pre-split regions containing 2
271 * column families 'test' & 'rs', assuming the keys are hex-encoded ASCII:
273 * <li>bin/hbase org.apache.hadoop.hbase.util.RegionSplitter -c 60 -f test:rs
274 * myTable HexStringSplit
276 * <li>create a table named 'myTable' with 50 pre-split regions,
277 * assuming the keys are decimal-encoded ASCII:
279 * <li>bin/hbase org.apache.hadoop.hbase.util.RegionSplitter -c 50
280 * myTable DecimalStringSplit
282 * <li>perform a rolling split of 'myTable' (i.e. 60 => 120 regions), # 2
283 * outstanding splits at a time, assuming keys are uniformly distributed
286 * <li>bin/hbase org.apache.hadoop.hbase.util.RegionSplitter -r -o 2 myTable
291 * There are three SplitAlgorithms built into RegionSplitter, HexStringSplit,
292 * DecimalStringSplit, and UniformSplit. These are different strategies for
293 * choosing region boundaries. See their source code for details.
296 * Usage: RegionSplitter <TABLE> <SPLITALGORITHM>
297 * <-c <# regions> -f <family:family:...> | -r
298 * [-o <# outstanding splits>]>
299 * [-D <conf.param=value>]
300 * @throws IOException
302 * @throws InterruptedException
303 * user requested exit
304 * @throws ParseException
305 * problem parsing user input
307 @SuppressWarnings("static-access")
308 public static void main(String
[] args
) throws IOException
,
309 InterruptedException
, ParseException
{
310 Configuration conf
= HBaseConfiguration
.create();
313 Options opt
= new Options();
314 opt
.addOption(OptionBuilder
.withArgName("property=value").hasArg()
315 .withDescription("Override HBase Configuration Settings").create("D"));
316 opt
.addOption(OptionBuilder
.withArgName("region count").hasArg()
318 "Create a new table with a pre-split number of regions")
320 opt
.addOption(OptionBuilder
.withArgName("family:family:...").hasArg()
322 "Column Families to create with new table. Required with -c")
324 opt
.addOption("h", false, "Print this usage help");
325 opt
.addOption("r", false, "Perform a rolling split of an existing region");
326 opt
.addOption(OptionBuilder
.withArgName("count").hasArg().withDescription(
327 "Max outstanding splits that have unfinished major compactions")
329 opt
.addOption(null, "firstrow", true,
330 "First Row in Table for Split Algorithm");
331 opt
.addOption(null, "lastrow", true,
332 "Last Row in Table for Split Algorithm");
333 opt
.addOption(null, "risky", false,
334 "Skip verification steps to complete quickly. "
335 + "STRONGLY DISCOURAGED for production systems. ");
336 CommandLine cmd
= new GnuParser().parse(opt
, args
);
338 if (cmd
.hasOption("D")) {
339 for (String confOpt
: cmd
.getOptionValues("D")) {
340 String
[] kv
= confOpt
.split("=", 2);
341 if (kv
.length
== 2) {
342 conf
.set(kv
[0], kv
[1]);
343 LOG
.debug("-D configuration override: " + kv
[0] + "=" + kv
[1]);
345 throw new ParseException("-D option format invalid: " + confOpt
);
350 if (cmd
.hasOption("risky")) {
351 conf
.setBoolean("split.verify", false);
354 boolean createTable
= cmd
.hasOption("c") && cmd
.hasOption("f");
355 boolean rollingSplit
= cmd
.hasOption("r");
356 boolean oneOperOnly
= createTable ^ rollingSplit
;
358 if (2 != cmd
.getArgList().size() || !oneOperOnly
|| cmd
.hasOption("h")) {
359 new HelpFormatter().printHelp("bin/hbase regionsplitter <TABLE> <SPLITALGORITHM>\n"+
360 "SPLITALGORITHM is the java class name of a class implementing " +
361 "SplitAlgorithm, or one of the special strings HexStringSplit or " +
362 "DecimalStringSplit or UniformSplit, which are built-in split algorithms. " +
363 "HexStringSplit treats keys as hexadecimal ASCII, and " +
364 "DecimalStringSplit treats keys as decimal ASCII, and " +
365 "UniformSplit treats keys as arbitrary bytes.", opt
);
368 TableName tableName
= TableName
.valueOf(cmd
.getArgs()[0]);
369 String splitClass
= cmd
.getArgs()[1];
370 SplitAlgorithm splitAlgo
= newSplitAlgoInstance(conf
, splitClass
);
372 if (cmd
.hasOption("firstrow")) {
373 splitAlgo
.setFirstRow(cmd
.getOptionValue("firstrow"));
375 if (cmd
.hasOption("lastrow")) {
376 splitAlgo
.setLastRow(cmd
.getOptionValue("lastrow"));
380 conf
.set("split.count", cmd
.getOptionValue("c"));
381 createPresplitTable(tableName
, splitAlgo
, cmd
.getOptionValue("f").split(":"), conf
);
385 if (cmd
.hasOption("o")) {
386 conf
.set("split.outstanding", cmd
.getOptionValue("o"));
388 rollingSplit(tableName
, splitAlgo
, conf
);
392 static void createPresplitTable(TableName tableName
, SplitAlgorithm splitAlgo
,
393 String
[] columnFamilies
, Configuration conf
)
394 throws IOException
, InterruptedException
{
395 final int splitCount
= conf
.getInt("split.count", 0);
396 Preconditions
.checkArgument(splitCount
> 1, "Split count must be > 1");
398 Preconditions
.checkArgument(columnFamilies
.length
> 0,
399 "Must specify at least one column family. ");
400 LOG
.debug("Creating table " + tableName
+ " with " + columnFamilies
.length
401 + " column families. Presplitting to " + splitCount
+ " regions");
403 TableDescriptorBuilder builder
= TableDescriptorBuilder
.newBuilder(tableName
);
404 for (String cf
: columnFamilies
) {
405 builder
.setColumnFamily(ColumnFamilyDescriptorBuilder
.of(cf
));
407 try (Connection connection
= ConnectionFactory
.createConnection(conf
)) {
408 Admin admin
= connection
.getAdmin();
410 Preconditions
.checkArgument(!admin
.tableExists(tableName
),
411 "Table already exists: " + tableName
);
412 admin
.createTable(builder
.build(), splitAlgo
.split(splitCount
));
416 LOG
.debug("Table created! Waiting for regions to show online in META...");
417 if (!conf
.getBoolean("split.verify", true)) {
418 // NOTE: createTable is synchronous on the table, but not on the regions
419 int onlineRegions
= 0;
420 while (onlineRegions
< splitCount
) {
421 onlineRegions
= MetaTableAccessor
.getRegionCount(connection
, tableName
);
422 LOG
.debug(onlineRegions
+ " of " + splitCount
+ " regions online...");
423 if (onlineRegions
< splitCount
) {
424 Thread
.sleep(10 * 1000); // sleep
428 LOG
.debug("Finished creating table with " + splitCount
+ " regions");
433 * Alternative getCurrentNrHRS which is no longer available.
435 * @return Rough count of regionservers out on cluster.
436 * @throws IOException if a remote or network exception occurs
438 private static int getRegionServerCount(final Connection connection
) throws IOException
{
439 try (Admin admin
= connection
.getAdmin()) {
440 ClusterMetrics status
= admin
.getClusterMetrics(EnumSet
.of(Option
.LIVE_SERVERS
));
441 Collection
<ServerName
> servers
= status
.getLiveServerMetrics().keySet();
442 return servers
== null || servers
.isEmpty()?
0: servers
.size();
446 private static byte [] readFile(final FileSystem fs
, final Path path
) throws IOException
{
447 FSDataInputStream tmpIn
= fs
.open(path
);
449 byte [] rawData
= new byte[tmpIn
.available()];
450 tmpIn
.readFully(rawData
);
457 static void rollingSplit(TableName tableName
, SplitAlgorithm splitAlgo
, Configuration conf
)
458 throws IOException
, InterruptedException
{
459 final int minOS
= conf
.getInt("split.outstanding", 2);
460 try (Connection connection
= ConnectionFactory
.createConnection(conf
)) {
461 // Max outstanding splits. default == 50% of servers
462 final int MAX_OUTSTANDING
= Math
.max(getRegionServerCount(connection
) / 2, minOS
);
464 Path hbDir
= FSUtils
.getRootDir(conf
);
465 Path tableDir
= FSUtils
.getTableDir(hbDir
, tableName
);
466 Path splitFile
= new Path(tableDir
, "_balancedSplit");
467 FileSystem fs
= FileSystem
.get(conf
);
469 // Get a list of daughter regions to create
470 LinkedList
<Pair
<byte[], byte[]>> tmpRegionSet
= null;
471 try (Table table
= connection
.getTable(tableName
)) {
472 tmpRegionSet
= getSplits(connection
, tableName
, splitAlgo
);
474 LinkedList
<Pair
<byte[], byte[]>> outstanding
= Lists
.newLinkedList();
476 final int origCount
= tmpRegionSet
.size();
478 // all splits must compact & we have 1 compact thread, so 2 split
479 // requests to the same RS can stall the outstanding split queue.
480 // To fix, group the regions into an RS pool and round-robin through it
481 LOG
.debug("Bucketing regions by regionserver...");
482 TreeMap
<ServerName
, LinkedList
<Pair
<byte[], byte[]>>> daughterRegions
=
484 // Get a regionLocator. Need it in below.
485 try (RegionLocator regionLocator
= connection
.getRegionLocator(tableName
)) {
486 for (Pair
<byte[], byte[]> dr
: tmpRegionSet
) {
487 ServerName rsLocation
= regionLocator
.getRegionLocation(dr
.getSecond()).getServerName();
488 if (!daughterRegions
.containsKey(rsLocation
)) {
489 LinkedList
<Pair
<byte[], byte[]>> entry
= Lists
.newLinkedList();
490 daughterRegions
.put(rsLocation
, entry
);
492 daughterRegions
.get(rsLocation
).add(dr
);
494 LOG
.debug("Done with bucketing. Split time!");
495 long startTime
= System
.currentTimeMillis();
497 // Open the split file and modify it as splits finish
498 byte[] rawData
= readFile(fs
, splitFile
);
500 FSDataOutputStream splitOut
= fs
.create(splitFile
);
502 splitOut
.write(rawData
);
505 // *** split code ***
506 while (!daughterRegions
.isEmpty()) {
507 LOG
.debug(daughterRegions
.size() + " RS have regions to splt.");
509 // Get ServerName to region count mapping
510 final TreeMap
<ServerName
, Integer
> rsSizes
= Maps
.newTreeMap();
511 List
<HRegionLocation
> hrls
= regionLocator
.getAllRegionLocations();
512 for (HRegionLocation hrl
: hrls
) {
513 ServerName sn
= hrl
.getServerName();
514 if (rsSizes
.containsKey(sn
)) {
515 rsSizes
.put(sn
, rsSizes
.get(sn
) + 1);
521 // Round-robin through the ServerName list. Choose the lightest-loaded servers
522 // first to keep the master from load-balancing regions as we split.
523 for (Map
.Entry
<ServerName
, LinkedList
<Pair
<byte[], byte[]>>> daughterRegion
:
524 daughterRegions
.entrySet()) {
525 Pair
<byte[], byte[]> dr
= null;
526 ServerName rsLoc
= daughterRegion
.getKey();
527 LinkedList
<Pair
<byte[], byte[]>> regionList
= daughterRegion
.getValue();
529 // Find a region in the ServerName list that hasn't been moved
530 LOG
.debug("Finding a region on " + rsLoc
);
531 while (!regionList
.isEmpty()) {
532 dr
= regionList
.pop();
534 // get current region info
535 byte[] split
= dr
.getSecond();
536 HRegionLocation regionLoc
= regionLocator
.getRegionLocation(split
);
538 // if this region moved locations
539 ServerName newRs
= regionLoc
.getServerName();
540 if (newRs
.compareTo(rsLoc
) != 0) {
541 LOG
.debug("Region with " + splitAlgo
.rowToStr(split
)
542 + " moved to " + newRs
+ ". Relocating...");
543 // relocate it, don't use it right now
544 if (!daughterRegions
.containsKey(newRs
)) {
545 LinkedList
<Pair
<byte[], byte[]>> entry
= Lists
.newLinkedList();
546 daughterRegions
.put(newRs
, entry
);
548 daughterRegions
.get(newRs
).add(dr
);
553 // make sure this region wasn't already split
554 byte[] sk
= regionLoc
.getRegionInfo().getStartKey();
555 if (sk
.length
!= 0) {
556 if (Bytes
.equals(split
, sk
)) {
557 LOG
.debug("Region already split on "
558 + splitAlgo
.rowToStr(split
) + ". Skipping this region...");
563 byte[] start
= dr
.getFirst();
564 Preconditions
.checkArgument(Bytes
.equals(start
, sk
), splitAlgo
565 .rowToStr(start
) + " != " + splitAlgo
.rowToStr(sk
));
568 // passed all checks! found a good region
571 if (regionList
.isEmpty()) {
572 daughterRegions
.remove(rsLoc
);
577 // we have a good region, time to split!
578 byte[] split
= dr
.getSecond();
579 LOG
.debug("Splitting at " + splitAlgo
.rowToStr(split
));
580 try (Admin admin
= connection
.getAdmin()) {
581 admin
.split(tableName
, split
);
584 LinkedList
<Pair
<byte[], byte[]>> finished
= Lists
.newLinkedList();
585 LinkedList
<Pair
<byte[], byte[]>> local_finished
= Lists
.newLinkedList();
586 if (conf
.getBoolean("split.verify", true)) {
587 // we need to verify and rate-limit our splits
588 outstanding
.addLast(dr
);
589 // with too many outstanding splits, wait for some to finish
590 while (outstanding
.size() >= MAX_OUTSTANDING
) {
591 LOG
.debug("Wait for outstanding splits " + outstanding
.size());
592 local_finished
= splitScan(outstanding
, connection
, tableName
, splitAlgo
);
593 if (local_finished
.isEmpty()) {
594 Thread
.sleep(30 * 1000);
596 finished
.addAll(local_finished
);
597 outstanding
.removeAll(local_finished
);
598 LOG
.debug(local_finished
.size() + " outstanding splits finished");
605 // mark each finished region as successfully split.
606 for (Pair
<byte[], byte[]> region
: finished
) {
607 splitOut
.writeChars("- " + splitAlgo
.rowToStr(region
.getFirst())
608 + " " + splitAlgo
.rowToStr(region
.getSecond()) + "\n");
610 if (splitCount
% 10 == 0) {
611 long tDiff
= (System
.currentTimeMillis() - startTime
)
613 LOG
.debug("STATUS UPDATE: " + splitCount
+ " / " + origCount
614 + ". Avg Time / Split = "
615 + org
.apache
.hadoop
.util
.StringUtils
.formatTime(tDiff
));
620 if (conf
.getBoolean("split.verify", true)) {
621 while (!outstanding
.isEmpty()) {
622 LOG
.debug("Finally Wait for outstanding splits " + outstanding
.size());
623 LinkedList
<Pair
<byte[], byte[]>> finished
= splitScan(outstanding
,
624 connection
, tableName
, splitAlgo
);
625 if (finished
.isEmpty()) {
626 Thread
.sleep(30 * 1000);
628 outstanding
.removeAll(finished
);
629 for (Pair
<byte[], byte[]> region
: finished
) {
630 splitOut
.writeChars("- " + splitAlgo
.rowToStr(region
.getFirst())
631 + " " + splitAlgo
.rowToStr(region
.getSecond()) + "\n");
634 LOG
.debug("Finally " + finished
.size() + " outstanding splits finished");
638 LOG
.debug("All regions have been successfully split!");
640 long tDiff
= System
.currentTimeMillis() - startTime
;
641 LOG
.debug("TOTAL TIME = "
642 + org
.apache
.hadoop
.util
.StringUtils
.formatTime(tDiff
));
643 LOG
.debug("Splits = " + splitCount
);
644 if (0 < splitCount
) {
645 LOG
.debug("Avg Time / Split = "
646 + org
.apache
.hadoop
.util
.StringUtils
.formatTime(tDiff
/ splitCount
));
651 fs
.delete(splitFile
, false);
658 * @throws IOException if the specified SplitAlgorithm class couldn't be
661 public static SplitAlgorithm
newSplitAlgoInstance(Configuration conf
,
662 String splitClassName
) throws IOException
{
665 // For split algorithms builtin to RegionSplitter, the user can specify
666 // their simple class name instead of a fully qualified class name.
667 if(splitClassName
.equals(HexStringSplit
.class.getSimpleName())) {
668 splitClass
= HexStringSplit
.class;
669 } else if (splitClassName
.equals(DecimalStringSplit
.class.getSimpleName())) {
670 splitClass
= DecimalStringSplit
.class;
671 } else if (splitClassName
.equals(UniformSplit
.class.getSimpleName())) {
672 splitClass
= UniformSplit
.class;
675 splitClass
= conf
.getClassByName(splitClassName
);
676 } catch (ClassNotFoundException e
) {
677 throw new IOException("Couldn't load split class " + splitClassName
, e
);
679 if(splitClass
== null) {
680 throw new IOException("Failed loading split class " + splitClassName
);
682 if(!SplitAlgorithm
.class.isAssignableFrom(splitClass
)) {
683 throw new IOException(
684 "Specified split class doesn't implement SplitAlgorithm");
688 return splitClass
.asSubclass(SplitAlgorithm
.class).getDeclaredConstructor().newInstance();
689 } catch (Exception e
) {
690 throw new IOException("Problem loading split algorithm: ", e
);
694 static LinkedList
<Pair
<byte[], byte[]>> splitScan(
695 LinkedList
<Pair
<byte[], byte[]>> regionList
,
696 final Connection connection
,
697 final TableName tableName
,
698 SplitAlgorithm splitAlgo
)
699 throws IOException
, InterruptedException
{
700 LinkedList
<Pair
<byte[], byte[]>> finished
= Lists
.newLinkedList();
701 LinkedList
<Pair
<byte[], byte[]>> logicalSplitting
= Lists
.newLinkedList();
702 LinkedList
<Pair
<byte[], byte[]>> physicalSplitting
= Lists
.newLinkedList();
705 Pair
<Path
, Path
> tableDirAndSplitFile
=
706 getTableDirAndSplitFile(connection
.getConfiguration(), tableName
);
707 Path tableDir
= tableDirAndSplitFile
.getFirst();
708 FileSystem fs
= tableDir
.getFileSystem(connection
.getConfiguration());
709 // Clear the cache to forcibly refresh region information
710 ((ClusterConnection
)connection
).clearRegionCache();
711 TableDescriptor htd
= null;
712 try (Table table
= connection
.getTable(tableName
)) {
713 htd
= table
.getDescriptor();
715 try (RegionLocator regionLocator
= connection
.getRegionLocator(tableName
)) {
717 // for every region that hasn't been verified as a finished split
718 for (Pair
<byte[], byte[]> region
: regionList
) {
719 byte[] start
= region
.getFirst();
720 byte[] split
= region
.getSecond();
722 // see if the new split daughter region has come online
724 HRegionInfo dri
= regionLocator
.getRegionLocation(split
).getRegionInfo();
725 if (dri
.isOffline() || !Bytes
.equals(dri
.getStartKey(), split
)) {
726 logicalSplitting
.add(region
);
729 } catch (NoServerForRegionException nsfre
) {
730 // NSFRE will occur if the old hbase:meta entry has no server assigned
731 LOG
.info(nsfre
.toString(), nsfre
);
732 logicalSplitting
.add(region
);
737 // when a daughter region is opened, a compaction is triggered
738 // wait until compaction completes for both daughter regions
739 LinkedList
<HRegionInfo
> check
= Lists
.newLinkedList();
740 check
.add(regionLocator
.getRegionLocation(start
).getRegionInfo());
741 check
.add(regionLocator
.getRegionLocation(split
).getRegionInfo());
742 for (HRegionInfo hri
: check
.toArray(new HRegionInfo
[check
.size()])) {
743 byte[] sk
= hri
.getStartKey();
745 sk
= splitAlgo
.firstRow();
747 HRegionFileSystem regionFs
= HRegionFileSystem
.openRegionFromFileSystem(
748 connection
.getConfiguration(), fs
, tableDir
, hri
, true);
750 // Check every Column Family for that region -- check does not have references.
751 boolean refFound
= false;
752 for (ColumnFamilyDescriptor c
: htd
.getColumnFamilies()) {
753 if ((refFound
= regionFs
.hasReferences(c
.getNameAsString()))) {
758 // compaction is completed when all reference files are gone
763 if (check
.isEmpty()) {
764 finished
.add(region
);
766 physicalSplitting
.add(region
);
768 } catch (NoServerForRegionException nsfre
) {
769 LOG
.debug("No Server Exception thrown for: " + splitAlgo
.rowToStr(start
));
770 physicalSplitting
.add(region
);
771 ((ClusterConnection
)connection
).clearRegionCache();
775 LOG
.debug("Split Scan: " + finished
.size() + " finished / "
776 + logicalSplitting
.size() + " split wait / "
777 + physicalSplitting
.size() + " reference wait");
786 * @return A Pair where first item is table dir and second is the split file.
787 * @throws IOException if a remote or network exception occurs
789 private static Pair
<Path
, Path
> getTableDirAndSplitFile(final Configuration conf
,
790 final TableName tableName
)
792 Path hbDir
= FSUtils
.getRootDir(conf
);
793 Path tableDir
= FSUtils
.getTableDir(hbDir
, tableName
);
794 Path splitFile
= new Path(tableDir
, "_balancedSplit");
795 return new Pair
<>(tableDir
, splitFile
);
798 static LinkedList
<Pair
<byte[], byte[]>> getSplits(final Connection connection
,
799 TableName tableName
, SplitAlgorithm splitAlgo
)
801 Pair
<Path
, Path
> tableDirAndSplitFile
=
802 getTableDirAndSplitFile(connection
.getConfiguration(), tableName
);
803 Path tableDir
= tableDirAndSplitFile
.getFirst();
804 Path splitFile
= tableDirAndSplitFile
.getSecond();
806 FileSystem fs
= tableDir
.getFileSystem(connection
.getConfiguration());
808 // Using strings because (new byte[]{0}).equals(new byte[]{0}) == false
809 Set
<Pair
<String
, String
>> daughterRegions
= Sets
.newHashSet();
811 // Does a split file exist?
812 if (!fs
.exists(splitFile
)) {
813 // NO = fresh start. calculate splits to make
814 LOG
.debug("No " + splitFile
.getName() + " file. Calculating splits ");
816 // Query meta for all regions in the table
817 Set
<Pair
<byte[], byte[]>> rows
= Sets
.newHashSet();
818 Pair
<byte[][], byte[][]> tmp
= null;
819 try (RegionLocator regionLocator
= connection
.getRegionLocator(tableName
)) {
820 tmp
= regionLocator
.getStartEndKeys();
822 Preconditions
.checkArgument(tmp
.getFirst().length
== tmp
.getSecond().length
,
823 "Start and End rows should be equivalent");
824 for (int i
= 0; i
< tmp
.getFirst().length
; ++i
) {
825 byte[] start
= tmp
.getFirst()[i
], end
= tmp
.getSecond()[i
];
826 if (start
.length
== 0)
827 start
= splitAlgo
.firstRow();
829 end
= splitAlgo
.lastRow();
830 rows
.add(Pair
.newPair(start
, end
));
832 LOG
.debug("Table " + tableName
+ " has " + rows
.size() + " regions that will be split.");
834 // prepare the split file
835 Path tmpFile
= new Path(tableDir
, "_balancedSplit_prepare");
836 FSDataOutputStream tmpOut
= fs
.create(tmpFile
);
838 // calculate all the splits == [daughterRegions] = [(start, splitPoint)]
839 for (Pair
<byte[], byte[]> r
: rows
) {
840 byte[] splitPoint
= splitAlgo
.split(r
.getFirst(), r
.getSecond());
841 String startStr
= splitAlgo
.rowToStr(r
.getFirst());
842 String splitStr
= splitAlgo
.rowToStr(splitPoint
);
843 daughterRegions
.add(Pair
.newPair(startStr
, splitStr
));
844 LOG
.debug("Will Split [" + startStr
+ " , "
845 + splitAlgo
.rowToStr(r
.getSecond()) + ") at " + splitStr
);
846 tmpOut
.writeChars("+ " + startStr
+ splitAlgo
.separator() + splitStr
850 fs
.rename(tmpFile
, splitFile
);
852 LOG
.debug("_balancedSplit file found. Replay log to restore state...");
853 FSUtils
.getInstance(fs
, connection
.getConfiguration())
854 .recoverFileLease(fs
, splitFile
, connection
.getConfiguration(), null);
856 // parse split file and process remaining splits
857 FSDataInputStream tmpIn
= fs
.open(splitFile
);
858 StringBuilder sb
= new StringBuilder(tmpIn
.available());
859 while (tmpIn
.available() > 0) {
860 sb
.append(tmpIn
.readChar());
863 for (String line
: sb
.toString().split("\n")) {
864 String
[] cmd
= line
.split(splitAlgo
.separator());
865 Preconditions
.checkArgument(3 == cmd
.length
);
866 byte[] start
= splitAlgo
.strToRow(cmd
[1]);
867 String startStr
= splitAlgo
.rowToStr(start
);
868 byte[] splitPoint
= splitAlgo
.strToRow(cmd
[2]);
869 String splitStr
= splitAlgo
.rowToStr(splitPoint
);
870 Pair
<String
, String
> r
= Pair
.newPair(startStr
, splitStr
);
871 if (cmd
[0].equals("+")) {
872 LOG
.debug("Adding: " + r
);
873 daughterRegions
.add(r
);
875 LOG
.debug("Removing: " + r
);
876 Preconditions
.checkArgument(cmd
[0].equals("-"),
877 "Unknown option: " + cmd
[0]);
878 Preconditions
.checkState(daughterRegions
.contains(r
),
879 "Missing row: " + r
);
880 daughterRegions
.remove(r
);
883 LOG
.debug("Done reading. " + daughterRegions
.size() + " regions left.");
885 LinkedList
<Pair
<byte[], byte[]>> ret
= Lists
.newLinkedList();
886 for (Pair
<String
, String
> r
: daughterRegions
) {
887 ret
.add(Pair
.newPair(splitAlgo
.strToRow(r
.getFirst()), splitAlgo
888 .strToRow(r
.getSecond())));
894 * HexStringSplit is a well-known {@link SplitAlgorithm} for choosing region
895 * boundaries. The format of a HexStringSplit region boundary is the ASCII
896 * representation of an MD5 checksum, or any other uniformly distributed
897 * hexadecimal value. Row are hex-encoded long values in the range
898 * <b>"00000000" => "FFFFFFFF"</b> and are left-padded with zeros to keep the
899 * same order lexicographically as if they were binary.
901 * Since this split algorithm uses hex strings as keys, it is easy to read &
902 * write in the shell but takes up more space and may be non-intuitive.
904 public static class HexStringSplit
extends NumberStringSplit
{
905 final static String DEFAULT_MIN_HEX
= "00000000";
906 final static String DEFAULT_MAX_HEX
= "FFFFFFFF";
907 final static int RADIX_HEX
= 16;
909 public HexStringSplit() {
910 super(DEFAULT_MIN_HEX
, DEFAULT_MAX_HEX
, RADIX_HEX
);
916 * The format of a DecimalStringSplit region boundary is the ASCII representation of
917 * reversed sequential number, or any other uniformly distributed decimal value.
918 * Row are decimal-encoded long values in the range
919 * <b>"00000000" => "99999999"</b> and are left-padded with zeros to keep the
920 * same order lexicographically as if they were binary.
922 public static class DecimalStringSplit
extends NumberStringSplit
{
923 final static String DEFAULT_MIN_DEC
= "00000000";
924 final static String DEFAULT_MAX_DEC
= "99999999";
925 final static int RADIX_DEC
= 10;
927 public DecimalStringSplit() {
928 super(DEFAULT_MIN_DEC
, DEFAULT_MAX_DEC
, RADIX_DEC
);
933 public abstract static class NumberStringSplit
implements SplitAlgorithm
{
936 BigInteger firstRowInt
;
938 BigInteger lastRowInt
;
939 int rowComparisonLength
;
942 NumberStringSplit(String minRow
, String maxRow
, int radix
) {
943 this.firstRow
= minRow
;
944 this.lastRow
= maxRow
;
946 this.firstRowInt
= BigInteger
.ZERO
;
947 this.lastRowInt
= new BigInteger(lastRow
, this.radix
);
948 this.rowComparisonLength
= lastRow
.length();
952 public byte[] split(byte[] start
, byte[] end
) {
953 BigInteger s
= convertToBigInteger(start
);
954 BigInteger e
= convertToBigInteger(end
);
955 Preconditions
.checkArgument(!e
.equals(BigInteger
.ZERO
));
956 return convertToByte(split2(s
, e
));
960 public byte[][] split(int n
) {
961 Preconditions
.checkArgument(lastRowInt
.compareTo(firstRowInt
) > 0,
962 "last row (%s) is configured less than first row (%s)", lastRow
,
964 // +1 to range because the last row is inclusive
965 BigInteger range
= lastRowInt
.subtract(firstRowInt
).add(BigInteger
.ONE
);
966 Preconditions
.checkState(range
.compareTo(BigInteger
.valueOf(n
)) >= 0,
967 "split granularity (%s) is greater than the range (%s)", n
, range
);
969 BigInteger
[] splits
= new BigInteger
[n
- 1];
970 BigInteger sizeOfEachSplit
= range
.divide(BigInteger
.valueOf(n
));
971 for (int i
= 1; i
< n
; i
++) {
972 // NOTE: this means the last region gets all the slop.
973 // This is not a big deal if we're assuming n << MAXHEX
974 splits
[i
- 1] = firstRowInt
.add(sizeOfEachSplit
.multiply(BigInteger
977 return convertToBytes(splits
);
981 public byte[][] split(byte[] start
, byte[] end
, int numSplits
, boolean inclusive
) {
982 BigInteger s
= convertToBigInteger(start
);
983 BigInteger e
= convertToBigInteger(end
);
985 Preconditions
.checkArgument(e
.compareTo(s
) > 0,
986 "last row (%s) is configured less than first row (%s)", rowToStr(end
),
988 // +1 to range because the last row is inclusive
989 BigInteger range
= e
.subtract(s
).add(BigInteger
.ONE
);
990 Preconditions
.checkState(range
.compareTo(BigInteger
.valueOf(numSplits
)) >= 0,
991 "split granularity (%s) is greater than the range (%s)", numSplits
, range
);
993 BigInteger
[] splits
= new BigInteger
[numSplits
- 1];
994 BigInteger sizeOfEachSplit
= range
.divide(BigInteger
.valueOf(numSplits
));
995 for (int i
= 1; i
< numSplits
; i
++) {
996 // NOTE: this means the last region gets all the slop.
997 // This is not a big deal if we're assuming n << MAXHEX
998 splits
[i
- 1] = s
.add(sizeOfEachSplit
.multiply(BigInteger
1003 BigInteger
[] inclusiveSplitPoints
= new BigInteger
[numSplits
+ 1];
1004 inclusiveSplitPoints
[0] = convertToBigInteger(start
);
1005 inclusiveSplitPoints
[numSplits
] = convertToBigInteger(end
);
1006 System
.arraycopy(splits
, 0, inclusiveSplitPoints
, 1, splits
.length
);
1007 return convertToBytes(inclusiveSplitPoints
);
1009 return convertToBytes(splits
);
1014 public byte[] firstRow() {
1015 return convertToByte(firstRowInt
);
1019 public byte[] lastRow() {
1020 return convertToByte(lastRowInt
);
1024 public void setFirstRow(String userInput
) {
1025 firstRow
= userInput
;
1026 firstRowInt
= new BigInteger(firstRow
, radix
);
1030 public void setLastRow(String userInput
) {
1031 lastRow
= userInput
;
1032 lastRowInt
= new BigInteger(lastRow
, radix
);
1033 // Precondition: lastRow > firstRow, so last's length is the greater
1034 rowComparisonLength
= lastRow
.length();
1038 public byte[] strToRow(String in
) {
1039 return convertToByte(new BigInteger(in
, radix
));
1043 public String
rowToStr(byte[] row
) {
1044 return Bytes
.toStringBinary(row
);
1048 public String
separator() {
1053 public void setFirstRow(byte[] userInput
) {
1054 firstRow
= Bytes
.toString(userInput
);
1058 public void setLastRow(byte[] userInput
) {
1059 lastRow
= Bytes
.toString(userInput
);
1063 * Divide 2 numbers in half (for split algorithm)
1065 * @param a number #1
1066 * @param b number #2
1067 * @return the midpoint of the 2 numbers
1069 public BigInteger
split2(BigInteger a
, BigInteger b
) {
1070 return a
.add(b
).divide(BigInteger
.valueOf(2)).abs();
1074 * Returns an array of bytes corresponding to an array of BigIntegers
1076 * @param bigIntegers numbers to convert
1077 * @return bytes corresponding to the bigIntegers
1079 public byte[][] convertToBytes(BigInteger
[] bigIntegers
) {
1080 byte[][] returnBytes
= new byte[bigIntegers
.length
][];
1081 for (int i
= 0; i
< bigIntegers
.length
; i
++) {
1082 returnBytes
[i
] = convertToByte(bigIntegers
[i
]);
1088 * Returns the bytes corresponding to the BigInteger
1090 * @param bigInteger number to convert
1091 * @param pad padding length
1092 * @return byte corresponding to input BigInteger
1094 public byte[] convertToByte(BigInteger bigInteger
, int pad
) {
1095 String bigIntegerString
= bigInteger
.toString(radix
);
1096 bigIntegerString
= StringUtils
.leftPad(bigIntegerString
, pad
, '0');
1097 return Bytes
.toBytes(bigIntegerString
);
1101 * Returns the bytes corresponding to the BigInteger
1103 * @param bigInteger number to convert
1104 * @return corresponding bytes
1106 public byte[] convertToByte(BigInteger bigInteger
) {
1107 return convertToByte(bigInteger
, rowComparisonLength
);
1111 * Returns the BigInteger represented by the byte array
1113 * @param row byte array representing row
1114 * @return the corresponding BigInteger
1116 public BigInteger
convertToBigInteger(byte[] row
) {
1117 return (row
.length
> 0) ?
new BigInteger(Bytes
.toString(row
), radix
)
1122 public String
toString() {
1123 return this.getClass().getSimpleName() + " [" + rowToStr(firstRow())
1124 + "," + rowToStr(lastRow()) + "]";
1129 * A SplitAlgorithm that divides the space of possible keys evenly. Useful
1130 * when the keys are approximately uniform random bytes (e.g. hashes). Rows
1131 * are raw byte values in the range <b>00 => FF</b> and are right-padded with
1132 * zeros to keep the same memcmp() order. This is the natural algorithm to use
1133 * for a byte[] environment and saves space, but is not necessarily the
1134 * easiest for readability.
1136 public static class UniformSplit
implements SplitAlgorithm
{
1137 static final byte xFF
= (byte) 0xFF;
1138 byte[] firstRowBytes
= ArrayUtils
.EMPTY_BYTE_ARRAY
;
1139 byte[] lastRowBytes
=
1140 new byte[] {xFF
, xFF
, xFF
, xFF
, xFF
, xFF
, xFF
, xFF
};
1142 public byte[] split(byte[] start
, byte[] end
) {
1143 return Bytes
.split(start
, end
, 1)[1];
1147 public byte[][] split(int numRegions
) {
1148 Preconditions
.checkArgument(
1149 Bytes
.compareTo(lastRowBytes
, firstRowBytes
) > 0,
1150 "last row (%s) is configured less than first row (%s)",
1151 Bytes
.toStringBinary(lastRowBytes
),
1152 Bytes
.toStringBinary(firstRowBytes
));
1154 byte[][] splits
= Bytes
.split(firstRowBytes
, lastRowBytes
, true,
1156 Preconditions
.checkState(splits
!= null,
1157 "Could not split region with given user input: " + this);
1159 // remove endpoints, which are included in the splits list
1161 return splits
== null?
null: Arrays
.copyOfRange(splits
, 1, splits
.length
- 1);
1165 public byte[][] split(byte[] start
, byte[] end
, int numSplits
, boolean inclusive
) {
1166 if (Arrays
.equals(start
, HConstants
.EMPTY_BYTE_ARRAY
)) {
1167 start
= firstRowBytes
;
1169 if (Arrays
.equals(end
, HConstants
.EMPTY_BYTE_ARRAY
)) {
1172 Preconditions
.checkArgument(
1173 Bytes
.compareTo(end
, start
) > 0,
1174 "last row (%s) is configured less than first row (%s)",
1175 Bytes
.toStringBinary(end
),
1176 Bytes
.toStringBinary(start
));
1178 byte[][] splits
= Bytes
.split(start
, end
, true,
1180 Preconditions
.checkState(splits
!= null,
1181 "Could not calculate input splits with given user input: " + this);
1185 // remove endpoints, which are included in the splits list
1186 return Arrays
.copyOfRange(splits
, 1, splits
.length
- 1);
1191 public byte[] firstRow() {
1192 return firstRowBytes
;
1196 public byte[] lastRow() {
1197 return lastRowBytes
;
1201 public void setFirstRow(String userInput
) {
1202 firstRowBytes
= Bytes
.toBytesBinary(userInput
);
1206 public void setLastRow(String userInput
) {
1207 lastRowBytes
= Bytes
.toBytesBinary(userInput
);
1212 public void setFirstRow(byte[] userInput
) {
1213 firstRowBytes
= userInput
;
1217 public void setLastRow(byte[] userInput
) {
1218 lastRowBytes
= userInput
;
1222 public byte[] strToRow(String input
) {
1223 return Bytes
.toBytesBinary(input
);
1227 public String
rowToStr(byte[] row
) {
1228 return Bytes
.toStringBinary(row
);
1232 public String
separator() {
1237 public String
toString() {
1238 return this.getClass().getSimpleName() + " [" + rowToStr(firstRow())
1239 + "," + rowToStr(lastRow()) + "]";