HBASE-21843 RegionGroupingProvider breaks the meta wal file name pattern which may...
[hbase.git] / hbase-server / src / main / java / org / apache / hadoop / hbase / util / FSUtils.java
blob85ed2ae630fea6266fd9aec51e366c361d363c7d
1 /**
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org.apache.hadoop.hbase.util;
21 import edu.umd.cs.findbugs.annotations.CheckForNull;
22 import java.io.ByteArrayInputStream;
23 import java.io.DataInputStream;
24 import java.io.EOFException;
25 import java.io.FileNotFoundException;
26 import java.io.IOException;
27 import java.io.InputStream;
28 import java.io.InterruptedIOException;
29 import java.lang.reflect.InvocationTargetException;
30 import java.lang.reflect.Method;
31 import java.net.InetSocketAddress;
32 import java.util.ArrayList;
33 import java.util.Arrays;
34 import java.util.Collections;
35 import java.util.HashMap;
36 import java.util.Iterator;
37 import java.util.LinkedList;
38 import java.util.List;
39 import java.util.Locale;
40 import java.util.Map;
41 import java.util.Vector;
42 import java.util.concurrent.ArrayBlockingQueue;
43 import java.util.concurrent.ConcurrentHashMap;
44 import java.util.concurrent.ExecutionException;
45 import java.util.concurrent.ExecutorService;
46 import java.util.concurrent.Executors;
47 import java.util.concurrent.Future;
48 import java.util.concurrent.FutureTask;
49 import java.util.concurrent.ThreadPoolExecutor;
50 import java.util.concurrent.TimeUnit;
51 import java.util.regex.Pattern;
52 import org.apache.hadoop.conf.Configuration;
53 import org.apache.hadoop.fs.BlockLocation;
54 import org.apache.hadoop.fs.FSDataInputStream;
55 import org.apache.hadoop.fs.FSDataOutputStream;
56 import org.apache.hadoop.fs.FileStatus;
57 import org.apache.hadoop.fs.FileSystem;
58 import org.apache.hadoop.fs.FileUtil;
59 import org.apache.hadoop.fs.Path;
60 import org.apache.hadoop.fs.PathFilter;
61 import org.apache.hadoop.fs.permission.FsAction;
62 import org.apache.hadoop.fs.permission.FsPermission;
63 import org.apache.hadoop.hbase.ClusterId;
64 import org.apache.hadoop.hbase.HColumnDescriptor;
65 import org.apache.hadoop.hbase.HConstants;
66 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
67 import org.apache.hadoop.hbase.HRegionInfo;
68 import org.apache.hadoop.hbase.TableName;
69 import org.apache.hadoop.hbase.client.RegionInfo;
70 import org.apache.hadoop.hbase.exceptions.DeserializationException;
71 import org.apache.hadoop.hbase.fs.HFileSystem;
72 import org.apache.hadoop.hbase.io.HFileLink;
73 import org.apache.hadoop.hbase.master.HMaster;
74 import org.apache.hadoop.hbase.regionserver.HRegion;
75 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
76 import org.apache.hadoop.hbase.security.AccessDeniedException;
77 import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
78 import org.apache.hadoop.hdfs.DFSClient;
79 import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
80 import org.apache.hadoop.hdfs.DistributedFileSystem;
81 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
82 import org.apache.hadoop.io.IOUtils;
83 import org.apache.hadoop.ipc.RemoteException;
84 import org.apache.hadoop.security.UserGroupInformation;
85 import org.apache.hadoop.util.Progressable;
86 import org.apache.hadoop.util.ReflectionUtils;
87 import org.apache.hadoop.util.StringUtils;
88 import org.apache.yetus.audience.InterfaceAudience;
89 import org.slf4j.Logger;
90 import org.slf4j.LoggerFactory;
92 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
93 import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
94 import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
95 import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
97 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
98 import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos;
101 * Utility methods for interacting with the underlying file system.
103 @InterfaceAudience.Private
104 public abstract class FSUtils extends CommonFSUtils {
105 private static final Logger LOG = LoggerFactory.getLogger(FSUtils.class);
107 private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize";
108 private static final int DEFAULT_THREAD_POOLSIZE = 2;
110 /** Set to true on Windows platforms */
111 @VisibleForTesting // currently only used in testing. TODO refactor into a test class
112 public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
114 protected FSUtils() {
115 super();
119 * @return True is <code>fs</code> is instance of DistributedFileSystem
120 * @throws IOException
122 public static boolean isDistributedFileSystem(final FileSystem fs) throws IOException {
123 FileSystem fileSystem = fs;
124 // If passed an instance of HFileSystem, it fails instanceof DistributedFileSystem.
125 // Check its backing fs for dfs-ness.
126 if (fs instanceof HFileSystem) {
127 fileSystem = ((HFileSystem)fs).getBackingFs();
129 return fileSystem instanceof DistributedFileSystem;
133 * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
134 * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider
135 * schema; i.e. if schemas different but path or subpath matches, the two will equate.
136 * @param pathToSearch Path we will be trying to match.
137 * @param pathTail
138 * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
140 public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
141 if (pathToSearch.depth() != pathTail.depth()) return false;
142 Path tailPath = pathTail;
143 String tailName;
144 Path toSearch = pathToSearch;
145 String toSearchName;
146 boolean result = false;
147 do {
148 tailName = tailPath.getName();
149 if (tailName == null || tailName.length() <= 0) {
150 result = true;
151 break;
153 toSearchName = toSearch.getName();
154 if (toSearchName == null || toSearchName.length() <= 0) break;
155 // Move up a parent on each path for next go around. Path doesn't let us go off the end.
156 tailPath = tailPath.getParent();
157 toSearch = toSearch.getParent();
158 } while(tailName.equals(toSearchName));
159 return result;
162 public static FSUtils getInstance(FileSystem fs, Configuration conf) {
163 String scheme = fs.getUri().getScheme();
164 if (scheme == null) {
165 LOG.warn("Could not find scheme for uri " +
166 fs.getUri() + ", default to hdfs");
167 scheme = "hdfs";
169 Class<?> fsUtilsClass = conf.getClass("hbase.fsutil." +
170 scheme + ".impl", FSHDFSUtils.class); // Default to HDFS impl
171 FSUtils fsUtils = (FSUtils)ReflectionUtils.newInstance(fsUtilsClass, conf);
172 return fsUtils;
176 * Delete the region directory if exists.
177 * @param conf
178 * @param hri
179 * @return True if deleted the region directory.
180 * @throws IOException
182 public static boolean deleteRegionDir(final Configuration conf, final HRegionInfo hri)
183 throws IOException {
184 Path rootDir = getRootDir(conf);
185 FileSystem fs = rootDir.getFileSystem(conf);
186 return deleteDirectory(fs,
187 new Path(getTableDir(rootDir, hri.getTable()), hri.getEncodedName()));
191 * Create the specified file on the filesystem. By default, this will:
192 * <ol>
193 * <li>overwrite the file if it exists</li>
194 * <li>apply the umask in the configuration (if it is enabled)</li>
195 * <li>use the fs configured buffer size (or 4096 if not set)</li>
196 * <li>use the configured column family replication or default replication if
197 * {@link HColumnDescriptor#DEFAULT_DFS_REPLICATION}</li>
198 * <li>use the default block size</li>
199 * <li>not track progress</li>
200 * </ol>
201 * @param conf configurations
202 * @param fs {@link FileSystem} on which to write the file
203 * @param path {@link Path} to the file to write
204 * @param perm permissions
205 * @param favoredNodes
206 * @return output stream to the created file
207 * @throws IOException if the file cannot be created
209 public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path,
210 FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException {
211 if (fs instanceof HFileSystem) {
212 FileSystem backingFs = ((HFileSystem)fs).getBackingFs();
213 if (backingFs instanceof DistributedFileSystem) {
214 // Try to use the favoredNodes version via reflection to allow backwards-
215 // compatibility.
216 short replication = Short.parseShort(conf.get(HColumnDescriptor.DFS_REPLICATION,
217 String.valueOf(HColumnDescriptor.DEFAULT_DFS_REPLICATION)));
218 try {
219 return (FSDataOutputStream) (DistributedFileSystem.class.getDeclaredMethod("create",
220 Path.class, FsPermission.class, boolean.class, int.class, short.class, long.class,
221 Progressable.class, InetSocketAddress[].class).invoke(backingFs, path, perm, true,
222 getDefaultBufferSize(backingFs),
223 replication > 0 ? replication : getDefaultReplication(backingFs, path),
224 getDefaultBlockSize(backingFs, path), null, favoredNodes));
225 } catch (InvocationTargetException ite) {
226 // Function was properly called, but threw it's own exception.
227 throw new IOException(ite.getCause());
228 } catch (NoSuchMethodException e) {
229 LOG.debug("DFS Client does not support most favored nodes create; using default create");
230 if (LOG.isTraceEnabled()) LOG.trace("Ignoring; use default create", e);
231 } catch (IllegalArgumentException e) {
232 LOG.debug("Ignoring (most likely Reflection related exception) " + e);
233 } catch (SecurityException e) {
234 LOG.debug("Ignoring (most likely Reflection related exception) " + e);
235 } catch (IllegalAccessException e) {
236 LOG.debug("Ignoring (most likely Reflection related exception) " + e);
240 return create(fs, path, perm, true);
244 * Checks to see if the specified file system is available
246 * @param fs filesystem
247 * @throws IOException e
249 public static void checkFileSystemAvailable(final FileSystem fs)
250 throws IOException {
251 if (!(fs instanceof DistributedFileSystem)) {
252 return;
254 IOException exception = null;
255 DistributedFileSystem dfs = (DistributedFileSystem) fs;
256 try {
257 if (dfs.exists(new Path("/"))) {
258 return;
260 } catch (IOException e) {
261 exception = e instanceof RemoteException ?
262 ((RemoteException)e).unwrapRemoteException() : e;
264 try {
265 fs.close();
266 } catch (Exception e) {
267 LOG.error("file system close failed: ", e);
269 IOException io = new IOException("File system is not available");
270 io.initCause(exception);
271 throw io;
275 * We use reflection because {@link DistributedFileSystem#setSafeMode(
276 * HdfsConstants.SafeModeAction action, boolean isChecked)} is not in hadoop 1.1
278 * @param dfs
279 * @return whether we're in safe mode
280 * @throws IOException
282 private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOException {
283 boolean inSafeMode = false;
284 try {
285 Method m = DistributedFileSystem.class.getMethod("setSafeMode", new Class<?> []{
286 org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.class, boolean.class});
287 inSafeMode = (Boolean) m.invoke(dfs,
288 org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET, true);
289 } catch (Exception e) {
290 if (e instanceof IOException) throw (IOException) e;
292 // Check whether dfs is on safemode.
293 inSafeMode = dfs.setSafeMode(
294 org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET);
296 return inSafeMode;
300 * Check whether dfs is in safemode.
301 * @param conf
302 * @throws IOException
304 public static void checkDfsSafeMode(final Configuration conf)
305 throws IOException {
306 boolean isInSafeMode = false;
307 FileSystem fs = FileSystem.get(conf);
308 if (fs instanceof DistributedFileSystem) {
309 DistributedFileSystem dfs = (DistributedFileSystem)fs;
310 isInSafeMode = isInSafeMode(dfs);
312 if (isInSafeMode) {
313 throw new IOException("File system is in safemode, it can't be written now");
318 * Verifies current version of file system
320 * @param fs filesystem object
321 * @param rootdir root hbase directory
322 * @return null if no version file exists, version string otherwise.
323 * @throws IOException e
324 * @throws org.apache.hadoop.hbase.exceptions.DeserializationException
326 public static String getVersion(FileSystem fs, Path rootdir)
327 throws IOException, DeserializationException {
328 Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
329 FileStatus[] status = null;
330 try {
331 // hadoop 2.0 throws FNFE if directory does not exist.
332 // hadoop 1.0 returns null if directory does not exist.
333 status = fs.listStatus(versionFile);
334 } catch (FileNotFoundException fnfe) {
335 return null;
337 if (status == null || status.length == 0) return null;
338 String version = null;
339 byte [] content = new byte [(int)status[0].getLen()];
340 FSDataInputStream s = fs.open(versionFile);
341 try {
342 IOUtils.readFully(s, content, 0, content.length);
343 if (ProtobufUtil.isPBMagicPrefix(content)) {
344 version = parseVersionFrom(content);
345 } else {
346 // Presume it pre-pb format.
347 InputStream is = new ByteArrayInputStream(content);
348 DataInputStream dis = new DataInputStream(is);
349 try {
350 version = dis.readUTF();
351 } finally {
352 dis.close();
355 } catch (EOFException eof) {
356 LOG.warn("Version file was empty, odd, will try to set it.");
357 } finally {
358 s.close();
360 return version;
364 * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file.
365 * @param bytes The byte content of the hbase.version file.
366 * @return The version found in the file as a String.
367 * @throws DeserializationException
369 static String parseVersionFrom(final byte [] bytes)
370 throws DeserializationException {
371 ProtobufUtil.expectPBMagicPrefix(bytes);
372 int pblen = ProtobufUtil.lengthOfPBMagic();
373 FSProtos.HBaseVersionFileContent.Builder builder =
374 FSProtos.HBaseVersionFileContent.newBuilder();
375 try {
376 ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
377 return builder.getVersion();
378 } catch (IOException e) {
379 // Convert
380 throw new DeserializationException(e);
385 * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file.
386 * @param version Version to persist
387 * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a prefix.
389 static byte [] toVersionByteArray(final String version) {
390 FSProtos.HBaseVersionFileContent.Builder builder =
391 FSProtos.HBaseVersionFileContent.newBuilder();
392 return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray());
396 * Verifies current version of file system
398 * @param fs file system
399 * @param rootdir root directory of HBase installation
400 * @param message if true, issues a message on System.out
402 * @throws IOException e
403 * @throws DeserializationException
405 public static void checkVersion(FileSystem fs, Path rootdir, boolean message)
406 throws IOException, DeserializationException {
407 checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
411 * Verifies current version of file system
413 * @param fs file system
414 * @param rootdir root directory of HBase installation
415 * @param message if true, issues a message on System.out
416 * @param wait wait interval
417 * @param retries number of times to retry
419 * @throws IOException e
420 * @throws DeserializationException
422 public static void checkVersion(FileSystem fs, Path rootdir,
423 boolean message, int wait, int retries)
424 throws IOException, DeserializationException {
425 String version = getVersion(fs, rootdir);
426 if (version == null) {
427 if (!metaRegionExists(fs, rootdir)) {
428 // rootDir is empty (no version file and no root region)
429 // just create new version file (HBASE-1195)
430 setVersion(fs, rootdir, wait, retries);
431 return;
433 } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) return;
435 // version is deprecated require migration
436 // Output on stdout so user sees it in terminal.
437 String msg = "HBase file layout needs to be upgraded."
438 + " You have version " + version
439 + " and I want version " + HConstants.FILE_SYSTEM_VERSION
440 + ". Consult http://hbase.apache.org/book.html for further information about upgrading HBase."
441 + " Is your hbase.rootdir valid? If so, you may need to run "
442 + "'hbase hbck -fixVersionFile'.";
443 if (message) {
444 System.out.println("WARNING! " + msg);
446 throw new FileSystemVersionException(msg);
450 * Sets version of file system
452 * @param fs filesystem object
453 * @param rootdir hbase root
454 * @throws IOException e
456 public static void setVersion(FileSystem fs, Path rootdir)
457 throws IOException {
458 setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0,
459 HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
463 * Sets version of file system
465 * @param fs filesystem object
466 * @param rootdir hbase root
467 * @param wait time to wait for retry
468 * @param retries number of times to retry before failing
469 * @throws IOException e
471 public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries)
472 throws IOException {
473 setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries);
478 * Sets version of file system
480 * @param fs filesystem object
481 * @param rootdir hbase root directory
482 * @param version version to set
483 * @param wait time to wait for retry
484 * @param retries number of times to retry before throwing an IOException
485 * @throws IOException e
487 public static void setVersion(FileSystem fs, Path rootdir, String version,
488 int wait, int retries) throws IOException {
489 Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
490 Path tempVersionFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR +
491 HConstants.VERSION_FILE_NAME);
492 while (true) {
493 try {
494 // Write the version to a temporary file
495 FSDataOutputStream s = fs.create(tempVersionFile);
496 try {
497 s.write(toVersionByteArray(version));
498 s.close();
499 s = null;
500 // Move the temp version file to its normal location. Returns false
501 // if the rename failed. Throw an IOE in that case.
502 if (!fs.rename(tempVersionFile, versionFile)) {
503 throw new IOException("Unable to move temp version file to " + versionFile);
505 } finally {
506 // Cleaning up the temporary if the rename failed would be trying
507 // too hard. We'll unconditionally create it again the next time
508 // through anyway, files are overwritten by default by create().
510 // Attempt to close the stream on the way out if it is still open.
511 try {
512 if (s != null) s.close();
513 } catch (IOException ignore) { }
515 LOG.info("Created version file at " + rootdir.toString() + " with version=" + version);
516 return;
517 } catch (IOException e) {
518 if (retries > 0) {
519 LOG.debug("Unable to create version file at " + rootdir.toString() + ", retrying", e);
520 fs.delete(versionFile, false);
521 try {
522 if (wait > 0) {
523 Thread.sleep(wait);
525 } catch (InterruptedException ie) {
526 throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
528 retries--;
529 } else {
530 throw e;
537 * Checks that a cluster ID file exists in the HBase root directory
538 * @param fs the root directory FileSystem
539 * @param rootdir the HBase root directory in HDFS
540 * @param wait how long to wait between retries
541 * @return <code>true</code> if the file exists, otherwise <code>false</code>
542 * @throws IOException if checking the FileSystem fails
544 public static boolean checkClusterIdExists(FileSystem fs, Path rootdir,
545 int wait) throws IOException {
546 while (true) {
547 try {
548 Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
549 return fs.exists(filePath);
550 } catch (IOException ioe) {
551 if (wait > 0) {
552 LOG.warn("Unable to check cluster ID file in " + rootdir.toString() +
553 ", retrying in "+wait+"msec: "+StringUtils.stringifyException(ioe));
554 try {
555 Thread.sleep(wait);
556 } catch (InterruptedException e) {
557 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
559 } else {
560 throw ioe;
567 * Returns the value of the unique cluster ID stored for this HBase instance.
568 * @param fs the root directory FileSystem
569 * @param rootdir the path to the HBase root directory
570 * @return the unique cluster identifier
571 * @throws IOException if reading the cluster ID file fails
573 public static ClusterId getClusterId(FileSystem fs, Path rootdir)
574 throws IOException {
575 Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
576 ClusterId clusterId = null;
577 FileStatus status = fs.exists(idPath)? fs.getFileStatus(idPath): null;
578 if (status != null) {
579 int len = Ints.checkedCast(status.getLen());
580 byte [] content = new byte[len];
581 FSDataInputStream in = fs.open(idPath);
582 try {
583 in.readFully(content);
584 } catch (EOFException eof) {
585 LOG.warn("Cluster ID file " + idPath.toString() + " was empty");
586 } finally{
587 in.close();
589 try {
590 clusterId = ClusterId.parseFrom(content);
591 } catch (DeserializationException e) {
592 throw new IOException("content=" + Bytes.toString(content), e);
594 // If not pb'd, make it so.
595 if (!ProtobufUtil.isPBMagicPrefix(content)) {
596 String cid = null;
597 in = fs.open(idPath);
598 try {
599 cid = in.readUTF();
600 clusterId = new ClusterId(cid);
601 } catch (EOFException eof) {
602 LOG.warn("Cluster ID file " + idPath.toString() + " was empty");
603 } finally {
604 in.close();
606 rewriteAsPb(fs, rootdir, idPath, clusterId);
608 return clusterId;
609 } else {
610 LOG.warn("Cluster ID file does not exist at " + idPath.toString());
612 return clusterId;
616 * @param cid
617 * @throws IOException
619 private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p,
620 final ClusterId cid)
621 throws IOException {
622 // Rewrite the file as pb. Move aside the old one first, write new
623 // then delete the moved-aside file.
624 Path movedAsideName = new Path(p + "." + System.currentTimeMillis());
625 if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p);
626 setClusterId(fs, rootdir, cid, 100);
627 if (!fs.delete(movedAsideName, false)) {
628 throw new IOException("Failed delete of " + movedAsideName);
630 LOG.debug("Rewrote the hbase.id file as pb");
634 * Writes a new unique identifier for this cluster to the "hbase.id" file
635 * in the HBase root directory
636 * @param fs the root directory FileSystem
637 * @param rootdir the path to the HBase root directory
638 * @param clusterId the unique identifier to store
639 * @param wait how long (in milliseconds) to wait between retries
640 * @throws IOException if writing to the FileSystem fails and no wait value
642 public static void setClusterId(FileSystem fs, Path rootdir, ClusterId clusterId,
643 int wait) throws IOException {
644 while (true) {
645 try {
646 Path idFile = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
647 Path tempIdFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY +
648 Path.SEPARATOR + HConstants.CLUSTER_ID_FILE_NAME);
649 // Write the id file to a temporary location
650 FSDataOutputStream s = fs.create(tempIdFile);
651 try {
652 s.write(clusterId.toByteArray());
653 s.close();
654 s = null;
655 // Move the temporary file to its normal location. Throw an IOE if
656 // the rename failed
657 if (!fs.rename(tempIdFile, idFile)) {
658 throw new IOException("Unable to move temp version file to " + idFile);
660 } finally {
661 // Attempt to close the stream if still open on the way out
662 try {
663 if (s != null) s.close();
664 } catch (IOException ignore) { }
666 if (LOG.isDebugEnabled()) {
667 LOG.debug("Created cluster ID file at " + idFile.toString() + " with ID: " + clusterId);
669 return;
670 } catch (IOException ioe) {
671 if (wait > 0) {
672 LOG.warn("Unable to create cluster ID file in " + rootdir.toString() +
673 ", retrying in " + wait + "msec: " + StringUtils.stringifyException(ioe));
674 try {
675 Thread.sleep(wait);
676 } catch (InterruptedException e) {
677 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
679 } else {
680 throw ioe;
687 * If DFS, check safe mode and if so, wait until we clear it.
688 * @param conf configuration
689 * @param wait Sleep between retries
690 * @throws IOException e
692 public static void waitOnSafeMode(final Configuration conf,
693 final long wait)
694 throws IOException {
695 FileSystem fs = FileSystem.get(conf);
696 if (!(fs instanceof DistributedFileSystem)) return;
697 DistributedFileSystem dfs = (DistributedFileSystem)fs;
698 // Make sure dfs is not in safe mode
699 while (isInSafeMode(dfs)) {
700 LOG.info("Waiting for dfs to exit safe mode...");
701 try {
702 Thread.sleep(wait);
703 } catch (InterruptedException e) {
704 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
710 * Checks if meta region exists
712 * @param fs file system
713 * @param rootdir root directory of HBase installation
714 * @return true if exists
715 * @throws IOException e
717 @SuppressWarnings("deprecation")
718 public static boolean metaRegionExists(FileSystem fs, Path rootdir)
719 throws IOException {
720 Path metaRegionDir =
721 HRegion.getRegionDir(rootdir, HRegionInfo.FIRST_META_REGIONINFO);
722 return fs.exists(metaRegionDir);
726 * Compute HDFS blocks distribution of a given file, or a portion of the file
727 * @param fs file system
728 * @param status file status of the file
729 * @param start start position of the portion
730 * @param length length of the portion
731 * @return The HDFS blocks distribution
733 static public HDFSBlocksDistribution computeHDFSBlocksDistribution(
734 final FileSystem fs, FileStatus status, long start, long length)
735 throws IOException {
736 HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
737 BlockLocation [] blockLocations =
738 fs.getFileBlockLocations(status, start, length);
739 for(BlockLocation bl : blockLocations) {
740 String [] hosts = bl.getHosts();
741 long len = bl.getLength();
742 blocksDistribution.addHostsAndBlockWeight(hosts, len);
745 return blocksDistribution;
749 * Update blocksDistribution with blockLocations
750 * @param blocksDistribution the hdfs blocks distribution
751 * @param blockLocations an array containing block location
753 static public void addToHDFSBlocksDistribution(
754 HDFSBlocksDistribution blocksDistribution, BlockLocation[] blockLocations)
755 throws IOException {
756 for (BlockLocation bl : blockLocations) {
757 String[] hosts = bl.getHosts();
758 long len = bl.getLength();
759 blocksDistribution.addHostsAndBlockWeight(hosts, len);
763 // TODO move this method OUT of FSUtils. No dependencies to HMaster
765 * Returns the total overall fragmentation percentage. Includes hbase:meta and
766 * -ROOT- as well.
768 * @param master The master defining the HBase root and file system.
769 * @return A map for each table and its percentage.
770 * @throws IOException When scanning the directory fails.
772 public static int getTotalTableFragmentation(final HMaster master)
773 throws IOException {
774 Map<String, Integer> map = getTableFragmentation(master);
775 return map != null && map.size() > 0 ? map.get("-TOTAL-") : -1;
779 * Runs through the HBase rootdir and checks how many stores for each table
780 * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
781 * percentage across all tables is stored under the special key "-TOTAL-".
783 * @param master The master defining the HBase root and file system.
784 * @return A map for each table and its percentage.
786 * @throws IOException When scanning the directory fails.
788 public static Map<String, Integer> getTableFragmentation(
789 final HMaster master)
790 throws IOException {
791 Path path = getRootDir(master.getConfiguration());
792 // since HMaster.getFileSystem() is package private
793 FileSystem fs = path.getFileSystem(master.getConfiguration());
794 return getTableFragmentation(fs, path);
798 * Runs through the HBase rootdir and checks how many stores for each table
799 * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
800 * percentage across all tables is stored under the special key "-TOTAL-".
802 * @param fs The file system to use.
803 * @param hbaseRootDir The root directory to scan.
804 * @return A map for each table and its percentage.
805 * @throws IOException When scanning the directory fails.
807 public static Map<String, Integer> getTableFragmentation(
808 final FileSystem fs, final Path hbaseRootDir)
809 throws IOException {
810 Map<String, Integer> frags = new HashMap<>();
811 int cfCountTotal = 0;
812 int cfFragTotal = 0;
813 PathFilter regionFilter = new RegionDirFilter(fs);
814 PathFilter familyFilter = new FamilyDirFilter(fs);
815 List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
816 for (Path d : tableDirs) {
817 int cfCount = 0;
818 int cfFrag = 0;
819 FileStatus[] regionDirs = fs.listStatus(d, regionFilter);
820 for (FileStatus regionDir : regionDirs) {
821 Path dd = regionDir.getPath();
822 // else its a region name, now look in region for families
823 FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
824 for (FileStatus familyDir : familyDirs) {
825 cfCount++;
826 cfCountTotal++;
827 Path family = familyDir.getPath();
828 // now in family make sure only one file
829 FileStatus[] familyStatus = fs.listStatus(family);
830 if (familyStatus.length > 1) {
831 cfFrag++;
832 cfFragTotal++;
836 // compute percentage per table and store in result list
837 frags.put(FSUtils.getTableName(d).getNameAsString(),
838 cfCount == 0? 0: Math.round((float) cfFrag / cfCount * 100));
840 // set overall percentage for all tables
841 frags.put("-TOTAL-",
842 cfCountTotal == 0? 0: Math.round((float) cfFragTotal / cfCountTotal * 100));
843 return frags;
846 public static void renameFile(FileSystem fs, Path src, Path dst) throws IOException {
847 if (fs.exists(dst) && !fs.delete(dst, false)) {
848 throw new IOException("Can not delete " + dst);
850 if (!fs.rename(src, dst)) {
851 throw new IOException("Can not rename from " + src + " to " + dst);
856 * A {@link PathFilter} that returns only regular files.
858 static class FileFilter extends AbstractFileStatusFilter {
859 private final FileSystem fs;
861 public FileFilter(final FileSystem fs) {
862 this.fs = fs;
865 @Override
866 protected boolean accept(Path p, @CheckForNull Boolean isDir) {
867 try {
868 return isFile(fs, isDir, p);
869 } catch (IOException e) {
870 LOG.warn("unable to verify if path=" + p + " is a regular file", e);
871 return false;
877 * Directory filter that doesn't include any of the directories in the specified blacklist
879 public static class BlackListDirFilter extends AbstractFileStatusFilter {
880 private final FileSystem fs;
881 private List<String> blacklist;
884 * Create a filter on the givem filesystem with the specified blacklist
885 * @param fs filesystem to filter
886 * @param directoryNameBlackList list of the names of the directories to filter. If
887 * <tt>null</tt>, all directories are returned
889 @SuppressWarnings("unchecked")
890 public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) {
891 this.fs = fs;
892 blacklist =
893 (List<String>) (directoryNameBlackList == null ? Collections.emptyList()
894 : directoryNameBlackList);
897 @Override
898 protected boolean accept(Path p, @CheckForNull Boolean isDir) {
899 if (!isValidName(p.getName())) {
900 return false;
903 try {
904 return isDirectory(fs, isDir, p);
905 } catch (IOException e) {
906 LOG.warn("An error occurred while verifying if [" + p.toString()
907 + "] is a valid directory. Returning 'not valid' and continuing.", e);
908 return false;
912 protected boolean isValidName(final String name) {
913 return !blacklist.contains(name);
918 * A {@link PathFilter} that only allows directories.
920 public static class DirFilter extends BlackListDirFilter {
922 public DirFilter(FileSystem fs) {
923 super(fs, null);
928 * A {@link PathFilter} that returns usertable directories. To get all directories use the
929 * {@link BlackListDirFilter} with a <tt>null</tt> blacklist
931 public static class UserTableDirFilter extends BlackListDirFilter {
932 public UserTableDirFilter(FileSystem fs) {
933 super(fs, HConstants.HBASE_NON_TABLE_DIRS);
936 @Override
937 protected boolean isValidName(final String name) {
938 if (!super.isValidName(name))
939 return false;
941 try {
942 TableName.isLegalTableQualifierName(Bytes.toBytes(name));
943 } catch (IllegalArgumentException e) {
944 LOG.info("INVALID NAME " + name);
945 return false;
947 return true;
951 public void recoverFileLease(final FileSystem fs, final Path p, Configuration conf)
952 throws IOException {
953 recoverFileLease(fs, p, conf, null);
957 * Recover file lease. Used when a file might be suspect
958 * to be had been left open by another process.
959 * @param fs FileSystem handle
960 * @param p Path of file to recover lease
961 * @param conf Configuration handle
962 * @throws IOException
964 public abstract void recoverFileLease(final FileSystem fs, final Path p,
965 Configuration conf, CancelableProgressable reporter) throws IOException;
967 public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir)
968 throws IOException {
969 List<Path> tableDirs = new LinkedList<>();
971 for(FileStatus status :
972 fs.globStatus(new Path(rootdir,
973 new Path(HConstants.BASE_NAMESPACE_DIR, "*")))) {
974 tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath()));
976 return tableDirs;
980 * @param fs
981 * @param rootdir
982 * @return All the table directories under <code>rootdir</code>. Ignore non table hbase folders such as
983 * .logs, .oldlogs, .corrupt folders.
984 * @throws IOException
986 public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir)
987 throws IOException {
988 // presumes any directory under hbase.rootdir is a table
989 FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs));
990 List<Path> tabledirs = new ArrayList<>(dirs.length);
991 for (FileStatus dir: dirs) {
992 tabledirs.add(dir.getPath());
994 return tabledirs;
998 * Filter for all dirs that don't start with '.'
1000 public static class RegionDirFilter extends AbstractFileStatusFilter {
1001 // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names.
1002 final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$");
1003 final FileSystem fs;
1005 public RegionDirFilter(FileSystem fs) {
1006 this.fs = fs;
1009 @Override
1010 protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1011 if (!regionDirPattern.matcher(p.getName()).matches()) {
1012 return false;
1015 try {
1016 return isDirectory(fs, isDir, p);
1017 } catch (IOException ioe) {
1018 // Maybe the file was moved or the fs was disconnected.
1019 LOG.warn("Skipping file " + p +" due to IOException", ioe);
1020 return false;
1026 * Given a particular table dir, return all the regiondirs inside it, excluding files such as
1027 * .tableinfo
1028 * @param fs A file system for the Path
1029 * @param tableDir Path to a specific table directory &lt;hbase.rootdir&gt;/&lt;tabledir&gt;
1030 * @return List of paths to valid region directories in table dir.
1031 * @throws IOException
1033 public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir) throws IOException {
1034 // assumes we are in a table dir.
1035 List<FileStatus> rds = listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
1036 if (rds == null) {
1037 return Collections.emptyList();
1039 List<Path> regionDirs = new ArrayList<>(rds.size());
1040 for (FileStatus rdfs: rds) {
1041 Path rdPath = rdfs.getPath();
1042 regionDirs.add(rdPath);
1044 return regionDirs;
1047 public static Path getRegionDir(Path tableDir, RegionInfo region) {
1048 return new Path(tableDir, ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName());
1052 * Filter for all dirs that are legal column family names. This is generally used for colfam
1053 * dirs &lt;hbase.rootdir&gt;/&lt;tabledir&gt;/&lt;regiondir&gt;/&lt;colfamdir&gt;.
1055 public static class FamilyDirFilter extends AbstractFileStatusFilter {
1056 final FileSystem fs;
1058 public FamilyDirFilter(FileSystem fs) {
1059 this.fs = fs;
1062 @Override
1063 protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1064 try {
1065 // throws IAE if invalid
1066 HColumnDescriptor.isLegalFamilyName(Bytes.toBytes(p.getName()));
1067 } catch (IllegalArgumentException iae) {
1068 // path name is an invalid family name and thus is excluded.
1069 return false;
1072 try {
1073 return isDirectory(fs, isDir, p);
1074 } catch (IOException ioe) {
1075 // Maybe the file was moved or the fs was disconnected.
1076 LOG.warn("Skipping file " + p +" due to IOException", ioe);
1077 return false;
1083 * Given a particular region dir, return all the familydirs inside it
1085 * @param fs A file system for the Path
1086 * @param regionDir Path to a specific region directory
1087 * @return List of paths to valid family directories in region dir.
1088 * @throws IOException
1090 public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) throws IOException {
1091 // assumes we are in a region dir.
1092 FileStatus[] fds = fs.listStatus(regionDir, new FamilyDirFilter(fs));
1093 List<Path> familyDirs = new ArrayList<>(fds.length);
1094 for (FileStatus fdfs: fds) {
1095 Path fdPath = fdfs.getPath();
1096 familyDirs.add(fdPath);
1098 return familyDirs;
1101 public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir) throws IOException {
1102 List<FileStatus> fds = listStatusWithStatusFilter(fs, familyDir, new ReferenceFileFilter(fs));
1103 if (fds == null) {
1104 return Collections.emptyList();
1106 List<Path> referenceFiles = new ArrayList<>(fds.size());
1107 for (FileStatus fdfs: fds) {
1108 Path fdPath = fdfs.getPath();
1109 referenceFiles.add(fdPath);
1111 return referenceFiles;
1115 * Filter for HFiles that excludes reference files.
1117 public static class HFileFilter extends AbstractFileStatusFilter {
1118 final FileSystem fs;
1120 public HFileFilter(FileSystem fs) {
1121 this.fs = fs;
1124 @Override
1125 protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1126 if (!StoreFileInfo.isHFile(p)) {
1127 return false;
1130 try {
1131 return isFile(fs, isDir, p);
1132 } catch (IOException ioe) {
1133 // Maybe the file was moved or the fs was disconnected.
1134 LOG.warn("Skipping file " + p +" due to IOException", ioe);
1135 return false;
1141 * Filter for HFileLinks (StoreFiles and HFiles not included).
1142 * the filter itself does not consider if a link is file or not.
1144 public static class HFileLinkFilter implements PathFilter {
1146 @Override
1147 public boolean accept(Path p) {
1148 return HFileLink.isHFileLink(p);
1152 public static class ReferenceFileFilter extends AbstractFileStatusFilter {
1154 private final FileSystem fs;
1156 public ReferenceFileFilter(FileSystem fs) {
1157 this.fs = fs;
1160 @Override
1161 protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1162 if (!StoreFileInfo.isReference(p)) {
1163 return false;
1166 try {
1167 // only files can be references.
1168 return isFile(fs, isDir, p);
1169 } catch (IOException ioe) {
1170 // Maybe the file was moved or the fs was disconnected.
1171 LOG.warn("Skipping file " + p +" due to IOException", ioe);
1172 return false;
1178 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1179 * table StoreFile names to the full Path.
1180 * <br>
1181 * Example...<br>
1182 * Key = 3944417774205889744 <br>
1183 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1185 * @param map map to add values. If null, this method will create and populate one to return
1186 * @param fs The file system to use.
1187 * @param hbaseRootDir The root directory to scan.
1188 * @param tableName name of the table to scan.
1189 * @return Map keyed by StoreFile name with a value of the full Path.
1190 * @throws IOException When scanning the directory fails.
1191 * @throws InterruptedException
1193 public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map,
1194 final FileSystem fs, final Path hbaseRootDir, TableName tableName)
1195 throws IOException, InterruptedException {
1196 return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null, null, null);
1200 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1201 * table StoreFile names to the full Path. Note that because this method can be called
1202 * on a 'live' HBase system that we will skip files that no longer exist by the time
1203 * we traverse them and similarly the user of the result needs to consider that some
1204 * entries in this map may not exist by the time this call completes.
1205 * <br>
1206 * Example...<br>
1207 * Key = 3944417774205889744 <br>
1208 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1210 * @param resultMap map to add values. If null, this method will create and populate one to return
1211 * @param fs The file system to use.
1212 * @param hbaseRootDir The root directory to scan.
1213 * @param tableName name of the table to scan.
1214 * @param sfFilter optional path filter to apply to store files
1215 * @param executor optional executor service to parallelize this operation
1216 * @param errors ErrorReporter instance or null
1217 * @return Map keyed by StoreFile name with a value of the full Path.
1218 * @throws IOException When scanning the directory fails.
1219 * @throws InterruptedException
1221 public static Map<String, Path> getTableStoreFilePathMap(
1222 Map<String, Path> resultMap,
1223 final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter,
1224 ExecutorService executor, final ErrorReporter errors) throws IOException, InterruptedException {
1226 final Map<String, Path> finalResultMap =
1227 resultMap == null ? new ConcurrentHashMap<>(128, 0.75f, 32) : resultMap;
1229 // only include the directory paths to tables
1230 Path tableDir = FSUtils.getTableDir(hbaseRootDir, tableName);
1231 // Inside a table, there are compaction.dir directories to skip. Otherwise, all else
1232 // should be regions.
1233 final FamilyDirFilter familyFilter = new FamilyDirFilter(fs);
1234 final Vector<Exception> exceptions = new Vector<>();
1236 try {
1237 List<FileStatus> regionDirs = FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
1238 if (regionDirs == null) {
1239 return finalResultMap;
1242 final List<Future<?>> futures = new ArrayList<>(regionDirs.size());
1244 for (FileStatus regionDir : regionDirs) {
1245 if (null != errors) {
1246 errors.progress();
1248 final Path dd = regionDir.getPath();
1250 if (!exceptions.isEmpty()) {
1251 break;
1254 Runnable getRegionStoreFileMapCall = new Runnable() {
1255 @Override
1256 public void run() {
1257 try {
1258 HashMap<String,Path> regionStoreFileMap = new HashMap<>();
1259 List<FileStatus> familyDirs = FSUtils.listStatusWithStatusFilter(fs, dd, familyFilter);
1260 if (familyDirs == null) {
1261 if (!fs.exists(dd)) {
1262 LOG.warn("Skipping region because it no longer exists: " + dd);
1263 } else {
1264 LOG.warn("Skipping region because it has no family dirs: " + dd);
1266 return;
1268 for (FileStatus familyDir : familyDirs) {
1269 if (null != errors) {
1270 errors.progress();
1272 Path family = familyDir.getPath();
1273 if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) {
1274 continue;
1276 // now in family, iterate over the StoreFiles and
1277 // put in map
1278 FileStatus[] familyStatus = fs.listStatus(family);
1279 for (FileStatus sfStatus : familyStatus) {
1280 if (null != errors) {
1281 errors.progress();
1283 Path sf = sfStatus.getPath();
1284 if (sfFilter == null || sfFilter.accept(sf)) {
1285 regionStoreFileMap.put( sf.getName(), sf);
1289 finalResultMap.putAll(regionStoreFileMap);
1290 } catch (Exception e) {
1291 LOG.error("Could not get region store file map for region: " + dd, e);
1292 exceptions.add(e);
1297 // If executor is available, submit async tasks to exec concurrently, otherwise
1298 // just do serial sync execution
1299 if (executor != null) {
1300 Future<?> future = executor.submit(getRegionStoreFileMapCall);
1301 futures.add(future);
1302 } else {
1303 FutureTask<?> future = new FutureTask<>(getRegionStoreFileMapCall, null);
1304 future.run();
1305 futures.add(future);
1309 // Ensure all pending tasks are complete (or that we run into an exception)
1310 for (Future<?> f : futures) {
1311 if (!exceptions.isEmpty()) {
1312 break;
1314 try {
1315 f.get();
1316 } catch (ExecutionException e) {
1317 LOG.error("Unexpected exec exception! Should've been caught already. (Bug?)", e);
1318 // Shouldn't happen, we already logged/caught any exceptions in the Runnable
1321 } catch (IOException e) {
1322 LOG.error("Cannot execute getTableStoreFilePathMap for " + tableName, e);
1323 exceptions.add(e);
1324 } finally {
1325 if (!exceptions.isEmpty()) {
1326 // Just throw the first exception as an indication something bad happened
1327 // Don't need to propagate all the exceptions, we already logged them all anyway
1328 Throwables.propagateIfInstanceOf(exceptions.firstElement(), IOException.class);
1329 throw Throwables.propagate(exceptions.firstElement());
1333 return finalResultMap;
1336 public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) {
1337 int result = 0;
1338 try {
1339 for (Path familyDir:getFamilyDirs(fs, p)){
1340 result += getReferenceFilePaths(fs, familyDir).size();
1342 } catch (IOException e) {
1343 LOG.warn("Error Counting reference files.", e);
1345 return result;
1349 * Runs through the HBase rootdir and creates a reverse lookup map for
1350 * table StoreFile names to the full Path.
1351 * <br>
1352 * Example...<br>
1353 * Key = 3944417774205889744 <br>
1354 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1356 * @param fs The file system to use.
1357 * @param hbaseRootDir The root directory to scan.
1358 * @return Map keyed by StoreFile name with a value of the full Path.
1359 * @throws IOException When scanning the directory fails.
1360 * @throws InterruptedException
1362 public static Map<String, Path> getTableStoreFilePathMap(
1363 final FileSystem fs, final Path hbaseRootDir)
1364 throws IOException, InterruptedException {
1365 return getTableStoreFilePathMap(fs, hbaseRootDir, null, null, null);
1369 * Runs through the HBase rootdir and creates a reverse lookup map for
1370 * table StoreFile names to the full Path.
1371 * <br>
1372 * Example...<br>
1373 * Key = 3944417774205889744 <br>
1374 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1376 * @param fs The file system to use.
1377 * @param hbaseRootDir The root directory to scan.
1378 * @param sfFilter optional path filter to apply to store files
1379 * @param executor optional executor service to parallelize this operation
1380 * @param errors ErrorReporter instance or null
1381 * @return Map keyed by StoreFile name with a value of the full Path.
1382 * @throws IOException When scanning the directory fails.
1383 * @throws InterruptedException
1385 public static Map<String, Path> getTableStoreFilePathMap(
1386 final FileSystem fs, final Path hbaseRootDir, PathFilter sfFilter,
1387 ExecutorService executor, ErrorReporter errors)
1388 throws IOException, InterruptedException {
1389 ConcurrentHashMap<String, Path> map = new ConcurrentHashMap<>(1024, 0.75f, 32);
1391 // if this method looks similar to 'getTableFragmentation' that is because
1392 // it was borrowed from it.
1394 // only include the directory paths to tables
1395 for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) {
1396 getTableStoreFilePathMap(map, fs, hbaseRootDir,
1397 FSUtils.getTableName(tableDir), sfFilter, executor, errors);
1399 return map;
1403 * Filters FileStatuses in an array and returns a list
1405 * @param input An array of FileStatuses
1406 * @param filter A required filter to filter the array
1407 * @return A list of FileStatuses
1409 public static List<FileStatus> filterFileStatuses(FileStatus[] input,
1410 FileStatusFilter filter) {
1411 if (input == null) return null;
1412 return filterFileStatuses(Iterators.forArray(input), filter);
1416 * Filters FileStatuses in an iterator and returns a list
1418 * @param input An iterator of FileStatuses
1419 * @param filter A required filter to filter the array
1420 * @return A list of FileStatuses
1422 public static List<FileStatus> filterFileStatuses(Iterator<FileStatus> input,
1423 FileStatusFilter filter) {
1424 if (input == null) return null;
1425 ArrayList<FileStatus> results = new ArrayList<>();
1426 while (input.hasNext()) {
1427 FileStatus f = input.next();
1428 if (filter.accept(f)) {
1429 results.add(f);
1432 return results;
1436 * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
1437 * This accommodates differences between hadoop versions, where hadoop 1
1438 * does not throw a FileNotFoundException, and return an empty FileStatus[]
1439 * while Hadoop 2 will throw FileNotFoundException.
1441 * @param fs file system
1442 * @param dir directory
1443 * @param filter file status filter
1444 * @return null if dir is empty or doesn't exist, otherwise FileStatus list
1446 public static List<FileStatus> listStatusWithStatusFilter(final FileSystem fs,
1447 final Path dir, final FileStatusFilter filter) throws IOException {
1448 FileStatus [] status = null;
1449 try {
1450 status = fs.listStatus(dir);
1451 } catch (FileNotFoundException fnfe) {
1452 // if directory doesn't exist, return null
1453 if (LOG.isTraceEnabled()) {
1454 LOG.trace(dir + " doesn't exist");
1458 if (status == null || status.length < 1) {
1459 return null;
1462 if (filter == null) {
1463 return Arrays.asList(status);
1464 } else {
1465 List<FileStatus> status2 = filterFileStatuses(status, filter);
1466 if (status2 == null || status2.isEmpty()) {
1467 return null;
1468 } else {
1469 return status2;
1475 * Throw an exception if an action is not permitted by a user on a file.
1477 * @param ugi
1478 * the user
1479 * @param file
1480 * the file
1481 * @param action
1482 * the action
1484 public static void checkAccess(UserGroupInformation ugi, FileStatus file,
1485 FsAction action) throws AccessDeniedException {
1486 if (ugi.getShortUserName().equals(file.getOwner())) {
1487 if (file.getPermission().getUserAction().implies(action)) {
1488 return;
1490 } else if (contains(ugi.getGroupNames(), file.getGroup())) {
1491 if (file.getPermission().getGroupAction().implies(action)) {
1492 return;
1494 } else if (file.getPermission().getOtherAction().implies(action)) {
1495 return;
1497 throw new AccessDeniedException("Permission denied:" + " action=" + action
1498 + " path=" + file.getPath() + " user=" + ugi.getShortUserName());
1501 private static boolean contains(String[] groups, String user) {
1502 for (String group : groups) {
1503 if (group.equals(user)) {
1504 return true;
1507 return false;
1511 * This function is to scan the root path of the file system to get the
1512 * degree of locality for each region on each of the servers having at least
1513 * one block of that region.
1514 * This is used by the tool {@link org.apache.hadoop.hbase.master.RegionPlacementMaintainer}
1516 * @param conf
1517 * the configuration to use
1518 * @return the mapping from region encoded name to a map of server names to
1519 * locality fraction
1520 * @throws IOException
1521 * in case of file system errors or interrupts
1523 public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1524 final Configuration conf) throws IOException {
1525 return getRegionDegreeLocalityMappingFromFS(
1526 conf, null,
1527 conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE));
1532 * This function is to scan the root path of the file system to get the
1533 * degree of locality for each region on each of the servers having at least
1534 * one block of that region.
1536 * @param conf
1537 * the configuration to use
1538 * @param desiredTable
1539 * the table you wish to scan locality for
1540 * @param threadPoolSize
1541 * the thread pool size to use
1542 * @return the mapping from region encoded name to a map of server names to
1543 * locality fraction
1544 * @throws IOException
1545 * in case of file system errors or interrupts
1547 public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1548 final Configuration conf, final String desiredTable, int threadPoolSize)
1549 throws IOException {
1550 Map<String, Map<String, Float>> regionDegreeLocalityMapping = new ConcurrentHashMap<>();
1551 getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, null,
1552 regionDegreeLocalityMapping);
1553 return regionDegreeLocalityMapping;
1557 * This function is to scan the root path of the file system to get either the
1558 * mapping between the region name and its best locality region server or the
1559 * degree of locality of each region on each of the servers having at least
1560 * one block of that region. The output map parameters are both optional.
1562 * @param conf
1563 * the configuration to use
1564 * @param desiredTable
1565 * the table you wish to scan locality for
1566 * @param threadPoolSize
1567 * the thread pool size to use
1568 * @param regionToBestLocalityRSMapping
1569 * the map into which to put the best locality mapping or null
1570 * @param regionDegreeLocalityMapping
1571 * the map into which to put the locality degree mapping or null,
1572 * must be a thread-safe implementation
1573 * @throws IOException
1574 * in case of file system errors or interrupts
1576 private static void getRegionLocalityMappingFromFS(
1577 final Configuration conf, final String desiredTable,
1578 int threadPoolSize,
1579 Map<String, String> regionToBestLocalityRSMapping,
1580 Map<String, Map<String, Float>> regionDegreeLocalityMapping)
1581 throws IOException {
1582 FileSystem fs = FileSystem.get(conf);
1583 Path rootPath = FSUtils.getRootDir(conf);
1584 long startTime = EnvironmentEdgeManager.currentTime();
1585 Path queryPath;
1586 // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/*
1587 if (null == desiredTable) {
1588 queryPath = new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/");
1589 } else {
1590 queryPath = new Path(FSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)).toString() + "/*/");
1593 // reject all paths that are not appropriate
1594 PathFilter pathFilter = new PathFilter() {
1595 @Override
1596 public boolean accept(Path path) {
1597 // this is the region name; it may get some noise data
1598 if (null == path) {
1599 return false;
1602 // no parent?
1603 Path parent = path.getParent();
1604 if (null == parent) {
1605 return false;
1608 String regionName = path.getName();
1609 if (null == regionName) {
1610 return false;
1613 if (!regionName.toLowerCase(Locale.ROOT).matches("[0-9a-f]+")) {
1614 return false;
1616 return true;
1620 FileStatus[] statusList = fs.globStatus(queryPath, pathFilter);
1622 if (null == statusList) {
1623 return;
1624 } else {
1625 LOG.debug("Query Path: " + queryPath + " ; # list of files: " +
1626 statusList.length);
1629 // lower the number of threads in case we have very few expected regions
1630 threadPoolSize = Math.min(threadPoolSize, statusList.length);
1632 // run in multiple threads
1633 ThreadPoolExecutor tpe = new ThreadPoolExecutor(threadPoolSize,
1634 threadPoolSize, 60, TimeUnit.SECONDS,
1635 new ArrayBlockingQueue<>(statusList.length));
1636 try {
1637 // ignore all file status items that are not of interest
1638 for (FileStatus regionStatus : statusList) {
1639 if (null == regionStatus) {
1640 continue;
1643 if (!regionStatus.isDirectory()) {
1644 continue;
1647 Path regionPath = regionStatus.getPath();
1648 if (null == regionPath) {
1649 continue;
1652 tpe.execute(new FSRegionScanner(fs, regionPath,
1653 regionToBestLocalityRSMapping, regionDegreeLocalityMapping));
1655 } finally {
1656 tpe.shutdown();
1657 int threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
1658 60 * 1000);
1659 try {
1660 // here we wait until TPE terminates, which is either naturally or by
1661 // exceptions in the execution of the threads
1662 while (!tpe.awaitTermination(threadWakeFrequency,
1663 TimeUnit.MILLISECONDS)) {
1664 // printing out rough estimate, so as to not introduce
1665 // AtomicInteger
1666 LOG.info("Locality checking is underway: { Scanned Regions : "
1667 + tpe.getCompletedTaskCount() + "/"
1668 + tpe.getTaskCount() + " }");
1670 } catch (InterruptedException e) {
1671 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1675 long overhead = EnvironmentEdgeManager.currentTime() - startTime;
1676 String overheadMsg = "Scan DFS for locality info takes " + overhead + " ms";
1678 LOG.info(overheadMsg);
1682 * Do our short circuit read setup.
1683 * Checks buffer size to use and whether to do checksumming in hbase or hdfs.
1684 * @param conf
1686 public static void setupShortCircuitRead(final Configuration conf) {
1687 // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property.
1688 boolean shortCircuitSkipChecksum =
1689 conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
1690 boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
1691 if (shortCircuitSkipChecksum) {
1692 LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " +
1693 "be set to true." + (useHBaseChecksum ? " HBase checksum doesn't require " +
1694 "it, see https://issues.apache.org/jira/browse/HBASE-6868." : ""));
1695 assert !shortCircuitSkipChecksum; //this will fail if assertions are on
1697 checkShortCircuitReadBufferSize(conf);
1701 * Check if short circuit read buffer size is set and if not, set it to hbase value.
1702 * @param conf
1704 public static void checkShortCircuitReadBufferSize(final Configuration conf) {
1705 final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
1706 final int notSet = -1;
1707 // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
1708 final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
1709 int size = conf.getInt(dfsKey, notSet);
1710 // If a size is set, return -- we will use it.
1711 if (size != notSet) return;
1712 // But short circuit buffer size is normally not set. Put in place the hbase wanted size.
1713 int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
1714 conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
1718 * @param c
1719 * @return The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs.
1720 * @throws IOException
1722 public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c)
1723 throws IOException {
1724 if (!isHDFS(c)) return null;
1725 // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal
1726 // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it
1727 // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients.
1728 final String name = "getHedgedReadMetrics";
1729 DFSClient dfsclient = ((DistributedFileSystem)FileSystem.get(c)).getClient();
1730 Method m;
1731 try {
1732 m = dfsclient.getClass().getDeclaredMethod(name);
1733 } catch (NoSuchMethodException e) {
1734 LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
1735 e.getMessage());
1736 return null;
1737 } catch (SecurityException e) {
1738 LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
1739 e.getMessage());
1740 return null;
1742 m.setAccessible(true);
1743 try {
1744 return (DFSHedgedReadMetrics)m.invoke(dfsclient);
1745 } catch (IllegalAccessException e) {
1746 LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
1747 e.getMessage());
1748 return null;
1749 } catch (IllegalArgumentException e) {
1750 LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
1751 e.getMessage());
1752 return null;
1753 } catch (InvocationTargetException e) {
1754 LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
1755 e.getMessage());
1756 return null;
1760 public static List<Path> copyFilesParallel(FileSystem srcFS, Path src, FileSystem dstFS, Path dst,
1761 Configuration conf, int threads) throws IOException {
1762 ExecutorService pool = Executors.newFixedThreadPool(threads);
1763 List<Future<Void>> futures = new ArrayList<>();
1764 List<Path> traversedPaths;
1765 try {
1766 traversedPaths = copyFiles(srcFS, src, dstFS, dst, conf, pool, futures);
1767 for (Future<Void> future : futures) {
1768 future.get();
1770 } catch (ExecutionException | InterruptedException | IOException e) {
1771 throw new IOException("copy snapshot reference files failed", e);
1772 } finally {
1773 pool.shutdownNow();
1775 return traversedPaths;
1778 private static List<Path> copyFiles(FileSystem srcFS, Path src, FileSystem dstFS, Path dst,
1779 Configuration conf, ExecutorService pool, List<Future<Void>> futures) throws IOException {
1780 List<Path> traversedPaths = new ArrayList<>();
1781 traversedPaths.add(dst);
1782 FileStatus currentFileStatus = srcFS.getFileStatus(src);
1783 if (currentFileStatus.isDirectory()) {
1784 if (!dstFS.mkdirs(dst)) {
1785 throw new IOException("create dir failed: " + dst);
1787 FileStatus[] subPaths = srcFS.listStatus(src);
1788 for (FileStatus subPath : subPaths) {
1789 traversedPaths.addAll(copyFiles(srcFS, subPath.getPath(), dstFS,
1790 new Path(dst, subPath.getPath().getName()), conf, pool, futures));
1792 } else {
1793 Future<Void> future = pool.submit(() -> {
1794 FileUtil.copy(srcFS, src, dstFS, dst, false, false, conf);
1795 return null;
1797 futures.add(future);
1799 return traversedPaths;