3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
.util
;
21 import edu
.umd
.cs
.findbugs
.annotations
.CheckForNull
;
22 import java
.io
.ByteArrayInputStream
;
23 import java
.io
.DataInputStream
;
24 import java
.io
.EOFException
;
25 import java
.io
.FileNotFoundException
;
26 import java
.io
.IOException
;
27 import java
.io
.InputStream
;
28 import java
.io
.InterruptedIOException
;
29 import java
.lang
.reflect
.InvocationTargetException
;
30 import java
.lang
.reflect
.Method
;
31 import java
.net
.InetSocketAddress
;
32 import java
.util
.ArrayList
;
33 import java
.util
.Arrays
;
34 import java
.util
.Collections
;
35 import java
.util
.HashMap
;
36 import java
.util
.Iterator
;
37 import java
.util
.LinkedList
;
38 import java
.util
.List
;
39 import java
.util
.Locale
;
41 import java
.util
.Vector
;
42 import java
.util
.concurrent
.ArrayBlockingQueue
;
43 import java
.util
.concurrent
.ConcurrentHashMap
;
44 import java
.util
.concurrent
.ExecutionException
;
45 import java
.util
.concurrent
.ExecutorService
;
46 import java
.util
.concurrent
.Executors
;
47 import java
.util
.concurrent
.Future
;
48 import java
.util
.concurrent
.FutureTask
;
49 import java
.util
.concurrent
.ThreadPoolExecutor
;
50 import java
.util
.concurrent
.TimeUnit
;
51 import java
.util
.regex
.Pattern
;
52 import org
.apache
.hadoop
.conf
.Configuration
;
53 import org
.apache
.hadoop
.fs
.BlockLocation
;
54 import org
.apache
.hadoop
.fs
.FSDataInputStream
;
55 import org
.apache
.hadoop
.fs
.FSDataOutputStream
;
56 import org
.apache
.hadoop
.fs
.FileStatus
;
57 import org
.apache
.hadoop
.fs
.FileSystem
;
58 import org
.apache
.hadoop
.fs
.FileUtil
;
59 import org
.apache
.hadoop
.fs
.Path
;
60 import org
.apache
.hadoop
.fs
.PathFilter
;
61 import org
.apache
.hadoop
.fs
.permission
.FsAction
;
62 import org
.apache
.hadoop
.fs
.permission
.FsPermission
;
63 import org
.apache
.hadoop
.hbase
.ClusterId
;
64 import org
.apache
.hadoop
.hbase
.HColumnDescriptor
;
65 import org
.apache
.hadoop
.hbase
.HConstants
;
66 import org
.apache
.hadoop
.hbase
.HDFSBlocksDistribution
;
67 import org
.apache
.hadoop
.hbase
.HRegionInfo
;
68 import org
.apache
.hadoop
.hbase
.TableName
;
69 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
70 import org
.apache
.hadoop
.hbase
.exceptions
.DeserializationException
;
71 import org
.apache
.hadoop
.hbase
.fs
.HFileSystem
;
72 import org
.apache
.hadoop
.hbase
.io
.HFileLink
;
73 import org
.apache
.hadoop
.hbase
.master
.HMaster
;
74 import org
.apache
.hadoop
.hbase
.regionserver
.HRegion
;
75 import org
.apache
.hadoop
.hbase
.regionserver
.StoreFileInfo
;
76 import org
.apache
.hadoop
.hbase
.security
.AccessDeniedException
;
77 import org
.apache
.hadoop
.hbase
.util
.HBaseFsck
.ErrorReporter
;
78 import org
.apache
.hadoop
.hdfs
.DFSClient
;
79 import org
.apache
.hadoop
.hdfs
.DFSHedgedReadMetrics
;
80 import org
.apache
.hadoop
.hdfs
.DistributedFileSystem
;
81 import org
.apache
.hadoop
.hdfs
.protocol
.HdfsConstants
;
82 import org
.apache
.hadoop
.io
.IOUtils
;
83 import org
.apache
.hadoop
.ipc
.RemoteException
;
84 import org
.apache
.hadoop
.security
.UserGroupInformation
;
85 import org
.apache
.hadoop
.util
.Progressable
;
86 import org
.apache
.hadoop
.util
.ReflectionUtils
;
87 import org
.apache
.hadoop
.util
.StringUtils
;
88 import org
.apache
.yetus
.audience
.InterfaceAudience
;
89 import org
.slf4j
.Logger
;
90 import org
.slf4j
.LoggerFactory
;
92 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.annotations
.VisibleForTesting
;
93 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.base
.Throwables
;
94 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.Iterators
;
95 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.primitives
.Ints
;
97 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ProtobufUtil
;
98 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.FSProtos
;
101 * Utility methods for interacting with the underlying file system.
103 @InterfaceAudience.Private
104 public abstract class FSUtils
extends CommonFSUtils
{
105 private static final Logger LOG
= LoggerFactory
.getLogger(FSUtils
.class);
107 private static final String THREAD_POOLSIZE
= "hbase.client.localityCheck.threadPoolSize";
108 private static final int DEFAULT_THREAD_POOLSIZE
= 2;
110 /** Set to true on Windows platforms */
111 @VisibleForTesting // currently only used in testing. TODO refactor into a test class
112 public static final boolean WINDOWS
= System
.getProperty("os.name").startsWith("Windows");
114 protected FSUtils() {
119 * @return True is <code>fs</code> is instance of DistributedFileSystem
120 * @throws IOException
122 public static boolean isDistributedFileSystem(final FileSystem fs
) throws IOException
{
123 FileSystem fileSystem
= fs
;
124 // If passed an instance of HFileSystem, it fails instanceof DistributedFileSystem.
125 // Check its backing fs for dfs-ness.
126 if (fs
instanceof HFileSystem
) {
127 fileSystem
= ((HFileSystem
)fs
).getBackingFs();
129 return fileSystem
instanceof DistributedFileSystem
;
133 * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
134 * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider
135 * schema; i.e. if schemas different but path or subpath matches, the two will equate.
136 * @param pathToSearch Path we will be trying to match.
138 * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
140 public static boolean isMatchingTail(final Path pathToSearch
, final Path pathTail
) {
141 if (pathToSearch
.depth() != pathTail
.depth()) return false;
142 Path tailPath
= pathTail
;
144 Path toSearch
= pathToSearch
;
146 boolean result
= false;
148 tailName
= tailPath
.getName();
149 if (tailName
== null || tailName
.length() <= 0) {
153 toSearchName
= toSearch
.getName();
154 if (toSearchName
== null || toSearchName
.length() <= 0) break;
155 // Move up a parent on each path for next go around. Path doesn't let us go off the end.
156 tailPath
= tailPath
.getParent();
157 toSearch
= toSearch
.getParent();
158 } while(tailName
.equals(toSearchName
));
162 public static FSUtils
getInstance(FileSystem fs
, Configuration conf
) {
163 String scheme
= fs
.getUri().getScheme();
164 if (scheme
== null) {
165 LOG
.warn("Could not find scheme for uri " +
166 fs
.getUri() + ", default to hdfs");
169 Class
<?
> fsUtilsClass
= conf
.getClass("hbase.fsutil." +
170 scheme
+ ".impl", FSHDFSUtils
.class); // Default to HDFS impl
171 FSUtils fsUtils
= (FSUtils
)ReflectionUtils
.newInstance(fsUtilsClass
, conf
);
176 * Delete the region directory if exists.
179 * @return True if deleted the region directory.
180 * @throws IOException
182 public static boolean deleteRegionDir(final Configuration conf
, final HRegionInfo hri
)
184 Path rootDir
= getRootDir(conf
);
185 FileSystem fs
= rootDir
.getFileSystem(conf
);
186 return deleteDirectory(fs
,
187 new Path(getTableDir(rootDir
, hri
.getTable()), hri
.getEncodedName()));
191 * Create the specified file on the filesystem. By default, this will:
193 * <li>overwrite the file if it exists</li>
194 * <li>apply the umask in the configuration (if it is enabled)</li>
195 * <li>use the fs configured buffer size (or 4096 if not set)</li>
196 * <li>use the configured column family replication or default replication if
197 * {@link HColumnDescriptor#DEFAULT_DFS_REPLICATION}</li>
198 * <li>use the default block size</li>
199 * <li>not track progress</li>
201 * @param conf configurations
202 * @param fs {@link FileSystem} on which to write the file
203 * @param path {@link Path} to the file to write
204 * @param perm permissions
205 * @param favoredNodes
206 * @return output stream to the created file
207 * @throws IOException if the file cannot be created
209 public static FSDataOutputStream
create(Configuration conf
, FileSystem fs
, Path path
,
210 FsPermission perm
, InetSocketAddress
[] favoredNodes
) throws IOException
{
211 if (fs
instanceof HFileSystem
) {
212 FileSystem backingFs
= ((HFileSystem
)fs
).getBackingFs();
213 if (backingFs
instanceof DistributedFileSystem
) {
214 // Try to use the favoredNodes version via reflection to allow backwards-
216 short replication
= Short
.parseShort(conf
.get(HColumnDescriptor
.DFS_REPLICATION
,
217 String
.valueOf(HColumnDescriptor
.DEFAULT_DFS_REPLICATION
)));
219 return (FSDataOutputStream
) (DistributedFileSystem
.class.getDeclaredMethod("create",
220 Path
.class, FsPermission
.class, boolean.class, int.class, short.class, long.class,
221 Progressable
.class, InetSocketAddress
[].class).invoke(backingFs
, path
, perm
, true,
222 getDefaultBufferSize(backingFs
),
223 replication
> 0 ? replication
: getDefaultReplication(backingFs
, path
),
224 getDefaultBlockSize(backingFs
, path
), null, favoredNodes
));
225 } catch (InvocationTargetException ite
) {
226 // Function was properly called, but threw it's own exception.
227 throw new IOException(ite
.getCause());
228 } catch (NoSuchMethodException e
) {
229 LOG
.debug("DFS Client does not support most favored nodes create; using default create");
230 if (LOG
.isTraceEnabled()) LOG
.trace("Ignoring; use default create", e
);
231 } catch (IllegalArgumentException e
) {
232 LOG
.debug("Ignoring (most likely Reflection related exception) " + e
);
233 } catch (SecurityException e
) {
234 LOG
.debug("Ignoring (most likely Reflection related exception) " + e
);
235 } catch (IllegalAccessException e
) {
236 LOG
.debug("Ignoring (most likely Reflection related exception) " + e
);
240 return create(fs
, path
, perm
, true);
244 * Checks to see if the specified file system is available
246 * @param fs filesystem
247 * @throws IOException e
249 public static void checkFileSystemAvailable(final FileSystem fs
)
251 if (!(fs
instanceof DistributedFileSystem
)) {
254 IOException exception
= null;
255 DistributedFileSystem dfs
= (DistributedFileSystem
) fs
;
257 if (dfs
.exists(new Path("/"))) {
260 } catch (IOException e
) {
261 exception
= e
instanceof RemoteException ?
262 ((RemoteException
)e
).unwrapRemoteException() : e
;
266 } catch (Exception e
) {
267 LOG
.error("file system close failed: ", e
);
269 IOException io
= new IOException("File system is not available");
270 io
.initCause(exception
);
275 * We use reflection because {@link DistributedFileSystem#setSafeMode(
276 * HdfsConstants.SafeModeAction action, boolean isChecked)} is not in hadoop 1.1
279 * @return whether we're in safe mode
280 * @throws IOException
282 private static boolean isInSafeMode(DistributedFileSystem dfs
) throws IOException
{
283 boolean inSafeMode
= false;
285 Method m
= DistributedFileSystem
.class.getMethod("setSafeMode", new Class
<?
> []{
286 org
.apache
.hadoop
.hdfs
.protocol
.HdfsConstants
.SafeModeAction
.class, boolean.class});
287 inSafeMode
= (Boolean
) m
.invoke(dfs
,
288 org
.apache
.hadoop
.hdfs
.protocol
.HdfsConstants
.SafeModeAction
.SAFEMODE_GET
, true);
289 } catch (Exception e
) {
290 if (e
instanceof IOException
) throw (IOException
) e
;
292 // Check whether dfs is on safemode.
293 inSafeMode
= dfs
.setSafeMode(
294 org
.apache
.hadoop
.hdfs
.protocol
.HdfsConstants
.SafeModeAction
.SAFEMODE_GET
);
300 * Check whether dfs is in safemode.
302 * @throws IOException
304 public static void checkDfsSafeMode(final Configuration conf
)
306 boolean isInSafeMode
= false;
307 FileSystem fs
= FileSystem
.get(conf
);
308 if (fs
instanceof DistributedFileSystem
) {
309 DistributedFileSystem dfs
= (DistributedFileSystem
)fs
;
310 isInSafeMode
= isInSafeMode(dfs
);
313 throw new IOException("File system is in safemode, it can't be written now");
318 * Verifies current version of file system
320 * @param fs filesystem object
321 * @param rootdir root hbase directory
322 * @return null if no version file exists, version string otherwise.
323 * @throws IOException e
324 * @throws org.apache.hadoop.hbase.exceptions.DeserializationException
326 public static String
getVersion(FileSystem fs
, Path rootdir
)
327 throws IOException
, DeserializationException
{
328 Path versionFile
= new Path(rootdir
, HConstants
.VERSION_FILE_NAME
);
329 FileStatus
[] status
= null;
331 // hadoop 2.0 throws FNFE if directory does not exist.
332 // hadoop 1.0 returns null if directory does not exist.
333 status
= fs
.listStatus(versionFile
);
334 } catch (FileNotFoundException fnfe
) {
337 if (status
== null || status
.length
== 0) return null;
338 String version
= null;
339 byte [] content
= new byte [(int)status
[0].getLen()];
340 FSDataInputStream s
= fs
.open(versionFile
);
342 IOUtils
.readFully(s
, content
, 0, content
.length
);
343 if (ProtobufUtil
.isPBMagicPrefix(content
)) {
344 version
= parseVersionFrom(content
);
346 // Presume it pre-pb format.
347 InputStream is
= new ByteArrayInputStream(content
);
348 DataInputStream dis
= new DataInputStream(is
);
350 version
= dis
.readUTF();
355 } catch (EOFException eof
) {
356 LOG
.warn("Version file was empty, odd, will try to set it.");
364 * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file.
365 * @param bytes The byte content of the hbase.version file.
366 * @return The version found in the file as a String.
367 * @throws DeserializationException
369 static String
parseVersionFrom(final byte [] bytes
)
370 throws DeserializationException
{
371 ProtobufUtil
.expectPBMagicPrefix(bytes
);
372 int pblen
= ProtobufUtil
.lengthOfPBMagic();
373 FSProtos
.HBaseVersionFileContent
.Builder builder
=
374 FSProtos
.HBaseVersionFileContent
.newBuilder();
376 ProtobufUtil
.mergeFrom(builder
, bytes
, pblen
, bytes
.length
- pblen
);
377 return builder
.getVersion();
378 } catch (IOException e
) {
380 throw new DeserializationException(e
);
385 * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file.
386 * @param version Version to persist
387 * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a prefix.
389 static byte [] toVersionByteArray(final String version
) {
390 FSProtos
.HBaseVersionFileContent
.Builder builder
=
391 FSProtos
.HBaseVersionFileContent
.newBuilder();
392 return ProtobufUtil
.prependPBMagic(builder
.setVersion(version
).build().toByteArray());
396 * Verifies current version of file system
398 * @param fs file system
399 * @param rootdir root directory of HBase installation
400 * @param message if true, issues a message on System.out
402 * @throws IOException e
403 * @throws DeserializationException
405 public static void checkVersion(FileSystem fs
, Path rootdir
, boolean message
)
406 throws IOException
, DeserializationException
{
407 checkVersion(fs
, rootdir
, message
, 0, HConstants
.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS
);
411 * Verifies current version of file system
413 * @param fs file system
414 * @param rootdir root directory of HBase installation
415 * @param message if true, issues a message on System.out
416 * @param wait wait interval
417 * @param retries number of times to retry
419 * @throws IOException e
420 * @throws DeserializationException
422 public static void checkVersion(FileSystem fs
, Path rootdir
,
423 boolean message
, int wait
, int retries
)
424 throws IOException
, DeserializationException
{
425 String version
= getVersion(fs
, rootdir
);
426 if (version
== null) {
427 if (!metaRegionExists(fs
, rootdir
)) {
428 // rootDir is empty (no version file and no root region)
429 // just create new version file (HBASE-1195)
430 setVersion(fs
, rootdir
, wait
, retries
);
433 } else if (version
.compareTo(HConstants
.FILE_SYSTEM_VERSION
) == 0) return;
435 // version is deprecated require migration
436 // Output on stdout so user sees it in terminal.
437 String msg
= "HBase file layout needs to be upgraded."
438 + " You have version " + version
439 + " and I want version " + HConstants
.FILE_SYSTEM_VERSION
440 + ". Consult http://hbase.apache.org/book.html for further information about upgrading HBase."
441 + " Is your hbase.rootdir valid? If so, you may need to run "
442 + "'hbase hbck -fixVersionFile'.";
444 System
.out
.println("WARNING! " + msg
);
446 throw new FileSystemVersionException(msg
);
450 * Sets version of file system
452 * @param fs filesystem object
453 * @param rootdir hbase root
454 * @throws IOException e
456 public static void setVersion(FileSystem fs
, Path rootdir
)
458 setVersion(fs
, rootdir
, HConstants
.FILE_SYSTEM_VERSION
, 0,
459 HConstants
.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS
);
463 * Sets version of file system
465 * @param fs filesystem object
466 * @param rootdir hbase root
467 * @param wait time to wait for retry
468 * @param retries number of times to retry before failing
469 * @throws IOException e
471 public static void setVersion(FileSystem fs
, Path rootdir
, int wait
, int retries
)
473 setVersion(fs
, rootdir
, HConstants
.FILE_SYSTEM_VERSION
, wait
, retries
);
478 * Sets version of file system
480 * @param fs filesystem object
481 * @param rootdir hbase root directory
482 * @param version version to set
483 * @param wait time to wait for retry
484 * @param retries number of times to retry before throwing an IOException
485 * @throws IOException e
487 public static void setVersion(FileSystem fs
, Path rootdir
, String version
,
488 int wait
, int retries
) throws IOException
{
489 Path versionFile
= new Path(rootdir
, HConstants
.VERSION_FILE_NAME
);
490 Path tempVersionFile
= new Path(rootdir
, HConstants
.HBASE_TEMP_DIRECTORY
+ Path
.SEPARATOR
+
491 HConstants
.VERSION_FILE_NAME
);
494 // Write the version to a temporary file
495 FSDataOutputStream s
= fs
.create(tempVersionFile
);
497 s
.write(toVersionByteArray(version
));
500 // Move the temp version file to its normal location. Returns false
501 // if the rename failed. Throw an IOE in that case.
502 if (!fs
.rename(tempVersionFile
, versionFile
)) {
503 throw new IOException("Unable to move temp version file to " + versionFile
);
506 // Cleaning up the temporary if the rename failed would be trying
507 // too hard. We'll unconditionally create it again the next time
508 // through anyway, files are overwritten by default by create().
510 // Attempt to close the stream on the way out if it is still open.
512 if (s
!= null) s
.close();
513 } catch (IOException ignore
) { }
515 LOG
.info("Created version file at " + rootdir
.toString() + " with version=" + version
);
517 } catch (IOException e
) {
519 LOG
.debug("Unable to create version file at " + rootdir
.toString() + ", retrying", e
);
520 fs
.delete(versionFile
, false);
525 } catch (InterruptedException ie
) {
526 throw (InterruptedIOException
)new InterruptedIOException().initCause(ie
);
537 * Checks that a cluster ID file exists in the HBase root directory
538 * @param fs the root directory FileSystem
539 * @param rootdir the HBase root directory in HDFS
540 * @param wait how long to wait between retries
541 * @return <code>true</code> if the file exists, otherwise <code>false</code>
542 * @throws IOException if checking the FileSystem fails
544 public static boolean checkClusterIdExists(FileSystem fs
, Path rootdir
,
545 int wait
) throws IOException
{
548 Path filePath
= new Path(rootdir
, HConstants
.CLUSTER_ID_FILE_NAME
);
549 return fs
.exists(filePath
);
550 } catch (IOException ioe
) {
552 LOG
.warn("Unable to check cluster ID file in " + rootdir
.toString() +
553 ", retrying in "+wait
+"msec: "+StringUtils
.stringifyException(ioe
));
556 } catch (InterruptedException e
) {
557 throw (InterruptedIOException
)new InterruptedIOException().initCause(e
);
567 * Returns the value of the unique cluster ID stored for this HBase instance.
568 * @param fs the root directory FileSystem
569 * @param rootdir the path to the HBase root directory
570 * @return the unique cluster identifier
571 * @throws IOException if reading the cluster ID file fails
573 public static ClusterId
getClusterId(FileSystem fs
, Path rootdir
)
575 Path idPath
= new Path(rootdir
, HConstants
.CLUSTER_ID_FILE_NAME
);
576 ClusterId clusterId
= null;
577 FileStatus status
= fs
.exists(idPath
)? fs
.getFileStatus(idPath
): null;
578 if (status
!= null) {
579 int len
= Ints
.checkedCast(status
.getLen());
580 byte [] content
= new byte[len
];
581 FSDataInputStream in
= fs
.open(idPath
);
583 in
.readFully(content
);
584 } catch (EOFException eof
) {
585 LOG
.warn("Cluster ID file " + idPath
.toString() + " was empty");
590 clusterId
= ClusterId
.parseFrom(content
);
591 } catch (DeserializationException e
) {
592 throw new IOException("content=" + Bytes
.toString(content
), e
);
594 // If not pb'd, make it so.
595 if (!ProtobufUtil
.isPBMagicPrefix(content
)) {
597 in
= fs
.open(idPath
);
600 clusterId
= new ClusterId(cid
);
601 } catch (EOFException eof
) {
602 LOG
.warn("Cluster ID file " + idPath
.toString() + " was empty");
606 rewriteAsPb(fs
, rootdir
, idPath
, clusterId
);
610 LOG
.warn("Cluster ID file does not exist at " + idPath
.toString());
617 * @throws IOException
619 private static void rewriteAsPb(final FileSystem fs
, final Path rootdir
, final Path p
,
622 // Rewrite the file as pb. Move aside the old one first, write new
623 // then delete the moved-aside file.
624 Path movedAsideName
= new Path(p
+ "." + System
.currentTimeMillis());
625 if (!fs
.rename(p
, movedAsideName
)) throw new IOException("Failed rename of " + p
);
626 setClusterId(fs
, rootdir
, cid
, 100);
627 if (!fs
.delete(movedAsideName
, false)) {
628 throw new IOException("Failed delete of " + movedAsideName
);
630 LOG
.debug("Rewrote the hbase.id file as pb");
634 * Writes a new unique identifier for this cluster to the "hbase.id" file
635 * in the HBase root directory
636 * @param fs the root directory FileSystem
637 * @param rootdir the path to the HBase root directory
638 * @param clusterId the unique identifier to store
639 * @param wait how long (in milliseconds) to wait between retries
640 * @throws IOException if writing to the FileSystem fails and no wait value
642 public static void setClusterId(FileSystem fs
, Path rootdir
, ClusterId clusterId
,
643 int wait
) throws IOException
{
646 Path idFile
= new Path(rootdir
, HConstants
.CLUSTER_ID_FILE_NAME
);
647 Path tempIdFile
= new Path(rootdir
, HConstants
.HBASE_TEMP_DIRECTORY
+
648 Path
.SEPARATOR
+ HConstants
.CLUSTER_ID_FILE_NAME
);
649 // Write the id file to a temporary location
650 FSDataOutputStream s
= fs
.create(tempIdFile
);
652 s
.write(clusterId
.toByteArray());
655 // Move the temporary file to its normal location. Throw an IOE if
657 if (!fs
.rename(tempIdFile
, idFile
)) {
658 throw new IOException("Unable to move temp version file to " + idFile
);
661 // Attempt to close the stream if still open on the way out
663 if (s
!= null) s
.close();
664 } catch (IOException ignore
) { }
666 if (LOG
.isDebugEnabled()) {
667 LOG
.debug("Created cluster ID file at " + idFile
.toString() + " with ID: " + clusterId
);
670 } catch (IOException ioe
) {
672 LOG
.warn("Unable to create cluster ID file in " + rootdir
.toString() +
673 ", retrying in " + wait
+ "msec: " + StringUtils
.stringifyException(ioe
));
676 } catch (InterruptedException e
) {
677 throw (InterruptedIOException
)new InterruptedIOException().initCause(e
);
687 * If DFS, check safe mode and if so, wait until we clear it.
688 * @param conf configuration
689 * @param wait Sleep between retries
690 * @throws IOException e
692 public static void waitOnSafeMode(final Configuration conf
,
695 FileSystem fs
= FileSystem
.get(conf
);
696 if (!(fs
instanceof DistributedFileSystem
)) return;
697 DistributedFileSystem dfs
= (DistributedFileSystem
)fs
;
698 // Make sure dfs is not in safe mode
699 while (isInSafeMode(dfs
)) {
700 LOG
.info("Waiting for dfs to exit safe mode...");
703 } catch (InterruptedException e
) {
704 throw (InterruptedIOException
)new InterruptedIOException().initCause(e
);
710 * Checks if meta region exists
712 * @param fs file system
713 * @param rootdir root directory of HBase installation
714 * @return true if exists
715 * @throws IOException e
717 @SuppressWarnings("deprecation")
718 public static boolean metaRegionExists(FileSystem fs
, Path rootdir
)
721 HRegion
.getRegionDir(rootdir
, HRegionInfo
.FIRST_META_REGIONINFO
);
722 return fs
.exists(metaRegionDir
);
726 * Compute HDFS blocks distribution of a given file, or a portion of the file
727 * @param fs file system
728 * @param status file status of the file
729 * @param start start position of the portion
730 * @param length length of the portion
731 * @return The HDFS blocks distribution
733 static public HDFSBlocksDistribution
computeHDFSBlocksDistribution(
734 final FileSystem fs
, FileStatus status
, long start
, long length
)
736 HDFSBlocksDistribution blocksDistribution
= new HDFSBlocksDistribution();
737 BlockLocation
[] blockLocations
=
738 fs
.getFileBlockLocations(status
, start
, length
);
739 for(BlockLocation bl
: blockLocations
) {
740 String
[] hosts
= bl
.getHosts();
741 long len
= bl
.getLength();
742 blocksDistribution
.addHostsAndBlockWeight(hosts
, len
);
745 return blocksDistribution
;
749 * Update blocksDistribution with blockLocations
750 * @param blocksDistribution the hdfs blocks distribution
751 * @param blockLocations an array containing block location
753 static public void addToHDFSBlocksDistribution(
754 HDFSBlocksDistribution blocksDistribution
, BlockLocation
[] blockLocations
)
756 for (BlockLocation bl
: blockLocations
) {
757 String
[] hosts
= bl
.getHosts();
758 long len
= bl
.getLength();
759 blocksDistribution
.addHostsAndBlockWeight(hosts
, len
);
763 // TODO move this method OUT of FSUtils. No dependencies to HMaster
765 * Returns the total overall fragmentation percentage. Includes hbase:meta and
768 * @param master The master defining the HBase root and file system.
769 * @return A map for each table and its percentage.
770 * @throws IOException When scanning the directory fails.
772 public static int getTotalTableFragmentation(final HMaster master
)
774 Map
<String
, Integer
> map
= getTableFragmentation(master
);
775 return map
!= null && map
.size() > 0 ? map
.get("-TOTAL-") : -1;
779 * Runs through the HBase rootdir and checks how many stores for each table
780 * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
781 * percentage across all tables is stored under the special key "-TOTAL-".
783 * @param master The master defining the HBase root and file system.
784 * @return A map for each table and its percentage.
786 * @throws IOException When scanning the directory fails.
788 public static Map
<String
, Integer
> getTableFragmentation(
789 final HMaster master
)
791 Path path
= getRootDir(master
.getConfiguration());
792 // since HMaster.getFileSystem() is package private
793 FileSystem fs
= path
.getFileSystem(master
.getConfiguration());
794 return getTableFragmentation(fs
, path
);
798 * Runs through the HBase rootdir and checks how many stores for each table
799 * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
800 * percentage across all tables is stored under the special key "-TOTAL-".
802 * @param fs The file system to use.
803 * @param hbaseRootDir The root directory to scan.
804 * @return A map for each table and its percentage.
805 * @throws IOException When scanning the directory fails.
807 public static Map
<String
, Integer
> getTableFragmentation(
808 final FileSystem fs
, final Path hbaseRootDir
)
810 Map
<String
, Integer
> frags
= new HashMap
<>();
811 int cfCountTotal
= 0;
813 PathFilter regionFilter
= new RegionDirFilter(fs
);
814 PathFilter familyFilter
= new FamilyDirFilter(fs
);
815 List
<Path
> tableDirs
= getTableDirs(fs
, hbaseRootDir
);
816 for (Path d
: tableDirs
) {
819 FileStatus
[] regionDirs
= fs
.listStatus(d
, regionFilter
);
820 for (FileStatus regionDir
: regionDirs
) {
821 Path dd
= regionDir
.getPath();
822 // else its a region name, now look in region for families
823 FileStatus
[] familyDirs
= fs
.listStatus(dd
, familyFilter
);
824 for (FileStatus familyDir
: familyDirs
) {
827 Path family
= familyDir
.getPath();
828 // now in family make sure only one file
829 FileStatus
[] familyStatus
= fs
.listStatus(family
);
830 if (familyStatus
.length
> 1) {
836 // compute percentage per table and store in result list
837 frags
.put(FSUtils
.getTableName(d
).getNameAsString(),
838 cfCount
== 0?
0: Math
.round((float) cfFrag
/ cfCount
* 100));
840 // set overall percentage for all tables
842 cfCountTotal
== 0?
0: Math
.round((float) cfFragTotal
/ cfCountTotal
* 100));
846 public static void renameFile(FileSystem fs
, Path src
, Path dst
) throws IOException
{
847 if (fs
.exists(dst
) && !fs
.delete(dst
, false)) {
848 throw new IOException("Can not delete " + dst
);
850 if (!fs
.rename(src
, dst
)) {
851 throw new IOException("Can not rename from " + src
+ " to " + dst
);
856 * A {@link PathFilter} that returns only regular files.
858 static class FileFilter
extends AbstractFileStatusFilter
{
859 private final FileSystem fs
;
861 public FileFilter(final FileSystem fs
) {
866 protected boolean accept(Path p
, @CheckForNull Boolean isDir
) {
868 return isFile(fs
, isDir
, p
);
869 } catch (IOException e
) {
870 LOG
.warn("unable to verify if path=" + p
+ " is a regular file", e
);
877 * Directory filter that doesn't include any of the directories in the specified blacklist
879 public static class BlackListDirFilter
extends AbstractFileStatusFilter
{
880 private final FileSystem fs
;
881 private List
<String
> blacklist
;
884 * Create a filter on the givem filesystem with the specified blacklist
885 * @param fs filesystem to filter
886 * @param directoryNameBlackList list of the names of the directories to filter. If
887 * <tt>null</tt>, all directories are returned
889 @SuppressWarnings("unchecked")
890 public BlackListDirFilter(final FileSystem fs
, final List
<String
> directoryNameBlackList
) {
893 (List
<String
>) (directoryNameBlackList
== null ? Collections
.emptyList()
894 : directoryNameBlackList
);
898 protected boolean accept(Path p
, @CheckForNull Boolean isDir
) {
899 if (!isValidName(p
.getName())) {
904 return isDirectory(fs
, isDir
, p
);
905 } catch (IOException e
) {
906 LOG
.warn("An error occurred while verifying if [" + p
.toString()
907 + "] is a valid directory. Returning 'not valid' and continuing.", e
);
912 protected boolean isValidName(final String name
) {
913 return !blacklist
.contains(name
);
918 * A {@link PathFilter} that only allows directories.
920 public static class DirFilter
extends BlackListDirFilter
{
922 public DirFilter(FileSystem fs
) {
928 * A {@link PathFilter} that returns usertable directories. To get all directories use the
929 * {@link BlackListDirFilter} with a <tt>null</tt> blacklist
931 public static class UserTableDirFilter
extends BlackListDirFilter
{
932 public UserTableDirFilter(FileSystem fs
) {
933 super(fs
, HConstants
.HBASE_NON_TABLE_DIRS
);
937 protected boolean isValidName(final String name
) {
938 if (!super.isValidName(name
))
942 TableName
.isLegalTableQualifierName(Bytes
.toBytes(name
));
943 } catch (IllegalArgumentException e
) {
944 LOG
.info("INVALID NAME " + name
);
951 public void recoverFileLease(final FileSystem fs
, final Path p
, Configuration conf
)
953 recoverFileLease(fs
, p
, conf
, null);
957 * Recover file lease. Used when a file might be suspect
958 * to be had been left open by another process.
959 * @param fs FileSystem handle
960 * @param p Path of file to recover lease
961 * @param conf Configuration handle
962 * @throws IOException
964 public abstract void recoverFileLease(final FileSystem fs
, final Path p
,
965 Configuration conf
, CancelableProgressable reporter
) throws IOException
;
967 public static List
<Path
> getTableDirs(final FileSystem fs
, final Path rootdir
)
969 List
<Path
> tableDirs
= new LinkedList
<>();
971 for(FileStatus status
:
972 fs
.globStatus(new Path(rootdir
,
973 new Path(HConstants
.BASE_NAMESPACE_DIR
, "*")))) {
974 tableDirs
.addAll(FSUtils
.getLocalTableDirs(fs
, status
.getPath()));
982 * @return All the table directories under <code>rootdir</code>. Ignore non table hbase folders such as
983 * .logs, .oldlogs, .corrupt folders.
984 * @throws IOException
986 public static List
<Path
> getLocalTableDirs(final FileSystem fs
, final Path rootdir
)
988 // presumes any directory under hbase.rootdir is a table
989 FileStatus
[] dirs
= fs
.listStatus(rootdir
, new UserTableDirFilter(fs
));
990 List
<Path
> tabledirs
= new ArrayList
<>(dirs
.length
);
991 for (FileStatus dir
: dirs
) {
992 tabledirs
.add(dir
.getPath());
998 * Filter for all dirs that don't start with '.'
1000 public static class RegionDirFilter
extends AbstractFileStatusFilter
{
1001 // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names.
1002 final public static Pattern regionDirPattern
= Pattern
.compile("^[0-9a-f]*$");
1003 final FileSystem fs
;
1005 public RegionDirFilter(FileSystem fs
) {
1010 protected boolean accept(Path p
, @CheckForNull Boolean isDir
) {
1011 if (!regionDirPattern
.matcher(p
.getName()).matches()) {
1016 return isDirectory(fs
, isDir
, p
);
1017 } catch (IOException ioe
) {
1018 // Maybe the file was moved or the fs was disconnected.
1019 LOG
.warn("Skipping file " + p
+" due to IOException", ioe
);
1026 * Given a particular table dir, return all the regiondirs inside it, excluding files such as
1028 * @param fs A file system for the Path
1029 * @param tableDir Path to a specific table directory <hbase.rootdir>/<tabledir>
1030 * @return List of paths to valid region directories in table dir.
1031 * @throws IOException
1033 public static List
<Path
> getRegionDirs(final FileSystem fs
, final Path tableDir
) throws IOException
{
1034 // assumes we are in a table dir.
1035 List
<FileStatus
> rds
= listStatusWithStatusFilter(fs
, tableDir
, new RegionDirFilter(fs
));
1037 return Collections
.emptyList();
1039 List
<Path
> regionDirs
= new ArrayList
<>(rds
.size());
1040 for (FileStatus rdfs
: rds
) {
1041 Path rdPath
= rdfs
.getPath();
1042 regionDirs
.add(rdPath
);
1047 public static Path
getRegionDir(Path tableDir
, RegionInfo region
) {
1048 return new Path(tableDir
, ServerRegionReplicaUtil
.getRegionInfoForFs(region
).getEncodedName());
1052 * Filter for all dirs that are legal column family names. This is generally used for colfam
1053 * dirs <hbase.rootdir>/<tabledir>/<regiondir>/<colfamdir>.
1055 public static class FamilyDirFilter
extends AbstractFileStatusFilter
{
1056 final FileSystem fs
;
1058 public FamilyDirFilter(FileSystem fs
) {
1063 protected boolean accept(Path p
, @CheckForNull Boolean isDir
) {
1065 // throws IAE if invalid
1066 HColumnDescriptor
.isLegalFamilyName(Bytes
.toBytes(p
.getName()));
1067 } catch (IllegalArgumentException iae
) {
1068 // path name is an invalid family name and thus is excluded.
1073 return isDirectory(fs
, isDir
, p
);
1074 } catch (IOException ioe
) {
1075 // Maybe the file was moved or the fs was disconnected.
1076 LOG
.warn("Skipping file " + p
+" due to IOException", ioe
);
1083 * Given a particular region dir, return all the familydirs inside it
1085 * @param fs A file system for the Path
1086 * @param regionDir Path to a specific region directory
1087 * @return List of paths to valid family directories in region dir.
1088 * @throws IOException
1090 public static List
<Path
> getFamilyDirs(final FileSystem fs
, final Path regionDir
) throws IOException
{
1091 // assumes we are in a region dir.
1092 FileStatus
[] fds
= fs
.listStatus(regionDir
, new FamilyDirFilter(fs
));
1093 List
<Path
> familyDirs
= new ArrayList
<>(fds
.length
);
1094 for (FileStatus fdfs
: fds
) {
1095 Path fdPath
= fdfs
.getPath();
1096 familyDirs
.add(fdPath
);
1101 public static List
<Path
> getReferenceFilePaths(final FileSystem fs
, final Path familyDir
) throws IOException
{
1102 List
<FileStatus
> fds
= listStatusWithStatusFilter(fs
, familyDir
, new ReferenceFileFilter(fs
));
1104 return Collections
.emptyList();
1106 List
<Path
> referenceFiles
= new ArrayList
<>(fds
.size());
1107 for (FileStatus fdfs
: fds
) {
1108 Path fdPath
= fdfs
.getPath();
1109 referenceFiles
.add(fdPath
);
1111 return referenceFiles
;
1115 * Filter for HFiles that excludes reference files.
1117 public static class HFileFilter
extends AbstractFileStatusFilter
{
1118 final FileSystem fs
;
1120 public HFileFilter(FileSystem fs
) {
1125 protected boolean accept(Path p
, @CheckForNull Boolean isDir
) {
1126 if (!StoreFileInfo
.isHFile(p
)) {
1131 return isFile(fs
, isDir
, p
);
1132 } catch (IOException ioe
) {
1133 // Maybe the file was moved or the fs was disconnected.
1134 LOG
.warn("Skipping file " + p
+" due to IOException", ioe
);
1141 * Filter for HFileLinks (StoreFiles and HFiles not included).
1142 * the filter itself does not consider if a link is file or not.
1144 public static class HFileLinkFilter
implements PathFilter
{
1147 public boolean accept(Path p
) {
1148 return HFileLink
.isHFileLink(p
);
1152 public static class ReferenceFileFilter
extends AbstractFileStatusFilter
{
1154 private final FileSystem fs
;
1156 public ReferenceFileFilter(FileSystem fs
) {
1161 protected boolean accept(Path p
, @CheckForNull Boolean isDir
) {
1162 if (!StoreFileInfo
.isReference(p
)) {
1167 // only files can be references.
1168 return isFile(fs
, isDir
, p
);
1169 } catch (IOException ioe
) {
1170 // Maybe the file was moved or the fs was disconnected.
1171 LOG
.warn("Skipping file " + p
+" due to IOException", ioe
);
1178 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1179 * table StoreFile names to the full Path.
1182 * Key = 3944417774205889744 <br>
1183 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1185 * @param map map to add values. If null, this method will create and populate one to return
1186 * @param fs The file system to use.
1187 * @param hbaseRootDir The root directory to scan.
1188 * @param tableName name of the table to scan.
1189 * @return Map keyed by StoreFile name with a value of the full Path.
1190 * @throws IOException When scanning the directory fails.
1191 * @throws InterruptedException
1193 public static Map
<String
, Path
> getTableStoreFilePathMap(Map
<String
, Path
> map
,
1194 final FileSystem fs
, final Path hbaseRootDir
, TableName tableName
)
1195 throws IOException
, InterruptedException
{
1196 return getTableStoreFilePathMap(map
, fs
, hbaseRootDir
, tableName
, null, null, null);
1200 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1201 * table StoreFile names to the full Path. Note that because this method can be called
1202 * on a 'live' HBase system that we will skip files that no longer exist by the time
1203 * we traverse them and similarly the user of the result needs to consider that some
1204 * entries in this map may not exist by the time this call completes.
1207 * Key = 3944417774205889744 <br>
1208 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1210 * @param resultMap map to add values. If null, this method will create and populate one to return
1211 * @param fs The file system to use.
1212 * @param hbaseRootDir The root directory to scan.
1213 * @param tableName name of the table to scan.
1214 * @param sfFilter optional path filter to apply to store files
1215 * @param executor optional executor service to parallelize this operation
1216 * @param errors ErrorReporter instance or null
1217 * @return Map keyed by StoreFile name with a value of the full Path.
1218 * @throws IOException When scanning the directory fails.
1219 * @throws InterruptedException
1221 public static Map
<String
, Path
> getTableStoreFilePathMap(
1222 Map
<String
, Path
> resultMap
,
1223 final FileSystem fs
, final Path hbaseRootDir
, TableName tableName
, final PathFilter sfFilter
,
1224 ExecutorService executor
, final ErrorReporter errors
) throws IOException
, InterruptedException
{
1226 final Map
<String
, Path
> finalResultMap
=
1227 resultMap
== null ?
new ConcurrentHashMap
<>(128, 0.75f
, 32) : resultMap
;
1229 // only include the directory paths to tables
1230 Path tableDir
= FSUtils
.getTableDir(hbaseRootDir
, tableName
);
1231 // Inside a table, there are compaction.dir directories to skip. Otherwise, all else
1232 // should be regions.
1233 final FamilyDirFilter familyFilter
= new FamilyDirFilter(fs
);
1234 final Vector
<Exception
> exceptions
= new Vector
<>();
1237 List
<FileStatus
> regionDirs
= FSUtils
.listStatusWithStatusFilter(fs
, tableDir
, new RegionDirFilter(fs
));
1238 if (regionDirs
== null) {
1239 return finalResultMap
;
1242 final List
<Future
<?
>> futures
= new ArrayList
<>(regionDirs
.size());
1244 for (FileStatus regionDir
: regionDirs
) {
1245 if (null != errors
) {
1248 final Path dd
= regionDir
.getPath();
1250 if (!exceptions
.isEmpty()) {
1254 Runnable getRegionStoreFileMapCall
= new Runnable() {
1258 HashMap
<String
,Path
> regionStoreFileMap
= new HashMap
<>();
1259 List
<FileStatus
> familyDirs
= FSUtils
.listStatusWithStatusFilter(fs
, dd
, familyFilter
);
1260 if (familyDirs
== null) {
1261 if (!fs
.exists(dd
)) {
1262 LOG
.warn("Skipping region because it no longer exists: " + dd
);
1264 LOG
.warn("Skipping region because it has no family dirs: " + dd
);
1268 for (FileStatus familyDir
: familyDirs
) {
1269 if (null != errors
) {
1272 Path family
= familyDir
.getPath();
1273 if (family
.getName().equals(HConstants
.RECOVERED_EDITS_DIR
)) {
1276 // now in family, iterate over the StoreFiles and
1278 FileStatus
[] familyStatus
= fs
.listStatus(family
);
1279 for (FileStatus sfStatus
: familyStatus
) {
1280 if (null != errors
) {
1283 Path sf
= sfStatus
.getPath();
1284 if (sfFilter
== null || sfFilter
.accept(sf
)) {
1285 regionStoreFileMap
.put( sf
.getName(), sf
);
1289 finalResultMap
.putAll(regionStoreFileMap
);
1290 } catch (Exception e
) {
1291 LOG
.error("Could not get region store file map for region: " + dd
, e
);
1297 // If executor is available, submit async tasks to exec concurrently, otherwise
1298 // just do serial sync execution
1299 if (executor
!= null) {
1300 Future
<?
> future
= executor
.submit(getRegionStoreFileMapCall
);
1301 futures
.add(future
);
1303 FutureTask
<?
> future
= new FutureTask
<>(getRegionStoreFileMapCall
, null);
1305 futures
.add(future
);
1309 // Ensure all pending tasks are complete (or that we run into an exception)
1310 for (Future
<?
> f
: futures
) {
1311 if (!exceptions
.isEmpty()) {
1316 } catch (ExecutionException e
) {
1317 LOG
.error("Unexpected exec exception! Should've been caught already. (Bug?)", e
);
1318 // Shouldn't happen, we already logged/caught any exceptions in the Runnable
1321 } catch (IOException e
) {
1322 LOG
.error("Cannot execute getTableStoreFilePathMap for " + tableName
, e
);
1325 if (!exceptions
.isEmpty()) {
1326 // Just throw the first exception as an indication something bad happened
1327 // Don't need to propagate all the exceptions, we already logged them all anyway
1328 Throwables
.propagateIfInstanceOf(exceptions
.firstElement(), IOException
.class);
1329 throw Throwables
.propagate(exceptions
.firstElement());
1333 return finalResultMap
;
1336 public static int getRegionReferenceFileCount(final FileSystem fs
, final Path p
) {
1339 for (Path familyDir
:getFamilyDirs(fs
, p
)){
1340 result
+= getReferenceFilePaths(fs
, familyDir
).size();
1342 } catch (IOException e
) {
1343 LOG
.warn("Error Counting reference files.", e
);
1349 * Runs through the HBase rootdir and creates a reverse lookup map for
1350 * table StoreFile names to the full Path.
1353 * Key = 3944417774205889744 <br>
1354 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1356 * @param fs The file system to use.
1357 * @param hbaseRootDir The root directory to scan.
1358 * @return Map keyed by StoreFile name with a value of the full Path.
1359 * @throws IOException When scanning the directory fails.
1360 * @throws InterruptedException
1362 public static Map
<String
, Path
> getTableStoreFilePathMap(
1363 final FileSystem fs
, final Path hbaseRootDir
)
1364 throws IOException
, InterruptedException
{
1365 return getTableStoreFilePathMap(fs
, hbaseRootDir
, null, null, null);
1369 * Runs through the HBase rootdir and creates a reverse lookup map for
1370 * table StoreFile names to the full Path.
1373 * Key = 3944417774205889744 <br>
1374 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1376 * @param fs The file system to use.
1377 * @param hbaseRootDir The root directory to scan.
1378 * @param sfFilter optional path filter to apply to store files
1379 * @param executor optional executor service to parallelize this operation
1380 * @param errors ErrorReporter instance or null
1381 * @return Map keyed by StoreFile name with a value of the full Path.
1382 * @throws IOException When scanning the directory fails.
1383 * @throws InterruptedException
1385 public static Map
<String
, Path
> getTableStoreFilePathMap(
1386 final FileSystem fs
, final Path hbaseRootDir
, PathFilter sfFilter
,
1387 ExecutorService executor
, ErrorReporter errors
)
1388 throws IOException
, InterruptedException
{
1389 ConcurrentHashMap
<String
, Path
> map
= new ConcurrentHashMap
<>(1024, 0.75f
, 32);
1391 // if this method looks similar to 'getTableFragmentation' that is because
1392 // it was borrowed from it.
1394 // only include the directory paths to tables
1395 for (Path tableDir
: FSUtils
.getTableDirs(fs
, hbaseRootDir
)) {
1396 getTableStoreFilePathMap(map
, fs
, hbaseRootDir
,
1397 FSUtils
.getTableName(tableDir
), sfFilter
, executor
, errors
);
1403 * Filters FileStatuses in an array and returns a list
1405 * @param input An array of FileStatuses
1406 * @param filter A required filter to filter the array
1407 * @return A list of FileStatuses
1409 public static List
<FileStatus
> filterFileStatuses(FileStatus
[] input
,
1410 FileStatusFilter filter
) {
1411 if (input
== null) return null;
1412 return filterFileStatuses(Iterators
.forArray(input
), filter
);
1416 * Filters FileStatuses in an iterator and returns a list
1418 * @param input An iterator of FileStatuses
1419 * @param filter A required filter to filter the array
1420 * @return A list of FileStatuses
1422 public static List
<FileStatus
> filterFileStatuses(Iterator
<FileStatus
> input
,
1423 FileStatusFilter filter
) {
1424 if (input
== null) return null;
1425 ArrayList
<FileStatus
> results
= new ArrayList
<>();
1426 while (input
.hasNext()) {
1427 FileStatus f
= input
.next();
1428 if (filter
.accept(f
)) {
1436 * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
1437 * This accommodates differences between hadoop versions, where hadoop 1
1438 * does not throw a FileNotFoundException, and return an empty FileStatus[]
1439 * while Hadoop 2 will throw FileNotFoundException.
1441 * @param fs file system
1442 * @param dir directory
1443 * @param filter file status filter
1444 * @return null if dir is empty or doesn't exist, otherwise FileStatus list
1446 public static List
<FileStatus
> listStatusWithStatusFilter(final FileSystem fs
,
1447 final Path dir
, final FileStatusFilter filter
) throws IOException
{
1448 FileStatus
[] status
= null;
1450 status
= fs
.listStatus(dir
);
1451 } catch (FileNotFoundException fnfe
) {
1452 // if directory doesn't exist, return null
1453 if (LOG
.isTraceEnabled()) {
1454 LOG
.trace(dir
+ " doesn't exist");
1458 if (status
== null || status
.length
< 1) {
1462 if (filter
== null) {
1463 return Arrays
.asList(status
);
1465 List
<FileStatus
> status2
= filterFileStatuses(status
, filter
);
1466 if (status2
== null || status2
.isEmpty()) {
1475 * Throw an exception if an action is not permitted by a user on a file.
1484 public static void checkAccess(UserGroupInformation ugi
, FileStatus file
,
1485 FsAction action
) throws AccessDeniedException
{
1486 if (ugi
.getShortUserName().equals(file
.getOwner())) {
1487 if (file
.getPermission().getUserAction().implies(action
)) {
1490 } else if (contains(ugi
.getGroupNames(), file
.getGroup())) {
1491 if (file
.getPermission().getGroupAction().implies(action
)) {
1494 } else if (file
.getPermission().getOtherAction().implies(action
)) {
1497 throw new AccessDeniedException("Permission denied:" + " action=" + action
1498 + " path=" + file
.getPath() + " user=" + ugi
.getShortUserName());
1501 private static boolean contains(String
[] groups
, String user
) {
1502 for (String group
: groups
) {
1503 if (group
.equals(user
)) {
1511 * This function is to scan the root path of the file system to get the
1512 * degree of locality for each region on each of the servers having at least
1513 * one block of that region.
1514 * This is used by the tool {@link org.apache.hadoop.hbase.master.RegionPlacementMaintainer}
1517 * the configuration to use
1518 * @return the mapping from region encoded name to a map of server names to
1520 * @throws IOException
1521 * in case of file system errors or interrupts
1523 public static Map
<String
, Map
<String
, Float
>> getRegionDegreeLocalityMappingFromFS(
1524 final Configuration conf
) throws IOException
{
1525 return getRegionDegreeLocalityMappingFromFS(
1527 conf
.getInt(THREAD_POOLSIZE
, DEFAULT_THREAD_POOLSIZE
));
1532 * This function is to scan the root path of the file system to get the
1533 * degree of locality for each region on each of the servers having at least
1534 * one block of that region.
1537 * the configuration to use
1538 * @param desiredTable
1539 * the table you wish to scan locality for
1540 * @param threadPoolSize
1541 * the thread pool size to use
1542 * @return the mapping from region encoded name to a map of server names to
1544 * @throws IOException
1545 * in case of file system errors or interrupts
1547 public static Map
<String
, Map
<String
, Float
>> getRegionDegreeLocalityMappingFromFS(
1548 final Configuration conf
, final String desiredTable
, int threadPoolSize
)
1549 throws IOException
{
1550 Map
<String
, Map
<String
, Float
>> regionDegreeLocalityMapping
= new ConcurrentHashMap
<>();
1551 getRegionLocalityMappingFromFS(conf
, desiredTable
, threadPoolSize
, null,
1552 regionDegreeLocalityMapping
);
1553 return regionDegreeLocalityMapping
;
1557 * This function is to scan the root path of the file system to get either the
1558 * mapping between the region name and its best locality region server or the
1559 * degree of locality of each region on each of the servers having at least
1560 * one block of that region. The output map parameters are both optional.
1563 * the configuration to use
1564 * @param desiredTable
1565 * the table you wish to scan locality for
1566 * @param threadPoolSize
1567 * the thread pool size to use
1568 * @param regionToBestLocalityRSMapping
1569 * the map into which to put the best locality mapping or null
1570 * @param regionDegreeLocalityMapping
1571 * the map into which to put the locality degree mapping or null,
1572 * must be a thread-safe implementation
1573 * @throws IOException
1574 * in case of file system errors or interrupts
1576 private static void getRegionLocalityMappingFromFS(
1577 final Configuration conf
, final String desiredTable
,
1579 Map
<String
, String
> regionToBestLocalityRSMapping
,
1580 Map
<String
, Map
<String
, Float
>> regionDegreeLocalityMapping
)
1581 throws IOException
{
1582 FileSystem fs
= FileSystem
.get(conf
);
1583 Path rootPath
= FSUtils
.getRootDir(conf
);
1584 long startTime
= EnvironmentEdgeManager
.currentTime();
1586 // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/*
1587 if (null == desiredTable
) {
1588 queryPath
= new Path(new Path(rootPath
, HConstants
.BASE_NAMESPACE_DIR
).toString() + "/*/*/*/");
1590 queryPath
= new Path(FSUtils
.getTableDir(rootPath
, TableName
.valueOf(desiredTable
)).toString() + "/*/");
1593 // reject all paths that are not appropriate
1594 PathFilter pathFilter
= new PathFilter() {
1596 public boolean accept(Path path
) {
1597 // this is the region name; it may get some noise data
1603 Path parent
= path
.getParent();
1604 if (null == parent
) {
1608 String regionName
= path
.getName();
1609 if (null == regionName
) {
1613 if (!regionName
.toLowerCase(Locale
.ROOT
).matches("[0-9a-f]+")) {
1620 FileStatus
[] statusList
= fs
.globStatus(queryPath
, pathFilter
);
1622 if (null == statusList
) {
1625 LOG
.debug("Query Path: " + queryPath
+ " ; # list of files: " +
1629 // lower the number of threads in case we have very few expected regions
1630 threadPoolSize
= Math
.min(threadPoolSize
, statusList
.length
);
1632 // run in multiple threads
1633 ThreadPoolExecutor tpe
= new ThreadPoolExecutor(threadPoolSize
,
1634 threadPoolSize
, 60, TimeUnit
.SECONDS
,
1635 new ArrayBlockingQueue
<>(statusList
.length
));
1637 // ignore all file status items that are not of interest
1638 for (FileStatus regionStatus
: statusList
) {
1639 if (null == regionStatus
) {
1643 if (!regionStatus
.isDirectory()) {
1647 Path regionPath
= regionStatus
.getPath();
1648 if (null == regionPath
) {
1652 tpe
.execute(new FSRegionScanner(fs
, regionPath
,
1653 regionToBestLocalityRSMapping
, regionDegreeLocalityMapping
));
1657 int threadWakeFrequency
= conf
.getInt(HConstants
.THREAD_WAKE_FREQUENCY
,
1660 // here we wait until TPE terminates, which is either naturally or by
1661 // exceptions in the execution of the threads
1662 while (!tpe
.awaitTermination(threadWakeFrequency
,
1663 TimeUnit
.MILLISECONDS
)) {
1664 // printing out rough estimate, so as to not introduce
1666 LOG
.info("Locality checking is underway: { Scanned Regions : "
1667 + tpe
.getCompletedTaskCount() + "/"
1668 + tpe
.getTaskCount() + " }");
1670 } catch (InterruptedException e
) {
1671 throw (InterruptedIOException
)new InterruptedIOException().initCause(e
);
1675 long overhead
= EnvironmentEdgeManager
.currentTime() - startTime
;
1676 String overheadMsg
= "Scan DFS for locality info takes " + overhead
+ " ms";
1678 LOG
.info(overheadMsg
);
1682 * Do our short circuit read setup.
1683 * Checks buffer size to use and whether to do checksumming in hbase or hdfs.
1686 public static void setupShortCircuitRead(final Configuration conf
) {
1687 // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property.
1688 boolean shortCircuitSkipChecksum
=
1689 conf
.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
1690 boolean useHBaseChecksum
= conf
.getBoolean(HConstants
.HBASE_CHECKSUM_VERIFICATION
, true);
1691 if (shortCircuitSkipChecksum
) {
1692 LOG
.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " +
1693 "be set to true." + (useHBaseChecksum ?
" HBase checksum doesn't require " +
1694 "it, see https://issues.apache.org/jira/browse/HBASE-6868." : ""));
1695 assert !shortCircuitSkipChecksum
; //this will fail if assertions are on
1697 checkShortCircuitReadBufferSize(conf
);
1701 * Check if short circuit read buffer size is set and if not, set it to hbase value.
1704 public static void checkShortCircuitReadBufferSize(final Configuration conf
) {
1705 final int defaultSize
= HConstants
.DEFAULT_BLOCKSIZE
* 2;
1706 final int notSet
= -1;
1707 // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
1708 final String dfsKey
= "dfs.client.read.shortcircuit.buffer.size";
1709 int size
= conf
.getInt(dfsKey
, notSet
);
1710 // If a size is set, return -- we will use it.
1711 if (size
!= notSet
) return;
1712 // But short circuit buffer size is normally not set. Put in place the hbase wanted size.
1713 int hbaseSize
= conf
.getInt("hbase." + dfsKey
, defaultSize
);
1714 conf
.setIfUnset(dfsKey
, Integer
.toString(hbaseSize
));
1719 * @return The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs.
1720 * @throws IOException
1722 public static DFSHedgedReadMetrics
getDFSHedgedReadMetrics(final Configuration c
)
1723 throws IOException
{
1724 if (!isHDFS(c
)) return null;
1725 // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal
1726 // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it
1727 // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients.
1728 final String name
= "getHedgedReadMetrics";
1729 DFSClient dfsclient
= ((DistributedFileSystem
)FileSystem
.get(c
)).getClient();
1732 m
= dfsclient
.getClass().getDeclaredMethod(name
);
1733 } catch (NoSuchMethodException e
) {
1734 LOG
.warn("Failed find method " + name
+ " in dfsclient; no hedged read metrics: " +
1737 } catch (SecurityException e
) {
1738 LOG
.warn("Failed find method " + name
+ " in dfsclient; no hedged read metrics: " +
1742 m
.setAccessible(true);
1744 return (DFSHedgedReadMetrics
)m
.invoke(dfsclient
);
1745 } catch (IllegalAccessException e
) {
1746 LOG
.warn("Failed invoking method " + name
+ " on dfsclient; no hedged read metrics: " +
1749 } catch (IllegalArgumentException e
) {
1750 LOG
.warn("Failed invoking method " + name
+ " on dfsclient; no hedged read metrics: " +
1753 } catch (InvocationTargetException e
) {
1754 LOG
.warn("Failed invoking method " + name
+ " on dfsclient; no hedged read metrics: " +
1760 public static List
<Path
> copyFilesParallel(FileSystem srcFS
, Path src
, FileSystem dstFS
, Path dst
,
1761 Configuration conf
, int threads
) throws IOException
{
1762 ExecutorService pool
= Executors
.newFixedThreadPool(threads
);
1763 List
<Future
<Void
>> futures
= new ArrayList
<>();
1764 List
<Path
> traversedPaths
;
1766 traversedPaths
= copyFiles(srcFS
, src
, dstFS
, dst
, conf
, pool
, futures
);
1767 for (Future
<Void
> future
: futures
) {
1770 } catch (ExecutionException
| InterruptedException
| IOException e
) {
1771 throw new IOException("copy snapshot reference files failed", e
);
1775 return traversedPaths
;
1778 private static List
<Path
> copyFiles(FileSystem srcFS
, Path src
, FileSystem dstFS
, Path dst
,
1779 Configuration conf
, ExecutorService pool
, List
<Future
<Void
>> futures
) throws IOException
{
1780 List
<Path
> traversedPaths
= new ArrayList
<>();
1781 traversedPaths
.add(dst
);
1782 FileStatus currentFileStatus
= srcFS
.getFileStatus(src
);
1783 if (currentFileStatus
.isDirectory()) {
1784 if (!dstFS
.mkdirs(dst
)) {
1785 throw new IOException("create dir failed: " + dst
);
1787 FileStatus
[] subPaths
= srcFS
.listStatus(src
);
1788 for (FileStatus subPath
: subPaths
) {
1789 traversedPaths
.addAll(copyFiles(srcFS
, subPath
.getPath(), dstFS
,
1790 new Path(dst
, subPath
.getPath().getName()), conf
, pool
, futures
));
1793 Future
<Void
> future
= pool
.submit(() -> {
1794 FileUtil
.copy(srcFS
, src
, dstFS
, dst
, false, false, conf
);
1797 futures
.add(future
);
1799 return traversedPaths
;