2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.tool
;
20 import static java
.lang
.String
.format
;
22 import java
.io
.FileNotFoundException
;
23 import java
.io
.IOException
;
24 import java
.io
.InterruptedIOException
;
25 import java
.nio
.ByteBuffer
;
26 import java
.util
.ArrayDeque
;
27 import java
.util
.ArrayList
;
28 import java
.util
.Arrays
;
29 import java
.util
.Collection
;
30 import java
.util
.Collections
;
31 import java
.util
.Deque
;
32 import java
.util
.HashMap
;
33 import java
.util
.HashSet
;
34 import java
.util
.List
;
36 import java
.util
.Map
.Entry
;
37 import java
.util
.Optional
;
39 import java
.util
.SortedMap
;
40 import java
.util
.TreeMap
;
41 import java
.util
.UUID
;
42 import java
.util
.concurrent
.Callable
;
43 import java
.util
.concurrent
.ExecutionException
;
44 import java
.util
.concurrent
.ExecutorService
;
45 import java
.util
.concurrent
.Future
;
46 import java
.util
.concurrent
.LinkedBlockingQueue
;
47 import java
.util
.concurrent
.ThreadPoolExecutor
;
48 import java
.util
.concurrent
.TimeUnit
;
49 import java
.util
.concurrent
.atomic
.AtomicInteger
;
50 import java
.util
.stream
.Collectors
;
51 import org
.apache
.commons
.lang3
.mutable
.MutableInt
;
52 import org
.apache
.hadoop
.conf
.Configuration
;
53 import org
.apache
.hadoop
.conf
.Configured
;
54 import org
.apache
.hadoop
.fs
.FileStatus
;
55 import org
.apache
.hadoop
.fs
.FileSystem
;
56 import org
.apache
.hadoop
.fs
.Path
;
57 import org
.apache
.hadoop
.fs
.permission
.FsPermission
;
58 import org
.apache
.hadoop
.hbase
.HBaseConfiguration
;
59 import org
.apache
.hadoop
.hbase
.HConstants
;
60 import org
.apache
.hadoop
.hbase
.TableName
;
61 import org
.apache
.hadoop
.hbase
.TableNotFoundException
;
62 import org
.apache
.hadoop
.hbase
.client
.Admin
;
63 import org
.apache
.hadoop
.hbase
.client
.ClientServiceCallable
;
64 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptor
;
65 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptorBuilder
;
66 import org
.apache
.hadoop
.hbase
.client
.Connection
;
67 import org
.apache
.hadoop
.hbase
.client
.ConnectionFactory
;
68 import org
.apache
.hadoop
.hbase
.client
.RegionLocator
;
69 import org
.apache
.hadoop
.hbase
.client
.RpcRetryingCallerFactory
;
70 import org
.apache
.hadoop
.hbase
.client
.SecureBulkLoadClient
;
71 import org
.apache
.hadoop
.hbase
.client
.Table
;
72 import org
.apache
.hadoop
.hbase
.client
.TableDescriptorBuilder
;
73 import org
.apache
.hadoop
.hbase
.io
.HFileLink
;
74 import org
.apache
.hadoop
.hbase
.io
.HalfStoreFileReader
;
75 import org
.apache
.hadoop
.hbase
.io
.Reference
;
76 import org
.apache
.hadoop
.hbase
.io
.compress
.Compression
.Algorithm
;
77 import org
.apache
.hadoop
.hbase
.io
.hfile
.CacheConfig
;
78 import org
.apache
.hadoop
.hbase
.io
.hfile
.HFile
;
79 import org
.apache
.hadoop
.hbase
.io
.hfile
.HFileContext
;
80 import org
.apache
.hadoop
.hbase
.io
.hfile
.HFileContextBuilder
;
81 import org
.apache
.hadoop
.hbase
.io
.hfile
.HFileDataBlockEncoder
;
82 import org
.apache
.hadoop
.hbase
.io
.hfile
.HFileScanner
;
83 import org
.apache
.hadoop
.hbase
.ipc
.RpcControllerFactory
;
84 import org
.apache
.hadoop
.hbase
.regionserver
.BloomType
;
85 import org
.apache
.hadoop
.hbase
.regionserver
.HStore
;
86 import org
.apache
.hadoop
.hbase
.regionserver
.StoreFileInfo
;
87 import org
.apache
.hadoop
.hbase
.regionserver
.StoreFileWriter
;
88 import org
.apache
.hadoop
.hbase
.security
.UserProvider
;
89 import org
.apache
.hadoop
.hbase
.security
.token
.FsDelegationToken
;
90 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
91 import org
.apache
.hadoop
.hbase
.util
.FSHDFSUtils
;
92 import org
.apache
.hadoop
.hbase
.util
.FSVisitor
;
93 import org
.apache
.hadoop
.hbase
.util
.Pair
;
94 import org
.apache
.hadoop
.util
.Tool
;
95 import org
.apache
.hadoop
.util
.ToolRunner
;
96 import org
.apache
.yetus
.audience
.InterfaceAudience
;
97 import org
.slf4j
.Logger
;
98 import org
.slf4j
.LoggerFactory
;
100 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.annotations
.VisibleForTesting
;
101 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.HashMultimap
;
102 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.Lists
;
103 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.Maps
;
104 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.Multimap
;
105 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.Multimaps
;
106 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.util
.concurrent
.ThreadFactoryBuilder
;
109 * Tool to load the output of HFileOutputFormat into an existing table.
110 * @deprecated since 2.2.0, will be removed in 3.0.0. Use {@link BulkLoadHFiles} instead. Please
111 * rewrite your code if you rely on methods other than the {@link #run(Map, TableName)}
112 * and {@link #run(String, TableName)}, as all the methods other than them will be
113 * removed with no replacement.
116 @InterfaceAudience.Public
117 public class LoadIncrementalHFiles
extends Configured
implements Tool
{
119 private static final Logger LOG
= LoggerFactory
.getLogger(LoadIncrementalHFiles
.class);
122 * @deprecated since 2.2.0, will be removed in 3.0.0, with no replacement. End user should not
123 * depend on this value.
126 public static final String NAME
= BulkLoadHFilesTool
.NAME
;
127 static final String RETRY_ON_IO_EXCEPTION
= BulkLoadHFiles
.RETRY_ON_IO_EXCEPTION
;
128 public static final String MAX_FILES_PER_REGION_PER_FAMILY
=
129 BulkLoadHFiles
.MAX_FILES_PER_REGION_PER_FAMILY
;
130 private static final String ASSIGN_SEQ_IDS
= BulkLoadHFiles
.ASSIGN_SEQ_IDS
;
131 public final static String CREATE_TABLE_CONF_KEY
= BulkLoadHFiles
.CREATE_TABLE_CONF_KEY
;
132 public final static String IGNORE_UNMATCHED_CF_CONF_KEY
=
133 BulkLoadHFiles
.IGNORE_UNMATCHED_CF_CONF_KEY
;
134 public final static String ALWAYS_COPY_FILES
= BulkLoadHFiles
.ALWAYS_COPY_FILES
;
136 // We use a '.' prefix which is ignored when walking directory trees
137 // above. It is invalid family name.
138 static final String TMP_DIR
= ".tmp";
140 private final int maxFilesPerRegionPerFamily
;
141 private final boolean assignSeqIds
;
143 // Source delegation token
144 private final FsDelegationToken fsDelegationToken
;
145 private final UserProvider userProvider
;
146 private final int nrThreads
;
147 private AtomicInteger numRetries
;
148 private final RpcControllerFactory rpcControllerFactory
;
150 private String bulkToken
;
153 * Represents an HFile waiting to be loaded. An queue is used in this class in order to support
154 * the case where a region has split during the process of the load. When this happens, the HFile
155 * is split into two physical parts across the new region boundary, and each part is added back
156 * into the queue. The import process finishes when the queue is empty.
157 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use {@link BulkLoadHFiles} instead.
158 * @see BulkLoadHFiles
159 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21782">HBASE-21782</a>
161 @InterfaceAudience.Public
163 public static class LoadQueueItem
extends BulkLoadHFiles
.LoadQueueItem
{
165 public LoadQueueItem(byte[] family
, Path hfilePath
) {
166 super(family
, hfilePath
);
170 public LoadIncrementalHFiles(Configuration conf
) {
171 // make a copy, just to be sure we're not overriding someone else's config
172 super(HBaseConfiguration
.create(conf
));
174 // disable blockcache for tool invocation, see HBASE-10500
175 conf
.setFloat(HConstants
.HFILE_BLOCK_CACHE_SIZE_KEY
, 0);
176 userProvider
= UserProvider
.instantiate(conf
);
177 fsDelegationToken
= new FsDelegationToken(userProvider
, "renewer");
178 assignSeqIds
= conf
.getBoolean(ASSIGN_SEQ_IDS
, true);
179 maxFilesPerRegionPerFamily
= conf
.getInt(MAX_FILES_PER_REGION_PER_FAMILY
, 32);
180 nrThreads
= conf
.getInt("hbase.loadincremental.threads.max",
181 Runtime
.getRuntime().availableProcessors());
182 numRetries
= new AtomicInteger(0);
183 rpcControllerFactory
= new RpcControllerFactory(conf
);
186 private void usage() {
187 System
.err
.println("usage: " + "bin/hbase completebulkload <-Dargs> "
188 + "</path/to/hfileoutputformat-output> <tablename>\n"
189 + "\t-D" + CREATE_TABLE_CONF_KEY
+ "=no can be used to avoid creation "
190 + "of a table by this tool.\n"
191 + "\t Note: if you set this to 'no', then target table must already exist.\n"
192 + "\t-D" + IGNORE_UNMATCHED_CF_CONF_KEY
+ "=yes can be used to ignore "
193 + "unmatched column families.\n"
194 + "\t-loadTable switch implies your baseDirectory to store file has a "
195 + "depth of 3, table must exist\n"
196 + "\t and -loadTable switch is the last option on the command line.\n\n");
200 * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the
201 * passed directory and validates whether the prepared queue has all the valid table column
203 * @param hfilesDir directory containing list of hfiles to be loaded into the table
204 * @param table table to which hfiles should be loaded
205 * @param queue queue which needs to be loaded into the table
206 * @param validateHFile if true hfiles will be validated for its format
207 * @throws IOException If any I/O or network error occurred
209 public void prepareHFileQueue(Path hfilesDir
, Table table
, Deque
<LoadQueueItem
> queue
,
210 boolean validateHFile
) throws IOException
{
211 prepareHFileQueue(hfilesDir
, table
, queue
, validateHFile
, false);
215 * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the
216 * passed directory and validates whether the prepared queue has all the valid table column
218 * @param hfilesDir directory containing list of hfiles to be loaded into the table
219 * @param table table to which hfiles should be loaded
220 * @param queue queue which needs to be loaded into the table
221 * @param validateHFile if true hfiles will be validated for its format
222 * @param silence true to ignore unmatched column families
223 * @throws IOException If any I/O or network error occurred
225 public void prepareHFileQueue(Path hfilesDir
, Table table
, Deque
<LoadQueueItem
> queue
,
226 boolean validateHFile
, boolean silence
) throws IOException
{
227 discoverLoadQueue(queue
, hfilesDir
, validateHFile
);
228 validateFamiliesInHFiles(table
, queue
, silence
);
232 * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the
233 * passed directory and validates whether the prepared queue has all the valid table column
235 * @param map map of family to List of hfiles
236 * @param table table to which hfiles should be loaded
237 * @param queue queue which needs to be loaded into the table
238 * @param silence true to ignore unmatched column families
239 * @throws IOException If any I/O or network error occurred
241 public void prepareHFileQueue(Map
<byte[], List
<Path
>> map
, Table table
,
242 Deque
<LoadQueueItem
> queue
, boolean silence
) throws IOException
{
243 populateLoadQueue(queue
, map
);
244 validateFamiliesInHFiles(table
, queue
, silence
);
248 * Perform a bulk load of the given directory into the given pre-existing table. This method is
250 * @param hfofDir the directory that was provided as the output path of a job using
252 * @param admin the Admin
253 * @param table the table to load into
254 * @param regionLocator region locator
255 * @throws TableNotFoundException if table does not yet exist
257 public Map
<LoadQueueItem
, ByteBuffer
> doBulkLoad(Path hfofDir
, final Admin admin
, Table table
,
258 RegionLocator regionLocator
) throws TableNotFoundException
, IOException
{
259 return doBulkLoad(hfofDir
, admin
, table
, regionLocator
, false, false);
263 * Perform a bulk load of the given directory into the given pre-existing table. This method is
265 * @param map map of family to List of hfiles
266 * @param admin the Admin
267 * @param table the table to load into
268 * @param regionLocator region locator
269 * @param silence true to ignore unmatched column families
270 * @param copyFile always copy hfiles if true
271 * @throws TableNotFoundException if table does not yet exist
273 public Map
<LoadQueueItem
, ByteBuffer
> doBulkLoad(Map
<byte[], List
<Path
>> map
, final Admin admin
,
274 Table table
, RegionLocator regionLocator
, boolean silence
, boolean copyFile
)
275 throws TableNotFoundException
, IOException
{
276 if (!admin
.isTableAvailable(regionLocator
.getName())) {
277 throw new TableNotFoundException("Table " + table
.getName() + " is not currently available.");
279 // LQI queue does not need to be threadsafe -- all operations on this queue
280 // happen in this thread
281 Deque
<LoadQueueItem
> queue
= new ArrayDeque
<>();
282 ExecutorService pool
= null;
283 SecureBulkLoadClient secureClient
= null;
285 prepareHFileQueue(map
, table
, queue
, silence
);
286 if (queue
.isEmpty()) {
287 LOG
.warn("Bulk load operation did not get any files to load");
288 return Collections
.emptyMap();
290 pool
= createExecutorService();
291 secureClient
= new SecureBulkLoadClient(table
.getConfiguration(), table
);
292 return performBulkLoad(admin
, table
, regionLocator
, queue
, pool
, secureClient
, copyFile
);
294 cleanup(admin
, queue
, pool
, secureClient
);
299 * Perform a bulk load of the given directory into the given pre-existing table. This method is
301 * @param hfofDir the directory that was provided as the output path of a job using
303 * @param admin the Admin
304 * @param table the table to load into
305 * @param regionLocator region locator
306 * @param silence true to ignore unmatched column families
307 * @param copyFile always copy hfiles if true
308 * @throws TableNotFoundException if table does not yet exist
310 public Map
<LoadQueueItem
, ByteBuffer
> doBulkLoad(Path hfofDir
, final Admin admin
, Table table
,
311 RegionLocator regionLocator
, boolean silence
, boolean copyFile
)
312 throws TableNotFoundException
, IOException
{
313 if (!admin
.isTableAvailable(regionLocator
.getName())) {
314 throw new TableNotFoundException("Table " + table
.getName() + " is not currently available.");
318 * Checking hfile format is a time-consuming operation, we should have an option to skip this
319 * step when bulkloading millions of HFiles. See HBASE-13985.
321 boolean validateHFile
= getConf().getBoolean("hbase.loadincremental.validate.hfile", true);
322 if (!validateHFile
) {
323 LOG
.warn("You are skipping HFiles validation, it might cause some data loss if files " +
324 "are not correct. If you fail to read data from your table after using this " +
325 "option, consider removing the files and bulkload again without this option. " +
328 // LQI queue does not need to be threadsafe -- all operations on this queue
329 // happen in this thread
330 Deque
<LoadQueueItem
> queue
= new ArrayDeque
<>();
331 ExecutorService pool
= null;
332 SecureBulkLoadClient secureClient
= null;
334 prepareHFileQueue(hfofDir
, table
, queue
, validateHFile
, silence
);
336 if (queue
.isEmpty()) {
338 "Bulk load operation did not find any files to load in directory {}. " +
339 "Does it contain files in subdirectories that correspond to column family names?",
340 (hfofDir
!= null ? hfofDir
.toUri().toString() : ""));
341 return Collections
.emptyMap();
343 pool
= createExecutorService();
344 secureClient
= new SecureBulkLoadClient(table
.getConfiguration(), table
);
345 return performBulkLoad(admin
, table
, regionLocator
, queue
, pool
, secureClient
, copyFile
);
347 cleanup(admin
, queue
, pool
, secureClient
);
352 * Used by the replication sink to load the hfiles from the source cluster. It does the following,
354 * <li>LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}</li>
355 * <li>LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap)
358 * @param table Table to which these hfiles should be loaded to
359 * @param conn Connection to use
360 * @param queue {@link LoadQueueItem} has hfiles yet to be loaded
361 * @param startEndKeys starting and ending row keys of the region
363 public void loadHFileQueue(Table table
, Connection conn
, Deque
<LoadQueueItem
> queue
,
364 Pair
<byte[][], byte[][]> startEndKeys
) throws IOException
{
365 loadHFileQueue(table
, conn
, queue
, startEndKeys
, false);
369 * Used by the replication sink to load the hfiles from the source cluster. It does the following,
371 * <li>LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}</li>
372 * <li>LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap)
375 * @param table Table to which these hfiles should be loaded to
376 * @param conn Connection to use
377 * @param queue {@link LoadQueueItem} has hfiles yet to be loaded
378 * @param startEndKeys starting and ending row keys of the region
380 public void loadHFileQueue(Table table
, Connection conn
, Deque
<LoadQueueItem
> queue
,
381 Pair
<byte[][], byte[][]> startEndKeys
, boolean copyFile
) throws IOException
{
382 ExecutorService pool
= null;
384 pool
= createExecutorService();
385 Multimap
<ByteBuffer
, LoadQueueItem
> regionGroups
=
386 groupOrSplitPhase(table
, pool
, queue
, startEndKeys
).getFirst();
387 bulkLoadPhase(table
, conn
, pool
, queue
, regionGroups
, copyFile
, null);
395 private Map
<LoadQueueItem
, ByteBuffer
> performBulkLoad(Admin admin
, Table table
,
396 RegionLocator regionLocator
, Deque
<LoadQueueItem
> queue
, ExecutorService pool
,
397 SecureBulkLoadClient secureClient
, boolean copyFile
) throws IOException
{
400 fsDelegationToken
.acquireDelegationToken(queue
.peek().getFilePath().getFileSystem(getConf()));
401 bulkToken
= secureClient
.prepareBulkLoad(admin
.getConnection());
402 Pair
<Multimap
<ByteBuffer
, LoadQueueItem
>, Set
<String
>> pair
= null;
404 Map
<LoadQueueItem
, ByteBuffer
> item2RegionMap
= new HashMap
<>();
405 // Assumes that region splits can happen while this occurs.
406 while (!queue
.isEmpty()) {
407 // need to reload split keys each iteration.
408 final Pair
<byte[][], byte[][]> startEndKeys
= regionLocator
.getStartEndKeys();
410 LOG
.info("Split occurred while grouping HFiles, retry attempt " + count
+ " with " +
411 queue
.size() + " files remaining to group or split");
414 int maxRetries
= getConf().getInt(HConstants
.BULKLOAD_MAX_RETRIES_NUMBER
, 10);
415 maxRetries
= Math
.max(maxRetries
, startEndKeys
.getFirst().length
+ 1);
416 if (maxRetries
!= 0 && count
>= maxRetries
) {
417 throw new IOException(
418 "Retry attempted " + count
+ " times without completing, bailing out");
422 // Using ByteBuffer for byte[] equality semantics
423 pair
= groupOrSplitPhase(table
, pool
, queue
, startEndKeys
);
424 Multimap
<ByteBuffer
, LoadQueueItem
> regionGroups
= pair
.getFirst();
426 if (!checkHFilesCountPerRegionPerFamily(regionGroups
)) {
427 // Error is logged inside checkHFilesCountPerRegionPerFamily.
428 throw new IOException("Trying to load more than " + maxFilesPerRegionPerFamily
+
429 " hfiles to one family of one region");
432 bulkLoadPhase(table
, admin
.getConnection(), pool
, queue
, regionGroups
, copyFile
,
435 // NOTE: The next iteration's split / group could happen in parallel to
436 // atomic bulkloads assuming that there are splits and no merges, and
437 // that we can atomically pull out the groups we want to retry.
440 if (!queue
.isEmpty()) {
441 throw new RuntimeException("Bulk load aborted with some files not yet loaded." +
442 "Please check log for more details.");
444 return item2RegionMap
;
448 * This takes the LQI's grouped by likely regions and attempts to bulk load them. Any failures are
449 * re-queued for another pass with the groupOrSplitPhase.
451 * protected for testing.
454 protected void bulkLoadPhase(Table table
, Connection conn
, ExecutorService pool
,
455 Deque
<LoadQueueItem
> queue
, Multimap
<ByteBuffer
, LoadQueueItem
> regionGroups
,
456 boolean copyFile
, Map
<LoadQueueItem
, ByteBuffer
> item2RegionMap
) throws IOException
{
457 // atomically bulk load the groups.
458 Set
<Future
<List
<LoadQueueItem
>>> loadingFutures
= new HashSet
<>();
459 for (Entry
<ByteBuffer
, ?
extends Collection
<LoadQueueItem
>> e
: regionGroups
.asMap()
461 byte[] first
= e
.getKey().array();
462 Collection
<LoadQueueItem
> lqis
= e
.getValue();
464 ClientServiceCallable
<byte[]> serviceCallable
=
465 buildClientServiceCallable(conn
, table
.getName(), first
, lqis
, copyFile
);
467 Callable
<List
<LoadQueueItem
>> call
= new Callable
<List
<LoadQueueItem
>>() {
469 public List
<LoadQueueItem
> call() throws Exception
{
470 List
<LoadQueueItem
> toRetry
=
471 tryAtomicRegionLoad(serviceCallable
, table
.getName(), first
, lqis
);
475 if (item2RegionMap
!= null) {
476 for (LoadQueueItem lqi
: lqis
) {
477 item2RegionMap
.put(lqi
, e
.getKey());
480 loadingFutures
.add(pool
.submit(call
));
483 // get all the results.
484 for (Future
<List
<LoadQueueItem
>> future
: loadingFutures
) {
486 List
<LoadQueueItem
> toRetry
= future
.get();
488 if (item2RegionMap
!= null) {
489 for (LoadQueueItem lqi
: toRetry
) {
490 item2RegionMap
.remove(lqi
);
493 // LQIs that are requeued to be regrouped.
494 queue
.addAll(toRetry
);
496 } catch (ExecutionException e1
) {
497 Throwable t
= e1
.getCause();
498 if (t
instanceof IOException
) {
499 // At this point something unrecoverable has happened.
500 // TODO Implement bulk load recovery
501 throw new IOException("BulkLoad encountered an unrecoverable problem", t
);
503 LOG
.error("Unexpected execution exception during bulk load", e1
);
504 throw new IllegalStateException(t
);
505 } catch (InterruptedException e1
) {
506 LOG
.error("Unexpected interrupted exception during bulk load", e1
);
507 throw (InterruptedIOException
) new InterruptedIOException().initCause(e1
);
513 protected ClientServiceCallable
<byte[]> buildClientServiceCallable(Connection conn
,
514 TableName tableName
, byte[] first
, Collection
<LoadQueueItem
> lqis
, boolean copyFile
) {
515 List
<Pair
<byte[], String
>> famPaths
=
516 lqis
.stream().map(lqi
-> Pair
.newPair(lqi
.getFamily(), lqi
.getFilePath().toString()))
517 .collect(Collectors
.toList());
518 return new ClientServiceCallable
<byte[]>(conn
, tableName
, first
,
519 rpcControllerFactory
.newController(), HConstants
.PRIORITY_UNSET
) {
521 protected byte[] rpcCall() throws Exception
{
522 SecureBulkLoadClient secureClient
= null;
523 boolean success
= false;
525 if (LOG
.isDebugEnabled()) {
526 LOG
.debug("Going to connect to server " + getLocation() + " for row " +
527 Bytes
.toStringBinary(getRow()) + " with hfile group " +
528 LoadIncrementalHFiles
.this.toString(famPaths
));
530 byte[] regionName
= getLocation().getRegion().getRegionName();
531 try (Table table
= conn
.getTable(getTableName())) {
532 secureClient
= new SecureBulkLoadClient(getConf(), table
);
533 success
= secureClient
.secureBulkLoadHFiles(getStub(), famPaths
, regionName
,
534 assignSeqIds
, fsDelegationToken
.getUserToken(), bulkToken
, copyFile
);
536 return success ? regionName
: null;
538 // Best effort copying of files that might not have been imported
539 // from the staging directory back to original location
541 if (secureClient
!= null && !success
) {
542 FileSystem targetFs
= FileSystem
.get(getConf());
543 FileSystem sourceFs
= lqis
.iterator().next().getFilePath().getFileSystem(getConf());
544 // Check to see if the source and target filesystems are the same
545 // If they are the same filesystem, we will try move the files back
546 // because previously we moved them to the staging directory.
547 if (FSHDFSUtils
.isSameHdfs(getConf(), sourceFs
, targetFs
)) {
548 for (Pair
<byte[], String
> el
: famPaths
) {
549 Path hfileStagingPath
= null;
550 Path hfileOrigPath
= new Path(el
.getSecond());
552 hfileStagingPath
= new Path(new Path(bulkToken
, Bytes
.toString(el
.getFirst())),
553 hfileOrigPath
.getName());
554 if (targetFs
.rename(hfileStagingPath
, hfileOrigPath
)) {
555 LOG
.debug("Moved back file " + hfileOrigPath
+ " from " + hfileStagingPath
);
556 } else if (targetFs
.exists(hfileStagingPath
)) {
558 "Unable to move back file " + hfileOrigPath
+ " from " + hfileStagingPath
);
560 } catch (Exception ex
) {
562 "Unable to move back file " + hfileOrigPath
+ " from " + hfileStagingPath
, ex
);
572 private boolean checkHFilesCountPerRegionPerFamily(
573 final Multimap
<ByteBuffer
, LoadQueueItem
> regionGroups
) {
574 for (Map
.Entry
<ByteBuffer
, Collection
<LoadQueueItem
>> e
: regionGroups
.asMap().entrySet()) {
575 Map
<byte[], MutableInt
> filesMap
= new TreeMap
<>(Bytes
.BYTES_COMPARATOR
);
576 for (LoadQueueItem lqi
: e
.getValue()) {
577 MutableInt count
= filesMap
.computeIfAbsent(lqi
.getFamily(), k
-> new MutableInt());
579 if (count
.intValue() > maxFilesPerRegionPerFamily
) {
580 LOG
.error("Trying to load more than " + maxFilesPerRegionPerFamily
+
581 " hfiles to family " + Bytes
.toStringBinary(lqi
.getFamily()) +
582 " of region with start key " + Bytes
.toStringBinary(e
.getKey()));
591 * @param table the table to load into
592 * @param pool the ExecutorService
593 * @param queue the queue for LoadQueueItem
594 * @param startEndKeys start and end keys
595 * @return A map that groups LQI by likely bulk load region targets and Set of missing hfiles.
597 private Pair
<Multimap
<ByteBuffer
, LoadQueueItem
>, Set
<String
>> groupOrSplitPhase(
598 final Table table
, ExecutorService pool
, Deque
<LoadQueueItem
> queue
,
599 final Pair
<byte[][], byte[][]> startEndKeys
) throws IOException
{
600 // <region start key, LQI> need synchronized only within this scope of this
601 // phase because of the puts that happen in futures.
602 Multimap
<ByteBuffer
, LoadQueueItem
> rgs
= HashMultimap
.create();
603 final Multimap
<ByteBuffer
, LoadQueueItem
> regionGroups
= Multimaps
.synchronizedMultimap(rgs
);
604 Set
<String
> missingHFiles
= new HashSet
<>();
605 Pair
<Multimap
<ByteBuffer
, LoadQueueItem
>, Set
<String
>> pair
=
606 new Pair
<>(regionGroups
, missingHFiles
);
608 // drain LQIs and figure out bulk load groups
609 Set
<Future
<Pair
<List
<LoadQueueItem
>, String
>>> splittingFutures
= new HashSet
<>();
610 while (!queue
.isEmpty()) {
611 final LoadQueueItem item
= queue
.remove();
613 final Callable
<Pair
<List
<LoadQueueItem
>, String
>> call
=
614 new Callable
<Pair
<List
<LoadQueueItem
>, String
>>() {
616 public Pair
<List
<LoadQueueItem
>, String
> call() throws Exception
{
617 Pair
<List
<LoadQueueItem
>, String
> splits
=
618 groupOrSplit(regionGroups
, item
, table
, startEndKeys
);
622 splittingFutures
.add(pool
.submit(call
));
624 // get all the results. All grouping and splitting must finish before
625 // we can attempt the atomic loads.
626 for (Future
<Pair
<List
<LoadQueueItem
>, String
>> lqis
: splittingFutures
) {
628 Pair
<List
<LoadQueueItem
>, String
> splits
= lqis
.get();
629 if (splits
!= null) {
630 if (splits
.getFirst() != null) {
631 queue
.addAll(splits
.getFirst());
633 missingHFiles
.add(splits
.getSecond());
636 } catch (ExecutionException e1
) {
637 Throwable t
= e1
.getCause();
638 if (t
instanceof IOException
) {
639 LOG
.error("IOException during splitting", e1
);
640 throw (IOException
) t
; // would have been thrown if not parallelized,
642 LOG
.error("Unexpected execution exception during splitting", e1
);
643 throw new IllegalStateException(t
);
644 } catch (InterruptedException e1
) {
645 LOG
.error("Unexpected interrupted exception during splitting", e1
);
646 throw (InterruptedIOException
) new InterruptedIOException().initCause(e1
);
652 private List
<LoadQueueItem
> splitStoreFile(final LoadQueueItem item
, final Table table
,
653 byte[] startKey
, byte[] splitKey
) throws IOException
{
654 Path hfilePath
= item
.getFilePath();
655 byte[] family
= item
.getFamily();
656 Path tmpDir
= hfilePath
.getParent();
657 if (!tmpDir
.getName().equals(TMP_DIR
)) {
658 tmpDir
= new Path(tmpDir
, TMP_DIR
);
661 LOG
.info("HFile at " + hfilePath
+ " no longer fits inside a single " + "region. Splitting...");
663 String uniqueName
= getUniqueName();
664 ColumnFamilyDescriptor familyDesc
= table
.getDescriptor().getColumnFamily(family
);
666 Path botOut
= new Path(tmpDir
, uniqueName
+ ".bottom");
667 Path topOut
= new Path(tmpDir
, uniqueName
+ ".top");
668 splitStoreFile(getConf(), hfilePath
, familyDesc
, splitKey
, botOut
, topOut
);
670 FileSystem fs
= tmpDir
.getFileSystem(getConf());
671 fs
.setPermission(tmpDir
, FsPermission
.valueOf("-rwxrwxrwx"));
672 fs
.setPermission(botOut
, FsPermission
.valueOf("-rwxrwxrwx"));
673 fs
.setPermission(topOut
, FsPermission
.valueOf("-rwxrwxrwx"));
675 // Add these back at the *front* of the queue, so there's a lower
676 // chance that the region will just split again before we get there.
677 List
<LoadQueueItem
> lqis
= new ArrayList
<>(2);
678 lqis
.add(new LoadQueueItem(family
, botOut
));
679 lqis
.add(new LoadQueueItem(family
, topOut
));
681 // If the current item is already the result of previous splits,
682 // we don't need it anymore. Clean up to save space.
683 // It is not part of the original input files.
685 if (tmpDir
.getName().equals(TMP_DIR
)) {
686 fs
.delete(hfilePath
, false);
688 } catch (IOException e
) {
689 LOG
.warn("Unable to delete temporary split file " + hfilePath
);
691 LOG
.info("Successfully split into new HFiles " + botOut
+ " and " + topOut
);
696 * Attempt to assign the given load queue item into its target region group. If the hfile boundary
697 * no longer fits into a region, physically splits the hfile such that the new bottom half will
698 * fit and returns the list of LQI's corresponding to the resultant hfiles.
700 * protected for testing
701 * @throws IOException if an IO failure is encountered
704 protected Pair
<List
<LoadQueueItem
>, String
> groupOrSplit(
705 Multimap
<ByteBuffer
, LoadQueueItem
> regionGroups
, final LoadQueueItem item
, final Table table
,
706 final Pair
<byte[][], byte[][]> startEndKeys
) throws IOException
{
707 Path hfilePath
= item
.getFilePath();
708 Optional
<byte[]> first
, last
;
709 try (HFile
.Reader hfr
= HFile
.createReader(hfilePath
.getFileSystem(getConf()), hfilePath
,
710 CacheConfig
.DISABLED
, true, getConf())) {
712 first
= hfr
.getFirstRowKey();
713 last
= hfr
.getLastRowKey();
714 } catch (FileNotFoundException fnfe
) {
715 LOG
.debug("encountered", fnfe
);
716 return new Pair
<>(null, hfilePath
.getName());
719 LOG
.info("Trying to load hfile=" + hfilePath
+ " first=" + first
.map(Bytes
::toStringBinary
) +
720 " last=" + last
.map(Bytes
::toStringBinary
));
721 if (!first
.isPresent() || !last
.isPresent()) {
722 assert !first
.isPresent() && !last
.isPresent();
723 // TODO what if this is due to a bad HFile?
724 LOG
.info("hfile " + hfilePath
+ " has no entries, skipping");
727 if (Bytes
.compareTo(first
.get(), last
.get()) > 0) {
728 throw new IllegalArgumentException("Invalid range: " + Bytes
.toStringBinary(first
.get()) +
729 " > " + Bytes
.toStringBinary(last
.get()));
731 int idx
= Arrays
.binarySearch(startEndKeys
.getFirst(), first
.get(), Bytes
.BYTES_COMPARATOR
);
733 // not on boundary, returns -(insertion index). Calculate region it
735 idx
= -(idx
+ 1) - 1;
737 int indexForCallable
= idx
;
740 * we can consider there is a region hole in following conditions. 1) if idx < 0,then first
741 * region info is lost. 2) if the endkey of a region is not equal to the startkey of the next
742 * region. 3) if the endkey of the last region is not empty.
744 if (indexForCallable
< 0) {
745 throw new IOException("The first region info for table " + table
.getName() +
746 " can't be found in hbase:meta.Please use hbck tool to fix it first.");
747 } else if ((indexForCallable
== startEndKeys
.getFirst().length
- 1) &&
748 !Bytes
.equals(startEndKeys
.getSecond()[indexForCallable
], HConstants
.EMPTY_BYTE_ARRAY
)) {
749 throw new IOException("The last region info for table " + table
.getName() +
750 " can't be found in hbase:meta.Please use hbck tool to fix it first.");
751 } else if (indexForCallable
+ 1 < startEndKeys
.getFirst().length
&&
752 !(Bytes
.compareTo(startEndKeys
.getSecond()[indexForCallable
],
753 startEndKeys
.getFirst()[indexForCallable
+ 1]) == 0)) {
754 throw new IOException("The endkey of one region for table " + table
.getName() +
755 " is not equal to the startkey of the next region in hbase:meta." +
756 "Please use hbck tool to fix it first.");
759 boolean lastKeyInRange
= Bytes
.compareTo(last
.get(), startEndKeys
.getSecond()[idx
]) < 0 ||
760 Bytes
.equals(startEndKeys
.getSecond()[idx
], HConstants
.EMPTY_BYTE_ARRAY
);
761 if (!lastKeyInRange
) {
762 List
<LoadQueueItem
> lqis
= splitStoreFile(item
, table
,
763 startEndKeys
.getFirst()[indexForCallable
], startEndKeys
.getSecond()[indexForCallable
]);
764 return new Pair
<>(lqis
, null);
768 regionGroups
.put(ByteBuffer
.wrap(startEndKeys
.getFirst()[idx
]), item
);
773 * Attempts to do an atomic load of many hfiles into a region. If it fails, it returns a list of
774 * hfiles that need to be retried. If it is successful it will return an empty list.
776 * NOTE: To maintain row atomicity guarantees, region server callable should succeed atomically
777 * and fails atomically.
779 * Protected for testing.
780 * @return empty list if success, list of items to retry on recoverable failure
783 protected List
<LoadQueueItem
> tryAtomicRegionLoad(ClientServiceCallable
<byte[]> serviceCallable
,
784 final TableName tableName
, final byte[] first
, final Collection
<LoadQueueItem
> lqis
)
786 List
<LoadQueueItem
> toRetry
= new ArrayList
<>();
788 Configuration conf
= getConf();
789 byte[] region
= RpcRetryingCallerFactory
.instantiate(conf
, null).<byte[]> newCaller()
790 .callWithRetries(serviceCallable
, Integer
.MAX_VALUE
);
791 if (region
== null) {
792 LOG
.warn("Attempt to bulk load region containing " + Bytes
.toStringBinary(first
) +
793 " into table " + tableName
+ " with files " + lqis
+
794 " failed. This is recoverable and they will be retried.");
795 toRetry
.addAll(lqis
); // return lqi's to retry
799 } catch (IOException e
) {
800 LOG
.error("Encountered unrecoverable error from region server, additional details: " +
801 serviceCallable
.getExceptionMessageAdditionalDetail(),
804 "Received a " + e
.getClass().getSimpleName()
805 + " from region server: "
806 + serviceCallable
.getExceptionMessageAdditionalDetail(), e
);
807 if (getConf().getBoolean(RETRY_ON_IO_EXCEPTION
, false)
808 && numRetries
.get() < getConf().getInt(
809 HConstants
.HBASE_CLIENT_RETRIES_NUMBER
,
810 HConstants
.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER
)) {
811 LOG
.warn("Will attempt to retry loading failed HFiles. Retry #"
812 + numRetries
.incrementAndGet());
813 toRetry
.addAll(lqis
);
816 LOG
.error(RETRY_ON_IO_EXCEPTION
+ " is disabled. Unable to recover");
822 * If the table is created for the first time, then "completebulkload" reads the files twice. More
823 * modifications necessary if we want to avoid doing it.
825 private void createTable(TableName tableName
, Path hfofDir
, Admin admin
) throws IOException
{
826 final FileSystem fs
= hfofDir
.getFileSystem(getConf());
828 // Add column families
829 // Build a set of keys
830 List
<ColumnFamilyDescriptorBuilder
> familyBuilders
= new ArrayList
<>();
831 SortedMap
<byte[], Integer
> map
= new TreeMap
<>(Bytes
.BYTES_COMPARATOR
);
832 visitBulkHFiles(fs
, hfofDir
, new BulkHFileVisitor
<ColumnFamilyDescriptorBuilder
>() {
834 public ColumnFamilyDescriptorBuilder
bulkFamily(byte[] familyName
) {
835 ColumnFamilyDescriptorBuilder builder
=
836 ColumnFamilyDescriptorBuilder
.newBuilder(familyName
);
837 familyBuilders
.add(builder
);
842 public void bulkHFile(ColumnFamilyDescriptorBuilder builder
, FileStatus hfileStatus
)
844 Path hfile
= hfileStatus
.getPath();
845 try (HFile
.Reader reader
=
846 HFile
.createReader(fs
, hfile
, CacheConfig
.DISABLED
, true, getConf())) {
847 if (builder
.getCompressionType() != reader
.getFileContext().getCompression()) {
848 builder
.setCompressionType(reader
.getFileContext().getCompression());
849 LOG
.info("Setting compression " + reader
.getFileContext().getCompression().name() +
850 " for family " + builder
.getNameAsString());
852 reader
.loadFileInfo();
853 byte[] first
= reader
.getFirstRowKey().get();
854 byte[] last
= reader
.getLastRowKey().get();
856 LOG
.info("Trying to figure out region boundaries hfile=" + hfile
+ " first=" +
857 Bytes
.toStringBinary(first
) + " last=" + Bytes
.toStringBinary(last
));
859 // To eventually infer start key-end key boundaries
860 Integer value
= map
.containsKey(first
) ? map
.get(first
) : 0;
861 map
.put(first
, value
+ 1);
863 value
= map
.containsKey(last
) ? map
.get(last
) : 0;
864 map
.put(last
, value
- 1);
869 byte[][] keys
= inferBoundaries(map
);
870 TableDescriptorBuilder tdBuilder
= TableDescriptorBuilder
.newBuilder(tableName
);
871 familyBuilders
.stream().map(ColumnFamilyDescriptorBuilder
::build
)
872 .forEachOrdered(tdBuilder
::setColumnFamily
);
873 admin
.createTable(tdBuilder
.build(), keys
);
875 LOG
.info("Table " + tableName
+ " is available!!");
878 private void cleanup(Admin admin
, Deque
<LoadQueueItem
> queue
, ExecutorService pool
,
879 SecureBulkLoadClient secureClient
) throws IOException
{
880 fsDelegationToken
.releaseDelegationToken();
881 if (bulkToken
!= null && secureClient
!= null) {
882 secureClient
.cleanupBulkLoad(admin
.getConnection(), bulkToken
);
887 if (!queue
.isEmpty()) {
888 StringBuilder err
= new StringBuilder();
889 err
.append("-------------------------------------------------\n");
890 err
.append("Bulk load aborted with some files not yet loaded:\n");
891 err
.append("-------------------------------------------------\n");
892 for (LoadQueueItem q
: queue
) {
893 err
.append(" ").append(q
.getFilePath()).append('\n');
895 LOG
.error(err
.toString());
899 // unique file name for the table
900 private String
getUniqueName() {
901 return UUID
.randomUUID().toString().replaceAll("-", "");
905 * Checks whether there is any invalid family name in HFiles to be bulk loaded.
907 private void validateFamiliesInHFiles(Table table
, Deque
<LoadQueueItem
> queue
, boolean silence
)
909 Set
<String
> familyNames
= Arrays
.asList(table
.getDescriptor().getColumnFamilies()).stream()
910 .map(f
-> f
.getNameAsString()).collect(Collectors
.toSet());
911 List
<String
> unmatchedFamilies
= queue
.stream().map(item
-> Bytes
.toString(item
.getFamily()))
912 .filter(fn
-> !familyNames
.contains(fn
)).distinct().collect(Collectors
.toList());
913 if (unmatchedFamilies
.size() > 0) {
915 "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: " +
916 unmatchedFamilies
+ "; valid family names of table " + table
.getName() + " are: " +
920 throw new IOException(msg
);
926 * Populate the Queue with given HFiles
928 private void populateLoadQueue(Deque
<LoadQueueItem
> ret
, Map
<byte[], List
<Path
>> map
) {
929 map
.forEach((k
, v
) -> v
.stream().map(p
-> new LoadQueueItem(k
, p
)).forEachOrdered(ret
::add
));
933 * Walk the given directory for all HFiles, and return a Queue containing all such files.
935 private void discoverLoadQueue(final Deque
<LoadQueueItem
> ret
, final Path hfofDir
,
936 final boolean validateHFile
) throws IOException
{
937 visitBulkHFiles(hfofDir
.getFileSystem(getConf()), hfofDir
, new BulkHFileVisitor
<byte[]>() {
939 public byte[] bulkFamily(final byte[] familyName
) {
944 public void bulkHFile(final byte[] family
, final FileStatus hfile
) throws IOException
{
945 long length
= hfile
.getLen();
946 if (length
> getConf().getLong(HConstants
.HREGION_MAX_FILESIZE
,
947 HConstants
.DEFAULT_MAX_FILE_SIZE
)) {
948 LOG
.warn("Trying to bulk load hfile " + hfile
.getPath() + " with size: " + length
+
949 " bytes can be problematic as it may lead to oversplitting.");
951 ret
.add(new LoadQueueItem(family
, hfile
.getPath()));
956 private interface BulkHFileVisitor
<TFamily
> {
958 TFamily
bulkFamily(byte[] familyName
) throws IOException
;
960 void bulkHFile(TFamily family
, FileStatus hfileStatus
) throws IOException
;
964 * Iterate over the bulkDir hfiles. Skip reference, HFileLink, files starting with "_" and
967 private static <TFamily
> void visitBulkHFiles(final FileSystem fs
, final Path bulkDir
,
968 final BulkHFileVisitor
<TFamily
> visitor
) throws IOException
{
969 visitBulkHFiles(fs
, bulkDir
, visitor
, true);
973 * Iterate over the bulkDir hfiles. Skip reference, HFileLink, files starting with "_". Check and
974 * skip non-valid hfiles by default, or skip this validation by setting
975 * 'hbase.loadincremental.validate.hfile' to false.
977 private static <TFamily
> void visitBulkHFiles(FileSystem fs
, Path bulkDir
,
978 BulkHFileVisitor
<TFamily
> visitor
, boolean validateHFile
) throws IOException
{
979 FileStatus
[] familyDirStatuses
= fs
.listStatus(bulkDir
);
980 for (FileStatus familyStat
: familyDirStatuses
) {
981 if (!familyStat
.isDirectory()) {
982 LOG
.warn("Skipping non-directory " + familyStat
.getPath());
985 Path familyDir
= familyStat
.getPath();
986 byte[] familyName
= Bytes
.toBytes(familyDir
.getName());
987 // Skip invalid family
989 ColumnFamilyDescriptorBuilder
.isLegalColumnFamilyName(familyName
);
990 } catch (IllegalArgumentException e
) {
991 LOG
.warn("Skipping invalid " + familyStat
.getPath());
994 TFamily family
= visitor
.bulkFamily(familyName
);
996 FileStatus
[] hfileStatuses
= fs
.listStatus(familyDir
);
997 for (FileStatus hfileStatus
: hfileStatuses
) {
998 if (!fs
.isFile(hfileStatus
.getPath())) {
999 LOG
.warn("Skipping non-file " + hfileStatus
);
1003 Path hfile
= hfileStatus
.getPath();
1004 // Skip "_", reference, HFileLink
1005 String fileName
= hfile
.getName();
1006 if (fileName
.startsWith("_")) {
1009 if (StoreFileInfo
.isReference(fileName
)) {
1010 LOG
.warn("Skipping reference " + fileName
);
1013 if (HFileLink
.isHFileLink(fileName
)) {
1014 LOG
.warn("Skipping HFileLink " + fileName
);
1018 // Validate HFile Format if needed
1019 if (validateHFile
) {
1021 if (!HFile
.isHFileFormat(fs
, hfile
)) {
1022 LOG
.warn("the file " + hfile
+ " doesn't seems to be an hfile. skipping");
1025 } catch (FileNotFoundException e
) {
1026 LOG
.warn("the file " + hfile
+ " was removed");
1031 visitor
.bulkHFile(family
, hfileStatus
);
1036 // Initialize a thread pool
1037 private ExecutorService
createExecutorService() {
1038 ThreadPoolExecutor pool
= new ThreadPoolExecutor(nrThreads
, nrThreads
, 60, TimeUnit
.SECONDS
,
1039 new LinkedBlockingQueue
<>(),
1040 new ThreadFactoryBuilder().setNameFormat("LoadIncrementalHFiles-%1$d").build());
1041 pool
.allowCoreThreadTimeOut(true);
1045 private final String
toString(List
<Pair
<byte[], String
>> list
) {
1046 StringBuilder sb
= new StringBuilder();
1049 sb
.append('{').append(Bytes
.toStringBinary(p
.getFirst())).append(',').append(p
.getSecond())
1053 return sb
.toString();
1057 * Split a storefile into a top and bottom half, maintaining the metadata, recreating bloom
1061 static void splitStoreFile(Configuration conf
, Path inFile
, ColumnFamilyDescriptor familyDesc
,
1062 byte[] splitKey
, Path bottomOut
, Path topOut
) throws IOException
{
1063 // Open reader with no block cache, and not in-memory
1064 Reference topReference
= Reference
.createTopReference(splitKey
);
1065 Reference bottomReference
= Reference
.createBottomReference(splitKey
);
1067 copyHFileHalf(conf
, inFile
, topOut
, topReference
, familyDesc
);
1068 copyHFileHalf(conf
, inFile
, bottomOut
, bottomReference
, familyDesc
);
1072 * Copy half of an HFile into a new HFile.
1074 private static void copyHFileHalf(Configuration conf
, Path inFile
, Path outFile
,
1075 Reference reference
, ColumnFamilyDescriptor familyDescriptor
) throws IOException
{
1076 FileSystem fs
= inFile
.getFileSystem(conf
);
1077 CacheConfig cacheConf
= CacheConfig
.DISABLED
;
1078 HalfStoreFileReader halfReader
= null;
1079 StoreFileWriter halfWriter
= null;
1081 halfReader
= new HalfStoreFileReader(fs
, inFile
, cacheConf
, reference
, true,
1082 new AtomicInteger(0), true, conf
);
1083 Map
<byte[], byte[]> fileInfo
= halfReader
.loadFileInfo();
1085 int blocksize
= familyDescriptor
.getBlocksize();
1086 Algorithm compression
= familyDescriptor
.getCompressionType();
1087 BloomType bloomFilterType
= familyDescriptor
.getBloomFilterType();
1088 HFileContext hFileContext
= new HFileContextBuilder().withCompression(compression
)
1089 .withChecksumType(HStore
.getChecksumType(conf
))
1090 .withBytesPerCheckSum(HStore
.getBytesPerChecksum(conf
)).withBlockSize(blocksize
)
1091 .withDataBlockEncoding(familyDescriptor
.getDataBlockEncoding()).withIncludesTags(true)
1093 halfWriter
= new StoreFileWriter
.Builder(conf
, cacheConf
, fs
).withFilePath(outFile
)
1094 .withBloomType(bloomFilterType
).withFileContext(hFileContext
).build();
1095 HFileScanner scanner
= halfReader
.getScanner(false, false, false);
1098 halfWriter
.append(scanner
.getCell());
1099 } while (scanner
.next());
1101 for (Map
.Entry
<byte[], byte[]> entry
: fileInfo
.entrySet()) {
1102 if (shouldCopyHFileMetaKey(entry
.getKey())) {
1103 halfWriter
.appendFileInfo(entry
.getKey(), entry
.getValue());
1107 if (halfReader
!= null) {
1109 halfReader
.close(cacheConf
.shouldEvictOnClose());
1110 } catch (IOException e
) {
1111 LOG
.warn("failed to close hfile reader for " + inFile
, e
);
1114 if (halfWriter
!= null) {
1121 private static boolean shouldCopyHFileMetaKey(byte[] key
) {
1122 // skip encoding to keep hfile meta consistent with data block info, see HBASE-15085
1123 if (Bytes
.equals(key
, HFileDataBlockEncoder
.DATA_BLOCK_ENCODING
)) {
1127 return !HFile
.isReservedFileInfoKey(key
);
1130 private boolean isCreateTable() {
1131 return "yes".equalsIgnoreCase(getConf().get(CREATE_TABLE_CONF_KEY
, "yes"));
1134 private boolean isSilence() {
1135 return "yes".equalsIgnoreCase(getConf().get(IGNORE_UNMATCHED_CF_CONF_KEY
, ""));
1138 private boolean isAlwaysCopyFiles() {
1139 return getConf().getBoolean(ALWAYS_COPY_FILES
, false);
1142 protected final Map
<LoadQueueItem
, ByteBuffer
> run(Path hfofDir
, TableName tableName
)
1143 throws IOException
{
1144 try (Connection connection
= ConnectionFactory
.createConnection(getConf());
1145 Admin admin
= connection
.getAdmin()) {
1146 if (!admin
.tableExists(tableName
)) {
1147 if (isCreateTable()) {
1148 createTable(tableName
, hfofDir
, admin
);
1150 String errorMsg
= format("Table '%s' does not exist.", tableName
);
1151 LOG
.error(errorMsg
);
1152 throw new TableNotFoundException(errorMsg
);
1155 try (Table table
= connection
.getTable(tableName
);
1156 RegionLocator locator
= connection
.getRegionLocator(tableName
)) {
1157 return doBulkLoad(hfofDir
, admin
, table
, locator
, isSilence(),
1158 isAlwaysCopyFiles());
1163 * Perform bulk load on the given table.
1164 * @param hfofDir the directory that was provided as the output path of a job using
1166 * @param tableName the table to load into
1168 public Map
<LoadQueueItem
, ByteBuffer
> run(String hfofDir
, TableName tableName
)
1169 throws IOException
{
1170 return run(new Path(hfofDir
), tableName
);
1174 * Perform bulk load on the given table.
1175 * @param family2Files map of family to List of hfiles
1176 * @param tableName the table to load into
1178 public Map
<LoadQueueItem
, ByteBuffer
> run(Map
<byte[], List
<Path
>> family2Files
,
1179 TableName tableName
) throws IOException
{
1180 try (Connection connection
= ConnectionFactory
.createConnection(getConf());
1181 Admin admin
= connection
.getAdmin()) {
1182 if (!admin
.tableExists(tableName
)) {
1183 String errorMsg
= format("Table '%s' does not exist.", tableName
);
1184 LOG
.error(errorMsg
);
1185 throw new TableNotFoundException(errorMsg
);
1187 try (Table table
= connection
.getTable(tableName
);
1188 RegionLocator locator
= connection
.getRegionLocator(tableName
)) {
1189 return doBulkLoad(family2Files
, admin
, table
, locator
, isSilence(), isAlwaysCopyFiles());
1195 public int run(String
[] args
) throws Exception
{
1196 if (args
.length
!= 2 && args
.length
!= 3) {
1200 String dirPath
= args
[0];
1201 TableName tableName
= TableName
.valueOf(args
[1]);
1204 if (args
.length
== 2) {
1205 return !run(dirPath
, tableName
).isEmpty() ?
0 : -1;
1207 Map
<byte[], List
<Path
>> family2Files
= Maps
.newHashMap();
1208 FileSystem fs
= FileSystem
.get(getConf());
1209 for (FileStatus regionDir
: fs
.listStatus(new Path(dirPath
))) {
1210 FSVisitor
.visitRegionStoreFiles(fs
, regionDir
.getPath(), (region
, family
, hfileName
) -> {
1211 Path path
= new Path(regionDir
.getPath(), new Path(family
, hfileName
));
1212 byte[] familyName
= Bytes
.toBytes(family
);
1213 if (family2Files
.containsKey(familyName
)) {
1214 family2Files
.get(familyName
).add(path
);
1216 family2Files
.put(familyName
, Lists
.newArrayList(path
));
1220 return !run(family2Files
, tableName
).isEmpty() ?
0 : -1;
1225 public static void main(String
[] args
) throws Exception
{
1226 Configuration conf
= HBaseConfiguration
.create();
1227 int ret
= ToolRunner
.run(conf
, new LoadIncrementalHFiles(conf
), args
);
1232 * Called from replication sink, where it manages bulkToken(staging directory) by itself. This is
1233 * used only when SecureBulkLoadEndpoint is configured in hbase.coprocessor.region.classes
1234 * property. This directory is used as a temporary directory where all files are initially
1235 * copied/moved from user given directory, set all the required file permissions and then from
1236 * their it is finally loaded into a table. This should be set only when, one would like to manage
1237 * the staging directory by itself. Otherwise this tool will handle this by itself.
1238 * @param stagingDir staging directory path
1240 public void setBulkToken(String stagingDir
) {
1241 this.bulkToken
= stagingDir
;
1245 * Infers region boundaries for a new table.
1248 * bdryMap is a map between keys to an integer belonging to {+1, -1}
1250 * <li>If a key is a start key of a file, then it maps to +1</li>
1251 * <li>If a key is an end key of a file, then it maps to -1</li>
1256 * <li>Poll on the keys in order:
1258 * <li>Keep adding the mapped values to these keys (runningSum)</li>
1259 * <li>Each time runningSum reaches 0, add the start Key from when the runningSum had started to a
1260 * boundary list.</li>
1263 * <li>Return the boundary list.</li>
1266 public static byte[][] inferBoundaries(SortedMap
<byte[], Integer
> bdryMap
) {
1267 List
<byte[]> keysArray
= new ArrayList
<>();
1268 int runningValue
= 0;
1269 byte[] currStartKey
= null;
1270 boolean firstBoundary
= true;
1272 for (Map
.Entry
<byte[], Integer
> item
: bdryMap
.entrySet()) {
1273 if (runningValue
== 0) {
1274 currStartKey
= item
.getKey();
1276 runningValue
+= item
.getValue();
1277 if (runningValue
== 0) {
1278 if (!firstBoundary
) {
1279 keysArray
.add(currStartKey
);
1281 firstBoundary
= false;
1285 return keysArray
.toArray(new byte[0][]);