3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
.util
;
21 import static org
.apache
.hadoop
.hdfs
.protocol
.HdfsConstants
.SafeModeAction
.SAFEMODE_GET
;
23 import edu
.umd
.cs
.findbugs
.annotations
.CheckForNull
;
24 import java
.io
.ByteArrayInputStream
;
25 import java
.io
.DataInputStream
;
26 import java
.io
.EOFException
;
27 import java
.io
.FileNotFoundException
;
28 import java
.io
.IOException
;
29 import java
.io
.InterruptedIOException
;
30 import java
.lang
.reflect
.InvocationTargetException
;
31 import java
.lang
.reflect
.Method
;
32 import java
.net
.InetSocketAddress
;
34 import java
.util
.ArrayList
;
35 import java
.util
.Arrays
;
36 import java
.util
.Collection
;
37 import java
.util
.Collections
;
38 import java
.util
.HashMap
;
39 import java
.util
.HashSet
;
40 import java
.util
.Iterator
;
41 import java
.util
.List
;
42 import java
.util
.Locale
;
44 import java
.util
.Optional
;
46 import java
.util
.Vector
;
47 import java
.util
.concurrent
.ConcurrentHashMap
;
48 import java
.util
.concurrent
.ExecutionException
;
49 import java
.util
.concurrent
.ExecutorService
;
50 import java
.util
.concurrent
.Executors
;
51 import java
.util
.concurrent
.Future
;
52 import java
.util
.concurrent
.FutureTask
;
53 import java
.util
.concurrent
.ThreadPoolExecutor
;
54 import java
.util
.concurrent
.TimeUnit
;
55 import java
.util
.regex
.Pattern
;
56 import org
.apache
.commons
.lang3
.ArrayUtils
;
57 import org
.apache
.hadoop
.conf
.Configuration
;
58 import org
.apache
.hadoop
.fs
.BlockLocation
;
59 import org
.apache
.hadoop
.fs
.FSDataInputStream
;
60 import org
.apache
.hadoop
.fs
.FSDataOutputStream
;
61 import org
.apache
.hadoop
.fs
.FileStatus
;
62 import org
.apache
.hadoop
.fs
.FileSystem
;
63 import org
.apache
.hadoop
.fs
.FileUtil
;
64 import org
.apache
.hadoop
.fs
.Path
;
65 import org
.apache
.hadoop
.fs
.PathFilter
;
66 import org
.apache
.hadoop
.fs
.StorageType
;
67 import org
.apache
.hadoop
.fs
.permission
.FsPermission
;
68 import org
.apache
.hadoop
.hbase
.ClusterId
;
69 import org
.apache
.hadoop
.hbase
.HConstants
;
70 import org
.apache
.hadoop
.hbase
.HDFSBlocksDistribution
;
71 import org
.apache
.hadoop
.hbase
.TableName
;
72 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptorBuilder
;
73 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
74 import org
.apache
.hadoop
.hbase
.client
.RegionInfoBuilder
;
75 import org
.apache
.hadoop
.hbase
.exceptions
.DeserializationException
;
76 import org
.apache
.hadoop
.hbase
.fs
.HFileSystem
;
77 import org
.apache
.hadoop
.hbase
.io
.HFileLink
;
78 import org
.apache
.hadoop
.hbase
.master
.HMaster
;
79 import org
.apache
.hadoop
.hbase
.regionserver
.StoreFileInfo
;
80 import org
.apache
.hadoop
.hdfs
.DFSClient
;
81 import org
.apache
.hadoop
.hdfs
.DFSHedgedReadMetrics
;
82 import org
.apache
.hadoop
.hdfs
.DFSUtil
;
83 import org
.apache
.hadoop
.hdfs
.DistributedFileSystem
;
84 import org
.apache
.hadoop
.hdfs
.client
.HdfsDataInputStream
;
85 import org
.apache
.hadoop
.hdfs
.protocol
.DatanodeInfo
;
86 import org
.apache
.hadoop
.hdfs
.protocol
.LocatedBlock
;
87 import org
.apache
.hadoop
.io
.IOUtils
;
88 import org
.apache
.hadoop
.ipc
.RemoteException
;
89 import org
.apache
.hadoop
.util
.Progressable
;
90 import org
.apache
.yetus
.audience
.InterfaceAudience
;
91 import org
.slf4j
.Logger
;
92 import org
.slf4j
.LoggerFactory
;
94 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.base
.Throwables
;
95 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.Iterators
;
96 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.Sets
;
97 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.primitives
.Ints
;
98 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.util
.concurrent
.ThreadFactoryBuilder
;
100 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ProtobufUtil
;
101 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.FSProtos
;
104 * Utility methods for interacting with the underlying file system.
106 @InterfaceAudience.Private
107 public final class FSUtils
{
108 private static final Logger LOG
= LoggerFactory
.getLogger(FSUtils
.class);
110 private static final String THREAD_POOLSIZE
= "hbase.client.localityCheck.threadPoolSize";
111 private static final int DEFAULT_THREAD_POOLSIZE
= 2;
113 /** Set to true on Windows platforms */
114 // currently only used in testing. TODO refactor into a test class
115 public static final boolean WINDOWS
= System
.getProperty("os.name").startsWith("Windows");
121 * @return True is <code>fs</code> is instance of DistributedFileSystem
122 * @throws IOException
124 public static boolean isDistributedFileSystem(final FileSystem fs
) throws IOException
{
125 FileSystem fileSystem
= fs
;
126 // If passed an instance of HFileSystem, it fails instanceof DistributedFileSystem.
127 // Check its backing fs for dfs-ness.
128 if (fs
instanceof HFileSystem
) {
129 fileSystem
= ((HFileSystem
)fs
).getBackingFs();
131 return fileSystem
instanceof DistributedFileSystem
;
135 * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
136 * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider
137 * schema; i.e. if schemas different but path or subpath matches, the two will equate.
138 * @param pathToSearch Path we will be trying to match.
140 * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
142 public static boolean isMatchingTail(final Path pathToSearch
, final Path pathTail
) {
143 Path tailPath
= pathTail
;
145 Path toSearch
= pathToSearch
;
147 boolean result
= false;
149 if (pathToSearch
.depth() != pathTail
.depth()) {
154 tailName
= tailPath
.getName();
155 if (tailName
== null || tailName
.isEmpty()) {
159 toSearchName
= toSearch
.getName();
160 if (toSearchName
== null || toSearchName
.isEmpty()) {
163 // Move up a parent on each path for next go around. Path doesn't let us go off the end.
164 tailPath
= tailPath
.getParent();
165 toSearch
= toSearch
.getParent();
166 } while(tailName
.equals(toSearchName
));
171 * Delete the region directory if exists.
172 * @return True if deleted the region directory.
173 * @throws IOException
175 public static boolean deleteRegionDir(final Configuration conf
, final RegionInfo hri
)
177 Path rootDir
= CommonFSUtils
.getRootDir(conf
);
178 FileSystem fs
= rootDir
.getFileSystem(conf
);
179 return CommonFSUtils
.deleteDirectory(fs
,
180 new Path(CommonFSUtils
.getTableDir(rootDir
, hri
.getTable()), hri
.getEncodedName()));
184 * Create the specified file on the filesystem. By default, this will:
186 * <li>overwrite the file if it exists</li>
187 * <li>apply the umask in the configuration (if it is enabled)</li>
188 * <li>use the fs configured buffer size (or 4096 if not set)</li>
189 * <li>use the configured column family replication or default replication if
190 * {@link ColumnFamilyDescriptorBuilder#DEFAULT_DFS_REPLICATION}</li>
191 * <li>use the default block size</li>
192 * <li>not track progress</li>
194 * @param conf configurations
195 * @param fs {@link FileSystem} on which to write the file
196 * @param path {@link Path} to the file to write
197 * @param perm permissions
198 * @param favoredNodes favored data nodes
199 * @return output stream to the created file
200 * @throws IOException if the file cannot be created
202 public static FSDataOutputStream
create(Configuration conf
, FileSystem fs
, Path path
,
203 FsPermission perm
, InetSocketAddress
[] favoredNodes
) throws IOException
{
204 if (fs
instanceof HFileSystem
) {
205 FileSystem backingFs
= ((HFileSystem
) fs
).getBackingFs();
206 if (backingFs
instanceof DistributedFileSystem
) {
207 // Try to use the favoredNodes version via reflection to allow backwards-
209 short replication
= Short
.parseShort(conf
.get(ColumnFamilyDescriptorBuilder
.DFS_REPLICATION
,
210 String
.valueOf(ColumnFamilyDescriptorBuilder
.DEFAULT_DFS_REPLICATION
)));
212 return (FSDataOutputStream
) (DistributedFileSystem
.class
213 .getDeclaredMethod("create", Path
.class, FsPermission
.class, boolean.class, int.class,
214 short.class, long.class, Progressable
.class, InetSocketAddress
[].class)
215 .invoke(backingFs
, path
, perm
, true, CommonFSUtils
.getDefaultBufferSize(backingFs
),
216 replication
> 0 ? replication
: CommonFSUtils
.getDefaultReplication(backingFs
, path
),
217 CommonFSUtils
.getDefaultBlockSize(backingFs
, path
), null, favoredNodes
));
218 } catch (InvocationTargetException ite
) {
219 // Function was properly called, but threw it's own exception.
220 throw new IOException(ite
.getCause());
221 } catch (NoSuchMethodException e
) {
222 LOG
.debug("DFS Client does not support most favored nodes create; using default create");
223 LOG
.trace("Ignoring; use default create", e
);
224 } catch (IllegalArgumentException
| SecurityException
| IllegalAccessException e
) {
225 LOG
.debug("Ignoring (most likely Reflection related exception) " + e
);
229 return CommonFSUtils
.create(fs
, path
, perm
, true);
233 * Checks to see if the specified file system is available
235 * @param fs filesystem
236 * @throws IOException e
238 public static void checkFileSystemAvailable(final FileSystem fs
)
240 if (!(fs
instanceof DistributedFileSystem
)) {
243 IOException exception
= null;
244 DistributedFileSystem dfs
= (DistributedFileSystem
) fs
;
246 if (dfs
.exists(new Path("/"))) {
249 } catch (IOException e
) {
250 exception
= e
instanceof RemoteException ?
251 ((RemoteException
)e
).unwrapRemoteException() : e
;
255 } catch (Exception e
) {
256 LOG
.error("file system close failed: ", e
);
258 throw new IOException("File system is not available", exception
);
262 * Inquire the Active NameNode's safe mode status.
264 * @param dfs A DistributedFileSystem object representing the underlying HDFS.
265 * @return whether we're in safe mode
266 * @throws IOException
268 private static boolean isInSafeMode(DistributedFileSystem dfs
) throws IOException
{
269 return dfs
.setSafeMode(SAFEMODE_GET
, true);
273 * Check whether dfs is in safemode.
275 * @throws IOException
277 public static void checkDfsSafeMode(final Configuration conf
)
279 boolean isInSafeMode
= false;
280 FileSystem fs
= FileSystem
.get(conf
);
281 if (fs
instanceof DistributedFileSystem
) {
282 DistributedFileSystem dfs
= (DistributedFileSystem
)fs
;
283 isInSafeMode
= isInSafeMode(dfs
);
286 throw new IOException("File system is in safemode, it can't be written now");
291 * Verifies current version of file system
293 * @param fs filesystem object
294 * @param rootdir root hbase directory
295 * @return null if no version file exists, version string otherwise
296 * @throws IOException if the version file fails to open
297 * @throws DeserializationException if the version data cannot be translated into a version
299 public static String
getVersion(FileSystem fs
, Path rootdir
)
300 throws IOException
, DeserializationException
{
301 final Path versionFile
= new Path(rootdir
, HConstants
.VERSION_FILE_NAME
);
302 FileStatus
[] status
= null;
304 // hadoop 2.0 throws FNFE if directory does not exist.
305 // hadoop 1.0 returns null if directory does not exist.
306 status
= fs
.listStatus(versionFile
);
307 } catch (FileNotFoundException fnfe
) {
310 if (ArrayUtils
.getLength(status
) == 0) {
313 String version
= null;
314 byte [] content
= new byte [(int)status
[0].getLen()];
315 FSDataInputStream s
= fs
.open(versionFile
);
317 IOUtils
.readFully(s
, content
, 0, content
.length
);
318 if (ProtobufUtil
.isPBMagicPrefix(content
)) {
319 version
= parseVersionFrom(content
);
321 // Presume it pre-pb format.
322 try (DataInputStream dis
= new DataInputStream(new ByteArrayInputStream(content
))) {
323 version
= dis
.readUTF();
326 } catch (EOFException eof
) {
327 LOG
.warn("Version file was empty, odd, will try to set it.");
335 * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file.
336 * @param bytes The byte content of the hbase.version file
337 * @return The version found in the file as a String
338 * @throws DeserializationException if the version data cannot be translated into a version
340 static String
parseVersionFrom(final byte [] bytes
)
341 throws DeserializationException
{
342 ProtobufUtil
.expectPBMagicPrefix(bytes
);
343 int pblen
= ProtobufUtil
.lengthOfPBMagic();
344 FSProtos
.HBaseVersionFileContent
.Builder builder
=
345 FSProtos
.HBaseVersionFileContent
.newBuilder();
347 ProtobufUtil
.mergeFrom(builder
, bytes
, pblen
, bytes
.length
- pblen
);
348 return builder
.getVersion();
349 } catch (IOException e
) {
351 throw new DeserializationException(e
);
356 * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file.
357 * @param version Version to persist
358 * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a prefix.
360 static byte [] toVersionByteArray(final String version
) {
361 FSProtos
.HBaseVersionFileContent
.Builder builder
=
362 FSProtos
.HBaseVersionFileContent
.newBuilder();
363 return ProtobufUtil
.prependPBMagic(builder
.setVersion(version
).build().toByteArray());
367 * Verifies current version of file system
369 * @param fs file system
370 * @param rootdir root directory of HBase installation
371 * @param message if true, issues a message on System.out
372 * @throws IOException if the version file cannot be opened
373 * @throws DeserializationException if the contents of the version file cannot be parsed
375 public static void checkVersion(FileSystem fs
, Path rootdir
, boolean message
)
376 throws IOException
, DeserializationException
{
377 checkVersion(fs
, rootdir
, message
, 0, HConstants
.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS
);
381 * Verifies current version of file system
383 * @param fs file system
384 * @param rootdir root directory of HBase installation
385 * @param message if true, issues a message on System.out
386 * @param wait wait interval
387 * @param retries number of times to retry
389 * @throws IOException if the version file cannot be opened
390 * @throws DeserializationException if the contents of the version file cannot be parsed
392 public static void checkVersion(FileSystem fs
, Path rootdir
,
393 boolean message
, int wait
, int retries
)
394 throws IOException
, DeserializationException
{
395 String version
= getVersion(fs
, rootdir
);
397 if (version
== null) {
398 if (!metaRegionExists(fs
, rootdir
)) {
399 // rootDir is empty (no version file and no root region)
400 // just create new version file (HBASE-1195)
401 setVersion(fs
, rootdir
, wait
, retries
);
404 msg
= "hbase.version file is missing. Is your hbase.rootdir valid? " +
405 "You can restore hbase.version file by running 'HBCK2 filesystem -fix'. " +
406 "See https://github.com/apache/hbase-operator-tools/tree/master/hbase-hbck2";
408 } else if (version
.compareTo(HConstants
.FILE_SYSTEM_VERSION
) == 0) {
411 msg
= "HBase file layout needs to be upgraded. Current filesystem version is " + version
+
412 " but software requires version " + HConstants
.FILE_SYSTEM_VERSION
+
413 ". Consult http://hbase.apache.org/book.html for further information about " +
417 // version is deprecated require migration
418 // Output on stdout so user sees it in terminal.
420 System
.out
.println("WARNING! " + msg
);
422 throw new FileSystemVersionException(msg
);
426 * Sets version of file system
428 * @param fs filesystem object
429 * @param rootdir hbase root
430 * @throws IOException e
432 public static void setVersion(FileSystem fs
, Path rootdir
)
434 setVersion(fs
, rootdir
, HConstants
.FILE_SYSTEM_VERSION
, 0,
435 HConstants
.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS
);
439 * Sets version of file system
441 * @param fs filesystem object
442 * @param rootdir hbase root
443 * @param wait time to wait for retry
444 * @param retries number of times to retry before failing
445 * @throws IOException e
447 public static void setVersion(FileSystem fs
, Path rootdir
, int wait
, int retries
)
449 setVersion(fs
, rootdir
, HConstants
.FILE_SYSTEM_VERSION
, wait
, retries
);
454 * Sets version of file system
456 * @param fs filesystem object
457 * @param rootdir hbase root directory
458 * @param version version to set
459 * @param wait time to wait for retry
460 * @param retries number of times to retry before throwing an IOException
461 * @throws IOException e
463 public static void setVersion(FileSystem fs
, Path rootdir
, String version
,
464 int wait
, int retries
) throws IOException
{
465 Path versionFile
= new Path(rootdir
, HConstants
.VERSION_FILE_NAME
);
466 Path tempVersionFile
= new Path(rootdir
, HConstants
.HBASE_TEMP_DIRECTORY
+ Path
.SEPARATOR
+
467 HConstants
.VERSION_FILE_NAME
);
470 // Write the version to a temporary file
471 FSDataOutputStream s
= fs
.create(tempVersionFile
);
473 s
.write(toVersionByteArray(version
));
476 // Move the temp version file to its normal location. Returns false
477 // if the rename failed. Throw an IOE in that case.
478 if (!fs
.rename(tempVersionFile
, versionFile
)) {
479 throw new IOException("Unable to move temp version file to " + versionFile
);
482 // Cleaning up the temporary if the rename failed would be trying
483 // too hard. We'll unconditionally create it again the next time
484 // through anyway, files are overwritten by default by create().
486 // Attempt to close the stream on the way out if it is still open.
488 if (s
!= null) s
.close();
489 } catch (IOException ignore
) { }
491 LOG
.info("Created version file at " + rootdir
.toString() + " with version=" + version
);
493 } catch (IOException e
) {
495 LOG
.debug("Unable to create version file at " + rootdir
.toString() + ", retrying", e
);
496 fs
.delete(versionFile
, false);
501 } catch (InterruptedException ie
) {
502 throw (InterruptedIOException
)new InterruptedIOException().initCause(ie
);
513 * Checks that a cluster ID file exists in the HBase root directory
514 * @param fs the root directory FileSystem
515 * @param rootdir the HBase root directory in HDFS
516 * @param wait how long to wait between retries
517 * @return <code>true</code> if the file exists, otherwise <code>false</code>
518 * @throws IOException if checking the FileSystem fails
520 public static boolean checkClusterIdExists(FileSystem fs
, Path rootdir
,
521 long wait
) throws IOException
{
524 Path filePath
= new Path(rootdir
, HConstants
.CLUSTER_ID_FILE_NAME
);
525 return fs
.exists(filePath
);
526 } catch (IOException ioe
) {
528 LOG
.warn("Unable to check cluster ID file in {}, retrying in {}ms", rootdir
, wait
, ioe
);
531 } catch (InterruptedException e
) {
532 Thread
.currentThread().interrupt();
533 throw (InterruptedIOException
) new InterruptedIOException().initCause(e
);
543 * Returns the value of the unique cluster ID stored for this HBase instance.
544 * @param fs the root directory FileSystem
545 * @param rootdir the path to the HBase root directory
546 * @return the unique cluster identifier
547 * @throws IOException if reading the cluster ID file fails
549 public static ClusterId
getClusterId(FileSystem fs
, Path rootdir
)
551 Path idPath
= new Path(rootdir
, HConstants
.CLUSTER_ID_FILE_NAME
);
552 ClusterId clusterId
= null;
553 FileStatus status
= fs
.exists(idPath
)? fs
.getFileStatus(idPath
): null;
554 if (status
!= null) {
555 int len
= Ints
.checkedCast(status
.getLen());
556 byte [] content
= new byte[len
];
557 FSDataInputStream in
= fs
.open(idPath
);
559 in
.readFully(content
);
560 } catch (EOFException eof
) {
561 LOG
.warn("Cluster ID file {} is empty", idPath
);
566 clusterId
= ClusterId
.parseFrom(content
);
567 } catch (DeserializationException e
) {
568 throw new IOException("content=" + Bytes
.toString(content
), e
);
570 // If not pb'd, make it so.
571 if (!ProtobufUtil
.isPBMagicPrefix(content
)) {
573 in
= fs
.open(idPath
);
576 clusterId
= new ClusterId(cid
);
577 } catch (EOFException eof
) {
578 LOG
.warn("Cluster ID file {} is empty", idPath
);
582 rewriteAsPb(fs
, rootdir
, idPath
, clusterId
);
586 LOG
.warn("Cluster ID file does not exist at {}", idPath
);
593 * @throws IOException
595 private static void rewriteAsPb(final FileSystem fs
, final Path rootdir
, final Path p
,
598 // Rewrite the file as pb. Move aside the old one first, write new
599 // then delete the moved-aside file.
600 Path movedAsideName
= new Path(p
+ "." + EnvironmentEdgeManager
.currentTime());
601 if (!fs
.rename(p
, movedAsideName
)) throw new IOException("Failed rename of " + p
);
602 setClusterId(fs
, rootdir
, cid
, 100);
603 if (!fs
.delete(movedAsideName
, false)) {
604 throw new IOException("Failed delete of " + movedAsideName
);
606 LOG
.debug("Rewrote the hbase.id file as pb");
610 * Writes a new unique identifier for this cluster to the "hbase.id" file in the HBase root
611 * directory. If any operations on the ID file fails, and {@code wait} is a positive value, the
612 * method will retry to produce the ID file until the thread is forcibly interrupted.
614 * @param fs the root directory FileSystem
615 * @param rootdir the path to the HBase root directory
616 * @param clusterId the unique identifier to store
617 * @param wait how long (in milliseconds) to wait between retries
618 * @throws IOException if writing to the FileSystem fails and no wait value
620 public static void setClusterId(final FileSystem fs
, final Path rootdir
,
621 final ClusterId clusterId
, final long wait
) throws IOException
{
623 final Path idFile
= new Path(rootdir
, HConstants
.CLUSTER_ID_FILE_NAME
);
624 final Path tempDir
= new Path(rootdir
, HConstants
.HBASE_TEMP_DIRECTORY
);
625 final Path tempIdFile
= new Path(tempDir
, HConstants
.CLUSTER_ID_FILE_NAME
);
627 LOG
.debug("Create cluster ID file [{}] with ID: {}", idFile
, clusterId
);
630 Optional
<IOException
> failure
= Optional
.empty();
632 LOG
.debug("Write the cluster ID file to a temporary location: {}", tempIdFile
);
633 try (FSDataOutputStream s
= fs
.create(tempIdFile
)) {
634 s
.write(clusterId
.toByteArray());
635 } catch (IOException ioe
) {
636 failure
= Optional
.of(ioe
);
639 if (!failure
.isPresent()) {
641 LOG
.debug("Move the temporary cluster ID file to its target location [{}]:[{}]",
644 if (!fs
.rename(tempIdFile
, idFile
)) {
646 Optional
.of(new IOException("Unable to move temp cluster ID file to " + idFile
));
648 } catch (IOException ioe
) {
649 failure
= Optional
.of(ioe
);
653 if (failure
.isPresent()) {
654 final IOException cause
= failure
.get();
656 LOG
.warn("Unable to create cluster ID file in {}, retrying in {}ms", rootdir
, wait
,
660 } catch (InterruptedException e
) {
661 Thread
.currentThread().interrupt();
662 throw (InterruptedIOException
) new InterruptedIOException().initCause(e
);
675 * If DFS, check safe mode and if so, wait until we clear it.
676 * @param conf configuration
677 * @param wait Sleep between retries
678 * @throws IOException e
680 public static void waitOnSafeMode(final Configuration conf
,
683 FileSystem fs
= FileSystem
.get(conf
);
684 if (!(fs
instanceof DistributedFileSystem
)) return;
685 DistributedFileSystem dfs
= (DistributedFileSystem
)fs
;
686 // Make sure dfs is not in safe mode
687 while (isInSafeMode(dfs
)) {
688 LOG
.info("Waiting for dfs to exit safe mode...");
691 } catch (InterruptedException e
) {
692 Thread
.currentThread().interrupt();
693 throw (InterruptedIOException
) new InterruptedIOException().initCause(e
);
699 * Checks if meta region exists
700 * @param fs file system
701 * @param rootDir root directory of HBase installation
702 * @return true if exists
704 public static boolean metaRegionExists(FileSystem fs
, Path rootDir
) throws IOException
{
705 Path metaRegionDir
= getRegionDirFromRootDir(rootDir
, RegionInfoBuilder
.FIRST_META_REGIONINFO
);
706 return fs
.exists(metaRegionDir
);
710 * Compute HDFS block distribution of a given HdfsDataInputStream. All HdfsDataInputStreams
711 * are backed by a series of LocatedBlocks, which are fetched periodically from the namenode.
712 * This method retrieves those blocks from the input stream and uses them to calculate
713 * HDFSBlockDistribution.
715 * The underlying method in DFSInputStream does attempt to use locally cached blocks, but
716 * may hit the namenode if the cache is determined to be incomplete. The method also involves
717 * making copies of all LocatedBlocks rather than return the underlying blocks themselves.
719 static public HDFSBlocksDistribution
computeHDFSBlocksDistribution(
720 HdfsDataInputStream inputStream
) throws IOException
{
721 List
<LocatedBlock
> blocks
= inputStream
.getAllBlocks();
722 HDFSBlocksDistribution blocksDistribution
= new HDFSBlocksDistribution();
723 for (LocatedBlock block
: blocks
) {
724 String
[] hosts
= getHostsForLocations(block
);
725 long len
= block
.getBlockSize();
726 StorageType
[] storageTypes
= block
.getStorageTypes();
727 blocksDistribution
.addHostsAndBlockWeight(hosts
, len
, storageTypes
);
729 return blocksDistribution
;
732 private static String
[] getHostsForLocations(LocatedBlock block
) {
733 DatanodeInfo
[] locations
= block
.getLocations();
734 String
[] hosts
= new String
[locations
.length
];
735 for (int i
= 0; i
< hosts
.length
; i
++) {
736 hosts
[i
] = locations
[i
].getHostName();
742 * Compute HDFS blocks distribution of a given file, or a portion of the file
743 * @param fs file system
744 * @param status file status of the file
745 * @param start start position of the portion
746 * @param length length of the portion
747 * @return The HDFS blocks distribution
749 static public HDFSBlocksDistribution
computeHDFSBlocksDistribution(
750 final FileSystem fs
, FileStatus status
, long start
, long length
)
752 HDFSBlocksDistribution blocksDistribution
= new HDFSBlocksDistribution();
753 BlockLocation
[] blockLocations
=
754 fs
.getFileBlockLocations(status
, start
, length
);
755 addToHDFSBlocksDistribution(blocksDistribution
, blockLocations
);
756 return blocksDistribution
;
760 * Update blocksDistribution with blockLocations
761 * @param blocksDistribution the hdfs blocks distribution
762 * @param blockLocations an array containing block location
764 static public void addToHDFSBlocksDistribution(
765 HDFSBlocksDistribution blocksDistribution
, BlockLocation
[] blockLocations
)
767 for (BlockLocation bl
: blockLocations
) {
768 String
[] hosts
= bl
.getHosts();
769 long len
= bl
.getLength();
770 StorageType
[] storageTypes
= bl
.getStorageTypes();
771 blocksDistribution
.addHostsAndBlockWeight(hosts
, len
, storageTypes
);
775 // TODO move this method OUT of FSUtils. No dependencies to HMaster
777 * Returns the total overall fragmentation percentage. Includes hbase:meta and
780 * @param master The master defining the HBase root and file system
781 * @return A map for each table and its percentage (never null)
782 * @throws IOException When scanning the directory fails
784 public static int getTotalTableFragmentation(final HMaster master
)
786 Map
<String
, Integer
> map
= getTableFragmentation(master
);
787 return map
.isEmpty() ?
-1 : map
.get("-TOTAL-");
791 * Runs through the HBase rootdir and checks how many stores for each table
792 * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
793 * percentage across all tables is stored under the special key "-TOTAL-".
795 * @param master The master defining the HBase root and file system.
796 * @return A map for each table and its percentage (never null).
798 * @throws IOException When scanning the directory fails.
800 public static Map
<String
, Integer
> getTableFragmentation(final HMaster master
)
802 Path path
= CommonFSUtils
.getRootDir(master
.getConfiguration());
803 // since HMaster.getFileSystem() is package private
804 FileSystem fs
= path
.getFileSystem(master
.getConfiguration());
805 return getTableFragmentation(fs
, path
);
809 * Runs through the HBase rootdir and checks how many stores for each table
810 * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
811 * percentage across all tables is stored under the special key "-TOTAL-".
813 * @param fs The file system to use
814 * @param hbaseRootDir The root directory to scan
815 * @return A map for each table and its percentage (never null)
816 * @throws IOException When scanning the directory fails
818 public static Map
<String
, Integer
> getTableFragmentation(
819 final FileSystem fs
, final Path hbaseRootDir
)
821 Map
<String
, Integer
> frags
= new HashMap
<>();
822 int cfCountTotal
= 0;
824 PathFilter regionFilter
= new RegionDirFilter(fs
);
825 PathFilter familyFilter
= new FamilyDirFilter(fs
);
826 List
<Path
> tableDirs
= getTableDirs(fs
, hbaseRootDir
);
827 for (Path d
: tableDirs
) {
830 FileStatus
[] regionDirs
= fs
.listStatus(d
, regionFilter
);
831 for (FileStatus regionDir
: regionDirs
) {
832 Path dd
= regionDir
.getPath();
833 // else its a region name, now look in region for families
834 FileStatus
[] familyDirs
= fs
.listStatus(dd
, familyFilter
);
835 for (FileStatus familyDir
: familyDirs
) {
838 Path family
= familyDir
.getPath();
839 // now in family make sure only one file
840 FileStatus
[] familyStatus
= fs
.listStatus(family
);
841 if (familyStatus
.length
> 1) {
847 // compute percentage per table and store in result list
848 frags
.put(CommonFSUtils
.getTableName(d
).getNameAsString(),
849 cfCount
== 0?
0: Math
.round((float) cfFrag
/ cfCount
* 100));
851 // set overall percentage for all tables
853 cfCountTotal
== 0?
0: Math
.round((float) cfFragTotal
/ cfCountTotal
* 100));
857 public static void renameFile(FileSystem fs
, Path src
, Path dst
) throws IOException
{
858 if (fs
.exists(dst
) && !fs
.delete(dst
, false)) {
859 throw new IOException("Can not delete " + dst
);
861 if (!fs
.rename(src
, dst
)) {
862 throw new IOException("Can not rename from " + src
+ " to " + dst
);
867 * A {@link PathFilter} that returns only regular files.
869 static class FileFilter
extends AbstractFileStatusFilter
{
870 private final FileSystem fs
;
872 public FileFilter(final FileSystem fs
) {
877 protected boolean accept(Path p
, @CheckForNull Boolean isDir
) {
879 return isFile(fs
, isDir
, p
);
880 } catch (IOException e
) {
881 LOG
.warn("Unable to verify if path={} is a regular file", p
, e
);
888 * Directory filter that doesn't include any of the directories in the specified blacklist
890 public static class BlackListDirFilter
extends AbstractFileStatusFilter
{
891 private final FileSystem fs
;
892 private List
<String
> blacklist
;
895 * Create a filter on the givem filesystem with the specified blacklist
896 * @param fs filesystem to filter
897 * @param directoryNameBlackList list of the names of the directories to filter. If
898 * <tt>null</tt>, all directories are returned
900 @SuppressWarnings("unchecked")
901 public BlackListDirFilter(final FileSystem fs
, final List
<String
> directoryNameBlackList
) {
904 (List
<String
>) (directoryNameBlackList
== null ? Collections
.emptyList()
905 : directoryNameBlackList
);
909 protected boolean accept(Path p
, @CheckForNull Boolean isDir
) {
910 if (!isValidName(p
.getName())) {
915 return isDirectory(fs
, isDir
, p
);
916 } catch (IOException e
) {
917 LOG
.warn("An error occurred while verifying if [{}] is a valid directory."
918 + " Returning 'not valid' and continuing.", p
, e
);
923 protected boolean isValidName(final String name
) {
924 return !blacklist
.contains(name
);
929 * A {@link PathFilter} that only allows directories.
931 public static class DirFilter
extends BlackListDirFilter
{
933 public DirFilter(FileSystem fs
) {
939 * A {@link PathFilter} that returns usertable directories. To get all directories use the
940 * {@link BlackListDirFilter} with a <tt>null</tt> blacklist
942 public static class UserTableDirFilter
extends BlackListDirFilter
{
943 public UserTableDirFilter(FileSystem fs
) {
944 super(fs
, HConstants
.HBASE_NON_TABLE_DIRS
);
948 protected boolean isValidName(final String name
) {
949 if (!super.isValidName(name
))
953 TableName
.isLegalTableQualifierName(Bytes
.toBytes(name
));
954 } catch (IllegalArgumentException e
) {
955 LOG
.info("Invalid table name: {}", name
);
962 public static List
<Path
> getTableDirs(final FileSystem fs
, final Path rootdir
)
964 List
<Path
> tableDirs
= new ArrayList
<>();
965 Path baseNamespaceDir
= new Path(rootdir
, HConstants
.BASE_NAMESPACE_DIR
);
966 if (fs
.exists(baseNamespaceDir
)) {
967 for (FileStatus status
: fs
.globStatus(new Path(baseNamespaceDir
, "*"))) {
968 tableDirs
.addAll(FSUtils
.getLocalTableDirs(fs
, status
.getPath()));
977 * @return All the table directories under <code>rootdir</code>. Ignore non table hbase folders such as
978 * .logs, .oldlogs, .corrupt folders.
979 * @throws IOException
981 public static List
<Path
> getLocalTableDirs(final FileSystem fs
, final Path rootdir
)
983 // presumes any directory under hbase.rootdir is a table
984 FileStatus
[] dirs
= fs
.listStatus(rootdir
, new UserTableDirFilter(fs
));
985 List
<Path
> tabledirs
= new ArrayList
<>(dirs
.length
);
986 for (FileStatus dir
: dirs
) {
987 tabledirs
.add(dir
.getPath());
993 * Filter for all dirs that don't start with '.'
995 public static class RegionDirFilter
extends AbstractFileStatusFilter
{
996 // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names.
997 final public static Pattern regionDirPattern
= Pattern
.compile("^[0-9a-f]*$");
1000 public RegionDirFilter(FileSystem fs
) {
1005 protected boolean accept(Path p
, @CheckForNull Boolean isDir
) {
1006 if (!regionDirPattern
.matcher(p
.getName()).matches()) {
1011 return isDirectory(fs
, isDir
, p
);
1012 } catch (IOException ioe
) {
1013 // Maybe the file was moved or the fs was disconnected.
1014 LOG
.warn("Skipping file {} due to IOException", p
, ioe
);
1021 * Given a particular table dir, return all the regiondirs inside it, excluding files such as
1023 * @param fs A file system for the Path
1024 * @param tableDir Path to a specific table directory <hbase.rootdir>/<tabledir>
1025 * @return List of paths to valid region directories in table dir.
1026 * @throws IOException
1028 public static List
<Path
> getRegionDirs(final FileSystem fs
, final Path tableDir
) throws IOException
{
1029 // assumes we are in a table dir.
1030 List
<FileStatus
> rds
= listStatusWithStatusFilter(fs
, tableDir
, new RegionDirFilter(fs
));
1032 return Collections
.emptyList();
1034 List
<Path
> regionDirs
= new ArrayList
<>(rds
.size());
1035 for (FileStatus rdfs
: rds
) {
1036 Path rdPath
= rdfs
.getPath();
1037 regionDirs
.add(rdPath
);
1042 public static Path
getRegionDirFromRootDir(Path rootDir
, RegionInfo region
) {
1043 return getRegionDirFromTableDir(CommonFSUtils
.getTableDir(rootDir
, region
.getTable()), region
);
1046 public static Path
getRegionDirFromTableDir(Path tableDir
, RegionInfo region
) {
1047 return getRegionDirFromTableDir(tableDir
,
1048 ServerRegionReplicaUtil
.getRegionInfoForFs(region
).getEncodedName());
1051 public static Path
getRegionDirFromTableDir(Path tableDir
, String encodedRegionName
) {
1052 return new Path(tableDir
, encodedRegionName
);
1056 * Filter for all dirs that are legal column family names. This is generally used for colfam
1057 * dirs <hbase.rootdir>/<tabledir>/<regiondir>/<colfamdir>.
1059 public static class FamilyDirFilter
extends AbstractFileStatusFilter
{
1060 final FileSystem fs
;
1062 public FamilyDirFilter(FileSystem fs
) {
1067 protected boolean accept(Path p
, @CheckForNull Boolean isDir
) {
1069 // throws IAE if invalid
1070 ColumnFamilyDescriptorBuilder
.isLegalColumnFamilyName(Bytes
.toBytes(p
.getName()));
1071 } catch (IllegalArgumentException iae
) {
1072 // path name is an invalid family name and thus is excluded.
1077 return isDirectory(fs
, isDir
, p
);
1078 } catch (IOException ioe
) {
1079 // Maybe the file was moved or the fs was disconnected.
1080 LOG
.warn("Skipping file {} due to IOException", p
, ioe
);
1087 * Given a particular region dir, return all the familydirs inside it
1089 * @param fs A file system for the Path
1090 * @param regionDir Path to a specific region directory
1091 * @return List of paths to valid family directories in region dir.
1092 * @throws IOException
1094 public static List
<Path
> getFamilyDirs(final FileSystem fs
, final Path regionDir
)
1095 throws IOException
{
1096 // assumes we are in a region dir.
1097 return getFilePaths(fs
, regionDir
, new FamilyDirFilter(fs
));
1100 public static List
<Path
> getReferenceFilePaths(final FileSystem fs
, final Path familyDir
)
1101 throws IOException
{
1102 return getFilePaths(fs
, familyDir
, new ReferenceFileFilter(fs
));
1105 public static List
<Path
> getReferenceAndLinkFilePaths(final FileSystem fs
, final Path familyDir
)
1106 throws IOException
{
1107 return getFilePaths(fs
, familyDir
, new ReferenceAndLinkFileFilter(fs
));
1110 private static List
<Path
> getFilePaths(final FileSystem fs
, final Path dir
,
1111 final PathFilter pathFilter
) throws IOException
{
1112 FileStatus
[] fds
= fs
.listStatus(dir
, pathFilter
);
1113 List
<Path
> files
= new ArrayList
<>(fds
.length
);
1114 for (FileStatus fdfs
: fds
) {
1115 Path fdPath
= fdfs
.getPath();
1121 public static int getRegionReferenceAndLinkFileCount(final FileSystem fs
, final Path p
) {
1124 for (Path familyDir
: getFamilyDirs(fs
, p
)) {
1125 result
+= getReferenceAndLinkFilePaths(fs
, familyDir
).size();
1127 } catch (IOException e
) {
1128 LOG
.warn("Error Counting reference files.", e
);
1133 public static class ReferenceAndLinkFileFilter
implements PathFilter
{
1135 private final FileSystem fs
;
1137 public ReferenceAndLinkFileFilter(FileSystem fs
) {
1142 public boolean accept(Path rd
) {
1144 // only files can be references.
1145 return !fs
.getFileStatus(rd
).isDirectory() && (StoreFileInfo
.isReference(rd
) ||
1146 HFileLink
.isHFileLink(rd
));
1147 } catch (IOException ioe
) {
1148 // Maybe the file was moved or the fs was disconnected.
1149 LOG
.warn("Skipping file " + rd
+" due to IOException", ioe
);
1156 * Filter for HFiles that excludes reference files.
1158 public static class HFileFilter
extends AbstractFileStatusFilter
{
1159 final FileSystem fs
;
1161 public HFileFilter(FileSystem fs
) {
1166 protected boolean accept(Path p
, @CheckForNull Boolean isDir
) {
1167 if (!StoreFileInfo
.isHFile(p
) && !StoreFileInfo
.isMobFile(p
)) {
1172 return isFile(fs
, isDir
, p
);
1173 } catch (IOException ioe
) {
1174 // Maybe the file was moved or the fs was disconnected.
1175 LOG
.warn("Skipping file {} due to IOException", p
, ioe
);
1182 * Filter for HFileLinks (StoreFiles and HFiles not included).
1183 * the filter itself does not consider if a link is file or not.
1185 public static class HFileLinkFilter
implements PathFilter
{
1188 public boolean accept(Path p
) {
1189 return HFileLink
.isHFileLink(p
);
1193 public static class ReferenceFileFilter
extends AbstractFileStatusFilter
{
1195 private final FileSystem fs
;
1197 public ReferenceFileFilter(FileSystem fs
) {
1202 protected boolean accept(Path p
, @CheckForNull Boolean isDir
) {
1203 if (!StoreFileInfo
.isReference(p
)) {
1208 // only files can be references.
1209 return isFile(fs
, isDir
, p
);
1210 } catch (IOException ioe
) {
1211 // Maybe the file was moved or the fs was disconnected.
1212 LOG
.warn("Skipping file {} due to IOException", p
, ioe
);
1219 * Called every so-often by storefile map builder getTableStoreFilePathMap to
1222 interface ProgressReporter
{
1224 * @param status File or directory we are about to process.
1226 void progress(FileStatus status
);
1230 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1231 * table StoreFile names to the full Path.
1234 * Key = 3944417774205889744 <br>
1235 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1237 * @param map map to add values. If null, this method will create and populate one to return
1238 * @param fs The file system to use.
1239 * @param hbaseRootDir The root directory to scan.
1240 * @param tableName name of the table to scan.
1241 * @return Map keyed by StoreFile name with a value of the full Path.
1242 * @throws IOException When scanning the directory fails.
1243 * @throws InterruptedException
1245 public static Map
<String
, Path
> getTableStoreFilePathMap(Map
<String
, Path
> map
,
1246 final FileSystem fs
, final Path hbaseRootDir
, TableName tableName
)
1247 throws IOException
, InterruptedException
{
1248 return getTableStoreFilePathMap(map
, fs
, hbaseRootDir
, tableName
, null, null,
1249 (ProgressReporter
)null);
1253 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1254 * table StoreFile names to the full Path. Note that because this method can be called
1255 * on a 'live' HBase system that we will skip files that no longer exist by the time
1256 * we traverse them and similarly the user of the result needs to consider that some
1257 * entries in this map may not exist by the time this call completes.
1260 * Key = 3944417774205889744 <br>
1261 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1263 * @param resultMap map to add values. If null, this method will create and populate one to return
1264 * @param fs The file system to use.
1265 * @param hbaseRootDir The root directory to scan.
1266 * @param tableName name of the table to scan.
1267 * @param sfFilter optional path filter to apply to store files
1268 * @param executor optional executor service to parallelize this operation
1269 * @param progressReporter Instance or null; gets called every time we move to new region of
1270 * family dir and for each store file.
1271 * @return Map keyed by StoreFile name with a value of the full Path.
1272 * @throws IOException When scanning the directory fails.
1273 * @deprecated Since 2.3.0. For removal in hbase4. Use ProgressReporter override instead.
1276 public static Map
<String
, Path
> getTableStoreFilePathMap(Map
<String
, Path
> resultMap
,
1277 final FileSystem fs
, final Path hbaseRootDir
, TableName tableName
, final PathFilter sfFilter
,
1278 ExecutorService executor
, final HbckErrorReporter progressReporter
)
1279 throws IOException
, InterruptedException
{
1280 return getTableStoreFilePathMap(resultMap
, fs
, hbaseRootDir
, tableName
, sfFilter
, executor
,
1281 new ProgressReporter() {
1283 public void progress(FileStatus status
) {
1284 // status is not used in this implementation.
1285 progressReporter
.progress();
1291 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1292 * table StoreFile names to the full Path. Note that because this method can be called
1293 * on a 'live' HBase system that we will skip files that no longer exist by the time
1294 * we traverse them and similarly the user of the result needs to consider that some
1295 * entries in this map may not exist by the time this call completes.
1298 * Key = 3944417774205889744 <br>
1299 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1301 * @param resultMap map to add values. If null, this method will create and populate one
1303 * @param fs The file system to use.
1304 * @param hbaseRootDir The root directory to scan.
1305 * @param tableName name of the table to scan.
1306 * @param sfFilter optional path filter to apply to store files
1307 * @param executor optional executor service to parallelize this operation
1308 * @param progressReporter Instance or null; gets called every time we move to new region of
1309 * family dir and for each store file.
1310 * @return Map keyed by StoreFile name with a value of the full Path.
1311 * @throws IOException When scanning the directory fails.
1312 * @throws InterruptedException the thread is interrupted, either before or during the activity.
1314 public static Map
<String
, Path
> getTableStoreFilePathMap(Map
<String
, Path
> resultMap
,
1315 final FileSystem fs
, final Path hbaseRootDir
, TableName tableName
, final PathFilter sfFilter
,
1316 ExecutorService executor
, final ProgressReporter progressReporter
)
1317 throws IOException
, InterruptedException
{
1319 final Map
<String
, Path
> finalResultMap
=
1320 resultMap
== null ?
new ConcurrentHashMap
<>(128, 0.75f
, 32) : resultMap
;
1322 // only include the directory paths to tables
1323 Path tableDir
= CommonFSUtils
.getTableDir(hbaseRootDir
, tableName
);
1324 // Inside a table, there are compaction.dir directories to skip. Otherwise, all else
1325 // should be regions.
1326 final FamilyDirFilter familyFilter
= new FamilyDirFilter(fs
);
1327 final Vector
<Exception
> exceptions
= new Vector
<>();
1330 List
<FileStatus
> regionDirs
= FSUtils
.listStatusWithStatusFilter(fs
, tableDir
, new RegionDirFilter(fs
));
1331 if (regionDirs
== null) {
1332 return finalResultMap
;
1335 final List
<Future
<?
>> futures
= new ArrayList
<>(regionDirs
.size());
1337 for (FileStatus regionDir
: regionDirs
) {
1338 if (null != progressReporter
) {
1339 progressReporter
.progress(regionDir
);
1341 final Path dd
= regionDir
.getPath();
1343 if (!exceptions
.isEmpty()) {
1347 Runnable getRegionStoreFileMapCall
= new Runnable() {
1351 HashMap
<String
,Path
> regionStoreFileMap
= new HashMap
<>();
1352 List
<FileStatus
> familyDirs
= FSUtils
.listStatusWithStatusFilter(fs
, dd
, familyFilter
);
1353 if (familyDirs
== null) {
1354 if (!fs
.exists(dd
)) {
1355 LOG
.warn("Skipping region because it no longer exists: " + dd
);
1357 LOG
.warn("Skipping region because it has no family dirs: " + dd
);
1361 for (FileStatus familyDir
: familyDirs
) {
1362 if (null != progressReporter
) {
1363 progressReporter
.progress(familyDir
);
1365 Path family
= familyDir
.getPath();
1366 if (family
.getName().equals(HConstants
.RECOVERED_EDITS_DIR
)) {
1369 // now in family, iterate over the StoreFiles and
1371 FileStatus
[] familyStatus
= fs
.listStatus(family
);
1372 for (FileStatus sfStatus
: familyStatus
) {
1373 if (null != progressReporter
) {
1374 progressReporter
.progress(sfStatus
);
1376 Path sf
= sfStatus
.getPath();
1377 if (sfFilter
== null || sfFilter
.accept(sf
)) {
1378 regionStoreFileMap
.put( sf
.getName(), sf
);
1382 finalResultMap
.putAll(regionStoreFileMap
);
1383 } catch (Exception e
) {
1384 LOG
.error("Could not get region store file map for region: " + dd
, e
);
1390 // If executor is available, submit async tasks to exec concurrently, otherwise
1391 // just do serial sync execution
1392 if (executor
!= null) {
1393 Future
<?
> future
= executor
.submit(getRegionStoreFileMapCall
);
1394 futures
.add(future
);
1396 FutureTask
<?
> future
= new FutureTask
<>(getRegionStoreFileMapCall
, null);
1398 futures
.add(future
);
1402 // Ensure all pending tasks are complete (or that we run into an exception)
1403 for (Future
<?
> f
: futures
) {
1404 if (!exceptions
.isEmpty()) {
1409 } catch (ExecutionException e
) {
1410 LOG
.error("Unexpected exec exception! Should've been caught already. (Bug?)", e
);
1411 // Shouldn't happen, we already logged/caught any exceptions in the Runnable
1414 } catch (IOException e
) {
1415 LOG
.error("Cannot execute getTableStoreFilePathMap for " + tableName
, e
);
1418 if (!exceptions
.isEmpty()) {
1419 // Just throw the first exception as an indication something bad happened
1420 // Don't need to propagate all the exceptions, we already logged them all anyway
1421 Throwables
.propagateIfPossible(exceptions
.firstElement(), IOException
.class);
1422 throw new IOException(exceptions
.firstElement());
1426 return finalResultMap
;
1429 public static int getRegionReferenceFileCount(final FileSystem fs
, final Path p
) {
1432 for (Path familyDir
:getFamilyDirs(fs
, p
)){
1433 result
+= getReferenceFilePaths(fs
, familyDir
).size();
1435 } catch (IOException e
) {
1436 LOG
.warn("Error counting reference files", e
);
1442 * Runs through the HBase rootdir and creates a reverse lookup map for
1443 * table StoreFile names to the full Path.
1446 * Key = 3944417774205889744 <br>
1447 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1449 * @param fs The file system to use.
1450 * @param hbaseRootDir The root directory to scan.
1451 * @return Map keyed by StoreFile name with a value of the full Path.
1452 * @throws IOException When scanning the directory fails.
1454 public static Map
<String
, Path
> getTableStoreFilePathMap(final FileSystem fs
,
1455 final Path hbaseRootDir
)
1456 throws IOException
, InterruptedException
{
1457 return getTableStoreFilePathMap(fs
, hbaseRootDir
, null, null, (ProgressReporter
)null);
1461 * Runs through the HBase rootdir and creates a reverse lookup map for
1462 * table StoreFile names to the full Path.
1465 * Key = 3944417774205889744 <br>
1466 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1468 * @param fs The file system to use.
1469 * @param hbaseRootDir The root directory to scan.
1470 * @param sfFilter optional path filter to apply to store files
1471 * @param executor optional executor service to parallelize this operation
1472 * @param progressReporter Instance or null; gets called every time we move to new region of
1473 * family dir and for each store file.
1474 * @return Map keyed by StoreFile name with a value of the full Path.
1475 * @throws IOException When scanning the directory fails.
1476 * @deprecated Since 2.3.0. Will be removed in hbase4. Used {@link
1477 * #getTableStoreFilePathMap(FileSystem, Path, PathFilter, ExecutorService, ProgressReporter)}
1480 public static Map
<String
, Path
> getTableStoreFilePathMap(final FileSystem fs
,
1481 final Path hbaseRootDir
, PathFilter sfFilter
, ExecutorService executor
,
1482 HbckErrorReporter progressReporter
)
1483 throws IOException
, InterruptedException
{
1484 return getTableStoreFilePathMap(fs
, hbaseRootDir
, sfFilter
, executor
,
1485 new ProgressReporter() {
1487 public void progress(FileStatus status
) {
1488 // status is not used in this implementation.
1489 progressReporter
.progress();
1495 * Runs through the HBase rootdir and creates a reverse lookup map for
1496 * table StoreFile names to the full Path.
1499 * Key = 3944417774205889744 <br>
1500 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1502 * @param fs The file system to use.
1503 * @param hbaseRootDir The root directory to scan.
1504 * @param sfFilter optional path filter to apply to store files
1505 * @param executor optional executor service to parallelize this operation
1506 * @param progressReporter Instance or null; gets called every time we move to new region of
1507 * family dir and for each store file.
1508 * @return Map keyed by StoreFile name with a value of the full Path.
1509 * @throws IOException When scanning the directory fails.
1510 * @throws InterruptedException
1512 public static Map
<String
, Path
> getTableStoreFilePathMap(
1513 final FileSystem fs
, final Path hbaseRootDir
, PathFilter sfFilter
,
1514 ExecutorService executor
, ProgressReporter progressReporter
)
1515 throws IOException
, InterruptedException
{
1516 ConcurrentHashMap
<String
, Path
> map
= new ConcurrentHashMap
<>(1024, 0.75f
, 32);
1518 // if this method looks similar to 'getTableFragmentation' that is because
1519 // it was borrowed from it.
1521 // only include the directory paths to tables
1522 for (Path tableDir
: FSUtils
.getTableDirs(fs
, hbaseRootDir
)) {
1523 getTableStoreFilePathMap(map
, fs
, hbaseRootDir
, CommonFSUtils
.getTableName(tableDir
),
1524 sfFilter
, executor
, progressReporter
);
1530 * Filters FileStatuses in an array and returns a list
1532 * @param input An array of FileStatuses
1533 * @param filter A required filter to filter the array
1534 * @return A list of FileStatuses
1536 public static List
<FileStatus
> filterFileStatuses(FileStatus
[] input
,
1537 FileStatusFilter filter
) {
1538 if (input
== null) return null;
1539 return filterFileStatuses(Iterators
.forArray(input
), filter
);
1543 * Filters FileStatuses in an iterator and returns a list
1545 * @param input An iterator of FileStatuses
1546 * @param filter A required filter to filter the array
1547 * @return A list of FileStatuses
1549 public static List
<FileStatus
> filterFileStatuses(Iterator
<FileStatus
> input
,
1550 FileStatusFilter filter
) {
1551 if (input
== null) return null;
1552 ArrayList
<FileStatus
> results
= new ArrayList
<>();
1553 while (input
.hasNext()) {
1554 FileStatus f
= input
.next();
1555 if (filter
.accept(f
)) {
1563 * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
1564 * This accommodates differences between hadoop versions, where hadoop 1
1565 * does not throw a FileNotFoundException, and return an empty FileStatus[]
1566 * while Hadoop 2 will throw FileNotFoundException.
1568 * @param fs file system
1569 * @param dir directory
1570 * @param filter file status filter
1571 * @return null if dir is empty or doesn't exist, otherwise FileStatus list
1573 public static List
<FileStatus
> listStatusWithStatusFilter(final FileSystem fs
,
1574 final Path dir
, final FileStatusFilter filter
) throws IOException
{
1575 FileStatus
[] status
= null;
1577 status
= fs
.listStatus(dir
);
1578 } catch (FileNotFoundException fnfe
) {
1579 LOG
.trace("{} does not exist", dir
);
1583 if (ArrayUtils
.getLength(status
) == 0) {
1587 if (filter
== null) {
1588 return Arrays
.asList(status
);
1590 List
<FileStatus
> status2
= filterFileStatuses(status
, filter
);
1591 if (status2
== null || status2
.isEmpty()) {
1600 * This function is to scan the root path of the file system to get the
1601 * degree of locality for each region on each of the servers having at least
1602 * one block of that region.
1603 * This is used by the tool {@link org.apache.hadoop.hbase.master.RegionPlacementMaintainer}
1606 * the configuration to use
1607 * @return the mapping from region encoded name to a map of server names to
1609 * @throws IOException
1610 * in case of file system errors or interrupts
1612 public static Map
<String
, Map
<String
, Float
>> getRegionDegreeLocalityMappingFromFS(
1613 final Configuration conf
) throws IOException
{
1614 return getRegionDegreeLocalityMappingFromFS(
1616 conf
.getInt(THREAD_POOLSIZE
, DEFAULT_THREAD_POOLSIZE
));
1621 * This function is to scan the root path of the file system to get the
1622 * degree of locality for each region on each of the servers having at least
1623 * one block of that region.
1626 * the configuration to use
1627 * @param desiredTable
1628 * the table you wish to scan locality for
1629 * @param threadPoolSize
1630 * the thread pool size to use
1631 * @return the mapping from region encoded name to a map of server names to
1633 * @throws IOException
1634 * in case of file system errors or interrupts
1636 public static Map
<String
, Map
<String
, Float
>> getRegionDegreeLocalityMappingFromFS(
1637 final Configuration conf
, final String desiredTable
, int threadPoolSize
)
1638 throws IOException
{
1639 Map
<String
, Map
<String
, Float
>> regionDegreeLocalityMapping
= new ConcurrentHashMap
<>();
1640 getRegionLocalityMappingFromFS(conf
, desiredTable
, threadPoolSize
, regionDegreeLocalityMapping
);
1641 return regionDegreeLocalityMapping
;
1645 * This function is to scan the root path of the file system to get either the
1646 * mapping between the region name and its best locality region server or the
1647 * degree of locality of each region on each of the servers having at least
1648 * one block of that region. The output map parameters are both optional.
1651 * the configuration to use
1652 * @param desiredTable
1653 * the table you wish to scan locality for
1654 * @param threadPoolSize
1655 * the thread pool size to use
1656 * @param regionDegreeLocalityMapping
1657 * the map into which to put the locality degree mapping or null,
1658 * must be a thread-safe implementation
1659 * @throws IOException
1660 * in case of file system errors or interrupts
1662 private static void getRegionLocalityMappingFromFS(final Configuration conf
,
1663 final String desiredTable
, int threadPoolSize
,
1664 final Map
<String
, Map
<String
, Float
>> regionDegreeLocalityMapping
) throws IOException
{
1665 final FileSystem fs
= FileSystem
.get(conf
);
1666 final Path rootPath
= CommonFSUtils
.getRootDir(conf
);
1667 final long startTime
= EnvironmentEdgeManager
.currentTime();
1668 final Path queryPath
;
1669 // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/*
1670 if (null == desiredTable
) {
1672 new Path(new Path(rootPath
, HConstants
.BASE_NAMESPACE_DIR
).toString() + "/*/*/*/");
1674 queryPath
= new Path(
1675 CommonFSUtils
.getTableDir(rootPath
, TableName
.valueOf(desiredTable
)).toString() + "/*/");
1678 // reject all paths that are not appropriate
1679 PathFilter pathFilter
= new PathFilter() {
1681 public boolean accept(Path path
) {
1682 // this is the region name; it may get some noise data
1688 Path parent
= path
.getParent();
1689 if (null == parent
) {
1693 String regionName
= path
.getName();
1694 if (null == regionName
) {
1698 if (!regionName
.toLowerCase(Locale
.ROOT
).matches("[0-9a-f]+")) {
1705 FileStatus
[] statusList
= fs
.globStatus(queryPath
, pathFilter
);
1707 if (LOG
.isDebugEnabled()) {
1708 LOG
.debug("Query Path: {} ; # list of files: {}", queryPath
, Arrays
.toString(statusList
));
1711 if (null == statusList
) {
1715 // lower the number of threads in case we have very few expected regions
1716 threadPoolSize
= Math
.min(threadPoolSize
, statusList
.length
);
1718 // run in multiple threads
1719 final ExecutorService tpe
= Executors
.newFixedThreadPool(threadPoolSize
,
1720 new ThreadFactoryBuilder().setNameFormat("FSRegionQuery-pool-%d").setDaemon(true)
1721 .setUncaughtExceptionHandler(Threads
.LOGGING_EXCEPTION_HANDLER
).build());
1723 // ignore all file status items that are not of interest
1724 for (FileStatus regionStatus
: statusList
) {
1725 if (null == regionStatus
|| !regionStatus
.isDirectory()) {
1729 final Path regionPath
= regionStatus
.getPath();
1730 if (null != regionPath
) {
1731 tpe
.execute(new FSRegionScanner(fs
, regionPath
, null, regionDegreeLocalityMapping
));
1736 final long threadWakeFrequency
= (long) conf
.getInt(HConstants
.THREAD_WAKE_FREQUENCY
,
1737 HConstants
.DEFAULT_THREAD_WAKE_FREQUENCY
);
1739 // here we wait until TPE terminates, which is either naturally or by
1740 // exceptions in the execution of the threads
1741 while (!tpe
.awaitTermination(threadWakeFrequency
,
1742 TimeUnit
.MILLISECONDS
)) {
1743 // printing out rough estimate, so as to not introduce
1745 LOG
.info("Locality checking is underway: { Scanned Regions : "
1746 + ((ThreadPoolExecutor
) tpe
).getCompletedTaskCount() + "/"
1747 + ((ThreadPoolExecutor
) tpe
).getTaskCount() + " }");
1749 } catch (InterruptedException e
) {
1750 Thread
.currentThread().interrupt();
1751 throw (InterruptedIOException
) new InterruptedIOException().initCause(e
);
1755 long overhead
= EnvironmentEdgeManager
.currentTime() - startTime
;
1756 LOG
.info("Scan DFS for locality info takes {}ms", overhead
);
1760 * Do our short circuit read setup.
1761 * Checks buffer size to use and whether to do checksumming in hbase or hdfs.
1764 public static void setupShortCircuitRead(final Configuration conf
) {
1765 // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property.
1766 boolean shortCircuitSkipChecksum
=
1767 conf
.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
1768 boolean useHBaseChecksum
= conf
.getBoolean(HConstants
.HBASE_CHECKSUM_VERIFICATION
, true);
1769 if (shortCircuitSkipChecksum
) {
1770 LOG
.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " +
1771 "be set to true." + (useHBaseChecksum ?
" HBase checksum doesn't require " +
1772 "it, see https://issues.apache.org/jira/browse/HBASE-6868." : ""));
1773 assert !shortCircuitSkipChecksum
; //this will fail if assertions are on
1775 checkShortCircuitReadBufferSize(conf
);
1779 * Check if short circuit read buffer size is set and if not, set it to hbase value.
1782 public static void checkShortCircuitReadBufferSize(final Configuration conf
) {
1783 final int defaultSize
= HConstants
.DEFAULT_BLOCKSIZE
* 2;
1784 final int notSet
= -1;
1785 // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
1786 final String dfsKey
= "dfs.client.read.shortcircuit.buffer.size";
1787 int size
= conf
.getInt(dfsKey
, notSet
);
1788 // If a size is set, return -- we will use it.
1789 if (size
!= notSet
) return;
1790 // But short circuit buffer size is normally not set. Put in place the hbase wanted size.
1791 int hbaseSize
= conf
.getInt("hbase." + dfsKey
, defaultSize
);
1792 conf
.setIfUnset(dfsKey
, Integer
.toString(hbaseSize
));
1797 * @return The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs.
1798 * @throws IOException
1800 public static DFSHedgedReadMetrics
getDFSHedgedReadMetrics(final Configuration c
)
1801 throws IOException
{
1802 if (!CommonFSUtils
.isHDFS(c
)) {
1805 // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal
1806 // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it
1807 // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients.
1808 final String name
= "getHedgedReadMetrics";
1809 DFSClient dfsclient
= ((DistributedFileSystem
)FileSystem
.get(c
)).getClient();
1812 m
= dfsclient
.getClass().getDeclaredMethod(name
);
1813 } catch (NoSuchMethodException e
) {
1814 LOG
.warn("Failed find method " + name
+ " in dfsclient; no hedged read metrics: " +
1817 } catch (SecurityException e
) {
1818 LOG
.warn("Failed find method " + name
+ " in dfsclient; no hedged read metrics: " +
1822 m
.setAccessible(true);
1824 return (DFSHedgedReadMetrics
)m
.invoke(dfsclient
);
1825 } catch (IllegalAccessException
| IllegalArgumentException
| InvocationTargetException e
) {
1826 LOG
.warn("Failed invoking method " + name
+ " on dfsclient; no hedged read metrics: " +
1832 public static List
<Path
> copyFilesParallel(FileSystem srcFS
, Path src
, FileSystem dstFS
, Path dst
,
1833 Configuration conf
, int threads
) throws IOException
{
1834 ExecutorService pool
= Executors
.newFixedThreadPool(threads
);
1835 List
<Future
<Void
>> futures
= new ArrayList
<>();
1836 List
<Path
> traversedPaths
;
1838 traversedPaths
= copyFiles(srcFS
, src
, dstFS
, dst
, conf
, pool
, futures
);
1839 for (Future
<Void
> future
: futures
) {
1842 } catch (ExecutionException
| InterruptedException
| IOException e
) {
1843 throw new IOException("Copy snapshot reference files failed", e
);
1847 return traversedPaths
;
1850 private static List
<Path
> copyFiles(FileSystem srcFS
, Path src
, FileSystem dstFS
, Path dst
,
1851 Configuration conf
, ExecutorService pool
, List
<Future
<Void
>> futures
) throws IOException
{
1852 List
<Path
> traversedPaths
= new ArrayList
<>();
1853 traversedPaths
.add(dst
);
1854 FileStatus currentFileStatus
= srcFS
.getFileStatus(src
);
1855 if (currentFileStatus
.isDirectory()) {
1856 if (!dstFS
.mkdirs(dst
)) {
1857 throw new IOException("Create directory failed: " + dst
);
1859 FileStatus
[] subPaths
= srcFS
.listStatus(src
);
1860 for (FileStatus subPath
: subPaths
) {
1861 traversedPaths
.addAll(copyFiles(srcFS
, subPath
.getPath(), dstFS
,
1862 new Path(dst
, subPath
.getPath().getName()), conf
, pool
, futures
));
1865 Future
<Void
> future
= pool
.submit(() -> {
1866 FileUtil
.copy(srcFS
, src
, dstFS
, dst
, false, false, conf
);
1869 futures
.add(future
);
1871 return traversedPaths
;
1875 * @return A set containing all namenode addresses of fs
1877 private static Set
<InetSocketAddress
> getNNAddresses(DistributedFileSystem fs
,
1878 Configuration conf
) {
1879 Set
<InetSocketAddress
> addresses
= new HashSet
<>();
1880 String serviceName
= fs
.getCanonicalServiceName();
1882 if (serviceName
.startsWith("ha-hdfs")) {
1884 Map
<String
, Map
<String
, InetSocketAddress
>> addressMap
=
1885 DFSUtil
.getNNServiceRpcAddressesForCluster(conf
);
1886 String nameService
= serviceName
.substring(serviceName
.indexOf(":") + 1);
1887 if (addressMap
.containsKey(nameService
)) {
1888 Map
<String
, InetSocketAddress
> nnMap
= addressMap
.get(nameService
);
1889 for (Map
.Entry
<String
, InetSocketAddress
> e2
: nnMap
.entrySet()) {
1890 InetSocketAddress addr
= e2
.getValue();
1891 addresses
.add(addr
);
1894 } catch (Exception e
) {
1895 LOG
.warn("DFSUtil.getNNServiceRpcAddresses failed. serviceName=" + serviceName
, e
);
1898 URI uri
= fs
.getUri();
1899 int port
= uri
.getPort();
1901 int idx
= serviceName
.indexOf(':');
1902 port
= Integer
.parseInt(serviceName
.substring(idx
+ 1));
1904 InetSocketAddress addr
= new InetSocketAddress(uri
.getHost(), port
);
1905 addresses
.add(addr
);
1912 * @param conf the Configuration of HBase
1913 * @return Whether srcFs and desFs are on same hdfs or not
1915 public static boolean isSameHdfs(Configuration conf
, FileSystem srcFs
, FileSystem desFs
) {
1916 // By getCanonicalServiceName, we could make sure both srcFs and desFs
1917 // show a unified format which contains scheme, host and port.
1918 String srcServiceName
= srcFs
.getCanonicalServiceName();
1919 String desServiceName
= desFs
.getCanonicalServiceName();
1921 if (srcServiceName
== null || desServiceName
== null) {
1924 if (srcServiceName
.equals(desServiceName
)) {
1927 if (srcServiceName
.startsWith("ha-hdfs") && desServiceName
.startsWith("ha-hdfs")) {
1928 Collection
<String
> internalNameServices
=
1929 conf
.getTrimmedStringCollection("dfs.internal.nameservices");
1930 if (!internalNameServices
.isEmpty()) {
1931 if (internalNameServices
.contains(srcServiceName
.split(":")[1])) {
1938 if (srcFs
instanceof DistributedFileSystem
&& desFs
instanceof DistributedFileSystem
) {
1939 // If one serviceName is an HA format while the other is a non-HA format,
1940 // maybe they refer to the same FileSystem.
1941 // For example, srcFs is "ha-hdfs://nameservices" and desFs is "hdfs://activeNamenode:port"
1942 Set
<InetSocketAddress
> srcAddrs
= getNNAddresses((DistributedFileSystem
) srcFs
, conf
);
1943 Set
<InetSocketAddress
> desAddrs
= getNNAddresses((DistributedFileSystem
) desFs
, conf
);
1944 if (Sets
.intersection(srcAddrs
, desAddrs
).size() > 0) {