3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
.util
;
21 import edu
.umd
.cs
.findbugs
.annotations
.CheckForNull
;
22 import java
.io
.ByteArrayInputStream
;
23 import java
.io
.DataInputStream
;
24 import java
.io
.EOFException
;
25 import java
.io
.FileNotFoundException
;
26 import java
.io
.IOException
;
27 import java
.io
.InterruptedIOException
;
28 import java
.lang
.reflect
.InvocationTargetException
;
29 import java
.lang
.reflect
.Method
;
30 import java
.net
.InetSocketAddress
;
31 import java
.util
.ArrayList
;
32 import java
.util
.Arrays
;
33 import java
.util
.Collections
;
34 import java
.util
.HashMap
;
35 import java
.util
.Iterator
;
36 import java
.util
.List
;
37 import java
.util
.Locale
;
39 import java
.util
.Vector
;
40 import java
.util
.concurrent
.ConcurrentHashMap
;
41 import java
.util
.concurrent
.ExecutionException
;
42 import java
.util
.concurrent
.ExecutorService
;
43 import java
.util
.concurrent
.Executors
;
44 import java
.util
.concurrent
.Future
;
45 import java
.util
.concurrent
.FutureTask
;
46 import java
.util
.concurrent
.ThreadPoolExecutor
;
47 import java
.util
.concurrent
.TimeUnit
;
48 import java
.util
.regex
.Pattern
;
50 import org
.apache
.commons
.lang3
.ArrayUtils
;
51 import org
.apache
.hadoop
.conf
.Configuration
;
52 import org
.apache
.hadoop
.fs
.BlockLocation
;
53 import org
.apache
.hadoop
.fs
.FSDataInputStream
;
54 import org
.apache
.hadoop
.fs
.FSDataOutputStream
;
55 import org
.apache
.hadoop
.fs
.FileStatus
;
56 import org
.apache
.hadoop
.fs
.FileSystem
;
57 import org
.apache
.hadoop
.fs
.FileUtil
;
58 import org
.apache
.hadoop
.fs
.Path
;
59 import org
.apache
.hadoop
.fs
.PathFilter
;
60 import org
.apache
.hadoop
.fs
.permission
.FsAction
;
61 import org
.apache
.hadoop
.fs
.permission
.FsPermission
;
62 import org
.apache
.hadoop
.hbase
.ClusterId
;
63 import org
.apache
.hadoop
.hbase
.HColumnDescriptor
;
64 import org
.apache
.hadoop
.hbase
.HConstants
;
65 import org
.apache
.hadoop
.hbase
.HDFSBlocksDistribution
;
66 import org
.apache
.hadoop
.hbase
.HRegionInfo
;
67 import org
.apache
.hadoop
.hbase
.TableName
;
68 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
69 import org
.apache
.hadoop
.hbase
.client
.RegionInfoBuilder
;
70 import org
.apache
.hadoop
.hbase
.exceptions
.DeserializationException
;
71 import org
.apache
.hadoop
.hbase
.fs
.HFileSystem
;
72 import org
.apache
.hadoop
.hbase
.io
.HFileLink
;
73 import org
.apache
.hadoop
.hbase
.master
.HMaster
;
74 import org
.apache
.hadoop
.hbase
.regionserver
.StoreFileInfo
;
75 import org
.apache
.hadoop
.hbase
.security
.AccessDeniedException
;
76 import org
.apache
.hadoop
.hdfs
.DFSClient
;
77 import org
.apache
.hadoop
.hdfs
.DFSHedgedReadMetrics
;
78 import org
.apache
.hadoop
.hdfs
.DistributedFileSystem
;
79 import org
.apache
.hadoop
.hdfs
.protocol
.HdfsConstants
;
80 import org
.apache
.hadoop
.io
.IOUtils
;
81 import org
.apache
.hadoop
.ipc
.RemoteException
;
82 import org
.apache
.hadoop
.security
.UserGroupInformation
;
83 import org
.apache
.hadoop
.util
.Progressable
;
84 import org
.apache
.hadoop
.util
.ReflectionUtils
;
85 import org
.apache
.hadoop
.util
.StringUtils
;
86 import org
.apache
.yetus
.audience
.InterfaceAudience
;
87 import org
.slf4j
.Logger
;
88 import org
.slf4j
.LoggerFactory
;
90 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.annotations
.VisibleForTesting
;
91 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.base
.Throwables
;
92 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.Iterators
;
93 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.primitives
.Ints
;
95 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ProtobufUtil
;
96 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.FSProtos
;
99 * Utility methods for interacting with the underlying file system.
101 @InterfaceAudience.Private
102 public abstract class FSUtils
extends CommonFSUtils
{
103 private static final Logger LOG
= LoggerFactory
.getLogger(FSUtils
.class);
105 private static final String THREAD_POOLSIZE
= "hbase.client.localityCheck.threadPoolSize";
106 private static final int DEFAULT_THREAD_POOLSIZE
= 2;
108 /** Set to true on Windows platforms */
109 @VisibleForTesting // currently only used in testing. TODO refactor into a test class
110 public static final boolean WINDOWS
= System
.getProperty("os.name").startsWith("Windows");
112 protected FSUtils() {
117 * @return True is <code>fs</code> is instance of DistributedFileSystem
118 * @throws IOException
120 public static boolean isDistributedFileSystem(final FileSystem fs
) throws IOException
{
121 FileSystem fileSystem
= fs
;
122 // If passed an instance of HFileSystem, it fails instanceof DistributedFileSystem.
123 // Check its backing fs for dfs-ness.
124 if (fs
instanceof HFileSystem
) {
125 fileSystem
= ((HFileSystem
)fs
).getBackingFs();
127 return fileSystem
instanceof DistributedFileSystem
;
131 * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
132 * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider
133 * schema; i.e. if schemas different but path or subpath matches, the two will equate.
134 * @param pathToSearch Path we will be trying to match.
136 * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
138 public static boolean isMatchingTail(final Path pathToSearch
, final Path pathTail
) {
139 Path tailPath
= pathTail
;
141 Path toSearch
= pathToSearch
;
143 boolean result
= false;
145 if (pathToSearch
.depth() != pathTail
.depth()) {
150 tailName
= tailPath
.getName();
151 if (tailName
== null || tailName
.isEmpty()) {
155 toSearchName
= toSearch
.getName();
156 if (toSearchName
== null || toSearchName
.isEmpty()) {
159 // Move up a parent on each path for next go around. Path doesn't let us go off the end.
160 tailPath
= tailPath
.getParent();
161 toSearch
= toSearch
.getParent();
162 } while(tailName
.equals(toSearchName
));
166 public static FSUtils
getInstance(FileSystem fs
, Configuration conf
) {
167 String scheme
= fs
.getUri().getScheme();
168 if (scheme
== null) {
169 LOG
.warn("Could not find scheme for uri " +
170 fs
.getUri() + ", default to hdfs");
173 Class
<?
> fsUtilsClass
= conf
.getClass("hbase.fsutil." +
174 scheme
+ ".impl", FSHDFSUtils
.class); // Default to HDFS impl
175 FSUtils fsUtils
= (FSUtils
)ReflectionUtils
.newInstance(fsUtilsClass
, conf
);
180 * Delete the region directory if exists.
183 * @return True if deleted the region directory.
184 * @throws IOException
186 public static boolean deleteRegionDir(final Configuration conf
, final HRegionInfo hri
)
188 Path rootDir
= getRootDir(conf
);
189 FileSystem fs
= rootDir
.getFileSystem(conf
);
190 return deleteDirectory(fs
,
191 new Path(getTableDir(rootDir
, hri
.getTable()), hri
.getEncodedName()));
195 * Create the specified file on the filesystem. By default, this will:
197 * <li>overwrite the file if it exists</li>
198 * <li>apply the umask in the configuration (if it is enabled)</li>
199 * <li>use the fs configured buffer size (or 4096 if not set)</li>
200 * <li>use the configured column family replication or default replication if
201 * {@link HColumnDescriptor#DEFAULT_DFS_REPLICATION}</li>
202 * <li>use the default block size</li>
203 * <li>not track progress</li>
205 * @param conf configurations
206 * @param fs {@link FileSystem} on which to write the file
207 * @param path {@link Path} to the file to write
208 * @param perm permissions
209 * @param favoredNodes
210 * @return output stream to the created file
211 * @throws IOException if the file cannot be created
213 public static FSDataOutputStream
create(Configuration conf
, FileSystem fs
, Path path
,
214 FsPermission perm
, InetSocketAddress
[] favoredNodes
) throws IOException
{
215 if (fs
instanceof HFileSystem
) {
216 FileSystem backingFs
= ((HFileSystem
)fs
).getBackingFs();
217 if (backingFs
instanceof DistributedFileSystem
) {
218 // Try to use the favoredNodes version via reflection to allow backwards-
220 short replication
= Short
.parseShort(conf
.get(HColumnDescriptor
.DFS_REPLICATION
,
221 String
.valueOf(HColumnDescriptor
.DEFAULT_DFS_REPLICATION
)));
223 return (FSDataOutputStream
) (DistributedFileSystem
.class.getDeclaredMethod("create",
224 Path
.class, FsPermission
.class, boolean.class, int.class, short.class, long.class,
225 Progressable
.class, InetSocketAddress
[].class).invoke(backingFs
, path
, perm
, true,
226 getDefaultBufferSize(backingFs
),
227 replication
> 0 ? replication
: getDefaultReplication(backingFs
, path
),
228 getDefaultBlockSize(backingFs
, path
), null, favoredNodes
));
229 } catch (InvocationTargetException ite
) {
230 // Function was properly called, but threw it's own exception.
231 throw new IOException(ite
.getCause());
232 } catch (NoSuchMethodException e
) {
233 LOG
.debug("DFS Client does not support most favored nodes create; using default create");
234 LOG
.trace("Ignoring; use default create", e
);
235 } catch (IllegalArgumentException
| SecurityException
| IllegalAccessException e
) {
236 LOG
.debug("Ignoring (most likely Reflection related exception) " + e
);
240 return create(fs
, path
, perm
, true);
244 * Checks to see if the specified file system is available
246 * @param fs filesystem
247 * @throws IOException e
249 public static void checkFileSystemAvailable(final FileSystem fs
)
251 if (!(fs
instanceof DistributedFileSystem
)) {
254 IOException exception
= null;
255 DistributedFileSystem dfs
= (DistributedFileSystem
) fs
;
257 if (dfs
.exists(new Path("/"))) {
260 } catch (IOException e
) {
261 exception
= e
instanceof RemoteException ?
262 ((RemoteException
)e
).unwrapRemoteException() : e
;
266 } catch (Exception e
) {
267 LOG
.error("file system close failed: ", e
);
269 throw new IOException("File system is not available", exception
);
273 * We use reflection because {@link DistributedFileSystem#setSafeMode(
274 * HdfsConstants.SafeModeAction action, boolean isChecked)} is not in hadoop 1.1
277 * @return whether we're in safe mode
278 * @throws IOException
280 private static boolean isInSafeMode(DistributedFileSystem dfs
) throws IOException
{
281 boolean inSafeMode
= false;
283 Method m
= DistributedFileSystem
.class.getMethod("setSafeMode", new Class
<?
> []{
284 org
.apache
.hadoop
.hdfs
.protocol
.HdfsConstants
.SafeModeAction
.class, boolean.class});
285 inSafeMode
= (Boolean
) m
.invoke(dfs
,
286 org
.apache
.hadoop
.hdfs
.protocol
.HdfsConstants
.SafeModeAction
.SAFEMODE_GET
, true);
287 } catch (Exception e
) {
288 if (e
instanceof IOException
) throw (IOException
) e
;
290 // Check whether dfs is on safemode.
291 inSafeMode
= dfs
.setSafeMode(
292 org
.apache
.hadoop
.hdfs
.protocol
.HdfsConstants
.SafeModeAction
.SAFEMODE_GET
);
298 * Check whether dfs is in safemode.
300 * @throws IOException
302 public static void checkDfsSafeMode(final Configuration conf
)
304 boolean isInSafeMode
= false;
305 FileSystem fs
= FileSystem
.get(conf
);
306 if (fs
instanceof DistributedFileSystem
) {
307 DistributedFileSystem dfs
= (DistributedFileSystem
)fs
;
308 isInSafeMode
= isInSafeMode(dfs
);
311 throw new IOException("File system is in safemode, it can't be written now");
316 * Verifies current version of file system
318 * @param fs filesystem object
319 * @param rootdir root hbase directory
320 * @return null if no version file exists, version string otherwise
321 * @throws IOException if the version file fails to open
322 * @throws DeserializationException if the version data cannot be translated into a version
324 public static String
getVersion(FileSystem fs
, Path rootdir
)
325 throws IOException
, DeserializationException
{
326 final Path versionFile
= new Path(rootdir
, HConstants
.VERSION_FILE_NAME
);
327 FileStatus
[] status
= null;
329 // hadoop 2.0 throws FNFE if directory does not exist.
330 // hadoop 1.0 returns null if directory does not exist.
331 status
= fs
.listStatus(versionFile
);
332 } catch (FileNotFoundException fnfe
) {
335 if (ArrayUtils
.getLength(status
) == 0) {
338 String version
= null;
339 byte [] content
= new byte [(int)status
[0].getLen()];
340 FSDataInputStream s
= fs
.open(versionFile
);
342 IOUtils
.readFully(s
, content
, 0, content
.length
);
343 if (ProtobufUtil
.isPBMagicPrefix(content
)) {
344 version
= parseVersionFrom(content
);
346 // Presume it pre-pb format.
347 try (DataInputStream dis
= new DataInputStream(new ByteArrayInputStream(content
))) {
348 version
= dis
.readUTF();
351 } catch (EOFException eof
) {
352 LOG
.warn("Version file was empty, odd, will try to set it.");
360 * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file.
361 * @param bytes The byte content of the hbase.version file
362 * @return The version found in the file as a String
363 * @throws DeserializationException if the version data cannot be translated into a version
365 static String
parseVersionFrom(final byte [] bytes
)
366 throws DeserializationException
{
367 ProtobufUtil
.expectPBMagicPrefix(bytes
);
368 int pblen
= ProtobufUtil
.lengthOfPBMagic();
369 FSProtos
.HBaseVersionFileContent
.Builder builder
=
370 FSProtos
.HBaseVersionFileContent
.newBuilder();
372 ProtobufUtil
.mergeFrom(builder
, bytes
, pblen
, bytes
.length
- pblen
);
373 return builder
.getVersion();
374 } catch (IOException e
) {
376 throw new DeserializationException(e
);
381 * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file.
382 * @param version Version to persist
383 * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a prefix.
385 static byte [] toVersionByteArray(final String version
) {
386 FSProtos
.HBaseVersionFileContent
.Builder builder
=
387 FSProtos
.HBaseVersionFileContent
.newBuilder();
388 return ProtobufUtil
.prependPBMagic(builder
.setVersion(version
).build().toByteArray());
392 * Verifies current version of file system
394 * @param fs file system
395 * @param rootdir root directory of HBase installation
396 * @param message if true, issues a message on System.out
397 * @throws IOException if the version file cannot be opened
398 * @throws DeserializationException if the contents of the version file cannot be parsed
400 public static void checkVersion(FileSystem fs
, Path rootdir
, boolean message
)
401 throws IOException
, DeserializationException
{
402 checkVersion(fs
, rootdir
, message
, 0, HConstants
.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS
);
406 * Verifies current version of file system
408 * @param fs file system
409 * @param rootdir root directory of HBase installation
410 * @param message if true, issues a message on System.out
411 * @param wait wait interval
412 * @param retries number of times to retry
414 * @throws IOException if the version file cannot be opened
415 * @throws DeserializationException if the contents of the version file cannot be parsed
417 public static void checkVersion(FileSystem fs
, Path rootdir
,
418 boolean message
, int wait
, int retries
)
419 throws IOException
, DeserializationException
{
420 String version
= getVersion(fs
, rootdir
);
422 if (version
== null) {
423 if (!metaRegionExists(fs
, rootdir
)) {
424 // rootDir is empty (no version file and no root region)
425 // just create new version file (HBASE-1195)
426 setVersion(fs
, rootdir
, wait
, retries
);
429 msg
= "hbase.version file is missing. Is your hbase.rootdir valid? " +
430 "You can restore hbase.version file by running 'HBCK2 filesystem -fix'. " +
431 "See https://github.com/apache/hbase-operator-tools/tree/master/hbase-hbck2";
433 } else if (version
.compareTo(HConstants
.FILE_SYSTEM_VERSION
) == 0) {
436 msg
= "HBase file layout needs to be upgraded. Current filesystem version is " + version
+
437 " but software requires version " + HConstants
.FILE_SYSTEM_VERSION
+
438 ". Consult http://hbase.apache.org/book.html for further information about " +
442 // version is deprecated require migration
443 // Output on stdout so user sees it in terminal.
445 System
.out
.println("WARNING! " + msg
);
447 throw new FileSystemVersionException(msg
);
451 * Sets version of file system
453 * @param fs filesystem object
454 * @param rootdir hbase root
455 * @throws IOException e
457 public static void setVersion(FileSystem fs
, Path rootdir
)
459 setVersion(fs
, rootdir
, HConstants
.FILE_SYSTEM_VERSION
, 0,
460 HConstants
.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS
);
464 * Sets version of file system
466 * @param fs filesystem object
467 * @param rootdir hbase root
468 * @param wait time to wait for retry
469 * @param retries number of times to retry before failing
470 * @throws IOException e
472 public static void setVersion(FileSystem fs
, Path rootdir
, int wait
, int retries
)
474 setVersion(fs
, rootdir
, HConstants
.FILE_SYSTEM_VERSION
, wait
, retries
);
479 * Sets version of file system
481 * @param fs filesystem object
482 * @param rootdir hbase root directory
483 * @param version version to set
484 * @param wait time to wait for retry
485 * @param retries number of times to retry before throwing an IOException
486 * @throws IOException e
488 public static void setVersion(FileSystem fs
, Path rootdir
, String version
,
489 int wait
, int retries
) throws IOException
{
490 Path versionFile
= new Path(rootdir
, HConstants
.VERSION_FILE_NAME
);
491 Path tempVersionFile
= new Path(rootdir
, HConstants
.HBASE_TEMP_DIRECTORY
+ Path
.SEPARATOR
+
492 HConstants
.VERSION_FILE_NAME
);
495 // Write the version to a temporary file
496 FSDataOutputStream s
= fs
.create(tempVersionFile
);
498 s
.write(toVersionByteArray(version
));
501 // Move the temp version file to its normal location. Returns false
502 // if the rename failed. Throw an IOE in that case.
503 if (!fs
.rename(tempVersionFile
, versionFile
)) {
504 throw new IOException("Unable to move temp version file to " + versionFile
);
507 // Cleaning up the temporary if the rename failed would be trying
508 // too hard. We'll unconditionally create it again the next time
509 // through anyway, files are overwritten by default by create().
511 // Attempt to close the stream on the way out if it is still open.
513 if (s
!= null) s
.close();
514 } catch (IOException ignore
) { }
516 LOG
.info("Created version file at " + rootdir
.toString() + " with version=" + version
);
518 } catch (IOException e
) {
520 LOG
.debug("Unable to create version file at " + rootdir
.toString() + ", retrying", e
);
521 fs
.delete(versionFile
, false);
526 } catch (InterruptedException ie
) {
527 throw (InterruptedIOException
)new InterruptedIOException().initCause(ie
);
538 * Checks that a cluster ID file exists in the HBase root directory
539 * @param fs the root directory FileSystem
540 * @param rootdir the HBase root directory in HDFS
541 * @param wait how long to wait between retries
542 * @return <code>true</code> if the file exists, otherwise <code>false</code>
543 * @throws IOException if checking the FileSystem fails
545 public static boolean checkClusterIdExists(FileSystem fs
, Path rootdir
,
546 long wait
) throws IOException
{
549 Path filePath
= new Path(rootdir
, HConstants
.CLUSTER_ID_FILE_NAME
);
550 return fs
.exists(filePath
);
551 } catch (IOException ioe
) {
553 LOG
.warn("Unable to check cluster ID file in {}, retrying in {}ms", rootdir
, wait
, ioe
);
556 } catch (InterruptedException e
) {
557 Thread
.currentThread().interrupt();
558 throw (InterruptedIOException
) new InterruptedIOException().initCause(e
);
568 * Returns the value of the unique cluster ID stored for this HBase instance.
569 * @param fs the root directory FileSystem
570 * @param rootdir the path to the HBase root directory
571 * @return the unique cluster identifier
572 * @throws IOException if reading the cluster ID file fails
574 public static ClusterId
getClusterId(FileSystem fs
, Path rootdir
)
576 Path idPath
= new Path(rootdir
, HConstants
.CLUSTER_ID_FILE_NAME
);
577 ClusterId clusterId
= null;
578 FileStatus status
= fs
.exists(idPath
)? fs
.getFileStatus(idPath
): null;
579 if (status
!= null) {
580 int len
= Ints
.checkedCast(status
.getLen());
581 byte [] content
= new byte[len
];
582 FSDataInputStream in
= fs
.open(idPath
);
584 in
.readFully(content
);
585 } catch (EOFException eof
) {
586 LOG
.warn("Cluster ID file {} is empty", idPath
);
591 clusterId
= ClusterId
.parseFrom(content
);
592 } catch (DeserializationException e
) {
593 throw new IOException("content=" + Bytes
.toString(content
), e
);
595 // If not pb'd, make it so.
596 if (!ProtobufUtil
.isPBMagicPrefix(content
)) {
598 in
= fs
.open(idPath
);
601 clusterId
= new ClusterId(cid
);
602 } catch (EOFException eof
) {
603 LOG
.warn("Cluster ID file {} is empty", idPath
);
607 rewriteAsPb(fs
, rootdir
, idPath
, clusterId
);
611 LOG
.warn("Cluster ID file does not exist at {}", idPath
);
618 * @throws IOException
620 private static void rewriteAsPb(final FileSystem fs
, final Path rootdir
, final Path p
,
623 // Rewrite the file as pb. Move aside the old one first, write new
624 // then delete the moved-aside file.
625 Path movedAsideName
= new Path(p
+ "." + System
.currentTimeMillis());
626 if (!fs
.rename(p
, movedAsideName
)) throw new IOException("Failed rename of " + p
);
627 setClusterId(fs
, rootdir
, cid
, 100);
628 if (!fs
.delete(movedAsideName
, false)) {
629 throw new IOException("Failed delete of " + movedAsideName
);
631 LOG
.debug("Rewrote the hbase.id file as pb");
635 * Writes a new unique identifier for this cluster to the "hbase.id" file
636 * in the HBase root directory
637 * @param fs the root directory FileSystem
638 * @param rootdir the path to the HBase root directory
639 * @param clusterId the unique identifier to store
640 * @param wait how long (in milliseconds) to wait between retries
641 * @throws IOException if writing to the FileSystem fails and no wait value
643 public static void setClusterId(FileSystem fs
, Path rootdir
, ClusterId clusterId
,
644 int wait
) throws IOException
{
647 Path idFile
= new Path(rootdir
, HConstants
.CLUSTER_ID_FILE_NAME
);
648 Path tempIdFile
= new Path(rootdir
, HConstants
.HBASE_TEMP_DIRECTORY
+
649 Path
.SEPARATOR
+ HConstants
.CLUSTER_ID_FILE_NAME
);
650 // Write the id file to a temporary location
651 FSDataOutputStream s
= fs
.create(tempIdFile
);
653 s
.write(clusterId
.toByteArray());
656 // Move the temporary file to its normal location. Throw an IOE if
658 if (!fs
.rename(tempIdFile
, idFile
)) {
659 throw new IOException("Unable to move temp version file to " + idFile
);
662 // Attempt to close the stream if still open on the way out
664 if (s
!= null) s
.close();
665 } catch (IOException ignore
) { }
667 if (LOG
.isDebugEnabled()) {
668 LOG
.debug("Created cluster ID file at " + idFile
.toString() + " with ID: " + clusterId
);
671 } catch (IOException ioe
) {
673 LOG
.warn("Unable to create cluster ID file in " + rootdir
.toString() +
674 ", retrying in " + wait
+ "msec: " + StringUtils
.stringifyException(ioe
));
677 } catch (InterruptedException e
) {
678 throw (InterruptedIOException
)new InterruptedIOException().initCause(e
);
688 * If DFS, check safe mode and if so, wait until we clear it.
689 * @param conf configuration
690 * @param wait Sleep between retries
691 * @throws IOException e
693 public static void waitOnSafeMode(final Configuration conf
,
696 FileSystem fs
= FileSystem
.get(conf
);
697 if (!(fs
instanceof DistributedFileSystem
)) return;
698 DistributedFileSystem dfs
= (DistributedFileSystem
)fs
;
699 // Make sure dfs is not in safe mode
700 while (isInSafeMode(dfs
)) {
701 LOG
.info("Waiting for dfs to exit safe mode...");
704 } catch (InterruptedException e
) {
705 Thread
.currentThread().interrupt();
706 throw (InterruptedIOException
) new InterruptedIOException().initCause(e
);
712 * Checks if meta region exists
713 * @param fs file system
714 * @param rootDir root directory of HBase installation
715 * @return true if exists
717 public static boolean metaRegionExists(FileSystem fs
, Path rootDir
) throws IOException
{
718 Path metaRegionDir
= getRegionDirFromRootDir(rootDir
, RegionInfoBuilder
.FIRST_META_REGIONINFO
);
719 return fs
.exists(metaRegionDir
);
723 * Compute HDFS blocks distribution of a given file, or a portion of the file
724 * @param fs file system
725 * @param status file status of the file
726 * @param start start position of the portion
727 * @param length length of the portion
728 * @return The HDFS blocks distribution
730 static public HDFSBlocksDistribution
computeHDFSBlocksDistribution(
731 final FileSystem fs
, FileStatus status
, long start
, long length
)
733 HDFSBlocksDistribution blocksDistribution
= new HDFSBlocksDistribution();
734 BlockLocation
[] blockLocations
=
735 fs
.getFileBlockLocations(status
, start
, length
);
736 for(BlockLocation bl
: blockLocations
) {
737 String
[] hosts
= bl
.getHosts();
738 long len
= bl
.getLength();
739 blocksDistribution
.addHostsAndBlockWeight(hosts
, len
);
742 return blocksDistribution
;
746 * Update blocksDistribution with blockLocations
747 * @param blocksDistribution the hdfs blocks distribution
748 * @param blockLocations an array containing block location
750 static public void addToHDFSBlocksDistribution(
751 HDFSBlocksDistribution blocksDistribution
, BlockLocation
[] blockLocations
)
753 for (BlockLocation bl
: blockLocations
) {
754 String
[] hosts
= bl
.getHosts();
755 long len
= bl
.getLength();
756 blocksDistribution
.addHostsAndBlockWeight(hosts
, len
);
760 // TODO move this method OUT of FSUtils. No dependencies to HMaster
762 * Returns the total overall fragmentation percentage. Includes hbase:meta and
765 * @param master The master defining the HBase root and file system
766 * @return A map for each table and its percentage (never null)
767 * @throws IOException When scanning the directory fails
769 public static int getTotalTableFragmentation(final HMaster master
)
771 Map
<String
, Integer
> map
= getTableFragmentation(master
);
772 return map
.isEmpty() ?
-1 : map
.get("-TOTAL-");
776 * Runs through the HBase rootdir and checks how many stores for each table
777 * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
778 * percentage across all tables is stored under the special key "-TOTAL-".
780 * @param master The master defining the HBase root and file system.
781 * @return A map for each table and its percentage (never null).
783 * @throws IOException When scanning the directory fails.
785 public static Map
<String
, Integer
> getTableFragmentation(
786 final HMaster master
)
788 Path path
= getRootDir(master
.getConfiguration());
789 // since HMaster.getFileSystem() is package private
790 FileSystem fs
= path
.getFileSystem(master
.getConfiguration());
791 return getTableFragmentation(fs
, path
);
795 * Runs through the HBase rootdir and checks how many stores for each table
796 * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
797 * percentage across all tables is stored under the special key "-TOTAL-".
799 * @param fs The file system to use
800 * @param hbaseRootDir The root directory to scan
801 * @return A map for each table and its percentage (never null)
802 * @throws IOException When scanning the directory fails
804 public static Map
<String
, Integer
> getTableFragmentation(
805 final FileSystem fs
, final Path hbaseRootDir
)
807 Map
<String
, Integer
> frags
= new HashMap
<>();
808 int cfCountTotal
= 0;
810 PathFilter regionFilter
= new RegionDirFilter(fs
);
811 PathFilter familyFilter
= new FamilyDirFilter(fs
);
812 List
<Path
> tableDirs
= getTableDirs(fs
, hbaseRootDir
);
813 for (Path d
: tableDirs
) {
816 FileStatus
[] regionDirs
= fs
.listStatus(d
, regionFilter
);
817 for (FileStatus regionDir
: regionDirs
) {
818 Path dd
= regionDir
.getPath();
819 // else its a region name, now look in region for families
820 FileStatus
[] familyDirs
= fs
.listStatus(dd
, familyFilter
);
821 for (FileStatus familyDir
: familyDirs
) {
824 Path family
= familyDir
.getPath();
825 // now in family make sure only one file
826 FileStatus
[] familyStatus
= fs
.listStatus(family
);
827 if (familyStatus
.length
> 1) {
833 // compute percentage per table and store in result list
834 frags
.put(FSUtils
.getTableName(d
).getNameAsString(),
835 cfCount
== 0?
0: Math
.round((float) cfFrag
/ cfCount
* 100));
837 // set overall percentage for all tables
839 cfCountTotal
== 0?
0: Math
.round((float) cfFragTotal
/ cfCountTotal
* 100));
843 public static void renameFile(FileSystem fs
, Path src
, Path dst
) throws IOException
{
844 if (fs
.exists(dst
) && !fs
.delete(dst
, false)) {
845 throw new IOException("Can not delete " + dst
);
847 if (!fs
.rename(src
, dst
)) {
848 throw new IOException("Can not rename from " + src
+ " to " + dst
);
853 * A {@link PathFilter} that returns only regular files.
855 static class FileFilter
extends AbstractFileStatusFilter
{
856 private final FileSystem fs
;
858 public FileFilter(final FileSystem fs
) {
863 protected boolean accept(Path p
, @CheckForNull Boolean isDir
) {
865 return isFile(fs
, isDir
, p
);
866 } catch (IOException e
) {
867 LOG
.warn("Unable to verify if path={} is a regular file", p
, e
);
874 * Directory filter that doesn't include any of the directories in the specified blacklist
876 public static class BlackListDirFilter
extends AbstractFileStatusFilter
{
877 private final FileSystem fs
;
878 private List
<String
> blacklist
;
881 * Create a filter on the givem filesystem with the specified blacklist
882 * @param fs filesystem to filter
883 * @param directoryNameBlackList list of the names of the directories to filter. If
884 * <tt>null</tt>, all directories are returned
886 @SuppressWarnings("unchecked")
887 public BlackListDirFilter(final FileSystem fs
, final List
<String
> directoryNameBlackList
) {
890 (List
<String
>) (directoryNameBlackList
== null ? Collections
.emptyList()
891 : directoryNameBlackList
);
895 protected boolean accept(Path p
, @CheckForNull Boolean isDir
) {
896 if (!isValidName(p
.getName())) {
901 return isDirectory(fs
, isDir
, p
);
902 } catch (IOException e
) {
903 LOG
.warn("An error occurred while verifying if [{}] is a valid directory."
904 + " Returning 'not valid' and continuing.", p
, e
);
909 protected boolean isValidName(final String name
) {
910 return !blacklist
.contains(name
);
915 * A {@link PathFilter} that only allows directories.
917 public static class DirFilter
extends BlackListDirFilter
{
919 public DirFilter(FileSystem fs
) {
925 * A {@link PathFilter} that returns usertable directories. To get all directories use the
926 * {@link BlackListDirFilter} with a <tt>null</tt> blacklist
928 public static class UserTableDirFilter
extends BlackListDirFilter
{
929 public UserTableDirFilter(FileSystem fs
) {
930 super(fs
, HConstants
.HBASE_NON_TABLE_DIRS
);
934 protected boolean isValidName(final String name
) {
935 if (!super.isValidName(name
))
939 TableName
.isLegalTableQualifierName(Bytes
.toBytes(name
));
940 } catch (IllegalArgumentException e
) {
941 LOG
.info("Invalid table name: {}", name
);
948 public void recoverFileLease(final FileSystem fs
, final Path p
, Configuration conf
)
950 recoverFileLease(fs
, p
, conf
, null);
954 * Recover file lease. Used when a file might be suspect
955 * to be had been left open by another process.
956 * @param fs FileSystem handle
957 * @param p Path of file to recover lease
958 * @param conf Configuration handle
959 * @throws IOException
961 public abstract void recoverFileLease(final FileSystem fs
, final Path p
,
962 Configuration conf
, CancelableProgressable reporter
) throws IOException
;
964 public static List
<Path
> getTableDirs(final FileSystem fs
, final Path rootdir
)
966 List
<Path
> tableDirs
= new ArrayList
<>();
968 for (FileStatus status
: fs
969 .globStatus(new Path(rootdir
, new Path(HConstants
.BASE_NAMESPACE_DIR
, "*")))) {
970 tableDirs
.addAll(FSUtils
.getLocalTableDirs(fs
, status
.getPath()));
978 * @return All the table directories under <code>rootdir</code>. Ignore non table hbase folders such as
979 * .logs, .oldlogs, .corrupt folders.
980 * @throws IOException
982 public static List
<Path
> getLocalTableDirs(final FileSystem fs
, final Path rootdir
)
984 // presumes any directory under hbase.rootdir is a table
985 FileStatus
[] dirs
= fs
.listStatus(rootdir
, new UserTableDirFilter(fs
));
986 List
<Path
> tabledirs
= new ArrayList
<>(dirs
.length
);
987 for (FileStatus dir
: dirs
) {
988 tabledirs
.add(dir
.getPath());
994 * Filter for all dirs that don't start with '.'
996 public static class RegionDirFilter
extends AbstractFileStatusFilter
{
997 // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names.
998 final public static Pattern regionDirPattern
= Pattern
.compile("^[0-9a-f]*$");
1001 public RegionDirFilter(FileSystem fs
) {
1006 protected boolean accept(Path p
, @CheckForNull Boolean isDir
) {
1007 if (!regionDirPattern
.matcher(p
.getName()).matches()) {
1012 return isDirectory(fs
, isDir
, p
);
1013 } catch (IOException ioe
) {
1014 // Maybe the file was moved or the fs was disconnected.
1015 LOG
.warn("Skipping file {} due to IOException", p
, ioe
);
1022 * Given a particular table dir, return all the regiondirs inside it, excluding files such as
1024 * @param fs A file system for the Path
1025 * @param tableDir Path to a specific table directory <hbase.rootdir>/<tabledir>
1026 * @return List of paths to valid region directories in table dir.
1027 * @throws IOException
1029 public static List
<Path
> getRegionDirs(final FileSystem fs
, final Path tableDir
) throws IOException
{
1030 // assumes we are in a table dir.
1031 List
<FileStatus
> rds
= listStatusWithStatusFilter(fs
, tableDir
, new RegionDirFilter(fs
));
1033 return Collections
.emptyList();
1035 List
<Path
> regionDirs
= new ArrayList
<>(rds
.size());
1036 for (FileStatus rdfs
: rds
) {
1037 Path rdPath
= rdfs
.getPath();
1038 regionDirs
.add(rdPath
);
1043 public static Path
getRegionDirFromRootDir(Path rootDir
, RegionInfo region
) {
1044 return getRegionDirFromTableDir(getTableDir(rootDir
, region
.getTable()), region
);
1047 public static Path
getRegionDirFromTableDir(Path tableDir
, RegionInfo region
) {
1048 return new Path(tableDir
, ServerRegionReplicaUtil
.getRegionInfoForFs(region
).getEncodedName());
1052 * Filter for all dirs that are legal column family names. This is generally used for colfam
1053 * dirs <hbase.rootdir>/<tabledir>/<regiondir>/<colfamdir>.
1055 public static class FamilyDirFilter
extends AbstractFileStatusFilter
{
1056 final FileSystem fs
;
1058 public FamilyDirFilter(FileSystem fs
) {
1063 protected boolean accept(Path p
, @CheckForNull Boolean isDir
) {
1065 // throws IAE if invalid
1066 HColumnDescriptor
.isLegalFamilyName(Bytes
.toBytes(p
.getName()));
1067 } catch (IllegalArgumentException iae
) {
1068 // path name is an invalid family name and thus is excluded.
1073 return isDirectory(fs
, isDir
, p
);
1074 } catch (IOException ioe
) {
1075 // Maybe the file was moved or the fs was disconnected.
1076 LOG
.warn("Skipping file {} due to IOException", p
, ioe
);
1083 * Given a particular region dir, return all the familydirs inside it
1085 * @param fs A file system for the Path
1086 * @param regionDir Path to a specific region directory
1087 * @return List of paths to valid family directories in region dir.
1088 * @throws IOException
1090 public static List
<Path
> getFamilyDirs(final FileSystem fs
, final Path regionDir
) throws IOException
{
1091 // assumes we are in a region dir.
1092 FileStatus
[] fds
= fs
.listStatus(regionDir
, new FamilyDirFilter(fs
));
1093 List
<Path
> familyDirs
= new ArrayList
<>(fds
.length
);
1094 for (FileStatus fdfs
: fds
) {
1095 Path fdPath
= fdfs
.getPath();
1096 familyDirs
.add(fdPath
);
1101 public static List
<Path
> getReferenceFilePaths(final FileSystem fs
, final Path familyDir
) throws IOException
{
1102 List
<FileStatus
> fds
= listStatusWithStatusFilter(fs
, familyDir
, new ReferenceFileFilter(fs
));
1104 return Collections
.emptyList();
1106 List
<Path
> referenceFiles
= new ArrayList
<>(fds
.size());
1107 for (FileStatus fdfs
: fds
) {
1108 Path fdPath
= fdfs
.getPath();
1109 referenceFiles
.add(fdPath
);
1111 return referenceFiles
;
1115 * Filter for HFiles that excludes reference files.
1117 public static class HFileFilter
extends AbstractFileStatusFilter
{
1118 final FileSystem fs
;
1120 public HFileFilter(FileSystem fs
) {
1125 protected boolean accept(Path p
, @CheckForNull Boolean isDir
) {
1126 if (!StoreFileInfo
.isHFile(p
)) {
1131 return isFile(fs
, isDir
, p
);
1132 } catch (IOException ioe
) {
1133 // Maybe the file was moved or the fs was disconnected.
1134 LOG
.warn("Skipping file {} due to IOException", p
, ioe
);
1141 * Filter for HFileLinks (StoreFiles and HFiles not included).
1142 * the filter itself does not consider if a link is file or not.
1144 public static class HFileLinkFilter
implements PathFilter
{
1147 public boolean accept(Path p
) {
1148 return HFileLink
.isHFileLink(p
);
1152 public static class ReferenceFileFilter
extends AbstractFileStatusFilter
{
1154 private final FileSystem fs
;
1156 public ReferenceFileFilter(FileSystem fs
) {
1161 protected boolean accept(Path p
, @CheckForNull Boolean isDir
) {
1162 if (!StoreFileInfo
.isReference(p
)) {
1167 // only files can be references.
1168 return isFile(fs
, isDir
, p
);
1169 } catch (IOException ioe
) {
1170 // Maybe the file was moved or the fs was disconnected.
1171 LOG
.warn("Skipping file {} due to IOException", p
, ioe
);
1178 * Called every so-often by storefile map builder getTableStoreFilePathMap to
1181 interface ProgressReporter
{
1183 * @param status File or directory we are about to process.
1185 void progress(FileStatus status
);
1189 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1190 * table StoreFile names to the full Path.
1193 * Key = 3944417774205889744 <br>
1194 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1196 * @param map map to add values. If null, this method will create and populate one to return
1197 * @param fs The file system to use.
1198 * @param hbaseRootDir The root directory to scan.
1199 * @param tableName name of the table to scan.
1200 * @return Map keyed by StoreFile name with a value of the full Path.
1201 * @throws IOException When scanning the directory fails.
1202 * @throws InterruptedException
1204 public static Map
<String
, Path
> getTableStoreFilePathMap(Map
<String
, Path
> map
,
1205 final FileSystem fs
, final Path hbaseRootDir
, TableName tableName
)
1206 throws IOException
, InterruptedException
{
1207 return getTableStoreFilePathMap(map
, fs
, hbaseRootDir
, tableName
, null, null,
1208 (ProgressReporter
)null);
1212 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1213 * table StoreFile names to the full Path. Note that because this method can be called
1214 * on a 'live' HBase system that we will skip files that no longer exist by the time
1215 * we traverse them and similarly the user of the result needs to consider that some
1216 * entries in this map may not exist by the time this call completes.
1219 * Key = 3944417774205889744 <br>
1220 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1222 * @param resultMap map to add values. If null, this method will create and populate one to return
1223 * @param fs The file system to use.
1224 * @param hbaseRootDir The root directory to scan.
1225 * @param tableName name of the table to scan.
1226 * @param sfFilter optional path filter to apply to store files
1227 * @param executor optional executor service to parallelize this operation
1228 * @param progressReporter Instance or null; gets called every time we move to new region of
1229 * family dir and for each store file.
1230 * @return Map keyed by StoreFile name with a value of the full Path.
1231 * @throws IOException When scanning the directory fails.
1232 * @deprecated Since 2.3.0. For removal in hbase4. Use ProgressReporter override instead.
1235 public static Map
<String
, Path
> getTableStoreFilePathMap(Map
<String
, Path
> resultMap
,
1236 final FileSystem fs
, final Path hbaseRootDir
, TableName tableName
, final PathFilter sfFilter
,
1237 ExecutorService executor
, final HbckErrorReporter progressReporter
)
1238 throws IOException
, InterruptedException
{
1239 return getTableStoreFilePathMap(resultMap
, fs
, hbaseRootDir
, tableName
, sfFilter
, executor
,
1240 new ProgressReporter() {
1242 public void progress(FileStatus status
) {
1243 // status is not used in this implementation.
1244 progressReporter
.progress();
1250 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1251 * table StoreFile names to the full Path. Note that because this method can be called
1252 * on a 'live' HBase system that we will skip files that no longer exist by the time
1253 * we traverse them and similarly the user of the result needs to consider that some
1254 * entries in this map may not exist by the time this call completes.
1257 * Key = 3944417774205889744 <br>
1258 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1260 * @param resultMap map to add values. If null, this method will create and populate one
1262 * @param fs The file system to use.
1263 * @param hbaseRootDir The root directory to scan.
1264 * @param tableName name of the table to scan.
1265 * @param sfFilter optional path filter to apply to store files
1266 * @param executor optional executor service to parallelize this operation
1267 * @param progressReporter Instance or null; gets called every time we move to new region of
1268 * family dir and for each store file.
1269 * @return Map keyed by StoreFile name with a value of the full Path.
1270 * @throws IOException When scanning the directory fails.
1271 * @throws InterruptedException the thread is interrupted, either before or during the activity.
1273 public static Map
<String
, Path
> getTableStoreFilePathMap(Map
<String
, Path
> resultMap
,
1274 final FileSystem fs
, final Path hbaseRootDir
, TableName tableName
, final PathFilter sfFilter
,
1275 ExecutorService executor
, final ProgressReporter progressReporter
)
1276 throws IOException
, InterruptedException
{
1278 final Map
<String
, Path
> finalResultMap
=
1279 resultMap
== null ?
new ConcurrentHashMap
<>(128, 0.75f
, 32) : resultMap
;
1281 // only include the directory paths to tables
1282 Path tableDir
= FSUtils
.getTableDir(hbaseRootDir
, tableName
);
1283 // Inside a table, there are compaction.dir directories to skip. Otherwise, all else
1284 // should be regions.
1285 final FamilyDirFilter familyFilter
= new FamilyDirFilter(fs
);
1286 final Vector
<Exception
> exceptions
= new Vector
<>();
1289 List
<FileStatus
> regionDirs
= FSUtils
.listStatusWithStatusFilter(fs
, tableDir
, new RegionDirFilter(fs
));
1290 if (regionDirs
== null) {
1291 return finalResultMap
;
1294 final List
<Future
<?
>> futures
= new ArrayList
<>(regionDirs
.size());
1296 for (FileStatus regionDir
: regionDirs
) {
1297 if (null != progressReporter
) {
1298 progressReporter
.progress(regionDir
);
1300 final Path dd
= regionDir
.getPath();
1302 if (!exceptions
.isEmpty()) {
1306 Runnable getRegionStoreFileMapCall
= new Runnable() {
1310 HashMap
<String
,Path
> regionStoreFileMap
= new HashMap
<>();
1311 List
<FileStatus
> familyDirs
= FSUtils
.listStatusWithStatusFilter(fs
, dd
, familyFilter
);
1312 if (familyDirs
== null) {
1313 if (!fs
.exists(dd
)) {
1314 LOG
.warn("Skipping region because it no longer exists: " + dd
);
1316 LOG
.warn("Skipping region because it has no family dirs: " + dd
);
1320 for (FileStatus familyDir
: familyDirs
) {
1321 if (null != progressReporter
) {
1322 progressReporter
.progress(familyDir
);
1324 Path family
= familyDir
.getPath();
1325 if (family
.getName().equals(HConstants
.RECOVERED_EDITS_DIR
)) {
1328 // now in family, iterate over the StoreFiles and
1330 FileStatus
[] familyStatus
= fs
.listStatus(family
);
1331 for (FileStatus sfStatus
: familyStatus
) {
1332 if (null != progressReporter
) {
1333 progressReporter
.progress(sfStatus
);
1335 Path sf
= sfStatus
.getPath();
1336 if (sfFilter
== null || sfFilter
.accept(sf
)) {
1337 regionStoreFileMap
.put( sf
.getName(), sf
);
1341 finalResultMap
.putAll(regionStoreFileMap
);
1342 } catch (Exception e
) {
1343 LOG
.error("Could not get region store file map for region: " + dd
, e
);
1349 // If executor is available, submit async tasks to exec concurrently, otherwise
1350 // just do serial sync execution
1351 if (executor
!= null) {
1352 Future
<?
> future
= executor
.submit(getRegionStoreFileMapCall
);
1353 futures
.add(future
);
1355 FutureTask
<?
> future
= new FutureTask
<>(getRegionStoreFileMapCall
, null);
1357 futures
.add(future
);
1361 // Ensure all pending tasks are complete (or that we run into an exception)
1362 for (Future
<?
> f
: futures
) {
1363 if (!exceptions
.isEmpty()) {
1368 } catch (ExecutionException e
) {
1369 LOG
.error("Unexpected exec exception! Should've been caught already. (Bug?)", e
);
1370 // Shouldn't happen, we already logged/caught any exceptions in the Runnable
1373 } catch (IOException e
) {
1374 LOG
.error("Cannot execute getTableStoreFilePathMap for " + tableName
, e
);
1377 if (!exceptions
.isEmpty()) {
1378 // Just throw the first exception as an indication something bad happened
1379 // Don't need to propagate all the exceptions, we already logged them all anyway
1380 Throwables
.propagateIfInstanceOf(exceptions
.firstElement(), IOException
.class);
1381 throw Throwables
.propagate(exceptions
.firstElement());
1385 return finalResultMap
;
1388 public static int getRegionReferenceFileCount(final FileSystem fs
, final Path p
) {
1391 for (Path familyDir
:getFamilyDirs(fs
, p
)){
1392 result
+= getReferenceFilePaths(fs
, familyDir
).size();
1394 } catch (IOException e
) {
1395 LOG
.warn("Error counting reference files", e
);
1401 * Runs through the HBase rootdir and creates a reverse lookup map for
1402 * table StoreFile names to the full Path.
1405 * Key = 3944417774205889744 <br>
1406 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1408 * @param fs The file system to use.
1409 * @param hbaseRootDir The root directory to scan.
1410 * @return Map keyed by StoreFile name with a value of the full Path.
1411 * @throws IOException When scanning the directory fails.
1413 public static Map
<String
, Path
> getTableStoreFilePathMap(final FileSystem fs
,
1414 final Path hbaseRootDir
)
1415 throws IOException
, InterruptedException
{
1416 return getTableStoreFilePathMap(fs
, hbaseRootDir
, null, null, (ProgressReporter
)null);
1420 * Runs through the HBase rootdir and creates a reverse lookup map for
1421 * table StoreFile names to the full Path.
1424 * Key = 3944417774205889744 <br>
1425 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1427 * @param fs The file system to use.
1428 * @param hbaseRootDir The root directory to scan.
1429 * @param sfFilter optional path filter to apply to store files
1430 * @param executor optional executor service to parallelize this operation
1431 * @param progressReporter Instance or null; gets called every time we move to new region of
1432 * family dir and for each store file.
1433 * @return Map keyed by StoreFile name with a value of the full Path.
1434 * @throws IOException When scanning the directory fails.
1435 * @deprecated Since 2.3.0. Will be removed in hbase4. Used {@link
1436 * #getTableStoreFilePathMap(FileSystem, Path, PathFilter, ExecutorService, ProgressReporter)}
1439 public static Map
<String
, Path
> getTableStoreFilePathMap(final FileSystem fs
,
1440 final Path hbaseRootDir
, PathFilter sfFilter
, ExecutorService executor
,
1441 HbckErrorReporter progressReporter
)
1442 throws IOException
, InterruptedException
{
1443 return getTableStoreFilePathMap(fs
, hbaseRootDir
, sfFilter
, executor
,
1444 new ProgressReporter() {
1446 public void progress(FileStatus status
) {
1447 // status is not used in this implementation.
1448 progressReporter
.progress();
1454 * Runs through the HBase rootdir and creates a reverse lookup map for
1455 * table StoreFile names to the full Path.
1458 * Key = 3944417774205889744 <br>
1459 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1461 * @param fs The file system to use.
1462 * @param hbaseRootDir The root directory to scan.
1463 * @param sfFilter optional path filter to apply to store files
1464 * @param executor optional executor service to parallelize this operation
1465 * @param progressReporter Instance or null; gets called every time we move to new region of
1466 * family dir and for each store file.
1467 * @return Map keyed by StoreFile name with a value of the full Path.
1468 * @throws IOException When scanning the directory fails.
1469 * @throws InterruptedException
1471 public static Map
<String
, Path
> getTableStoreFilePathMap(
1472 final FileSystem fs
, final Path hbaseRootDir
, PathFilter sfFilter
,
1473 ExecutorService executor
, ProgressReporter progressReporter
)
1474 throws IOException
, InterruptedException
{
1475 ConcurrentHashMap
<String
, Path
> map
= new ConcurrentHashMap
<>(1024, 0.75f
, 32);
1477 // if this method looks similar to 'getTableFragmentation' that is because
1478 // it was borrowed from it.
1480 // only include the directory paths to tables
1481 for (Path tableDir
: FSUtils
.getTableDirs(fs
, hbaseRootDir
)) {
1482 getTableStoreFilePathMap(map
, fs
, hbaseRootDir
,
1483 FSUtils
.getTableName(tableDir
), sfFilter
, executor
, progressReporter
);
1489 * Filters FileStatuses in an array and returns a list
1491 * @param input An array of FileStatuses
1492 * @param filter A required filter to filter the array
1493 * @return A list of FileStatuses
1495 public static List
<FileStatus
> filterFileStatuses(FileStatus
[] input
,
1496 FileStatusFilter filter
) {
1497 if (input
== null) return null;
1498 return filterFileStatuses(Iterators
.forArray(input
), filter
);
1502 * Filters FileStatuses in an iterator and returns a list
1504 * @param input An iterator of FileStatuses
1505 * @param filter A required filter to filter the array
1506 * @return A list of FileStatuses
1508 public static List
<FileStatus
> filterFileStatuses(Iterator
<FileStatus
> input
,
1509 FileStatusFilter filter
) {
1510 if (input
== null) return null;
1511 ArrayList
<FileStatus
> results
= new ArrayList
<>();
1512 while (input
.hasNext()) {
1513 FileStatus f
= input
.next();
1514 if (filter
.accept(f
)) {
1522 * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
1523 * This accommodates differences between hadoop versions, where hadoop 1
1524 * does not throw a FileNotFoundException, and return an empty FileStatus[]
1525 * while Hadoop 2 will throw FileNotFoundException.
1527 * @param fs file system
1528 * @param dir directory
1529 * @param filter file status filter
1530 * @return null if dir is empty or doesn't exist, otherwise FileStatus list
1532 public static List
<FileStatus
> listStatusWithStatusFilter(final FileSystem fs
,
1533 final Path dir
, final FileStatusFilter filter
) throws IOException
{
1534 FileStatus
[] status
= null;
1536 status
= fs
.listStatus(dir
);
1537 } catch (FileNotFoundException fnfe
) {
1538 LOG
.trace("{} does not exist", dir
);
1542 if (ArrayUtils
.getLength(status
) == 0) {
1546 if (filter
== null) {
1547 return Arrays
.asList(status
);
1549 List
<FileStatus
> status2
= filterFileStatuses(status
, filter
);
1550 if (status2
== null || status2
.isEmpty()) {
1559 * Throw an exception if an action is not permitted by a user on a file.
1568 public static void checkAccess(UserGroupInformation ugi
, FileStatus file
,
1569 FsAction action
) throws AccessDeniedException
{
1570 if (ugi
.getShortUserName().equals(file
.getOwner())) {
1571 if (file
.getPermission().getUserAction().implies(action
)) {
1574 } else if (ArrayUtils
.contains(ugi
.getGroupNames(), file
.getGroup())) {
1575 if (file
.getPermission().getGroupAction().implies(action
)) {
1578 } else if (file
.getPermission().getOtherAction().implies(action
)) {
1581 throw new AccessDeniedException("Permission denied:" + " action=" + action
1582 + " path=" + file
.getPath() + " user=" + ugi
.getShortUserName());
1586 * This function is to scan the root path of the file system to get the
1587 * degree of locality for each region on each of the servers having at least
1588 * one block of that region.
1589 * This is used by the tool {@link org.apache.hadoop.hbase.master.RegionPlacementMaintainer}
1592 * the configuration to use
1593 * @return the mapping from region encoded name to a map of server names to
1595 * @throws IOException
1596 * in case of file system errors or interrupts
1598 public static Map
<String
, Map
<String
, Float
>> getRegionDegreeLocalityMappingFromFS(
1599 final Configuration conf
) throws IOException
{
1600 return getRegionDegreeLocalityMappingFromFS(
1602 conf
.getInt(THREAD_POOLSIZE
, DEFAULT_THREAD_POOLSIZE
));
1607 * This function is to scan the root path of the file system to get the
1608 * degree of locality for each region on each of the servers having at least
1609 * one block of that region.
1612 * the configuration to use
1613 * @param desiredTable
1614 * the table you wish to scan locality for
1615 * @param threadPoolSize
1616 * the thread pool size to use
1617 * @return the mapping from region encoded name to a map of server names to
1619 * @throws IOException
1620 * in case of file system errors or interrupts
1622 public static Map
<String
, Map
<String
, Float
>> getRegionDegreeLocalityMappingFromFS(
1623 final Configuration conf
, final String desiredTable
, int threadPoolSize
)
1624 throws IOException
{
1625 Map
<String
, Map
<String
, Float
>> regionDegreeLocalityMapping
= new ConcurrentHashMap
<>();
1626 getRegionLocalityMappingFromFS(conf
, desiredTable
, threadPoolSize
, regionDegreeLocalityMapping
);
1627 return regionDegreeLocalityMapping
;
1631 * This function is to scan the root path of the file system to get either the
1632 * mapping between the region name and its best locality region server or the
1633 * degree of locality of each region on each of the servers having at least
1634 * one block of that region. The output map parameters are both optional.
1637 * the configuration to use
1638 * @param desiredTable
1639 * the table you wish to scan locality for
1640 * @param threadPoolSize
1641 * the thread pool size to use
1642 * @param regionDegreeLocalityMapping
1643 * the map into which to put the locality degree mapping or null,
1644 * must be a thread-safe implementation
1645 * @throws IOException
1646 * in case of file system errors or interrupts
1648 private static void getRegionLocalityMappingFromFS(final Configuration conf
,
1649 final String desiredTable
, int threadPoolSize
,
1650 final Map
<String
, Map
<String
, Float
>> regionDegreeLocalityMapping
) throws IOException
{
1651 final FileSystem fs
= FileSystem
.get(conf
);
1652 final Path rootPath
= FSUtils
.getRootDir(conf
);
1653 final long startTime
= EnvironmentEdgeManager
.currentTime();
1654 final Path queryPath
;
1655 // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/*
1656 if (null == desiredTable
) {
1657 queryPath
= new Path(new Path(rootPath
, HConstants
.BASE_NAMESPACE_DIR
).toString() + "/*/*/*/");
1659 queryPath
= new Path(FSUtils
.getTableDir(rootPath
, TableName
.valueOf(desiredTable
)).toString() + "/*/");
1662 // reject all paths that are not appropriate
1663 PathFilter pathFilter
= new PathFilter() {
1665 public boolean accept(Path path
) {
1666 // this is the region name; it may get some noise data
1672 Path parent
= path
.getParent();
1673 if (null == parent
) {
1677 String regionName
= path
.getName();
1678 if (null == regionName
) {
1682 if (!regionName
.toLowerCase(Locale
.ROOT
).matches("[0-9a-f]+")) {
1689 FileStatus
[] statusList
= fs
.globStatus(queryPath
, pathFilter
);
1691 if (LOG
.isDebugEnabled()) {
1692 LOG
.debug("Query Path: {} ; # list of files: {}", queryPath
, Arrays
.toString(statusList
));
1695 if (null == statusList
) {
1699 // lower the number of threads in case we have very few expected regions
1700 threadPoolSize
= Math
.min(threadPoolSize
, statusList
.length
);
1702 // run in multiple threads
1703 final ExecutorService tpe
= Executors
.newFixedThreadPool(threadPoolSize
,
1704 Threads
.newDaemonThreadFactory("FSRegionQuery"));
1706 // ignore all file status items that are not of interest
1707 for (FileStatus regionStatus
: statusList
) {
1708 if (null == regionStatus
|| !regionStatus
.isDirectory()) {
1712 final Path regionPath
= regionStatus
.getPath();
1713 if (null != regionPath
) {
1714 tpe
.execute(new FSRegionScanner(fs
, regionPath
, null, regionDegreeLocalityMapping
));
1719 final long threadWakeFrequency
= (long) conf
.getInt(HConstants
.THREAD_WAKE_FREQUENCY
,
1720 HConstants
.DEFAULT_THREAD_WAKE_FREQUENCY
);
1722 // here we wait until TPE terminates, which is either naturally or by
1723 // exceptions in the execution of the threads
1724 while (!tpe
.awaitTermination(threadWakeFrequency
,
1725 TimeUnit
.MILLISECONDS
)) {
1726 // printing out rough estimate, so as to not introduce
1728 LOG
.info("Locality checking is underway: { Scanned Regions : "
1729 + ((ThreadPoolExecutor
) tpe
).getCompletedTaskCount() + "/"
1730 + ((ThreadPoolExecutor
) tpe
).getTaskCount() + " }");
1732 } catch (InterruptedException e
) {
1733 Thread
.currentThread().interrupt();
1734 throw (InterruptedIOException
) new InterruptedIOException().initCause(e
);
1738 long overhead
= EnvironmentEdgeManager
.currentTime() - startTime
;
1739 LOG
.info("Scan DFS for locality info takes {}ms", overhead
);
1743 * Do our short circuit read setup.
1744 * Checks buffer size to use and whether to do checksumming in hbase or hdfs.
1747 public static void setupShortCircuitRead(final Configuration conf
) {
1748 // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property.
1749 boolean shortCircuitSkipChecksum
=
1750 conf
.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
1751 boolean useHBaseChecksum
= conf
.getBoolean(HConstants
.HBASE_CHECKSUM_VERIFICATION
, true);
1752 if (shortCircuitSkipChecksum
) {
1753 LOG
.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " +
1754 "be set to true." + (useHBaseChecksum ?
" HBase checksum doesn't require " +
1755 "it, see https://issues.apache.org/jira/browse/HBASE-6868." : ""));
1756 assert !shortCircuitSkipChecksum
; //this will fail if assertions are on
1758 checkShortCircuitReadBufferSize(conf
);
1762 * Check if short circuit read buffer size is set and if not, set it to hbase value.
1765 public static void checkShortCircuitReadBufferSize(final Configuration conf
) {
1766 final int defaultSize
= HConstants
.DEFAULT_BLOCKSIZE
* 2;
1767 final int notSet
= -1;
1768 // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
1769 final String dfsKey
= "dfs.client.read.shortcircuit.buffer.size";
1770 int size
= conf
.getInt(dfsKey
, notSet
);
1771 // If a size is set, return -- we will use it.
1772 if (size
!= notSet
) return;
1773 // But short circuit buffer size is normally not set. Put in place the hbase wanted size.
1774 int hbaseSize
= conf
.getInt("hbase." + dfsKey
, defaultSize
);
1775 conf
.setIfUnset(dfsKey
, Integer
.toString(hbaseSize
));
1780 * @return The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs.
1781 * @throws IOException
1783 public static DFSHedgedReadMetrics
getDFSHedgedReadMetrics(final Configuration c
)
1784 throws IOException
{
1785 if (!isHDFS(c
)) return null;
1786 // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal
1787 // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it
1788 // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients.
1789 final String name
= "getHedgedReadMetrics";
1790 DFSClient dfsclient
= ((DistributedFileSystem
)FileSystem
.get(c
)).getClient();
1793 m
= dfsclient
.getClass().getDeclaredMethod(name
);
1794 } catch (NoSuchMethodException e
) {
1795 LOG
.warn("Failed find method " + name
+ " in dfsclient; no hedged read metrics: " +
1798 } catch (SecurityException e
) {
1799 LOG
.warn("Failed find method " + name
+ " in dfsclient; no hedged read metrics: " +
1803 m
.setAccessible(true);
1805 return (DFSHedgedReadMetrics
)m
.invoke(dfsclient
);
1806 } catch (IllegalAccessException
| IllegalArgumentException
| InvocationTargetException e
) {
1807 LOG
.warn("Failed invoking method " + name
+ " on dfsclient; no hedged read metrics: " +
1813 public static List
<Path
> copyFilesParallel(FileSystem srcFS
, Path src
, FileSystem dstFS
, Path dst
,
1814 Configuration conf
, int threads
) throws IOException
{
1815 ExecutorService pool
= Executors
.newFixedThreadPool(threads
);
1816 List
<Future
<Void
>> futures
= new ArrayList
<>();
1817 List
<Path
> traversedPaths
;
1819 traversedPaths
= copyFiles(srcFS
, src
, dstFS
, dst
, conf
, pool
, futures
);
1820 for (Future
<Void
> future
: futures
) {
1823 } catch (ExecutionException
| InterruptedException
| IOException e
) {
1824 throw new IOException("Copy snapshot reference files failed", e
);
1828 return traversedPaths
;
1831 private static List
<Path
> copyFiles(FileSystem srcFS
, Path src
, FileSystem dstFS
, Path dst
,
1832 Configuration conf
, ExecutorService pool
, List
<Future
<Void
>> futures
) throws IOException
{
1833 List
<Path
> traversedPaths
= new ArrayList
<>();
1834 traversedPaths
.add(dst
);
1835 FileStatus currentFileStatus
= srcFS
.getFileStatus(src
);
1836 if (currentFileStatus
.isDirectory()) {
1837 if (!dstFS
.mkdirs(dst
)) {
1838 throw new IOException("Create directory failed: " + dst
);
1840 FileStatus
[] subPaths
= srcFS
.listStatus(src
);
1841 for (FileStatus subPath
: subPaths
) {
1842 traversedPaths
.addAll(copyFiles(srcFS
, subPath
.getPath(), dstFS
,
1843 new Path(dst
, subPath
.getPath().getName()), conf
, pool
, futures
));
1846 Future
<Void
> future
= pool
.submit(() -> {
1847 FileUtil
.copy(srcFS
, src
, dstFS
, dst
, false, false, conf
);
1850 futures
.add(future
);
1852 return traversedPaths
;