HBASE-23380 General cleanup of FSUtil (#912)
[hbase.git] / hbase-server / src / main / java / org / apache / hadoop / hbase / util / FSUtils.java
blobd26bd81e4ede0f6f5bb85f129297715546c52113
1 /**
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org.apache.hadoop.hbase.util;
21 import edu.umd.cs.findbugs.annotations.CheckForNull;
22 import java.io.ByteArrayInputStream;
23 import java.io.DataInputStream;
24 import java.io.EOFException;
25 import java.io.FileNotFoundException;
26 import java.io.IOException;
27 import java.io.InterruptedIOException;
28 import java.lang.reflect.InvocationTargetException;
29 import java.lang.reflect.Method;
30 import java.net.InetSocketAddress;
31 import java.util.ArrayList;
32 import java.util.Arrays;
33 import java.util.Collections;
34 import java.util.HashMap;
35 import java.util.Iterator;
36 import java.util.List;
37 import java.util.Locale;
38 import java.util.Map;
39 import java.util.Vector;
40 import java.util.concurrent.ConcurrentHashMap;
41 import java.util.concurrent.ExecutionException;
42 import java.util.concurrent.ExecutorService;
43 import java.util.concurrent.Executors;
44 import java.util.concurrent.Future;
45 import java.util.concurrent.FutureTask;
46 import java.util.concurrent.ThreadPoolExecutor;
47 import java.util.concurrent.TimeUnit;
48 import java.util.regex.Pattern;
50 import org.apache.commons.lang3.ArrayUtils;
51 import org.apache.hadoop.conf.Configuration;
52 import org.apache.hadoop.fs.BlockLocation;
53 import org.apache.hadoop.fs.FSDataInputStream;
54 import org.apache.hadoop.fs.FSDataOutputStream;
55 import org.apache.hadoop.fs.FileStatus;
56 import org.apache.hadoop.fs.FileSystem;
57 import org.apache.hadoop.fs.FileUtil;
58 import org.apache.hadoop.fs.Path;
59 import org.apache.hadoop.fs.PathFilter;
60 import org.apache.hadoop.fs.permission.FsAction;
61 import org.apache.hadoop.fs.permission.FsPermission;
62 import org.apache.hadoop.hbase.ClusterId;
63 import org.apache.hadoop.hbase.HColumnDescriptor;
64 import org.apache.hadoop.hbase.HConstants;
65 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
66 import org.apache.hadoop.hbase.HRegionInfo;
67 import org.apache.hadoop.hbase.TableName;
68 import org.apache.hadoop.hbase.client.RegionInfo;
69 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
70 import org.apache.hadoop.hbase.exceptions.DeserializationException;
71 import org.apache.hadoop.hbase.fs.HFileSystem;
72 import org.apache.hadoop.hbase.io.HFileLink;
73 import org.apache.hadoop.hbase.master.HMaster;
74 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
75 import org.apache.hadoop.hbase.security.AccessDeniedException;
76 import org.apache.hadoop.hdfs.DFSClient;
77 import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
78 import org.apache.hadoop.hdfs.DistributedFileSystem;
79 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
80 import org.apache.hadoop.io.IOUtils;
81 import org.apache.hadoop.ipc.RemoteException;
82 import org.apache.hadoop.security.UserGroupInformation;
83 import org.apache.hadoop.util.Progressable;
84 import org.apache.hadoop.util.ReflectionUtils;
85 import org.apache.hadoop.util.StringUtils;
86 import org.apache.yetus.audience.InterfaceAudience;
87 import org.slf4j.Logger;
88 import org.slf4j.LoggerFactory;
90 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
91 import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
92 import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
93 import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
95 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
96 import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos;
98 /**
99 * Utility methods for interacting with the underlying file system.
101 @InterfaceAudience.Private
102 public abstract class FSUtils extends CommonFSUtils {
103 private static final Logger LOG = LoggerFactory.getLogger(FSUtils.class);
105 private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize";
106 private static final int DEFAULT_THREAD_POOLSIZE = 2;
108 /** Set to true on Windows platforms */
109 @VisibleForTesting // currently only used in testing. TODO refactor into a test class
110 public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
112 protected FSUtils() {
113 super();
117 * @return True is <code>fs</code> is instance of DistributedFileSystem
118 * @throws IOException
120 public static boolean isDistributedFileSystem(final FileSystem fs) throws IOException {
121 FileSystem fileSystem = fs;
122 // If passed an instance of HFileSystem, it fails instanceof DistributedFileSystem.
123 // Check its backing fs for dfs-ness.
124 if (fs instanceof HFileSystem) {
125 fileSystem = ((HFileSystem)fs).getBackingFs();
127 return fileSystem instanceof DistributedFileSystem;
131 * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
132 * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider
133 * schema; i.e. if schemas different but path or subpath matches, the two will equate.
134 * @param pathToSearch Path we will be trying to match.
135 * @param pathTail
136 * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
138 public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
139 Path tailPath = pathTail;
140 String tailName;
141 Path toSearch = pathToSearch;
142 String toSearchName;
143 boolean result = false;
145 if (pathToSearch.depth() != pathTail.depth()) {
146 return false;
149 do {
150 tailName = tailPath.getName();
151 if (tailName == null || tailName.isEmpty()) {
152 result = true;
153 break;
155 toSearchName = toSearch.getName();
156 if (toSearchName == null || toSearchName.isEmpty()) {
157 break;
159 // Move up a parent on each path for next go around. Path doesn't let us go off the end.
160 tailPath = tailPath.getParent();
161 toSearch = toSearch.getParent();
162 } while(tailName.equals(toSearchName));
163 return result;
166 public static FSUtils getInstance(FileSystem fs, Configuration conf) {
167 String scheme = fs.getUri().getScheme();
168 if (scheme == null) {
169 LOG.warn("Could not find scheme for uri " +
170 fs.getUri() + ", default to hdfs");
171 scheme = "hdfs";
173 Class<?> fsUtilsClass = conf.getClass("hbase.fsutil." +
174 scheme + ".impl", FSHDFSUtils.class); // Default to HDFS impl
175 FSUtils fsUtils = (FSUtils)ReflectionUtils.newInstance(fsUtilsClass, conf);
176 return fsUtils;
180 * Delete the region directory if exists.
181 * @param conf
182 * @param hri
183 * @return True if deleted the region directory.
184 * @throws IOException
186 public static boolean deleteRegionDir(final Configuration conf, final HRegionInfo hri)
187 throws IOException {
188 Path rootDir = getRootDir(conf);
189 FileSystem fs = rootDir.getFileSystem(conf);
190 return deleteDirectory(fs,
191 new Path(getTableDir(rootDir, hri.getTable()), hri.getEncodedName()));
195 * Create the specified file on the filesystem. By default, this will:
196 * <ol>
197 * <li>overwrite the file if it exists</li>
198 * <li>apply the umask in the configuration (if it is enabled)</li>
199 * <li>use the fs configured buffer size (or 4096 if not set)</li>
200 * <li>use the configured column family replication or default replication if
201 * {@link HColumnDescriptor#DEFAULT_DFS_REPLICATION}</li>
202 * <li>use the default block size</li>
203 * <li>not track progress</li>
204 * </ol>
205 * @param conf configurations
206 * @param fs {@link FileSystem} on which to write the file
207 * @param path {@link Path} to the file to write
208 * @param perm permissions
209 * @param favoredNodes
210 * @return output stream to the created file
211 * @throws IOException if the file cannot be created
213 public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path,
214 FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException {
215 if (fs instanceof HFileSystem) {
216 FileSystem backingFs = ((HFileSystem)fs).getBackingFs();
217 if (backingFs instanceof DistributedFileSystem) {
218 // Try to use the favoredNodes version via reflection to allow backwards-
219 // compatibility.
220 short replication = Short.parseShort(conf.get(HColumnDescriptor.DFS_REPLICATION,
221 String.valueOf(HColumnDescriptor.DEFAULT_DFS_REPLICATION)));
222 try {
223 return (FSDataOutputStream) (DistributedFileSystem.class.getDeclaredMethod("create",
224 Path.class, FsPermission.class, boolean.class, int.class, short.class, long.class,
225 Progressable.class, InetSocketAddress[].class).invoke(backingFs, path, perm, true,
226 getDefaultBufferSize(backingFs),
227 replication > 0 ? replication : getDefaultReplication(backingFs, path),
228 getDefaultBlockSize(backingFs, path), null, favoredNodes));
229 } catch (InvocationTargetException ite) {
230 // Function was properly called, but threw it's own exception.
231 throw new IOException(ite.getCause());
232 } catch (NoSuchMethodException e) {
233 LOG.debug("DFS Client does not support most favored nodes create; using default create");
234 LOG.trace("Ignoring; use default create", e);
235 } catch (IllegalArgumentException | SecurityException | IllegalAccessException e) {
236 LOG.debug("Ignoring (most likely Reflection related exception) " + e);
240 return create(fs, path, perm, true);
244 * Checks to see if the specified file system is available
246 * @param fs filesystem
247 * @throws IOException e
249 public static void checkFileSystemAvailable(final FileSystem fs)
250 throws IOException {
251 if (!(fs instanceof DistributedFileSystem)) {
252 return;
254 IOException exception = null;
255 DistributedFileSystem dfs = (DistributedFileSystem) fs;
256 try {
257 if (dfs.exists(new Path("/"))) {
258 return;
260 } catch (IOException e) {
261 exception = e instanceof RemoteException ?
262 ((RemoteException)e).unwrapRemoteException() : e;
264 try {
265 fs.close();
266 } catch (Exception e) {
267 LOG.error("file system close failed: ", e);
269 throw new IOException("File system is not available", exception);
273 * We use reflection because {@link DistributedFileSystem#setSafeMode(
274 * HdfsConstants.SafeModeAction action, boolean isChecked)} is not in hadoop 1.1
276 * @param dfs
277 * @return whether we're in safe mode
278 * @throws IOException
280 private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOException {
281 boolean inSafeMode = false;
282 try {
283 Method m = DistributedFileSystem.class.getMethod("setSafeMode", new Class<?> []{
284 org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.class, boolean.class});
285 inSafeMode = (Boolean) m.invoke(dfs,
286 org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET, true);
287 } catch (Exception e) {
288 if (e instanceof IOException) throw (IOException) e;
290 // Check whether dfs is on safemode.
291 inSafeMode = dfs.setSafeMode(
292 org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET);
294 return inSafeMode;
298 * Check whether dfs is in safemode.
299 * @param conf
300 * @throws IOException
302 public static void checkDfsSafeMode(final Configuration conf)
303 throws IOException {
304 boolean isInSafeMode = false;
305 FileSystem fs = FileSystem.get(conf);
306 if (fs instanceof DistributedFileSystem) {
307 DistributedFileSystem dfs = (DistributedFileSystem)fs;
308 isInSafeMode = isInSafeMode(dfs);
310 if (isInSafeMode) {
311 throw new IOException("File system is in safemode, it can't be written now");
316 * Verifies current version of file system
318 * @param fs filesystem object
319 * @param rootdir root hbase directory
320 * @return null if no version file exists, version string otherwise
321 * @throws IOException if the version file fails to open
322 * @throws DeserializationException if the version data cannot be translated into a version
324 public static String getVersion(FileSystem fs, Path rootdir)
325 throws IOException, DeserializationException {
326 final Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
327 FileStatus[] status = null;
328 try {
329 // hadoop 2.0 throws FNFE if directory does not exist.
330 // hadoop 1.0 returns null if directory does not exist.
331 status = fs.listStatus(versionFile);
332 } catch (FileNotFoundException fnfe) {
333 return null;
335 if (ArrayUtils.getLength(status) == 0) {
336 return null;
338 String version = null;
339 byte [] content = new byte [(int)status[0].getLen()];
340 FSDataInputStream s = fs.open(versionFile);
341 try {
342 IOUtils.readFully(s, content, 0, content.length);
343 if (ProtobufUtil.isPBMagicPrefix(content)) {
344 version = parseVersionFrom(content);
345 } else {
346 // Presume it pre-pb format.
347 try (DataInputStream dis = new DataInputStream(new ByteArrayInputStream(content))) {
348 version = dis.readUTF();
351 } catch (EOFException eof) {
352 LOG.warn("Version file was empty, odd, will try to set it.");
353 } finally {
354 s.close();
356 return version;
360 * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file.
361 * @param bytes The byte content of the hbase.version file
362 * @return The version found in the file as a String
363 * @throws DeserializationException if the version data cannot be translated into a version
365 static String parseVersionFrom(final byte [] bytes)
366 throws DeserializationException {
367 ProtobufUtil.expectPBMagicPrefix(bytes);
368 int pblen = ProtobufUtil.lengthOfPBMagic();
369 FSProtos.HBaseVersionFileContent.Builder builder =
370 FSProtos.HBaseVersionFileContent.newBuilder();
371 try {
372 ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
373 return builder.getVersion();
374 } catch (IOException e) {
375 // Convert
376 throw new DeserializationException(e);
381 * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file.
382 * @param version Version to persist
383 * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a prefix.
385 static byte [] toVersionByteArray(final String version) {
386 FSProtos.HBaseVersionFileContent.Builder builder =
387 FSProtos.HBaseVersionFileContent.newBuilder();
388 return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray());
392 * Verifies current version of file system
394 * @param fs file system
395 * @param rootdir root directory of HBase installation
396 * @param message if true, issues a message on System.out
397 * @throws IOException if the version file cannot be opened
398 * @throws DeserializationException if the contents of the version file cannot be parsed
400 public static void checkVersion(FileSystem fs, Path rootdir, boolean message)
401 throws IOException, DeserializationException {
402 checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
406 * Verifies current version of file system
408 * @param fs file system
409 * @param rootdir root directory of HBase installation
410 * @param message if true, issues a message on System.out
411 * @param wait wait interval
412 * @param retries number of times to retry
414 * @throws IOException if the version file cannot be opened
415 * @throws DeserializationException if the contents of the version file cannot be parsed
417 public static void checkVersion(FileSystem fs, Path rootdir,
418 boolean message, int wait, int retries)
419 throws IOException, DeserializationException {
420 String version = getVersion(fs, rootdir);
421 String msg;
422 if (version == null) {
423 if (!metaRegionExists(fs, rootdir)) {
424 // rootDir is empty (no version file and no root region)
425 // just create new version file (HBASE-1195)
426 setVersion(fs, rootdir, wait, retries);
427 return;
428 } else {
429 msg = "hbase.version file is missing. Is your hbase.rootdir valid? " +
430 "You can restore hbase.version file by running 'HBCK2 filesystem -fix'. " +
431 "See https://github.com/apache/hbase-operator-tools/tree/master/hbase-hbck2";
433 } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) {
434 return;
435 } else {
436 msg = "HBase file layout needs to be upgraded. Current filesystem version is " + version +
437 " but software requires version " + HConstants.FILE_SYSTEM_VERSION +
438 ". Consult http://hbase.apache.org/book.html for further information about " +
439 "upgrading HBase.";
442 // version is deprecated require migration
443 // Output on stdout so user sees it in terminal.
444 if (message) {
445 System.out.println("WARNING! " + msg);
447 throw new FileSystemVersionException(msg);
451 * Sets version of file system
453 * @param fs filesystem object
454 * @param rootdir hbase root
455 * @throws IOException e
457 public static void setVersion(FileSystem fs, Path rootdir)
458 throws IOException {
459 setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0,
460 HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
464 * Sets version of file system
466 * @param fs filesystem object
467 * @param rootdir hbase root
468 * @param wait time to wait for retry
469 * @param retries number of times to retry before failing
470 * @throws IOException e
472 public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries)
473 throws IOException {
474 setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries);
479 * Sets version of file system
481 * @param fs filesystem object
482 * @param rootdir hbase root directory
483 * @param version version to set
484 * @param wait time to wait for retry
485 * @param retries number of times to retry before throwing an IOException
486 * @throws IOException e
488 public static void setVersion(FileSystem fs, Path rootdir, String version,
489 int wait, int retries) throws IOException {
490 Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
491 Path tempVersionFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR +
492 HConstants.VERSION_FILE_NAME);
493 while (true) {
494 try {
495 // Write the version to a temporary file
496 FSDataOutputStream s = fs.create(tempVersionFile);
497 try {
498 s.write(toVersionByteArray(version));
499 s.close();
500 s = null;
501 // Move the temp version file to its normal location. Returns false
502 // if the rename failed. Throw an IOE in that case.
503 if (!fs.rename(tempVersionFile, versionFile)) {
504 throw new IOException("Unable to move temp version file to " + versionFile);
506 } finally {
507 // Cleaning up the temporary if the rename failed would be trying
508 // too hard. We'll unconditionally create it again the next time
509 // through anyway, files are overwritten by default by create().
511 // Attempt to close the stream on the way out if it is still open.
512 try {
513 if (s != null) s.close();
514 } catch (IOException ignore) { }
516 LOG.info("Created version file at " + rootdir.toString() + " with version=" + version);
517 return;
518 } catch (IOException e) {
519 if (retries > 0) {
520 LOG.debug("Unable to create version file at " + rootdir.toString() + ", retrying", e);
521 fs.delete(versionFile, false);
522 try {
523 if (wait > 0) {
524 Thread.sleep(wait);
526 } catch (InterruptedException ie) {
527 throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
529 retries--;
530 } else {
531 throw e;
538 * Checks that a cluster ID file exists in the HBase root directory
539 * @param fs the root directory FileSystem
540 * @param rootdir the HBase root directory in HDFS
541 * @param wait how long to wait between retries
542 * @return <code>true</code> if the file exists, otherwise <code>false</code>
543 * @throws IOException if checking the FileSystem fails
545 public static boolean checkClusterIdExists(FileSystem fs, Path rootdir,
546 long wait) throws IOException {
547 while (true) {
548 try {
549 Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
550 return fs.exists(filePath);
551 } catch (IOException ioe) {
552 if (wait > 0L) {
553 LOG.warn("Unable to check cluster ID file in {}, retrying in {}ms", rootdir, wait, ioe);
554 try {
555 Thread.sleep(wait);
556 } catch (InterruptedException e) {
557 Thread.currentThread().interrupt();
558 throw (InterruptedIOException) new InterruptedIOException().initCause(e);
560 } else {
561 throw ioe;
568 * Returns the value of the unique cluster ID stored for this HBase instance.
569 * @param fs the root directory FileSystem
570 * @param rootdir the path to the HBase root directory
571 * @return the unique cluster identifier
572 * @throws IOException if reading the cluster ID file fails
574 public static ClusterId getClusterId(FileSystem fs, Path rootdir)
575 throws IOException {
576 Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
577 ClusterId clusterId = null;
578 FileStatus status = fs.exists(idPath)? fs.getFileStatus(idPath): null;
579 if (status != null) {
580 int len = Ints.checkedCast(status.getLen());
581 byte [] content = new byte[len];
582 FSDataInputStream in = fs.open(idPath);
583 try {
584 in.readFully(content);
585 } catch (EOFException eof) {
586 LOG.warn("Cluster ID file {} is empty", idPath);
587 } finally{
588 in.close();
590 try {
591 clusterId = ClusterId.parseFrom(content);
592 } catch (DeserializationException e) {
593 throw new IOException("content=" + Bytes.toString(content), e);
595 // If not pb'd, make it so.
596 if (!ProtobufUtil.isPBMagicPrefix(content)) {
597 String cid = null;
598 in = fs.open(idPath);
599 try {
600 cid = in.readUTF();
601 clusterId = new ClusterId(cid);
602 } catch (EOFException eof) {
603 LOG.warn("Cluster ID file {} is empty", idPath);
604 } finally {
605 in.close();
607 rewriteAsPb(fs, rootdir, idPath, clusterId);
609 return clusterId;
610 } else {
611 LOG.warn("Cluster ID file does not exist at {}", idPath);
613 return clusterId;
617 * @param cid
618 * @throws IOException
620 private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p,
621 final ClusterId cid)
622 throws IOException {
623 // Rewrite the file as pb. Move aside the old one first, write new
624 // then delete the moved-aside file.
625 Path movedAsideName = new Path(p + "." + System.currentTimeMillis());
626 if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p);
627 setClusterId(fs, rootdir, cid, 100);
628 if (!fs.delete(movedAsideName, false)) {
629 throw new IOException("Failed delete of " + movedAsideName);
631 LOG.debug("Rewrote the hbase.id file as pb");
635 * Writes a new unique identifier for this cluster to the "hbase.id" file
636 * in the HBase root directory
637 * @param fs the root directory FileSystem
638 * @param rootdir the path to the HBase root directory
639 * @param clusterId the unique identifier to store
640 * @param wait how long (in milliseconds) to wait between retries
641 * @throws IOException if writing to the FileSystem fails and no wait value
643 public static void setClusterId(FileSystem fs, Path rootdir, ClusterId clusterId,
644 int wait) throws IOException {
645 while (true) {
646 try {
647 Path idFile = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
648 Path tempIdFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY +
649 Path.SEPARATOR + HConstants.CLUSTER_ID_FILE_NAME);
650 // Write the id file to a temporary location
651 FSDataOutputStream s = fs.create(tempIdFile);
652 try {
653 s.write(clusterId.toByteArray());
654 s.close();
655 s = null;
656 // Move the temporary file to its normal location. Throw an IOE if
657 // the rename failed
658 if (!fs.rename(tempIdFile, idFile)) {
659 throw new IOException("Unable to move temp version file to " + idFile);
661 } finally {
662 // Attempt to close the stream if still open on the way out
663 try {
664 if (s != null) s.close();
665 } catch (IOException ignore) { }
667 if (LOG.isDebugEnabled()) {
668 LOG.debug("Created cluster ID file at " + idFile.toString() + " with ID: " + clusterId);
670 return;
671 } catch (IOException ioe) {
672 if (wait > 0) {
673 LOG.warn("Unable to create cluster ID file in " + rootdir.toString() +
674 ", retrying in " + wait + "msec: " + StringUtils.stringifyException(ioe));
675 try {
676 Thread.sleep(wait);
677 } catch (InterruptedException e) {
678 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
680 } else {
681 throw ioe;
688 * If DFS, check safe mode and if so, wait until we clear it.
689 * @param conf configuration
690 * @param wait Sleep between retries
691 * @throws IOException e
693 public static void waitOnSafeMode(final Configuration conf,
694 final long wait)
695 throws IOException {
696 FileSystem fs = FileSystem.get(conf);
697 if (!(fs instanceof DistributedFileSystem)) return;
698 DistributedFileSystem dfs = (DistributedFileSystem)fs;
699 // Make sure dfs is not in safe mode
700 while (isInSafeMode(dfs)) {
701 LOG.info("Waiting for dfs to exit safe mode...");
702 try {
703 Thread.sleep(wait);
704 } catch (InterruptedException e) {
705 Thread.currentThread().interrupt();
706 throw (InterruptedIOException) new InterruptedIOException().initCause(e);
712 * Checks if meta region exists
713 * @param fs file system
714 * @param rootDir root directory of HBase installation
715 * @return true if exists
717 public static boolean metaRegionExists(FileSystem fs, Path rootDir) throws IOException {
718 Path metaRegionDir = getRegionDirFromRootDir(rootDir, RegionInfoBuilder.FIRST_META_REGIONINFO);
719 return fs.exists(metaRegionDir);
723 * Compute HDFS blocks distribution of a given file, or a portion of the file
724 * @param fs file system
725 * @param status file status of the file
726 * @param start start position of the portion
727 * @param length length of the portion
728 * @return The HDFS blocks distribution
730 static public HDFSBlocksDistribution computeHDFSBlocksDistribution(
731 final FileSystem fs, FileStatus status, long start, long length)
732 throws IOException {
733 HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
734 BlockLocation [] blockLocations =
735 fs.getFileBlockLocations(status, start, length);
736 for(BlockLocation bl : blockLocations) {
737 String [] hosts = bl.getHosts();
738 long len = bl.getLength();
739 blocksDistribution.addHostsAndBlockWeight(hosts, len);
742 return blocksDistribution;
746 * Update blocksDistribution with blockLocations
747 * @param blocksDistribution the hdfs blocks distribution
748 * @param blockLocations an array containing block location
750 static public void addToHDFSBlocksDistribution(
751 HDFSBlocksDistribution blocksDistribution, BlockLocation[] blockLocations)
752 throws IOException {
753 for (BlockLocation bl : blockLocations) {
754 String[] hosts = bl.getHosts();
755 long len = bl.getLength();
756 blocksDistribution.addHostsAndBlockWeight(hosts, len);
760 // TODO move this method OUT of FSUtils. No dependencies to HMaster
762 * Returns the total overall fragmentation percentage. Includes hbase:meta and
763 * -ROOT- as well.
765 * @param master The master defining the HBase root and file system
766 * @return A map for each table and its percentage (never null)
767 * @throws IOException When scanning the directory fails
769 public static int getTotalTableFragmentation(final HMaster master)
770 throws IOException {
771 Map<String, Integer> map = getTableFragmentation(master);
772 return map.isEmpty() ? -1 : map.get("-TOTAL-");
776 * Runs through the HBase rootdir and checks how many stores for each table
777 * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
778 * percentage across all tables is stored under the special key "-TOTAL-".
780 * @param master The master defining the HBase root and file system.
781 * @return A map for each table and its percentage (never null).
783 * @throws IOException When scanning the directory fails.
785 public static Map<String, Integer> getTableFragmentation(
786 final HMaster master)
787 throws IOException {
788 Path path = getRootDir(master.getConfiguration());
789 // since HMaster.getFileSystem() is package private
790 FileSystem fs = path.getFileSystem(master.getConfiguration());
791 return getTableFragmentation(fs, path);
795 * Runs through the HBase rootdir and checks how many stores for each table
796 * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
797 * percentage across all tables is stored under the special key "-TOTAL-".
799 * @param fs The file system to use
800 * @param hbaseRootDir The root directory to scan
801 * @return A map for each table and its percentage (never null)
802 * @throws IOException When scanning the directory fails
804 public static Map<String, Integer> getTableFragmentation(
805 final FileSystem fs, final Path hbaseRootDir)
806 throws IOException {
807 Map<String, Integer> frags = new HashMap<>();
808 int cfCountTotal = 0;
809 int cfFragTotal = 0;
810 PathFilter regionFilter = new RegionDirFilter(fs);
811 PathFilter familyFilter = new FamilyDirFilter(fs);
812 List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
813 for (Path d : tableDirs) {
814 int cfCount = 0;
815 int cfFrag = 0;
816 FileStatus[] regionDirs = fs.listStatus(d, regionFilter);
817 for (FileStatus regionDir : regionDirs) {
818 Path dd = regionDir.getPath();
819 // else its a region name, now look in region for families
820 FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
821 for (FileStatus familyDir : familyDirs) {
822 cfCount++;
823 cfCountTotal++;
824 Path family = familyDir.getPath();
825 // now in family make sure only one file
826 FileStatus[] familyStatus = fs.listStatus(family);
827 if (familyStatus.length > 1) {
828 cfFrag++;
829 cfFragTotal++;
833 // compute percentage per table and store in result list
834 frags.put(FSUtils.getTableName(d).getNameAsString(),
835 cfCount == 0? 0: Math.round((float) cfFrag / cfCount * 100));
837 // set overall percentage for all tables
838 frags.put("-TOTAL-",
839 cfCountTotal == 0? 0: Math.round((float) cfFragTotal / cfCountTotal * 100));
840 return frags;
843 public static void renameFile(FileSystem fs, Path src, Path dst) throws IOException {
844 if (fs.exists(dst) && !fs.delete(dst, false)) {
845 throw new IOException("Can not delete " + dst);
847 if (!fs.rename(src, dst)) {
848 throw new IOException("Can not rename from " + src + " to " + dst);
853 * A {@link PathFilter} that returns only regular files.
855 static class FileFilter extends AbstractFileStatusFilter {
856 private final FileSystem fs;
858 public FileFilter(final FileSystem fs) {
859 this.fs = fs;
862 @Override
863 protected boolean accept(Path p, @CheckForNull Boolean isDir) {
864 try {
865 return isFile(fs, isDir, p);
866 } catch (IOException e) {
867 LOG.warn("Unable to verify if path={} is a regular file", p, e);
868 return false;
874 * Directory filter that doesn't include any of the directories in the specified blacklist
876 public static class BlackListDirFilter extends AbstractFileStatusFilter {
877 private final FileSystem fs;
878 private List<String> blacklist;
881 * Create a filter on the givem filesystem with the specified blacklist
882 * @param fs filesystem to filter
883 * @param directoryNameBlackList list of the names of the directories to filter. If
884 * <tt>null</tt>, all directories are returned
886 @SuppressWarnings("unchecked")
887 public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) {
888 this.fs = fs;
889 blacklist =
890 (List<String>) (directoryNameBlackList == null ? Collections.emptyList()
891 : directoryNameBlackList);
894 @Override
895 protected boolean accept(Path p, @CheckForNull Boolean isDir) {
896 if (!isValidName(p.getName())) {
897 return false;
900 try {
901 return isDirectory(fs, isDir, p);
902 } catch (IOException e) {
903 LOG.warn("An error occurred while verifying if [{}] is a valid directory."
904 + " Returning 'not valid' and continuing.", p, e);
905 return false;
909 protected boolean isValidName(final String name) {
910 return !blacklist.contains(name);
915 * A {@link PathFilter} that only allows directories.
917 public static class DirFilter extends BlackListDirFilter {
919 public DirFilter(FileSystem fs) {
920 super(fs, null);
925 * A {@link PathFilter} that returns usertable directories. To get all directories use the
926 * {@link BlackListDirFilter} with a <tt>null</tt> blacklist
928 public static class UserTableDirFilter extends BlackListDirFilter {
929 public UserTableDirFilter(FileSystem fs) {
930 super(fs, HConstants.HBASE_NON_TABLE_DIRS);
933 @Override
934 protected boolean isValidName(final String name) {
935 if (!super.isValidName(name))
936 return false;
938 try {
939 TableName.isLegalTableQualifierName(Bytes.toBytes(name));
940 } catch (IllegalArgumentException e) {
941 LOG.info("Invalid table name: {}", name);
942 return false;
944 return true;
948 public void recoverFileLease(final FileSystem fs, final Path p, Configuration conf)
949 throws IOException {
950 recoverFileLease(fs, p, conf, null);
954 * Recover file lease. Used when a file might be suspect
955 * to be had been left open by another process.
956 * @param fs FileSystem handle
957 * @param p Path of file to recover lease
958 * @param conf Configuration handle
959 * @throws IOException
961 public abstract void recoverFileLease(final FileSystem fs, final Path p,
962 Configuration conf, CancelableProgressable reporter) throws IOException;
964 public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir)
965 throws IOException {
966 List<Path> tableDirs = new ArrayList<>();
968 for (FileStatus status : fs
969 .globStatus(new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR, "*")))) {
970 tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath()));
972 return tableDirs;
976 * @param fs
977 * @param rootdir
978 * @return All the table directories under <code>rootdir</code>. Ignore non table hbase folders such as
979 * .logs, .oldlogs, .corrupt folders.
980 * @throws IOException
982 public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir)
983 throws IOException {
984 // presumes any directory under hbase.rootdir is a table
985 FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs));
986 List<Path> tabledirs = new ArrayList<>(dirs.length);
987 for (FileStatus dir: dirs) {
988 tabledirs.add(dir.getPath());
990 return tabledirs;
994 * Filter for all dirs that don't start with '.'
996 public static class RegionDirFilter extends AbstractFileStatusFilter {
997 // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names.
998 final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$");
999 final FileSystem fs;
1001 public RegionDirFilter(FileSystem fs) {
1002 this.fs = fs;
1005 @Override
1006 protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1007 if (!regionDirPattern.matcher(p.getName()).matches()) {
1008 return false;
1011 try {
1012 return isDirectory(fs, isDir, p);
1013 } catch (IOException ioe) {
1014 // Maybe the file was moved or the fs was disconnected.
1015 LOG.warn("Skipping file {} due to IOException", p, ioe);
1016 return false;
1022 * Given a particular table dir, return all the regiondirs inside it, excluding files such as
1023 * .tableinfo
1024 * @param fs A file system for the Path
1025 * @param tableDir Path to a specific table directory &lt;hbase.rootdir&gt;/&lt;tabledir&gt;
1026 * @return List of paths to valid region directories in table dir.
1027 * @throws IOException
1029 public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir) throws IOException {
1030 // assumes we are in a table dir.
1031 List<FileStatus> rds = listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
1032 if (rds == null) {
1033 return Collections.emptyList();
1035 List<Path> regionDirs = new ArrayList<>(rds.size());
1036 for (FileStatus rdfs: rds) {
1037 Path rdPath = rdfs.getPath();
1038 regionDirs.add(rdPath);
1040 return regionDirs;
1043 public static Path getRegionDirFromRootDir(Path rootDir, RegionInfo region) {
1044 return getRegionDirFromTableDir(getTableDir(rootDir, region.getTable()), region);
1047 public static Path getRegionDirFromTableDir(Path tableDir, RegionInfo region) {
1048 return new Path(tableDir, ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName());
1052 * Filter for all dirs that are legal column family names. This is generally used for colfam
1053 * dirs &lt;hbase.rootdir&gt;/&lt;tabledir&gt;/&lt;regiondir&gt;/&lt;colfamdir&gt;.
1055 public static class FamilyDirFilter extends AbstractFileStatusFilter {
1056 final FileSystem fs;
1058 public FamilyDirFilter(FileSystem fs) {
1059 this.fs = fs;
1062 @Override
1063 protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1064 try {
1065 // throws IAE if invalid
1066 HColumnDescriptor.isLegalFamilyName(Bytes.toBytes(p.getName()));
1067 } catch (IllegalArgumentException iae) {
1068 // path name is an invalid family name and thus is excluded.
1069 return false;
1072 try {
1073 return isDirectory(fs, isDir, p);
1074 } catch (IOException ioe) {
1075 // Maybe the file was moved or the fs was disconnected.
1076 LOG.warn("Skipping file {} due to IOException", p, ioe);
1077 return false;
1083 * Given a particular region dir, return all the familydirs inside it
1085 * @param fs A file system for the Path
1086 * @param regionDir Path to a specific region directory
1087 * @return List of paths to valid family directories in region dir.
1088 * @throws IOException
1090 public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) throws IOException {
1091 // assumes we are in a region dir.
1092 FileStatus[] fds = fs.listStatus(regionDir, new FamilyDirFilter(fs));
1093 List<Path> familyDirs = new ArrayList<>(fds.length);
1094 for (FileStatus fdfs: fds) {
1095 Path fdPath = fdfs.getPath();
1096 familyDirs.add(fdPath);
1098 return familyDirs;
1101 public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir) throws IOException {
1102 List<FileStatus> fds = listStatusWithStatusFilter(fs, familyDir, new ReferenceFileFilter(fs));
1103 if (fds == null) {
1104 return Collections.emptyList();
1106 List<Path> referenceFiles = new ArrayList<>(fds.size());
1107 for (FileStatus fdfs: fds) {
1108 Path fdPath = fdfs.getPath();
1109 referenceFiles.add(fdPath);
1111 return referenceFiles;
1115 * Filter for HFiles that excludes reference files.
1117 public static class HFileFilter extends AbstractFileStatusFilter {
1118 final FileSystem fs;
1120 public HFileFilter(FileSystem fs) {
1121 this.fs = fs;
1124 @Override
1125 protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1126 if (!StoreFileInfo.isHFile(p)) {
1127 return false;
1130 try {
1131 return isFile(fs, isDir, p);
1132 } catch (IOException ioe) {
1133 // Maybe the file was moved or the fs was disconnected.
1134 LOG.warn("Skipping file {} due to IOException", p, ioe);
1135 return false;
1141 * Filter for HFileLinks (StoreFiles and HFiles not included).
1142 * the filter itself does not consider if a link is file or not.
1144 public static class HFileLinkFilter implements PathFilter {
1146 @Override
1147 public boolean accept(Path p) {
1148 return HFileLink.isHFileLink(p);
1152 public static class ReferenceFileFilter extends AbstractFileStatusFilter {
1154 private final FileSystem fs;
1156 public ReferenceFileFilter(FileSystem fs) {
1157 this.fs = fs;
1160 @Override
1161 protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1162 if (!StoreFileInfo.isReference(p)) {
1163 return false;
1166 try {
1167 // only files can be references.
1168 return isFile(fs, isDir, p);
1169 } catch (IOException ioe) {
1170 // Maybe the file was moved or the fs was disconnected.
1171 LOG.warn("Skipping file {} due to IOException", p, ioe);
1172 return false;
1178 * Called every so-often by storefile map builder getTableStoreFilePathMap to
1179 * report progress.
1181 interface ProgressReporter {
1183 * @param status File or directory we are about to process.
1185 void progress(FileStatus status);
1189 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1190 * table StoreFile names to the full Path.
1191 * <br>
1192 * Example...<br>
1193 * Key = 3944417774205889744 <br>
1194 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1196 * @param map map to add values. If null, this method will create and populate one to return
1197 * @param fs The file system to use.
1198 * @param hbaseRootDir The root directory to scan.
1199 * @param tableName name of the table to scan.
1200 * @return Map keyed by StoreFile name with a value of the full Path.
1201 * @throws IOException When scanning the directory fails.
1202 * @throws InterruptedException
1204 public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map,
1205 final FileSystem fs, final Path hbaseRootDir, TableName tableName)
1206 throws IOException, InterruptedException {
1207 return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null, null,
1208 (ProgressReporter)null);
1212 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1213 * table StoreFile names to the full Path. Note that because this method can be called
1214 * on a 'live' HBase system that we will skip files that no longer exist by the time
1215 * we traverse them and similarly the user of the result needs to consider that some
1216 * entries in this map may not exist by the time this call completes.
1217 * <br>
1218 * Example...<br>
1219 * Key = 3944417774205889744 <br>
1220 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1222 * @param resultMap map to add values. If null, this method will create and populate one to return
1223 * @param fs The file system to use.
1224 * @param hbaseRootDir The root directory to scan.
1225 * @param tableName name of the table to scan.
1226 * @param sfFilter optional path filter to apply to store files
1227 * @param executor optional executor service to parallelize this operation
1228 * @param progressReporter Instance or null; gets called every time we move to new region of
1229 * family dir and for each store file.
1230 * @return Map keyed by StoreFile name with a value of the full Path.
1231 * @throws IOException When scanning the directory fails.
1232 * @deprecated Since 2.3.0. For removal in hbase4. Use ProgressReporter override instead.
1234 @Deprecated
1235 public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap,
1236 final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter,
1237 ExecutorService executor, final HbckErrorReporter progressReporter)
1238 throws IOException, InterruptedException {
1239 return getTableStoreFilePathMap(resultMap, fs, hbaseRootDir, tableName, sfFilter, executor,
1240 new ProgressReporter() {
1241 @Override
1242 public void progress(FileStatus status) {
1243 // status is not used in this implementation.
1244 progressReporter.progress();
1250 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1251 * table StoreFile names to the full Path. Note that because this method can be called
1252 * on a 'live' HBase system that we will skip files that no longer exist by the time
1253 * we traverse them and similarly the user of the result needs to consider that some
1254 * entries in this map may not exist by the time this call completes.
1255 * <br>
1256 * Example...<br>
1257 * Key = 3944417774205889744 <br>
1258 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1260 * @param resultMap map to add values. If null, this method will create and populate one
1261 * to return
1262 * @param fs The file system to use.
1263 * @param hbaseRootDir The root directory to scan.
1264 * @param tableName name of the table to scan.
1265 * @param sfFilter optional path filter to apply to store files
1266 * @param executor optional executor service to parallelize this operation
1267 * @param progressReporter Instance or null; gets called every time we move to new region of
1268 * family dir and for each store file.
1269 * @return Map keyed by StoreFile name with a value of the full Path.
1270 * @throws IOException When scanning the directory fails.
1271 * @throws InterruptedException the thread is interrupted, either before or during the activity.
1273 public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap,
1274 final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter,
1275 ExecutorService executor, final ProgressReporter progressReporter)
1276 throws IOException, InterruptedException {
1278 final Map<String, Path> finalResultMap =
1279 resultMap == null ? new ConcurrentHashMap<>(128, 0.75f, 32) : resultMap;
1281 // only include the directory paths to tables
1282 Path tableDir = FSUtils.getTableDir(hbaseRootDir, tableName);
1283 // Inside a table, there are compaction.dir directories to skip. Otherwise, all else
1284 // should be regions.
1285 final FamilyDirFilter familyFilter = new FamilyDirFilter(fs);
1286 final Vector<Exception> exceptions = new Vector<>();
1288 try {
1289 List<FileStatus> regionDirs = FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
1290 if (regionDirs == null) {
1291 return finalResultMap;
1294 final List<Future<?>> futures = new ArrayList<>(regionDirs.size());
1296 for (FileStatus regionDir : regionDirs) {
1297 if (null != progressReporter) {
1298 progressReporter.progress(regionDir);
1300 final Path dd = regionDir.getPath();
1302 if (!exceptions.isEmpty()) {
1303 break;
1306 Runnable getRegionStoreFileMapCall = new Runnable() {
1307 @Override
1308 public void run() {
1309 try {
1310 HashMap<String,Path> regionStoreFileMap = new HashMap<>();
1311 List<FileStatus> familyDirs = FSUtils.listStatusWithStatusFilter(fs, dd, familyFilter);
1312 if (familyDirs == null) {
1313 if (!fs.exists(dd)) {
1314 LOG.warn("Skipping region because it no longer exists: " + dd);
1315 } else {
1316 LOG.warn("Skipping region because it has no family dirs: " + dd);
1318 return;
1320 for (FileStatus familyDir : familyDirs) {
1321 if (null != progressReporter) {
1322 progressReporter.progress(familyDir);
1324 Path family = familyDir.getPath();
1325 if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) {
1326 continue;
1328 // now in family, iterate over the StoreFiles and
1329 // put in map
1330 FileStatus[] familyStatus = fs.listStatus(family);
1331 for (FileStatus sfStatus : familyStatus) {
1332 if (null != progressReporter) {
1333 progressReporter.progress(sfStatus);
1335 Path sf = sfStatus.getPath();
1336 if (sfFilter == null || sfFilter.accept(sf)) {
1337 regionStoreFileMap.put( sf.getName(), sf);
1341 finalResultMap.putAll(regionStoreFileMap);
1342 } catch (Exception e) {
1343 LOG.error("Could not get region store file map for region: " + dd, e);
1344 exceptions.add(e);
1349 // If executor is available, submit async tasks to exec concurrently, otherwise
1350 // just do serial sync execution
1351 if (executor != null) {
1352 Future<?> future = executor.submit(getRegionStoreFileMapCall);
1353 futures.add(future);
1354 } else {
1355 FutureTask<?> future = new FutureTask<>(getRegionStoreFileMapCall, null);
1356 future.run();
1357 futures.add(future);
1361 // Ensure all pending tasks are complete (or that we run into an exception)
1362 for (Future<?> f : futures) {
1363 if (!exceptions.isEmpty()) {
1364 break;
1366 try {
1367 f.get();
1368 } catch (ExecutionException e) {
1369 LOG.error("Unexpected exec exception! Should've been caught already. (Bug?)", e);
1370 // Shouldn't happen, we already logged/caught any exceptions in the Runnable
1373 } catch (IOException e) {
1374 LOG.error("Cannot execute getTableStoreFilePathMap for " + tableName, e);
1375 exceptions.add(e);
1376 } finally {
1377 if (!exceptions.isEmpty()) {
1378 // Just throw the first exception as an indication something bad happened
1379 // Don't need to propagate all the exceptions, we already logged them all anyway
1380 Throwables.propagateIfInstanceOf(exceptions.firstElement(), IOException.class);
1381 throw Throwables.propagate(exceptions.firstElement());
1385 return finalResultMap;
1388 public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) {
1389 int result = 0;
1390 try {
1391 for (Path familyDir:getFamilyDirs(fs, p)){
1392 result += getReferenceFilePaths(fs, familyDir).size();
1394 } catch (IOException e) {
1395 LOG.warn("Error counting reference files", e);
1397 return result;
1401 * Runs through the HBase rootdir and creates a reverse lookup map for
1402 * table StoreFile names to the full Path.
1403 * <br>
1404 * Example...<br>
1405 * Key = 3944417774205889744 <br>
1406 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1408 * @param fs The file system to use.
1409 * @param hbaseRootDir The root directory to scan.
1410 * @return Map keyed by StoreFile name with a value of the full Path.
1411 * @throws IOException When scanning the directory fails.
1413 public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs,
1414 final Path hbaseRootDir)
1415 throws IOException, InterruptedException {
1416 return getTableStoreFilePathMap(fs, hbaseRootDir, null, null, (ProgressReporter)null);
1420 * Runs through the HBase rootdir and creates a reverse lookup map for
1421 * table StoreFile names to the full Path.
1422 * <br>
1423 * Example...<br>
1424 * Key = 3944417774205889744 <br>
1425 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1427 * @param fs The file system to use.
1428 * @param hbaseRootDir The root directory to scan.
1429 * @param sfFilter optional path filter to apply to store files
1430 * @param executor optional executor service to parallelize this operation
1431 * @param progressReporter Instance or null; gets called every time we move to new region of
1432 * family dir and for each store file.
1433 * @return Map keyed by StoreFile name with a value of the full Path.
1434 * @throws IOException When scanning the directory fails.
1435 * @deprecated Since 2.3.0. Will be removed in hbase4. Used {@link
1436 * #getTableStoreFilePathMap(FileSystem, Path, PathFilter, ExecutorService, ProgressReporter)}
1438 @Deprecated
1439 public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs,
1440 final Path hbaseRootDir, PathFilter sfFilter, ExecutorService executor,
1441 HbckErrorReporter progressReporter)
1442 throws IOException, InterruptedException {
1443 return getTableStoreFilePathMap(fs, hbaseRootDir, sfFilter, executor,
1444 new ProgressReporter() {
1445 @Override
1446 public void progress(FileStatus status) {
1447 // status is not used in this implementation.
1448 progressReporter.progress();
1454 * Runs through the HBase rootdir and creates a reverse lookup map for
1455 * table StoreFile names to the full Path.
1456 * <br>
1457 * Example...<br>
1458 * Key = 3944417774205889744 <br>
1459 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1461 * @param fs The file system to use.
1462 * @param hbaseRootDir The root directory to scan.
1463 * @param sfFilter optional path filter to apply to store files
1464 * @param executor optional executor service to parallelize this operation
1465 * @param progressReporter Instance or null; gets called every time we move to new region of
1466 * family dir and for each store file.
1467 * @return Map keyed by StoreFile name with a value of the full Path.
1468 * @throws IOException When scanning the directory fails.
1469 * @throws InterruptedException
1471 public static Map<String, Path> getTableStoreFilePathMap(
1472 final FileSystem fs, final Path hbaseRootDir, PathFilter sfFilter,
1473 ExecutorService executor, ProgressReporter progressReporter)
1474 throws IOException, InterruptedException {
1475 ConcurrentHashMap<String, Path> map = new ConcurrentHashMap<>(1024, 0.75f, 32);
1477 // if this method looks similar to 'getTableFragmentation' that is because
1478 // it was borrowed from it.
1480 // only include the directory paths to tables
1481 for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) {
1482 getTableStoreFilePathMap(map, fs, hbaseRootDir,
1483 FSUtils.getTableName(tableDir), sfFilter, executor, progressReporter);
1485 return map;
1489 * Filters FileStatuses in an array and returns a list
1491 * @param input An array of FileStatuses
1492 * @param filter A required filter to filter the array
1493 * @return A list of FileStatuses
1495 public static List<FileStatus> filterFileStatuses(FileStatus[] input,
1496 FileStatusFilter filter) {
1497 if (input == null) return null;
1498 return filterFileStatuses(Iterators.forArray(input), filter);
1502 * Filters FileStatuses in an iterator and returns a list
1504 * @param input An iterator of FileStatuses
1505 * @param filter A required filter to filter the array
1506 * @return A list of FileStatuses
1508 public static List<FileStatus> filterFileStatuses(Iterator<FileStatus> input,
1509 FileStatusFilter filter) {
1510 if (input == null) return null;
1511 ArrayList<FileStatus> results = new ArrayList<>();
1512 while (input.hasNext()) {
1513 FileStatus f = input.next();
1514 if (filter.accept(f)) {
1515 results.add(f);
1518 return results;
1522 * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
1523 * This accommodates differences between hadoop versions, where hadoop 1
1524 * does not throw a FileNotFoundException, and return an empty FileStatus[]
1525 * while Hadoop 2 will throw FileNotFoundException.
1527 * @param fs file system
1528 * @param dir directory
1529 * @param filter file status filter
1530 * @return null if dir is empty or doesn't exist, otherwise FileStatus list
1532 public static List<FileStatus> listStatusWithStatusFilter(final FileSystem fs,
1533 final Path dir, final FileStatusFilter filter) throws IOException {
1534 FileStatus [] status = null;
1535 try {
1536 status = fs.listStatus(dir);
1537 } catch (FileNotFoundException fnfe) {
1538 LOG.trace("{} does not exist", dir);
1539 return null;
1542 if (ArrayUtils.getLength(status) == 0) {
1543 return null;
1546 if (filter == null) {
1547 return Arrays.asList(status);
1548 } else {
1549 List<FileStatus> status2 = filterFileStatuses(status, filter);
1550 if (status2 == null || status2.isEmpty()) {
1551 return null;
1552 } else {
1553 return status2;
1559 * Throw an exception if an action is not permitted by a user on a file.
1561 * @param ugi
1562 * the user
1563 * @param file
1564 * the file
1565 * @param action
1566 * the action
1568 public static void checkAccess(UserGroupInformation ugi, FileStatus file,
1569 FsAction action) throws AccessDeniedException {
1570 if (ugi.getShortUserName().equals(file.getOwner())) {
1571 if (file.getPermission().getUserAction().implies(action)) {
1572 return;
1574 } else if (ArrayUtils.contains(ugi.getGroupNames(), file.getGroup())) {
1575 if (file.getPermission().getGroupAction().implies(action)) {
1576 return;
1578 } else if (file.getPermission().getOtherAction().implies(action)) {
1579 return;
1581 throw new AccessDeniedException("Permission denied:" + " action=" + action
1582 + " path=" + file.getPath() + " user=" + ugi.getShortUserName());
1586 * This function is to scan the root path of the file system to get the
1587 * degree of locality for each region on each of the servers having at least
1588 * one block of that region.
1589 * This is used by the tool {@link org.apache.hadoop.hbase.master.RegionPlacementMaintainer}
1591 * @param conf
1592 * the configuration to use
1593 * @return the mapping from region encoded name to a map of server names to
1594 * locality fraction
1595 * @throws IOException
1596 * in case of file system errors or interrupts
1598 public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1599 final Configuration conf) throws IOException {
1600 return getRegionDegreeLocalityMappingFromFS(
1601 conf, null,
1602 conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE));
1607 * This function is to scan the root path of the file system to get the
1608 * degree of locality for each region on each of the servers having at least
1609 * one block of that region.
1611 * @param conf
1612 * the configuration to use
1613 * @param desiredTable
1614 * the table you wish to scan locality for
1615 * @param threadPoolSize
1616 * the thread pool size to use
1617 * @return the mapping from region encoded name to a map of server names to
1618 * locality fraction
1619 * @throws IOException
1620 * in case of file system errors or interrupts
1622 public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1623 final Configuration conf, final String desiredTable, int threadPoolSize)
1624 throws IOException {
1625 Map<String, Map<String, Float>> regionDegreeLocalityMapping = new ConcurrentHashMap<>();
1626 getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, regionDegreeLocalityMapping);
1627 return regionDegreeLocalityMapping;
1631 * This function is to scan the root path of the file system to get either the
1632 * mapping between the region name and its best locality region server or the
1633 * degree of locality of each region on each of the servers having at least
1634 * one block of that region. The output map parameters are both optional.
1636 * @param conf
1637 * the configuration to use
1638 * @param desiredTable
1639 * the table you wish to scan locality for
1640 * @param threadPoolSize
1641 * the thread pool size to use
1642 * @param regionDegreeLocalityMapping
1643 * the map into which to put the locality degree mapping or null,
1644 * must be a thread-safe implementation
1645 * @throws IOException
1646 * in case of file system errors or interrupts
1648 private static void getRegionLocalityMappingFromFS(final Configuration conf,
1649 final String desiredTable, int threadPoolSize,
1650 final Map<String, Map<String, Float>> regionDegreeLocalityMapping) throws IOException {
1651 final FileSystem fs = FileSystem.get(conf);
1652 final Path rootPath = FSUtils.getRootDir(conf);
1653 final long startTime = EnvironmentEdgeManager.currentTime();
1654 final Path queryPath;
1655 // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/*
1656 if (null == desiredTable) {
1657 queryPath = new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/");
1658 } else {
1659 queryPath = new Path(FSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)).toString() + "/*/");
1662 // reject all paths that are not appropriate
1663 PathFilter pathFilter = new PathFilter() {
1664 @Override
1665 public boolean accept(Path path) {
1666 // this is the region name; it may get some noise data
1667 if (null == path) {
1668 return false;
1671 // no parent?
1672 Path parent = path.getParent();
1673 if (null == parent) {
1674 return false;
1677 String regionName = path.getName();
1678 if (null == regionName) {
1679 return false;
1682 if (!regionName.toLowerCase(Locale.ROOT).matches("[0-9a-f]+")) {
1683 return false;
1685 return true;
1689 FileStatus[] statusList = fs.globStatus(queryPath, pathFilter);
1691 if (LOG.isDebugEnabled()) {
1692 LOG.debug("Query Path: {} ; # list of files: {}", queryPath, Arrays.toString(statusList));
1695 if (null == statusList) {
1696 return;
1699 // lower the number of threads in case we have very few expected regions
1700 threadPoolSize = Math.min(threadPoolSize, statusList.length);
1702 // run in multiple threads
1703 final ExecutorService tpe = Executors.newFixedThreadPool(threadPoolSize,
1704 Threads.newDaemonThreadFactory("FSRegionQuery"));
1705 try {
1706 // ignore all file status items that are not of interest
1707 for (FileStatus regionStatus : statusList) {
1708 if (null == regionStatus || !regionStatus.isDirectory()) {
1709 continue;
1712 final Path regionPath = regionStatus.getPath();
1713 if (null != regionPath) {
1714 tpe.execute(new FSRegionScanner(fs, regionPath, null, regionDegreeLocalityMapping));
1717 } finally {
1718 tpe.shutdown();
1719 final long threadWakeFrequency = (long) conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
1720 HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
1721 try {
1722 // here we wait until TPE terminates, which is either naturally or by
1723 // exceptions in the execution of the threads
1724 while (!tpe.awaitTermination(threadWakeFrequency,
1725 TimeUnit.MILLISECONDS)) {
1726 // printing out rough estimate, so as to not introduce
1727 // AtomicInteger
1728 LOG.info("Locality checking is underway: { Scanned Regions : "
1729 + ((ThreadPoolExecutor) tpe).getCompletedTaskCount() + "/"
1730 + ((ThreadPoolExecutor) tpe).getTaskCount() + " }");
1732 } catch (InterruptedException e) {
1733 Thread.currentThread().interrupt();
1734 throw (InterruptedIOException) new InterruptedIOException().initCause(e);
1738 long overhead = EnvironmentEdgeManager.currentTime() - startTime;
1739 LOG.info("Scan DFS for locality info takes {}ms", overhead);
1743 * Do our short circuit read setup.
1744 * Checks buffer size to use and whether to do checksumming in hbase or hdfs.
1745 * @param conf
1747 public static void setupShortCircuitRead(final Configuration conf) {
1748 // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property.
1749 boolean shortCircuitSkipChecksum =
1750 conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
1751 boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
1752 if (shortCircuitSkipChecksum) {
1753 LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " +
1754 "be set to true." + (useHBaseChecksum ? " HBase checksum doesn't require " +
1755 "it, see https://issues.apache.org/jira/browse/HBASE-6868." : ""));
1756 assert !shortCircuitSkipChecksum; //this will fail if assertions are on
1758 checkShortCircuitReadBufferSize(conf);
1762 * Check if short circuit read buffer size is set and if not, set it to hbase value.
1763 * @param conf
1765 public static void checkShortCircuitReadBufferSize(final Configuration conf) {
1766 final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
1767 final int notSet = -1;
1768 // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
1769 final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
1770 int size = conf.getInt(dfsKey, notSet);
1771 // If a size is set, return -- we will use it.
1772 if (size != notSet) return;
1773 // But short circuit buffer size is normally not set. Put in place the hbase wanted size.
1774 int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
1775 conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
1779 * @param c
1780 * @return The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs.
1781 * @throws IOException
1783 public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c)
1784 throws IOException {
1785 if (!isHDFS(c)) return null;
1786 // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal
1787 // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it
1788 // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients.
1789 final String name = "getHedgedReadMetrics";
1790 DFSClient dfsclient = ((DistributedFileSystem)FileSystem.get(c)).getClient();
1791 Method m;
1792 try {
1793 m = dfsclient.getClass().getDeclaredMethod(name);
1794 } catch (NoSuchMethodException e) {
1795 LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
1796 e.getMessage());
1797 return null;
1798 } catch (SecurityException e) {
1799 LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
1800 e.getMessage());
1801 return null;
1803 m.setAccessible(true);
1804 try {
1805 return (DFSHedgedReadMetrics)m.invoke(dfsclient);
1806 } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
1807 LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
1808 e.getMessage());
1809 return null;
1813 public static List<Path> copyFilesParallel(FileSystem srcFS, Path src, FileSystem dstFS, Path dst,
1814 Configuration conf, int threads) throws IOException {
1815 ExecutorService pool = Executors.newFixedThreadPool(threads);
1816 List<Future<Void>> futures = new ArrayList<>();
1817 List<Path> traversedPaths;
1818 try {
1819 traversedPaths = copyFiles(srcFS, src, dstFS, dst, conf, pool, futures);
1820 for (Future<Void> future : futures) {
1821 future.get();
1823 } catch (ExecutionException | InterruptedException | IOException e) {
1824 throw new IOException("Copy snapshot reference files failed", e);
1825 } finally {
1826 pool.shutdownNow();
1828 return traversedPaths;
1831 private static List<Path> copyFiles(FileSystem srcFS, Path src, FileSystem dstFS, Path dst,
1832 Configuration conf, ExecutorService pool, List<Future<Void>> futures) throws IOException {
1833 List<Path> traversedPaths = new ArrayList<>();
1834 traversedPaths.add(dst);
1835 FileStatus currentFileStatus = srcFS.getFileStatus(src);
1836 if (currentFileStatus.isDirectory()) {
1837 if (!dstFS.mkdirs(dst)) {
1838 throw new IOException("Create directory failed: " + dst);
1840 FileStatus[] subPaths = srcFS.listStatus(src);
1841 for (FileStatus subPath : subPaths) {
1842 traversedPaths.addAll(copyFiles(srcFS, subPath.getPath(), dstFS,
1843 new Path(dst, subPath.getPath().getName()), conf, pool, futures));
1845 } else {
1846 Future<Void> future = pool.submit(() -> {
1847 FileUtil.copy(srcFS, src, dstFS, dst, false, false, conf);
1848 return null;
1850 futures.add(future);
1852 return traversedPaths;