HBASE-21778 Remove the usage of the locateRegion related methods in ClusterConnection
[hbase.git] / hbase-server / src / main / java / org / apache / hadoop / hbase / tool / LoadIncrementalHFiles.java
blob9a732317d5511d69b79c9a5967d97f4c505bbdef
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.tool;
20 import static java.lang.String.format;
22 import java.io.FileNotFoundException;
23 import java.io.IOException;
24 import java.io.InterruptedIOException;
25 import java.nio.ByteBuffer;
26 import java.util.ArrayDeque;
27 import java.util.ArrayList;
28 import java.util.Arrays;
29 import java.util.Collection;
30 import java.util.Collections;
31 import java.util.Deque;
32 import java.util.HashMap;
33 import java.util.HashSet;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.Map.Entry;
37 import java.util.Optional;
38 import java.util.Set;
39 import java.util.SortedMap;
40 import java.util.TreeMap;
41 import java.util.UUID;
42 import java.util.concurrent.Callable;
43 import java.util.concurrent.ExecutionException;
44 import java.util.concurrent.ExecutorService;
45 import java.util.concurrent.Future;
46 import java.util.concurrent.LinkedBlockingQueue;
47 import java.util.concurrent.ThreadPoolExecutor;
48 import java.util.concurrent.TimeUnit;
49 import java.util.concurrent.atomic.AtomicInteger;
50 import java.util.stream.Collectors;
51 import org.apache.commons.lang3.mutable.MutableInt;
52 import org.apache.hadoop.conf.Configuration;
53 import org.apache.hadoop.conf.Configured;
54 import org.apache.hadoop.fs.FileStatus;
55 import org.apache.hadoop.fs.FileSystem;
56 import org.apache.hadoop.fs.Path;
57 import org.apache.hadoop.fs.permission.FsPermission;
58 import org.apache.hadoop.hbase.HBaseConfiguration;
59 import org.apache.hadoop.hbase.HConstants;
60 import org.apache.hadoop.hbase.TableName;
61 import org.apache.hadoop.hbase.TableNotFoundException;
62 import org.apache.hadoop.hbase.client.Admin;
63 import org.apache.hadoop.hbase.client.ClientServiceCallable;
64 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
65 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
66 import org.apache.hadoop.hbase.client.Connection;
67 import org.apache.hadoop.hbase.client.ConnectionFactory;
68 import org.apache.hadoop.hbase.client.RegionLocator;
69 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
70 import org.apache.hadoop.hbase.client.SecureBulkLoadClient;
71 import org.apache.hadoop.hbase.client.Table;
72 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
73 import org.apache.hadoop.hbase.io.HFileLink;
74 import org.apache.hadoop.hbase.io.HalfStoreFileReader;
75 import org.apache.hadoop.hbase.io.Reference;
76 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
77 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
78 import org.apache.hadoop.hbase.io.hfile.HFile;
79 import org.apache.hadoop.hbase.io.hfile.HFileContext;
80 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
81 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
82 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
83 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
84 import org.apache.hadoop.hbase.regionserver.BloomType;
85 import org.apache.hadoop.hbase.regionserver.HStore;
86 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
87 import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
88 import org.apache.hadoop.hbase.security.UserProvider;
89 import org.apache.hadoop.hbase.security.token.FsDelegationToken;
90 import org.apache.hadoop.hbase.util.Bytes;
91 import org.apache.hadoop.hbase.util.FSHDFSUtils;
92 import org.apache.hadoop.hbase.util.FSVisitor;
93 import org.apache.hadoop.hbase.util.Pair;
94 import org.apache.hadoop.util.Tool;
95 import org.apache.hadoop.util.ToolRunner;
96 import org.apache.yetus.audience.InterfaceAudience;
97 import org.slf4j.Logger;
98 import org.slf4j.LoggerFactory;
100 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
101 import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
102 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
103 import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
104 import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
105 import org.apache.hbase.thirdparty.com.google.common.collect.Multimaps;
106 import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
109 * Tool to load the output of HFileOutputFormat into an existing table.
110 * @deprecated since 2.2.0, will be removed in 3.0.0. Use {@link BulkLoadHFiles} instead. Please
111 * rewrite your code if you rely on methods other than the {@link #run(Map, TableName)}
112 * and {@link #run(String, TableName)}, as all the methods other than them will be
113 * removed with no replacement.
115 @Deprecated
116 @InterfaceAudience.Public
117 public class LoadIncrementalHFiles extends Configured implements Tool {
119 private static final Logger LOG = LoggerFactory.getLogger(LoadIncrementalHFiles.class);
122 * @deprecated since 2.2.0, will be removed in 3.0.0, with no replacement. End user should not
123 * depend on this value.
125 @Deprecated
126 public static final String NAME = BulkLoadHFilesTool.NAME;
127 static final String RETRY_ON_IO_EXCEPTION = BulkLoadHFiles.RETRY_ON_IO_EXCEPTION;
128 public static final String MAX_FILES_PER_REGION_PER_FAMILY =
129 BulkLoadHFiles.MAX_FILES_PER_REGION_PER_FAMILY;
130 private static final String ASSIGN_SEQ_IDS = BulkLoadHFiles.ASSIGN_SEQ_IDS;
131 public final static String CREATE_TABLE_CONF_KEY = BulkLoadHFiles.CREATE_TABLE_CONF_KEY;
132 public final static String IGNORE_UNMATCHED_CF_CONF_KEY =
133 BulkLoadHFiles.IGNORE_UNMATCHED_CF_CONF_KEY;
134 public final static String ALWAYS_COPY_FILES = BulkLoadHFiles.ALWAYS_COPY_FILES;
136 // We use a '.' prefix which is ignored when walking directory trees
137 // above. It is invalid family name.
138 static final String TMP_DIR = ".tmp";
140 private final int maxFilesPerRegionPerFamily;
141 private final boolean assignSeqIds;
143 // Source delegation token
144 private final FsDelegationToken fsDelegationToken;
145 private final UserProvider userProvider;
146 private final int nrThreads;
147 private AtomicInteger numRetries;
148 private final RpcControllerFactory rpcControllerFactory;
150 private String bulkToken;
153 * Represents an HFile waiting to be loaded. An queue is used in this class in order to support
154 * the case where a region has split during the process of the load. When this happens, the HFile
155 * is split into two physical parts across the new region boundary, and each part is added back
156 * into the queue. The import process finishes when the queue is empty.
157 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use {@link BulkLoadHFiles} instead.
158 * @see BulkLoadHFiles
159 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21782">HBASE-21782</a>
161 @InterfaceAudience.Public
162 @Deprecated
163 public static class LoadQueueItem extends BulkLoadHFiles.LoadQueueItem {
165 public LoadQueueItem(byte[] family, Path hfilePath) {
166 super(family, hfilePath);
170 public LoadIncrementalHFiles(Configuration conf) {
171 // make a copy, just to be sure we're not overriding someone else's config
172 super(HBaseConfiguration.create(conf));
173 conf = getConf();
174 // disable blockcache for tool invocation, see HBASE-10500
175 conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0);
176 userProvider = UserProvider.instantiate(conf);
177 fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
178 assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
179 maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32);
180 nrThreads = conf.getInt("hbase.loadincremental.threads.max",
181 Runtime.getRuntime().availableProcessors());
182 numRetries = new AtomicInteger(0);
183 rpcControllerFactory = new RpcControllerFactory(conf);
186 private void usage() {
187 System.err.println("usage: " + "bin/hbase completebulkload <-Dargs> "
188 + "</path/to/hfileoutputformat-output> <tablename>\n"
189 + "\t-D" + CREATE_TABLE_CONF_KEY + "=no can be used to avoid creation "
190 + "of a table by this tool.\n"
191 + "\t Note: if you set this to 'no', then target table must already exist.\n"
192 + "\t-D" + IGNORE_UNMATCHED_CF_CONF_KEY + "=yes can be used to ignore "
193 + "unmatched column families.\n"
194 + "\t-loadTable switch implies your baseDirectory to store file has a "
195 + "depth of 3, table must exist\n"
196 + "\t and -loadTable switch is the last option on the command line.\n\n");
200 * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the
201 * passed directory and validates whether the prepared queue has all the valid table column
202 * families in it.
203 * @param hfilesDir directory containing list of hfiles to be loaded into the table
204 * @param table table to which hfiles should be loaded
205 * @param queue queue which needs to be loaded into the table
206 * @param validateHFile if true hfiles will be validated for its format
207 * @throws IOException If any I/O or network error occurred
209 public void prepareHFileQueue(Path hfilesDir, Table table, Deque<LoadQueueItem> queue,
210 boolean validateHFile) throws IOException {
211 prepareHFileQueue(hfilesDir, table, queue, validateHFile, false);
215 * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the
216 * passed directory and validates whether the prepared queue has all the valid table column
217 * families in it.
218 * @param hfilesDir directory containing list of hfiles to be loaded into the table
219 * @param table table to which hfiles should be loaded
220 * @param queue queue which needs to be loaded into the table
221 * @param validateHFile if true hfiles will be validated for its format
222 * @param silence true to ignore unmatched column families
223 * @throws IOException If any I/O or network error occurred
225 public void prepareHFileQueue(Path hfilesDir, Table table, Deque<LoadQueueItem> queue,
226 boolean validateHFile, boolean silence) throws IOException {
227 discoverLoadQueue(queue, hfilesDir, validateHFile);
228 validateFamiliesInHFiles(table, queue, silence);
232 * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the
233 * passed directory and validates whether the prepared queue has all the valid table column
234 * families in it.
235 * @param map map of family to List of hfiles
236 * @param table table to which hfiles should be loaded
237 * @param queue queue which needs to be loaded into the table
238 * @param silence true to ignore unmatched column families
239 * @throws IOException If any I/O or network error occurred
241 public void prepareHFileQueue(Map<byte[], List<Path>> map, Table table,
242 Deque<LoadQueueItem> queue, boolean silence) throws IOException {
243 populateLoadQueue(queue, map);
244 validateFamiliesInHFiles(table, queue, silence);
248 * Perform a bulk load of the given directory into the given pre-existing table. This method is
249 * not threadsafe.
250 * @param hfofDir the directory that was provided as the output path of a job using
251 * HFileOutputFormat
252 * @param admin the Admin
253 * @param table the table to load into
254 * @param regionLocator region locator
255 * @throws TableNotFoundException if table does not yet exist
257 public Map<LoadQueueItem, ByteBuffer> doBulkLoad(Path hfofDir, final Admin admin, Table table,
258 RegionLocator regionLocator) throws TableNotFoundException, IOException {
259 return doBulkLoad(hfofDir, admin, table, regionLocator, false, false);
263 * Perform a bulk load of the given directory into the given pre-existing table. This method is
264 * not threadsafe.
265 * @param map map of family to List of hfiles
266 * @param admin the Admin
267 * @param table the table to load into
268 * @param regionLocator region locator
269 * @param silence true to ignore unmatched column families
270 * @param copyFile always copy hfiles if true
271 * @throws TableNotFoundException if table does not yet exist
273 public Map<LoadQueueItem, ByteBuffer> doBulkLoad(Map<byte[], List<Path>> map, final Admin admin,
274 Table table, RegionLocator regionLocator, boolean silence, boolean copyFile)
275 throws TableNotFoundException, IOException {
276 if (!admin.isTableAvailable(regionLocator.getName())) {
277 throw new TableNotFoundException("Table " + table.getName() + " is not currently available.");
279 // LQI queue does not need to be threadsafe -- all operations on this queue
280 // happen in this thread
281 Deque<LoadQueueItem> queue = new ArrayDeque<>();
282 ExecutorService pool = null;
283 SecureBulkLoadClient secureClient = null;
284 try {
285 prepareHFileQueue(map, table, queue, silence);
286 if (queue.isEmpty()) {
287 LOG.warn("Bulk load operation did not get any files to load");
288 return Collections.emptyMap();
290 pool = createExecutorService();
291 secureClient = new SecureBulkLoadClient(table.getConfiguration(), table);
292 return performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile);
293 } finally {
294 cleanup(admin, queue, pool, secureClient);
299 * Perform a bulk load of the given directory into the given pre-existing table. This method is
300 * not threadsafe.
301 * @param hfofDir the directory that was provided as the output path of a job using
302 * HFileOutputFormat
303 * @param admin the Admin
304 * @param table the table to load into
305 * @param regionLocator region locator
306 * @param silence true to ignore unmatched column families
307 * @param copyFile always copy hfiles if true
308 * @throws TableNotFoundException if table does not yet exist
310 public Map<LoadQueueItem, ByteBuffer> doBulkLoad(Path hfofDir, final Admin admin, Table table,
311 RegionLocator regionLocator, boolean silence, boolean copyFile)
312 throws TableNotFoundException, IOException {
313 if (!admin.isTableAvailable(regionLocator.getName())) {
314 throw new TableNotFoundException("Table " + table.getName() + " is not currently available.");
318 * Checking hfile format is a time-consuming operation, we should have an option to skip this
319 * step when bulkloading millions of HFiles. See HBASE-13985.
321 boolean validateHFile = getConf().getBoolean("hbase.loadincremental.validate.hfile", true);
322 if (!validateHFile) {
323 LOG.warn("You are skipping HFiles validation, it might cause some data loss if files " +
324 "are not correct. If you fail to read data from your table after using this " +
325 "option, consider removing the files and bulkload again without this option. " +
326 "See HBASE-13985");
328 // LQI queue does not need to be threadsafe -- all operations on this queue
329 // happen in this thread
330 Deque<LoadQueueItem> queue = new ArrayDeque<>();
331 ExecutorService pool = null;
332 SecureBulkLoadClient secureClient = null;
333 try {
334 prepareHFileQueue(hfofDir, table, queue, validateHFile, silence);
336 if (queue.isEmpty()) {
337 LOG.warn(
338 "Bulk load operation did not find any files to load in directory {}. " +
339 "Does it contain files in subdirectories that correspond to column family names?",
340 (hfofDir != null ? hfofDir.toUri().toString() : ""));
341 return Collections.emptyMap();
343 pool = createExecutorService();
344 secureClient = new SecureBulkLoadClient(table.getConfiguration(), table);
345 return performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile);
346 } finally {
347 cleanup(admin, queue, pool, secureClient);
352 * Used by the replication sink to load the hfiles from the source cluster. It does the following,
353 * <ol>
354 * <li>LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}</li>
355 * <li>LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap)
356 * </li>
357 * </ol>
358 * @param table Table to which these hfiles should be loaded to
359 * @param conn Connection to use
360 * @param queue {@link LoadQueueItem} has hfiles yet to be loaded
361 * @param startEndKeys starting and ending row keys of the region
363 public void loadHFileQueue(Table table, Connection conn, Deque<LoadQueueItem> queue,
364 Pair<byte[][], byte[][]> startEndKeys) throws IOException {
365 loadHFileQueue(table, conn, queue, startEndKeys, false);
369 * Used by the replication sink to load the hfiles from the source cluster. It does the following,
370 * <ol>
371 * <li>LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}</li>
372 * <li>LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap)
373 * </li>
374 * </ol>
375 * @param table Table to which these hfiles should be loaded to
376 * @param conn Connection to use
377 * @param queue {@link LoadQueueItem} has hfiles yet to be loaded
378 * @param startEndKeys starting and ending row keys of the region
380 public void loadHFileQueue(Table table, Connection conn, Deque<LoadQueueItem> queue,
381 Pair<byte[][], byte[][]> startEndKeys, boolean copyFile) throws IOException {
382 ExecutorService pool = null;
383 try {
384 pool = createExecutorService();
385 Multimap<ByteBuffer, LoadQueueItem> regionGroups =
386 groupOrSplitPhase(table, pool, queue, startEndKeys).getFirst();
387 bulkLoadPhase(table, conn, pool, queue, regionGroups, copyFile, null);
388 } finally {
389 if (pool != null) {
390 pool.shutdown();
395 private Map<LoadQueueItem, ByteBuffer> performBulkLoad(Admin admin, Table table,
396 RegionLocator regionLocator, Deque<LoadQueueItem> queue, ExecutorService pool,
397 SecureBulkLoadClient secureClient, boolean copyFile) throws IOException {
398 int count = 0;
400 fsDelegationToken.acquireDelegationToken(queue.peek().getFilePath().getFileSystem(getConf()));
401 bulkToken = secureClient.prepareBulkLoad(admin.getConnection());
402 Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair = null;
404 Map<LoadQueueItem, ByteBuffer> item2RegionMap = new HashMap<>();
405 // Assumes that region splits can happen while this occurs.
406 while (!queue.isEmpty()) {
407 // need to reload split keys each iteration.
408 final Pair<byte[][], byte[][]> startEndKeys = regionLocator.getStartEndKeys();
409 if (count != 0) {
410 LOG.info("Split occurred while grouping HFiles, retry attempt " + count + " with " +
411 queue.size() + " files remaining to group or split");
414 int maxRetries = getConf().getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10);
415 maxRetries = Math.max(maxRetries, startEndKeys.getFirst().length + 1);
416 if (maxRetries != 0 && count >= maxRetries) {
417 throw new IOException(
418 "Retry attempted " + count + " times without completing, bailing out");
420 count++;
422 // Using ByteBuffer for byte[] equality semantics
423 pair = groupOrSplitPhase(table, pool, queue, startEndKeys);
424 Multimap<ByteBuffer, LoadQueueItem> regionGroups = pair.getFirst();
426 if (!checkHFilesCountPerRegionPerFamily(regionGroups)) {
427 // Error is logged inside checkHFilesCountPerRegionPerFamily.
428 throw new IOException("Trying to load more than " + maxFilesPerRegionPerFamily +
429 " hfiles to one family of one region");
432 bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups, copyFile,
433 item2RegionMap);
435 // NOTE: The next iteration's split / group could happen in parallel to
436 // atomic bulkloads assuming that there are splits and no merges, and
437 // that we can atomically pull out the groups we want to retry.
440 if (!queue.isEmpty()) {
441 throw new RuntimeException("Bulk load aborted with some files not yet loaded." +
442 "Please check log for more details.");
444 return item2RegionMap;
448 * This takes the LQI's grouped by likely regions and attempts to bulk load them. Any failures are
449 * re-queued for another pass with the groupOrSplitPhase.
450 * <p>
451 * protected for testing.
453 @VisibleForTesting
454 protected void bulkLoadPhase(Table table, Connection conn, ExecutorService pool,
455 Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups,
456 boolean copyFile, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
457 // atomically bulk load the groups.
458 Set<Future<List<LoadQueueItem>>> loadingFutures = new HashSet<>();
459 for (Entry<ByteBuffer, ? extends Collection<LoadQueueItem>> e : regionGroups.asMap()
460 .entrySet()) {
461 byte[] first = e.getKey().array();
462 Collection<LoadQueueItem> lqis = e.getValue();
464 ClientServiceCallable<byte[]> serviceCallable =
465 buildClientServiceCallable(conn, table.getName(), first, lqis, copyFile);
467 Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() {
468 @Override
469 public List<LoadQueueItem> call() throws Exception {
470 List<LoadQueueItem> toRetry =
471 tryAtomicRegionLoad(serviceCallable, table.getName(), first, lqis);
472 return toRetry;
475 if (item2RegionMap != null) {
476 for (LoadQueueItem lqi : lqis) {
477 item2RegionMap.put(lqi, e.getKey());
480 loadingFutures.add(pool.submit(call));
483 // get all the results.
484 for (Future<List<LoadQueueItem>> future : loadingFutures) {
485 try {
486 List<LoadQueueItem> toRetry = future.get();
488 if (item2RegionMap != null) {
489 for (LoadQueueItem lqi : toRetry) {
490 item2RegionMap.remove(lqi);
493 // LQIs that are requeued to be regrouped.
494 queue.addAll(toRetry);
496 } catch (ExecutionException e1) {
497 Throwable t = e1.getCause();
498 if (t instanceof IOException) {
499 // At this point something unrecoverable has happened.
500 // TODO Implement bulk load recovery
501 throw new IOException("BulkLoad encountered an unrecoverable problem", t);
503 LOG.error("Unexpected execution exception during bulk load", e1);
504 throw new IllegalStateException(t);
505 } catch (InterruptedException e1) {
506 LOG.error("Unexpected interrupted exception during bulk load", e1);
507 throw (InterruptedIOException) new InterruptedIOException().initCause(e1);
512 @VisibleForTesting
513 protected ClientServiceCallable<byte[]> buildClientServiceCallable(Connection conn,
514 TableName tableName, byte[] first, Collection<LoadQueueItem> lqis, boolean copyFile) {
515 List<Pair<byte[], String>> famPaths =
516 lqis.stream().map(lqi -> Pair.newPair(lqi.getFamily(), lqi.getFilePath().toString()))
517 .collect(Collectors.toList());
518 return new ClientServiceCallable<byte[]>(conn, tableName, first,
519 rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
520 @Override
521 protected byte[] rpcCall() throws Exception {
522 SecureBulkLoadClient secureClient = null;
523 boolean success = false;
524 try {
525 if (LOG.isDebugEnabled()) {
526 LOG.debug("Going to connect to server " + getLocation() + " for row " +
527 Bytes.toStringBinary(getRow()) + " with hfile group " +
528 LoadIncrementalHFiles.this.toString(famPaths));
530 byte[] regionName = getLocation().getRegion().getRegionName();
531 try (Table table = conn.getTable(getTableName())) {
532 secureClient = new SecureBulkLoadClient(getConf(), table);
533 success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
534 assignSeqIds, fsDelegationToken.getUserToken(), bulkToken, copyFile);
536 return success ? regionName : null;
537 } finally {
538 // Best effort copying of files that might not have been imported
539 // from the staging directory back to original location
540 // in user directory
541 if (secureClient != null && !success) {
542 FileSystem targetFs = FileSystem.get(getConf());
543 FileSystem sourceFs = lqis.iterator().next().getFilePath().getFileSystem(getConf());
544 // Check to see if the source and target filesystems are the same
545 // If they are the same filesystem, we will try move the files back
546 // because previously we moved them to the staging directory.
547 if (FSHDFSUtils.isSameHdfs(getConf(), sourceFs, targetFs)) {
548 for (Pair<byte[], String> el : famPaths) {
549 Path hfileStagingPath = null;
550 Path hfileOrigPath = new Path(el.getSecond());
551 try {
552 hfileStagingPath = new Path(new Path(bulkToken, Bytes.toString(el.getFirst())),
553 hfileOrigPath.getName());
554 if (targetFs.rename(hfileStagingPath, hfileOrigPath)) {
555 LOG.debug("Moved back file " + hfileOrigPath + " from " + hfileStagingPath);
556 } else if (targetFs.exists(hfileStagingPath)) {
557 LOG.debug(
558 "Unable to move back file " + hfileOrigPath + " from " + hfileStagingPath);
560 } catch (Exception ex) {
561 LOG.debug(
562 "Unable to move back file " + hfileOrigPath + " from " + hfileStagingPath, ex);
572 private boolean checkHFilesCountPerRegionPerFamily(
573 final Multimap<ByteBuffer, LoadQueueItem> regionGroups) {
574 for (Map.Entry<ByteBuffer, Collection<LoadQueueItem>> e : regionGroups.asMap().entrySet()) {
575 Map<byte[], MutableInt> filesMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
576 for (LoadQueueItem lqi : e.getValue()) {
577 MutableInt count = filesMap.computeIfAbsent(lqi.getFamily(), k -> new MutableInt());
578 count.increment();
579 if (count.intValue() > maxFilesPerRegionPerFamily) {
580 LOG.error("Trying to load more than " + maxFilesPerRegionPerFamily +
581 " hfiles to family " + Bytes.toStringBinary(lqi.getFamily()) +
582 " of region with start key " + Bytes.toStringBinary(e.getKey()));
583 return false;
587 return true;
591 * @param table the table to load into
592 * @param pool the ExecutorService
593 * @param queue the queue for LoadQueueItem
594 * @param startEndKeys start and end keys
595 * @return A map that groups LQI by likely bulk load region targets and Set of missing hfiles.
597 private Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> groupOrSplitPhase(
598 final Table table, ExecutorService pool, Deque<LoadQueueItem> queue,
599 final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
600 // <region start key, LQI> need synchronized only within this scope of this
601 // phase because of the puts that happen in futures.
602 Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create();
603 final Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs);
604 Set<String> missingHFiles = new HashSet<>();
605 Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair =
606 new Pair<>(regionGroups, missingHFiles);
608 // drain LQIs and figure out bulk load groups
609 Set<Future<Pair<List<LoadQueueItem>, String>>> splittingFutures = new HashSet<>();
610 while (!queue.isEmpty()) {
611 final LoadQueueItem item = queue.remove();
613 final Callable<Pair<List<LoadQueueItem>, String>> call =
614 new Callable<Pair<List<LoadQueueItem>, String>>() {
615 @Override
616 public Pair<List<LoadQueueItem>, String> call() throws Exception {
617 Pair<List<LoadQueueItem>, String> splits =
618 groupOrSplit(regionGroups, item, table, startEndKeys);
619 return splits;
622 splittingFutures.add(pool.submit(call));
624 // get all the results. All grouping and splitting must finish before
625 // we can attempt the atomic loads.
626 for (Future<Pair<List<LoadQueueItem>, String>> lqis : splittingFutures) {
627 try {
628 Pair<List<LoadQueueItem>, String> splits = lqis.get();
629 if (splits != null) {
630 if (splits.getFirst() != null) {
631 queue.addAll(splits.getFirst());
632 } else {
633 missingHFiles.add(splits.getSecond());
636 } catch (ExecutionException e1) {
637 Throwable t = e1.getCause();
638 if (t instanceof IOException) {
639 LOG.error("IOException during splitting", e1);
640 throw (IOException) t; // would have been thrown if not parallelized,
642 LOG.error("Unexpected execution exception during splitting", e1);
643 throw new IllegalStateException(t);
644 } catch (InterruptedException e1) {
645 LOG.error("Unexpected interrupted exception during splitting", e1);
646 throw (InterruptedIOException) new InterruptedIOException().initCause(e1);
649 return pair;
652 private List<LoadQueueItem> splitStoreFile(final LoadQueueItem item, final Table table,
653 byte[] startKey, byte[] splitKey) throws IOException {
654 Path hfilePath = item.getFilePath();
655 byte[] family = item.getFamily();
656 Path tmpDir = hfilePath.getParent();
657 if (!tmpDir.getName().equals(TMP_DIR)) {
658 tmpDir = new Path(tmpDir, TMP_DIR);
661 LOG.info("HFile at " + hfilePath + " no longer fits inside a single " + "region. Splitting...");
663 String uniqueName = getUniqueName();
664 ColumnFamilyDescriptor familyDesc = table.getDescriptor().getColumnFamily(family);
666 Path botOut = new Path(tmpDir, uniqueName + ".bottom");
667 Path topOut = new Path(tmpDir, uniqueName + ".top");
668 splitStoreFile(getConf(), hfilePath, familyDesc, splitKey, botOut, topOut);
670 FileSystem fs = tmpDir.getFileSystem(getConf());
671 fs.setPermission(tmpDir, FsPermission.valueOf("-rwxrwxrwx"));
672 fs.setPermission(botOut, FsPermission.valueOf("-rwxrwxrwx"));
673 fs.setPermission(topOut, FsPermission.valueOf("-rwxrwxrwx"));
675 // Add these back at the *front* of the queue, so there's a lower
676 // chance that the region will just split again before we get there.
677 List<LoadQueueItem> lqis = new ArrayList<>(2);
678 lqis.add(new LoadQueueItem(family, botOut));
679 lqis.add(new LoadQueueItem(family, topOut));
681 // If the current item is already the result of previous splits,
682 // we don't need it anymore. Clean up to save space.
683 // It is not part of the original input files.
684 try {
685 if (tmpDir.getName().equals(TMP_DIR)) {
686 fs.delete(hfilePath, false);
688 } catch (IOException e) {
689 LOG.warn("Unable to delete temporary split file " + hfilePath);
691 LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut);
692 return lqis;
696 * Attempt to assign the given load queue item into its target region group. If the hfile boundary
697 * no longer fits into a region, physically splits the hfile such that the new bottom half will
698 * fit and returns the list of LQI's corresponding to the resultant hfiles.
699 * <p>
700 * protected for testing
701 * @throws IOException if an IO failure is encountered
703 @VisibleForTesting
704 protected Pair<List<LoadQueueItem>, String> groupOrSplit(
705 Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item, final Table table,
706 final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
707 Path hfilePath = item.getFilePath();
708 Optional<byte[]> first, last;
709 try (HFile.Reader hfr = HFile.createReader(hfilePath.getFileSystem(getConf()), hfilePath,
710 CacheConfig.DISABLED, true, getConf())) {
711 hfr.loadFileInfo();
712 first = hfr.getFirstRowKey();
713 last = hfr.getLastRowKey();
714 } catch (FileNotFoundException fnfe) {
715 LOG.debug("encountered", fnfe);
716 return new Pair<>(null, hfilePath.getName());
719 LOG.info("Trying to load hfile=" + hfilePath + " first=" + first.map(Bytes::toStringBinary) +
720 " last=" + last.map(Bytes::toStringBinary));
721 if (!first.isPresent() || !last.isPresent()) {
722 assert !first.isPresent() && !last.isPresent();
723 // TODO what if this is due to a bad HFile?
724 LOG.info("hfile " + hfilePath + " has no entries, skipping");
725 return null;
727 if (Bytes.compareTo(first.get(), last.get()) > 0) {
728 throw new IllegalArgumentException("Invalid range: " + Bytes.toStringBinary(first.get()) +
729 " > " + Bytes.toStringBinary(last.get()));
731 int idx = Arrays.binarySearch(startEndKeys.getFirst(), first.get(), Bytes.BYTES_COMPARATOR);
732 if (idx < 0) {
733 // not on boundary, returns -(insertion index). Calculate region it
734 // would be in.
735 idx = -(idx + 1) - 1;
737 int indexForCallable = idx;
740 * we can consider there is a region hole in following conditions. 1) if idx < 0,then first
741 * region info is lost. 2) if the endkey of a region is not equal to the startkey of the next
742 * region. 3) if the endkey of the last region is not empty.
744 if (indexForCallable < 0) {
745 throw new IOException("The first region info for table " + table.getName() +
746 " can't be found in hbase:meta.Please use hbck tool to fix it first.");
747 } else if ((indexForCallable == startEndKeys.getFirst().length - 1) &&
748 !Bytes.equals(startEndKeys.getSecond()[indexForCallable], HConstants.EMPTY_BYTE_ARRAY)) {
749 throw new IOException("The last region info for table " + table.getName() +
750 " can't be found in hbase:meta.Please use hbck tool to fix it first.");
751 } else if (indexForCallable + 1 < startEndKeys.getFirst().length &&
752 !(Bytes.compareTo(startEndKeys.getSecond()[indexForCallable],
753 startEndKeys.getFirst()[indexForCallable + 1]) == 0)) {
754 throw new IOException("The endkey of one region for table " + table.getName() +
755 " is not equal to the startkey of the next region in hbase:meta." +
756 "Please use hbck tool to fix it first.");
759 boolean lastKeyInRange = Bytes.compareTo(last.get(), startEndKeys.getSecond()[idx]) < 0 ||
760 Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY);
761 if (!lastKeyInRange) {
762 List<LoadQueueItem> lqis = splitStoreFile(item, table,
763 startEndKeys.getFirst()[indexForCallable], startEndKeys.getSecond()[indexForCallable]);
764 return new Pair<>(lqis, null);
767 // group regions.
768 regionGroups.put(ByteBuffer.wrap(startEndKeys.getFirst()[idx]), item);
769 return null;
773 * Attempts to do an atomic load of many hfiles into a region. If it fails, it returns a list of
774 * hfiles that need to be retried. If it is successful it will return an empty list.
775 * <p>
776 * NOTE: To maintain row atomicity guarantees, region server callable should succeed atomically
777 * and fails atomically.
778 * <p>
779 * Protected for testing.
780 * @return empty list if success, list of items to retry on recoverable failure
782 @VisibleForTesting
783 protected List<LoadQueueItem> tryAtomicRegionLoad(ClientServiceCallable<byte[]> serviceCallable,
784 final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis)
785 throws IOException {
786 List<LoadQueueItem> toRetry = new ArrayList<>();
787 try {
788 Configuration conf = getConf();
789 byte[] region = RpcRetryingCallerFactory.instantiate(conf, null).<byte[]> newCaller()
790 .callWithRetries(serviceCallable, Integer.MAX_VALUE);
791 if (region == null) {
792 LOG.warn("Attempt to bulk load region containing " + Bytes.toStringBinary(first) +
793 " into table " + tableName + " with files " + lqis +
794 " failed. This is recoverable and they will be retried.");
795 toRetry.addAll(lqis); // return lqi's to retry
797 // success
798 return toRetry;
799 } catch (IOException e) {
800 LOG.error("Encountered unrecoverable error from region server, additional details: " +
801 serviceCallable.getExceptionMessageAdditionalDetail(),
803 LOG.warn(
804 "Received a " + e.getClass().getSimpleName()
805 + " from region server: "
806 + serviceCallable.getExceptionMessageAdditionalDetail(), e);
807 if (getConf().getBoolean(RETRY_ON_IO_EXCEPTION, false)
808 && numRetries.get() < getConf().getInt(
809 HConstants.HBASE_CLIENT_RETRIES_NUMBER,
810 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)) {
811 LOG.warn("Will attempt to retry loading failed HFiles. Retry #"
812 + numRetries.incrementAndGet());
813 toRetry.addAll(lqis);
814 return toRetry;
816 LOG.error(RETRY_ON_IO_EXCEPTION + " is disabled. Unable to recover");
817 throw e;
822 * If the table is created for the first time, then "completebulkload" reads the files twice. More
823 * modifications necessary if we want to avoid doing it.
825 private void createTable(TableName tableName, Path hfofDir, Admin admin) throws IOException {
826 final FileSystem fs = hfofDir.getFileSystem(getConf());
828 // Add column families
829 // Build a set of keys
830 List<ColumnFamilyDescriptorBuilder> familyBuilders = new ArrayList<>();
831 SortedMap<byte[], Integer> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
832 visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<ColumnFamilyDescriptorBuilder>() {
833 @Override
834 public ColumnFamilyDescriptorBuilder bulkFamily(byte[] familyName) {
835 ColumnFamilyDescriptorBuilder builder =
836 ColumnFamilyDescriptorBuilder.newBuilder(familyName);
837 familyBuilders.add(builder);
838 return builder;
841 @Override
842 public void bulkHFile(ColumnFamilyDescriptorBuilder builder, FileStatus hfileStatus)
843 throws IOException {
844 Path hfile = hfileStatus.getPath();
845 try (HFile.Reader reader =
846 HFile.createReader(fs, hfile, CacheConfig.DISABLED, true, getConf())) {
847 if (builder.getCompressionType() != reader.getFileContext().getCompression()) {
848 builder.setCompressionType(reader.getFileContext().getCompression());
849 LOG.info("Setting compression " + reader.getFileContext().getCompression().name() +
850 " for family " + builder.getNameAsString());
852 reader.loadFileInfo();
853 byte[] first = reader.getFirstRowKey().get();
854 byte[] last = reader.getLastRowKey().get();
856 LOG.info("Trying to figure out region boundaries hfile=" + hfile + " first=" +
857 Bytes.toStringBinary(first) + " last=" + Bytes.toStringBinary(last));
859 // To eventually infer start key-end key boundaries
860 Integer value = map.containsKey(first) ? map.get(first) : 0;
861 map.put(first, value + 1);
863 value = map.containsKey(last) ? map.get(last) : 0;
864 map.put(last, value - 1);
869 byte[][] keys = inferBoundaries(map);
870 TableDescriptorBuilder tdBuilder = TableDescriptorBuilder.newBuilder(tableName);
871 familyBuilders.stream().map(ColumnFamilyDescriptorBuilder::build)
872 .forEachOrdered(tdBuilder::setColumnFamily);
873 admin.createTable(tdBuilder.build(), keys);
875 LOG.info("Table " + tableName + " is available!!");
878 private void cleanup(Admin admin, Deque<LoadQueueItem> queue, ExecutorService pool,
879 SecureBulkLoadClient secureClient) throws IOException {
880 fsDelegationToken.releaseDelegationToken();
881 if (bulkToken != null && secureClient != null) {
882 secureClient.cleanupBulkLoad(admin.getConnection(), bulkToken);
884 if (pool != null) {
885 pool.shutdown();
887 if (!queue.isEmpty()) {
888 StringBuilder err = new StringBuilder();
889 err.append("-------------------------------------------------\n");
890 err.append("Bulk load aborted with some files not yet loaded:\n");
891 err.append("-------------------------------------------------\n");
892 for (LoadQueueItem q : queue) {
893 err.append(" ").append(q.getFilePath()).append('\n');
895 LOG.error(err.toString());
899 // unique file name for the table
900 private String getUniqueName() {
901 return UUID.randomUUID().toString().replaceAll("-", "");
905 * Checks whether there is any invalid family name in HFiles to be bulk loaded.
907 private void validateFamiliesInHFiles(Table table, Deque<LoadQueueItem> queue, boolean silence)
908 throws IOException {
909 Set<String> familyNames = Arrays.asList(table.getDescriptor().getColumnFamilies()).stream()
910 .map(f -> f.getNameAsString()).collect(Collectors.toSet());
911 List<String> unmatchedFamilies = queue.stream().map(item -> Bytes.toString(item.getFamily()))
912 .filter(fn -> !familyNames.contains(fn)).distinct().collect(Collectors.toList());
913 if (unmatchedFamilies.size() > 0) {
914 String msg =
915 "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: " +
916 unmatchedFamilies + "; valid family names of table " + table.getName() + " are: " +
917 familyNames;
918 LOG.error(msg);
919 if (!silence) {
920 throw new IOException(msg);
926 * Populate the Queue with given HFiles
928 private void populateLoadQueue(Deque<LoadQueueItem> ret, Map<byte[], List<Path>> map) {
929 map.forEach((k, v) -> v.stream().map(p -> new LoadQueueItem(k, p)).forEachOrdered(ret::add));
933 * Walk the given directory for all HFiles, and return a Queue containing all such files.
935 private void discoverLoadQueue(final Deque<LoadQueueItem> ret, final Path hfofDir,
936 final boolean validateHFile) throws IOException {
937 visitBulkHFiles(hfofDir.getFileSystem(getConf()), hfofDir, new BulkHFileVisitor<byte[]>() {
938 @Override
939 public byte[] bulkFamily(final byte[] familyName) {
940 return familyName;
943 @Override
944 public void bulkHFile(final byte[] family, final FileStatus hfile) throws IOException {
945 long length = hfile.getLen();
946 if (length > getConf().getLong(HConstants.HREGION_MAX_FILESIZE,
947 HConstants.DEFAULT_MAX_FILE_SIZE)) {
948 LOG.warn("Trying to bulk load hfile " + hfile.getPath() + " with size: " + length +
949 " bytes can be problematic as it may lead to oversplitting.");
951 ret.add(new LoadQueueItem(family, hfile.getPath()));
953 }, validateHFile);
956 private interface BulkHFileVisitor<TFamily> {
958 TFamily bulkFamily(byte[] familyName) throws IOException;
960 void bulkHFile(TFamily family, FileStatus hfileStatus) throws IOException;
964 * Iterate over the bulkDir hfiles. Skip reference, HFileLink, files starting with "_" and
965 * non-valid hfiles.
967 private static <TFamily> void visitBulkHFiles(final FileSystem fs, final Path bulkDir,
968 final BulkHFileVisitor<TFamily> visitor) throws IOException {
969 visitBulkHFiles(fs, bulkDir, visitor, true);
973 * Iterate over the bulkDir hfiles. Skip reference, HFileLink, files starting with "_". Check and
974 * skip non-valid hfiles by default, or skip this validation by setting
975 * 'hbase.loadincremental.validate.hfile' to false.
977 private static <TFamily> void visitBulkHFiles(FileSystem fs, Path bulkDir,
978 BulkHFileVisitor<TFamily> visitor, boolean validateHFile) throws IOException {
979 FileStatus[] familyDirStatuses = fs.listStatus(bulkDir);
980 for (FileStatus familyStat : familyDirStatuses) {
981 if (!familyStat.isDirectory()) {
982 LOG.warn("Skipping non-directory " + familyStat.getPath());
983 continue;
985 Path familyDir = familyStat.getPath();
986 byte[] familyName = Bytes.toBytes(familyDir.getName());
987 // Skip invalid family
988 try {
989 ColumnFamilyDescriptorBuilder.isLegalColumnFamilyName(familyName);
990 } catch (IllegalArgumentException e) {
991 LOG.warn("Skipping invalid " + familyStat.getPath());
992 continue;
994 TFamily family = visitor.bulkFamily(familyName);
996 FileStatus[] hfileStatuses = fs.listStatus(familyDir);
997 for (FileStatus hfileStatus : hfileStatuses) {
998 if (!fs.isFile(hfileStatus.getPath())) {
999 LOG.warn("Skipping non-file " + hfileStatus);
1000 continue;
1003 Path hfile = hfileStatus.getPath();
1004 // Skip "_", reference, HFileLink
1005 String fileName = hfile.getName();
1006 if (fileName.startsWith("_")) {
1007 continue;
1009 if (StoreFileInfo.isReference(fileName)) {
1010 LOG.warn("Skipping reference " + fileName);
1011 continue;
1013 if (HFileLink.isHFileLink(fileName)) {
1014 LOG.warn("Skipping HFileLink " + fileName);
1015 continue;
1018 // Validate HFile Format if needed
1019 if (validateHFile) {
1020 try {
1021 if (!HFile.isHFileFormat(fs, hfile)) {
1022 LOG.warn("the file " + hfile + " doesn't seems to be an hfile. skipping");
1023 continue;
1025 } catch (FileNotFoundException e) {
1026 LOG.warn("the file " + hfile + " was removed");
1027 continue;
1031 visitor.bulkHFile(family, hfileStatus);
1036 // Initialize a thread pool
1037 private ExecutorService createExecutorService() {
1038 ThreadPoolExecutor pool = new ThreadPoolExecutor(nrThreads, nrThreads, 60, TimeUnit.SECONDS,
1039 new LinkedBlockingQueue<>(),
1040 new ThreadFactoryBuilder().setNameFormat("LoadIncrementalHFiles-%1$d").build());
1041 pool.allowCoreThreadTimeOut(true);
1042 return pool;
1045 private final String toString(List<Pair<byte[], String>> list) {
1046 StringBuilder sb = new StringBuilder();
1047 sb.append('[');
1048 list.forEach(p -> {
1049 sb.append('{').append(Bytes.toStringBinary(p.getFirst())).append(',').append(p.getSecond())
1050 .append('}');
1052 sb.append(']');
1053 return sb.toString();
1057 * Split a storefile into a top and bottom half, maintaining the metadata, recreating bloom
1058 * filters, etc.
1060 @VisibleForTesting
1061 static void splitStoreFile(Configuration conf, Path inFile, ColumnFamilyDescriptor familyDesc,
1062 byte[] splitKey, Path bottomOut, Path topOut) throws IOException {
1063 // Open reader with no block cache, and not in-memory
1064 Reference topReference = Reference.createTopReference(splitKey);
1065 Reference bottomReference = Reference.createBottomReference(splitKey);
1067 copyHFileHalf(conf, inFile, topOut, topReference, familyDesc);
1068 copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc);
1072 * Copy half of an HFile into a new HFile.
1074 private static void copyHFileHalf(Configuration conf, Path inFile, Path outFile,
1075 Reference reference, ColumnFamilyDescriptor familyDescriptor) throws IOException {
1076 FileSystem fs = inFile.getFileSystem(conf);
1077 CacheConfig cacheConf = CacheConfig.DISABLED;
1078 HalfStoreFileReader halfReader = null;
1079 StoreFileWriter halfWriter = null;
1080 try {
1081 halfReader = new HalfStoreFileReader(fs, inFile, cacheConf, reference, true,
1082 new AtomicInteger(0), true, conf);
1083 Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo();
1085 int blocksize = familyDescriptor.getBlocksize();
1086 Algorithm compression = familyDescriptor.getCompressionType();
1087 BloomType bloomFilterType = familyDescriptor.getBloomFilterType();
1088 HFileContext hFileContext = new HFileContextBuilder().withCompression(compression)
1089 .withChecksumType(HStore.getChecksumType(conf))
1090 .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(blocksize)
1091 .withDataBlockEncoding(familyDescriptor.getDataBlockEncoding()).withIncludesTags(true)
1092 .build();
1093 halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
1094 .withBloomType(bloomFilterType).withFileContext(hFileContext).build();
1095 HFileScanner scanner = halfReader.getScanner(false, false, false);
1096 scanner.seekTo();
1097 do {
1098 halfWriter.append(scanner.getCell());
1099 } while (scanner.next());
1101 for (Map.Entry<byte[], byte[]> entry : fileInfo.entrySet()) {
1102 if (shouldCopyHFileMetaKey(entry.getKey())) {
1103 halfWriter.appendFileInfo(entry.getKey(), entry.getValue());
1106 } finally {
1107 if (halfReader != null) {
1108 try {
1109 halfReader.close(cacheConf.shouldEvictOnClose());
1110 } catch (IOException e) {
1111 LOG.warn("failed to close hfile reader for " + inFile, e);
1114 if (halfWriter != null) {
1115 halfWriter.close();
1121 private static boolean shouldCopyHFileMetaKey(byte[] key) {
1122 // skip encoding to keep hfile meta consistent with data block info, see HBASE-15085
1123 if (Bytes.equals(key, HFileDataBlockEncoder.DATA_BLOCK_ENCODING)) {
1124 return false;
1127 return !HFile.isReservedFileInfoKey(key);
1130 private boolean isCreateTable() {
1131 return "yes".equalsIgnoreCase(getConf().get(CREATE_TABLE_CONF_KEY, "yes"));
1134 private boolean isSilence() {
1135 return "yes".equalsIgnoreCase(getConf().get(IGNORE_UNMATCHED_CF_CONF_KEY, ""));
1138 private boolean isAlwaysCopyFiles() {
1139 return getConf().getBoolean(ALWAYS_COPY_FILES, false);
1142 protected final Map<LoadQueueItem, ByteBuffer> run(Path hfofDir, TableName tableName)
1143 throws IOException {
1144 try (Connection connection = ConnectionFactory.createConnection(getConf());
1145 Admin admin = connection.getAdmin()) {
1146 if (!admin.tableExists(tableName)) {
1147 if (isCreateTable()) {
1148 createTable(tableName, hfofDir, admin);
1149 } else {
1150 String errorMsg = format("Table '%s' does not exist.", tableName);
1151 LOG.error(errorMsg);
1152 throw new TableNotFoundException(errorMsg);
1155 try (Table table = connection.getTable(tableName);
1156 RegionLocator locator = connection.getRegionLocator(tableName)) {
1157 return doBulkLoad(hfofDir, admin, table, locator, isSilence(),
1158 isAlwaysCopyFiles());
1163 * Perform bulk load on the given table.
1164 * @param hfofDir the directory that was provided as the output path of a job using
1165 * HFileOutputFormat
1166 * @param tableName the table to load into
1168 public Map<LoadQueueItem, ByteBuffer> run(String hfofDir, TableName tableName)
1169 throws IOException {
1170 return run(new Path(hfofDir), tableName);
1174 * Perform bulk load on the given table.
1175 * @param family2Files map of family to List of hfiles
1176 * @param tableName the table to load into
1178 public Map<LoadQueueItem, ByteBuffer> run(Map<byte[], List<Path>> family2Files,
1179 TableName tableName) throws IOException {
1180 try (Connection connection = ConnectionFactory.createConnection(getConf());
1181 Admin admin = connection.getAdmin()) {
1182 if (!admin.tableExists(tableName)) {
1183 String errorMsg = format("Table '%s' does not exist.", tableName);
1184 LOG.error(errorMsg);
1185 throw new TableNotFoundException(errorMsg);
1187 try (Table table = connection.getTable(tableName);
1188 RegionLocator locator = connection.getRegionLocator(tableName)) {
1189 return doBulkLoad(family2Files, admin, table, locator, isSilence(), isAlwaysCopyFiles());
1194 @Override
1195 public int run(String[] args) throws Exception {
1196 if (args.length != 2 && args.length != 3) {
1197 usage();
1198 return -1;
1200 String dirPath = args[0];
1201 TableName tableName = TableName.valueOf(args[1]);
1204 if (args.length == 2) {
1205 return !run(dirPath, tableName).isEmpty() ? 0 : -1;
1206 } else {
1207 Map<byte[], List<Path>> family2Files = Maps.newHashMap();
1208 FileSystem fs = FileSystem.get(getConf());
1209 for (FileStatus regionDir : fs.listStatus(new Path(dirPath))) {
1210 FSVisitor.visitRegionStoreFiles(fs, regionDir.getPath(), (region, family, hfileName) -> {
1211 Path path = new Path(regionDir.getPath(), new Path(family, hfileName));
1212 byte[] familyName = Bytes.toBytes(family);
1213 if (family2Files.containsKey(familyName)) {
1214 family2Files.get(familyName).add(path);
1215 } else {
1216 family2Files.put(familyName, Lists.newArrayList(path));
1220 return !run(family2Files, tableName).isEmpty() ? 0 : -1;
1225 public static void main(String[] args) throws Exception {
1226 Configuration conf = HBaseConfiguration.create();
1227 int ret = ToolRunner.run(conf, new LoadIncrementalHFiles(conf), args);
1228 System.exit(ret);
1232 * Called from replication sink, where it manages bulkToken(staging directory) by itself. This is
1233 * used only when SecureBulkLoadEndpoint is configured in hbase.coprocessor.region.classes
1234 * property. This directory is used as a temporary directory where all files are initially
1235 * copied/moved from user given directory, set all the required file permissions and then from
1236 * their it is finally loaded into a table. This should be set only when, one would like to manage
1237 * the staging directory by itself. Otherwise this tool will handle this by itself.
1238 * @param stagingDir staging directory path
1240 public void setBulkToken(String stagingDir) {
1241 this.bulkToken = stagingDir;
1245 * Infers region boundaries for a new table.
1246 * <p>
1247 * Parameter: <br>
1248 * bdryMap is a map between keys to an integer belonging to {+1, -1}
1249 * <ul>
1250 * <li>If a key is a start key of a file, then it maps to +1</li>
1251 * <li>If a key is an end key of a file, then it maps to -1</li>
1252 * </ul>
1253 * <p>
1254 * Algo:<br>
1255 * <ol>
1256 * <li>Poll on the keys in order:
1257 * <ol type="a">
1258 * <li>Keep adding the mapped values to these keys (runningSum)</li>
1259 * <li>Each time runningSum reaches 0, add the start Key from when the runningSum had started to a
1260 * boundary list.</li>
1261 * </ol>
1262 * </li>
1263 * <li>Return the boundary list.</li>
1264 * </ol>
1266 public static byte[][] inferBoundaries(SortedMap<byte[], Integer> bdryMap) {
1267 List<byte[]> keysArray = new ArrayList<>();
1268 int runningValue = 0;
1269 byte[] currStartKey = null;
1270 boolean firstBoundary = true;
1272 for (Map.Entry<byte[], Integer> item : bdryMap.entrySet()) {
1273 if (runningValue == 0) {
1274 currStartKey = item.getKey();
1276 runningValue += item.getValue();
1277 if (runningValue == 0) {
1278 if (!firstBoundary) {
1279 keysArray.add(currStartKey);
1281 firstBoundary = false;
1285 return keysArray.toArray(new byte[0][]);