HBASE-26304 Reflect out of band locality improvements in metrics and balancer (#3803)
[hbase.git] / hbase-server / src / main / java / org / apache / hadoop / hbase / util / FSUtils.java
blob461170de6a15a9b9cb1219ff30e20cffa54c7525
1 /**
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org.apache.hadoop.hbase.util;
21 import static org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET;
23 import edu.umd.cs.findbugs.annotations.CheckForNull;
24 import java.io.ByteArrayInputStream;
25 import java.io.DataInputStream;
26 import java.io.EOFException;
27 import java.io.FileNotFoundException;
28 import java.io.IOException;
29 import java.io.InterruptedIOException;
30 import java.lang.reflect.InvocationTargetException;
31 import java.lang.reflect.Method;
32 import java.net.InetSocketAddress;
33 import java.net.URI;
34 import java.util.ArrayList;
35 import java.util.Arrays;
36 import java.util.Collection;
37 import java.util.Collections;
38 import java.util.HashMap;
39 import java.util.HashSet;
40 import java.util.Iterator;
41 import java.util.List;
42 import java.util.Locale;
43 import java.util.Map;
44 import java.util.Optional;
45 import java.util.Set;
46 import java.util.Vector;
47 import java.util.concurrent.ConcurrentHashMap;
48 import java.util.concurrent.ExecutionException;
49 import java.util.concurrent.ExecutorService;
50 import java.util.concurrent.Executors;
51 import java.util.concurrent.Future;
52 import java.util.concurrent.FutureTask;
53 import java.util.concurrent.ThreadPoolExecutor;
54 import java.util.concurrent.TimeUnit;
55 import java.util.regex.Pattern;
56 import org.apache.commons.lang3.ArrayUtils;
57 import org.apache.hadoop.conf.Configuration;
58 import org.apache.hadoop.fs.BlockLocation;
59 import org.apache.hadoop.fs.FSDataInputStream;
60 import org.apache.hadoop.fs.FSDataOutputStream;
61 import org.apache.hadoop.fs.FileStatus;
62 import org.apache.hadoop.fs.FileSystem;
63 import org.apache.hadoop.fs.FileUtil;
64 import org.apache.hadoop.fs.Path;
65 import org.apache.hadoop.fs.PathFilter;
66 import org.apache.hadoop.fs.StorageType;
67 import org.apache.hadoop.fs.permission.FsPermission;
68 import org.apache.hadoop.hbase.ClusterId;
69 import org.apache.hadoop.hbase.HConstants;
70 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
71 import org.apache.hadoop.hbase.TableName;
72 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
73 import org.apache.hadoop.hbase.client.RegionInfo;
74 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
75 import org.apache.hadoop.hbase.exceptions.DeserializationException;
76 import org.apache.hadoop.hbase.fs.HFileSystem;
77 import org.apache.hadoop.hbase.io.HFileLink;
78 import org.apache.hadoop.hbase.master.HMaster;
79 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
80 import org.apache.hadoop.hdfs.DFSClient;
81 import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
82 import org.apache.hadoop.hdfs.DFSUtil;
83 import org.apache.hadoop.hdfs.DistributedFileSystem;
84 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
85 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
86 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
87 import org.apache.hadoop.io.IOUtils;
88 import org.apache.hadoop.ipc.RemoteException;
89 import org.apache.hadoop.util.Progressable;
90 import org.apache.yetus.audience.InterfaceAudience;
91 import org.slf4j.Logger;
92 import org.slf4j.LoggerFactory;
94 import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
95 import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
96 import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
97 import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
98 import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
100 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
101 import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos;
104 * Utility methods for interacting with the underlying file system.
106 @InterfaceAudience.Private
107 public final class FSUtils {
108 private static final Logger LOG = LoggerFactory.getLogger(FSUtils.class);
110 private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize";
111 private static final int DEFAULT_THREAD_POOLSIZE = 2;
113 /** Set to true on Windows platforms */
114 // currently only used in testing. TODO refactor into a test class
115 public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
117 private FSUtils() {
121 * @return True is <code>fs</code> is instance of DistributedFileSystem
122 * @throws IOException
124 public static boolean isDistributedFileSystem(final FileSystem fs) throws IOException {
125 FileSystem fileSystem = fs;
126 // If passed an instance of HFileSystem, it fails instanceof DistributedFileSystem.
127 // Check its backing fs for dfs-ness.
128 if (fs instanceof HFileSystem) {
129 fileSystem = ((HFileSystem)fs).getBackingFs();
131 return fileSystem instanceof DistributedFileSystem;
135 * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
136 * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider
137 * schema; i.e. if schemas different but path or subpath matches, the two will equate.
138 * @param pathToSearch Path we will be trying to match.
139 * @param pathTail
140 * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
142 public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
143 Path tailPath = pathTail;
144 String tailName;
145 Path toSearch = pathToSearch;
146 String toSearchName;
147 boolean result = false;
149 if (pathToSearch.depth() != pathTail.depth()) {
150 return false;
153 do {
154 tailName = tailPath.getName();
155 if (tailName == null || tailName.isEmpty()) {
156 result = true;
157 break;
159 toSearchName = toSearch.getName();
160 if (toSearchName == null || toSearchName.isEmpty()) {
161 break;
163 // Move up a parent on each path for next go around. Path doesn't let us go off the end.
164 tailPath = tailPath.getParent();
165 toSearch = toSearch.getParent();
166 } while(tailName.equals(toSearchName));
167 return result;
171 * Delete the region directory if exists.
172 * @return True if deleted the region directory.
173 * @throws IOException
175 public static boolean deleteRegionDir(final Configuration conf, final RegionInfo hri)
176 throws IOException {
177 Path rootDir = CommonFSUtils.getRootDir(conf);
178 FileSystem fs = rootDir.getFileSystem(conf);
179 return CommonFSUtils.deleteDirectory(fs,
180 new Path(CommonFSUtils.getTableDir(rootDir, hri.getTable()), hri.getEncodedName()));
184 * Create the specified file on the filesystem. By default, this will:
185 * <ol>
186 * <li>overwrite the file if it exists</li>
187 * <li>apply the umask in the configuration (if it is enabled)</li>
188 * <li>use the fs configured buffer size (or 4096 if not set)</li>
189 * <li>use the configured column family replication or default replication if
190 * {@link ColumnFamilyDescriptorBuilder#DEFAULT_DFS_REPLICATION}</li>
191 * <li>use the default block size</li>
192 * <li>not track progress</li>
193 * </ol>
194 * @param conf configurations
195 * @param fs {@link FileSystem} on which to write the file
196 * @param path {@link Path} to the file to write
197 * @param perm permissions
198 * @param favoredNodes favored data nodes
199 * @return output stream to the created file
200 * @throws IOException if the file cannot be created
202 public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path,
203 FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException {
204 if (fs instanceof HFileSystem) {
205 FileSystem backingFs = ((HFileSystem) fs).getBackingFs();
206 if (backingFs instanceof DistributedFileSystem) {
207 // Try to use the favoredNodes version via reflection to allow backwards-
208 // compatibility.
209 short replication = Short.parseShort(conf.get(ColumnFamilyDescriptorBuilder.DFS_REPLICATION,
210 String.valueOf(ColumnFamilyDescriptorBuilder.DEFAULT_DFS_REPLICATION)));
211 try {
212 return (FSDataOutputStream) (DistributedFileSystem.class
213 .getDeclaredMethod("create", Path.class, FsPermission.class, boolean.class, int.class,
214 short.class, long.class, Progressable.class, InetSocketAddress[].class)
215 .invoke(backingFs, path, perm, true, CommonFSUtils.getDefaultBufferSize(backingFs),
216 replication > 0 ? replication : CommonFSUtils.getDefaultReplication(backingFs, path),
217 CommonFSUtils.getDefaultBlockSize(backingFs, path), null, favoredNodes));
218 } catch (InvocationTargetException ite) {
219 // Function was properly called, but threw it's own exception.
220 throw new IOException(ite.getCause());
221 } catch (NoSuchMethodException e) {
222 LOG.debug("DFS Client does not support most favored nodes create; using default create");
223 LOG.trace("Ignoring; use default create", e);
224 } catch (IllegalArgumentException | SecurityException | IllegalAccessException e) {
225 LOG.debug("Ignoring (most likely Reflection related exception) " + e);
229 return CommonFSUtils.create(fs, path, perm, true);
233 * Checks to see if the specified file system is available
235 * @param fs filesystem
236 * @throws IOException e
238 public static void checkFileSystemAvailable(final FileSystem fs)
239 throws IOException {
240 if (!(fs instanceof DistributedFileSystem)) {
241 return;
243 IOException exception = null;
244 DistributedFileSystem dfs = (DistributedFileSystem) fs;
245 try {
246 if (dfs.exists(new Path("/"))) {
247 return;
249 } catch (IOException e) {
250 exception = e instanceof RemoteException ?
251 ((RemoteException)e).unwrapRemoteException() : e;
253 try {
254 fs.close();
255 } catch (Exception e) {
256 LOG.error("file system close failed: ", e);
258 throw new IOException("File system is not available", exception);
262 * Inquire the Active NameNode's safe mode status.
264 * @param dfs A DistributedFileSystem object representing the underlying HDFS.
265 * @return whether we're in safe mode
266 * @throws IOException
268 private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOException {
269 return dfs.setSafeMode(SAFEMODE_GET, true);
273 * Check whether dfs is in safemode.
274 * @param conf
275 * @throws IOException
277 public static void checkDfsSafeMode(final Configuration conf)
278 throws IOException {
279 boolean isInSafeMode = false;
280 FileSystem fs = FileSystem.get(conf);
281 if (fs instanceof DistributedFileSystem) {
282 DistributedFileSystem dfs = (DistributedFileSystem)fs;
283 isInSafeMode = isInSafeMode(dfs);
285 if (isInSafeMode) {
286 throw new IOException("File system is in safemode, it can't be written now");
291 * Verifies current version of file system
293 * @param fs filesystem object
294 * @param rootdir root hbase directory
295 * @return null if no version file exists, version string otherwise
296 * @throws IOException if the version file fails to open
297 * @throws DeserializationException if the version data cannot be translated into a version
299 public static String getVersion(FileSystem fs, Path rootdir)
300 throws IOException, DeserializationException {
301 final Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
302 FileStatus[] status = null;
303 try {
304 // hadoop 2.0 throws FNFE if directory does not exist.
305 // hadoop 1.0 returns null if directory does not exist.
306 status = fs.listStatus(versionFile);
307 } catch (FileNotFoundException fnfe) {
308 return null;
310 if (ArrayUtils.getLength(status) == 0) {
311 return null;
313 String version = null;
314 byte [] content = new byte [(int)status[0].getLen()];
315 FSDataInputStream s = fs.open(versionFile);
316 try {
317 IOUtils.readFully(s, content, 0, content.length);
318 if (ProtobufUtil.isPBMagicPrefix(content)) {
319 version = parseVersionFrom(content);
320 } else {
321 // Presume it pre-pb format.
322 try (DataInputStream dis = new DataInputStream(new ByteArrayInputStream(content))) {
323 version = dis.readUTF();
326 } catch (EOFException eof) {
327 LOG.warn("Version file was empty, odd, will try to set it.");
328 } finally {
329 s.close();
331 return version;
335 * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file.
336 * @param bytes The byte content of the hbase.version file
337 * @return The version found in the file as a String
338 * @throws DeserializationException if the version data cannot be translated into a version
340 static String parseVersionFrom(final byte [] bytes)
341 throws DeserializationException {
342 ProtobufUtil.expectPBMagicPrefix(bytes);
343 int pblen = ProtobufUtil.lengthOfPBMagic();
344 FSProtos.HBaseVersionFileContent.Builder builder =
345 FSProtos.HBaseVersionFileContent.newBuilder();
346 try {
347 ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
348 return builder.getVersion();
349 } catch (IOException e) {
350 // Convert
351 throw new DeserializationException(e);
356 * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file.
357 * @param version Version to persist
358 * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a prefix.
360 static byte [] toVersionByteArray(final String version) {
361 FSProtos.HBaseVersionFileContent.Builder builder =
362 FSProtos.HBaseVersionFileContent.newBuilder();
363 return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray());
367 * Verifies current version of file system
369 * @param fs file system
370 * @param rootdir root directory of HBase installation
371 * @param message if true, issues a message on System.out
372 * @throws IOException if the version file cannot be opened
373 * @throws DeserializationException if the contents of the version file cannot be parsed
375 public static void checkVersion(FileSystem fs, Path rootdir, boolean message)
376 throws IOException, DeserializationException {
377 checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
381 * Verifies current version of file system
383 * @param fs file system
384 * @param rootdir root directory of HBase installation
385 * @param message if true, issues a message on System.out
386 * @param wait wait interval
387 * @param retries number of times to retry
389 * @throws IOException if the version file cannot be opened
390 * @throws DeserializationException if the contents of the version file cannot be parsed
392 public static void checkVersion(FileSystem fs, Path rootdir,
393 boolean message, int wait, int retries)
394 throws IOException, DeserializationException {
395 String version = getVersion(fs, rootdir);
396 String msg;
397 if (version == null) {
398 if (!metaRegionExists(fs, rootdir)) {
399 // rootDir is empty (no version file and no root region)
400 // just create new version file (HBASE-1195)
401 setVersion(fs, rootdir, wait, retries);
402 return;
403 } else {
404 msg = "hbase.version file is missing. Is your hbase.rootdir valid? " +
405 "You can restore hbase.version file by running 'HBCK2 filesystem -fix'. " +
406 "See https://github.com/apache/hbase-operator-tools/tree/master/hbase-hbck2";
408 } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) {
409 return;
410 } else {
411 msg = "HBase file layout needs to be upgraded. Current filesystem version is " + version +
412 " but software requires version " + HConstants.FILE_SYSTEM_VERSION +
413 ". Consult http://hbase.apache.org/book.html for further information about " +
414 "upgrading HBase.";
417 // version is deprecated require migration
418 // Output on stdout so user sees it in terminal.
419 if (message) {
420 System.out.println("WARNING! " + msg);
422 throw new FileSystemVersionException(msg);
426 * Sets version of file system
428 * @param fs filesystem object
429 * @param rootdir hbase root
430 * @throws IOException e
432 public static void setVersion(FileSystem fs, Path rootdir)
433 throws IOException {
434 setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0,
435 HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
439 * Sets version of file system
441 * @param fs filesystem object
442 * @param rootdir hbase root
443 * @param wait time to wait for retry
444 * @param retries number of times to retry before failing
445 * @throws IOException e
447 public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries)
448 throws IOException {
449 setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries);
454 * Sets version of file system
456 * @param fs filesystem object
457 * @param rootdir hbase root directory
458 * @param version version to set
459 * @param wait time to wait for retry
460 * @param retries number of times to retry before throwing an IOException
461 * @throws IOException e
463 public static void setVersion(FileSystem fs, Path rootdir, String version,
464 int wait, int retries) throws IOException {
465 Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
466 Path tempVersionFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR +
467 HConstants.VERSION_FILE_NAME);
468 while (true) {
469 try {
470 // Write the version to a temporary file
471 FSDataOutputStream s = fs.create(tempVersionFile);
472 try {
473 s.write(toVersionByteArray(version));
474 s.close();
475 s = null;
476 // Move the temp version file to its normal location. Returns false
477 // if the rename failed. Throw an IOE in that case.
478 if (!fs.rename(tempVersionFile, versionFile)) {
479 throw new IOException("Unable to move temp version file to " + versionFile);
481 } finally {
482 // Cleaning up the temporary if the rename failed would be trying
483 // too hard. We'll unconditionally create it again the next time
484 // through anyway, files are overwritten by default by create().
486 // Attempt to close the stream on the way out if it is still open.
487 try {
488 if (s != null) s.close();
489 } catch (IOException ignore) { }
491 LOG.info("Created version file at " + rootdir.toString() + " with version=" + version);
492 return;
493 } catch (IOException e) {
494 if (retries > 0) {
495 LOG.debug("Unable to create version file at " + rootdir.toString() + ", retrying", e);
496 fs.delete(versionFile, false);
497 try {
498 if (wait > 0) {
499 Thread.sleep(wait);
501 } catch (InterruptedException ie) {
502 throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
504 retries--;
505 } else {
506 throw e;
513 * Checks that a cluster ID file exists in the HBase root directory
514 * @param fs the root directory FileSystem
515 * @param rootdir the HBase root directory in HDFS
516 * @param wait how long to wait between retries
517 * @return <code>true</code> if the file exists, otherwise <code>false</code>
518 * @throws IOException if checking the FileSystem fails
520 public static boolean checkClusterIdExists(FileSystem fs, Path rootdir,
521 long wait) throws IOException {
522 while (true) {
523 try {
524 Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
525 return fs.exists(filePath);
526 } catch (IOException ioe) {
527 if (wait > 0L) {
528 LOG.warn("Unable to check cluster ID file in {}, retrying in {}ms", rootdir, wait, ioe);
529 try {
530 Thread.sleep(wait);
531 } catch (InterruptedException e) {
532 Thread.currentThread().interrupt();
533 throw (InterruptedIOException) new InterruptedIOException().initCause(e);
535 } else {
536 throw ioe;
543 * Returns the value of the unique cluster ID stored for this HBase instance.
544 * @param fs the root directory FileSystem
545 * @param rootdir the path to the HBase root directory
546 * @return the unique cluster identifier
547 * @throws IOException if reading the cluster ID file fails
549 public static ClusterId getClusterId(FileSystem fs, Path rootdir)
550 throws IOException {
551 Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
552 ClusterId clusterId = null;
553 FileStatus status = fs.exists(idPath)? fs.getFileStatus(idPath): null;
554 if (status != null) {
555 int len = Ints.checkedCast(status.getLen());
556 byte [] content = new byte[len];
557 FSDataInputStream in = fs.open(idPath);
558 try {
559 in.readFully(content);
560 } catch (EOFException eof) {
561 LOG.warn("Cluster ID file {} is empty", idPath);
562 } finally{
563 in.close();
565 try {
566 clusterId = ClusterId.parseFrom(content);
567 } catch (DeserializationException e) {
568 throw new IOException("content=" + Bytes.toString(content), e);
570 // If not pb'd, make it so.
571 if (!ProtobufUtil.isPBMagicPrefix(content)) {
572 String cid = null;
573 in = fs.open(idPath);
574 try {
575 cid = in.readUTF();
576 clusterId = new ClusterId(cid);
577 } catch (EOFException eof) {
578 LOG.warn("Cluster ID file {} is empty", idPath);
579 } finally {
580 in.close();
582 rewriteAsPb(fs, rootdir, idPath, clusterId);
584 return clusterId;
585 } else {
586 LOG.warn("Cluster ID file does not exist at {}", idPath);
588 return clusterId;
592 * @param cid
593 * @throws IOException
595 private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p,
596 final ClusterId cid)
597 throws IOException {
598 // Rewrite the file as pb. Move aside the old one first, write new
599 // then delete the moved-aside file.
600 Path movedAsideName = new Path(p + "." + EnvironmentEdgeManager.currentTime());
601 if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p);
602 setClusterId(fs, rootdir, cid, 100);
603 if (!fs.delete(movedAsideName, false)) {
604 throw new IOException("Failed delete of " + movedAsideName);
606 LOG.debug("Rewrote the hbase.id file as pb");
610 * Writes a new unique identifier for this cluster to the "hbase.id" file in the HBase root
611 * directory. If any operations on the ID file fails, and {@code wait} is a positive value, the
612 * method will retry to produce the ID file until the thread is forcibly interrupted.
614 * @param fs the root directory FileSystem
615 * @param rootdir the path to the HBase root directory
616 * @param clusterId the unique identifier to store
617 * @param wait how long (in milliseconds) to wait between retries
618 * @throws IOException if writing to the FileSystem fails and no wait value
620 public static void setClusterId(final FileSystem fs, final Path rootdir,
621 final ClusterId clusterId, final long wait) throws IOException {
623 final Path idFile = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
624 final Path tempDir = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY);
625 final Path tempIdFile = new Path(tempDir, HConstants.CLUSTER_ID_FILE_NAME);
627 LOG.debug("Create cluster ID file [{}] with ID: {}", idFile, clusterId);
629 while (true) {
630 Optional<IOException> failure = Optional.empty();
632 LOG.debug("Write the cluster ID file to a temporary location: {}", tempIdFile);
633 try (FSDataOutputStream s = fs.create(tempIdFile)) {
634 s.write(clusterId.toByteArray());
635 } catch (IOException ioe) {
636 failure = Optional.of(ioe);
639 if (!failure.isPresent()) {
640 try {
641 LOG.debug("Move the temporary cluster ID file to its target location [{}]:[{}]",
642 tempIdFile, idFile);
644 if (!fs.rename(tempIdFile, idFile)) {
645 failure =
646 Optional.of(new IOException("Unable to move temp cluster ID file to " + idFile));
648 } catch (IOException ioe) {
649 failure = Optional.of(ioe);
653 if (failure.isPresent()) {
654 final IOException cause = failure.get();
655 if (wait > 0L) {
656 LOG.warn("Unable to create cluster ID file in {}, retrying in {}ms", rootdir, wait,
657 cause);
658 try {
659 Thread.sleep(wait);
660 } catch (InterruptedException e) {
661 Thread.currentThread().interrupt();
662 throw (InterruptedIOException) new InterruptedIOException().initCause(e);
664 continue;
665 } else {
666 throw cause;
668 } else {
669 return;
675 * If DFS, check safe mode and if so, wait until we clear it.
676 * @param conf configuration
677 * @param wait Sleep between retries
678 * @throws IOException e
680 public static void waitOnSafeMode(final Configuration conf,
681 final long wait)
682 throws IOException {
683 FileSystem fs = FileSystem.get(conf);
684 if (!(fs instanceof DistributedFileSystem)) return;
685 DistributedFileSystem dfs = (DistributedFileSystem)fs;
686 // Make sure dfs is not in safe mode
687 while (isInSafeMode(dfs)) {
688 LOG.info("Waiting for dfs to exit safe mode...");
689 try {
690 Thread.sleep(wait);
691 } catch (InterruptedException e) {
692 Thread.currentThread().interrupt();
693 throw (InterruptedIOException) new InterruptedIOException().initCause(e);
699 * Checks if meta region exists
700 * @param fs file system
701 * @param rootDir root directory of HBase installation
702 * @return true if exists
704 public static boolean metaRegionExists(FileSystem fs, Path rootDir) throws IOException {
705 Path metaRegionDir = getRegionDirFromRootDir(rootDir, RegionInfoBuilder.FIRST_META_REGIONINFO);
706 return fs.exists(metaRegionDir);
710 * Compute HDFS block distribution of a given HdfsDataInputStream. All HdfsDataInputStreams
711 * are backed by a series of LocatedBlocks, which are fetched periodically from the namenode.
712 * This method retrieves those blocks from the input stream and uses them to calculate
713 * HDFSBlockDistribution.
715 * The underlying method in DFSInputStream does attempt to use locally cached blocks, but
716 * may hit the namenode if the cache is determined to be incomplete. The method also involves
717 * making copies of all LocatedBlocks rather than return the underlying blocks themselves.
719 static public HDFSBlocksDistribution computeHDFSBlocksDistribution(
720 HdfsDataInputStream inputStream) throws IOException {
721 List<LocatedBlock> blocks = inputStream.getAllBlocks();
722 HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
723 for (LocatedBlock block : blocks) {
724 String[] hosts = getHostsForLocations(block);
725 long len = block.getBlockSize();
726 StorageType[] storageTypes = block.getStorageTypes();
727 blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes);
729 return blocksDistribution;
732 private static String[] getHostsForLocations(LocatedBlock block) {
733 DatanodeInfo[] locations = block.getLocations();
734 String[] hosts = new String[locations.length];
735 for (int i = 0; i < hosts.length; i++) {
736 hosts[i] = locations[i].getHostName();
738 return hosts;
742 * Compute HDFS blocks distribution of a given file, or a portion of the file
743 * @param fs file system
744 * @param status file status of the file
745 * @param start start position of the portion
746 * @param length length of the portion
747 * @return The HDFS blocks distribution
749 static public HDFSBlocksDistribution computeHDFSBlocksDistribution(
750 final FileSystem fs, FileStatus status, long start, long length)
751 throws IOException {
752 HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
753 BlockLocation [] blockLocations =
754 fs.getFileBlockLocations(status, start, length);
755 addToHDFSBlocksDistribution(blocksDistribution, blockLocations);
756 return blocksDistribution;
760 * Update blocksDistribution with blockLocations
761 * @param blocksDistribution the hdfs blocks distribution
762 * @param blockLocations an array containing block location
764 static public void addToHDFSBlocksDistribution(
765 HDFSBlocksDistribution blocksDistribution, BlockLocation[] blockLocations)
766 throws IOException {
767 for (BlockLocation bl : blockLocations) {
768 String[] hosts = bl.getHosts();
769 long len = bl.getLength();
770 StorageType[] storageTypes = bl.getStorageTypes();
771 blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes);
775 // TODO move this method OUT of FSUtils. No dependencies to HMaster
777 * Returns the total overall fragmentation percentage. Includes hbase:meta and
778 * -ROOT- as well.
780 * @param master The master defining the HBase root and file system
781 * @return A map for each table and its percentage (never null)
782 * @throws IOException When scanning the directory fails
784 public static int getTotalTableFragmentation(final HMaster master)
785 throws IOException {
786 Map<String, Integer> map = getTableFragmentation(master);
787 return map.isEmpty() ? -1 : map.get("-TOTAL-");
791 * Runs through the HBase rootdir and checks how many stores for each table
792 * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
793 * percentage across all tables is stored under the special key "-TOTAL-".
795 * @param master The master defining the HBase root and file system.
796 * @return A map for each table and its percentage (never null).
798 * @throws IOException When scanning the directory fails.
800 public static Map<String, Integer> getTableFragmentation(final HMaster master)
801 throws IOException {
802 Path path = CommonFSUtils.getRootDir(master.getConfiguration());
803 // since HMaster.getFileSystem() is package private
804 FileSystem fs = path.getFileSystem(master.getConfiguration());
805 return getTableFragmentation(fs, path);
809 * Runs through the HBase rootdir and checks how many stores for each table
810 * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
811 * percentage across all tables is stored under the special key "-TOTAL-".
813 * @param fs The file system to use
814 * @param hbaseRootDir The root directory to scan
815 * @return A map for each table and its percentage (never null)
816 * @throws IOException When scanning the directory fails
818 public static Map<String, Integer> getTableFragmentation(
819 final FileSystem fs, final Path hbaseRootDir)
820 throws IOException {
821 Map<String, Integer> frags = new HashMap<>();
822 int cfCountTotal = 0;
823 int cfFragTotal = 0;
824 PathFilter regionFilter = new RegionDirFilter(fs);
825 PathFilter familyFilter = new FamilyDirFilter(fs);
826 List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
827 for (Path d : tableDirs) {
828 int cfCount = 0;
829 int cfFrag = 0;
830 FileStatus[] regionDirs = fs.listStatus(d, regionFilter);
831 for (FileStatus regionDir : regionDirs) {
832 Path dd = regionDir.getPath();
833 // else its a region name, now look in region for families
834 FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
835 for (FileStatus familyDir : familyDirs) {
836 cfCount++;
837 cfCountTotal++;
838 Path family = familyDir.getPath();
839 // now in family make sure only one file
840 FileStatus[] familyStatus = fs.listStatus(family);
841 if (familyStatus.length > 1) {
842 cfFrag++;
843 cfFragTotal++;
847 // compute percentage per table and store in result list
848 frags.put(CommonFSUtils.getTableName(d).getNameAsString(),
849 cfCount == 0? 0: Math.round((float) cfFrag / cfCount * 100));
851 // set overall percentage for all tables
852 frags.put("-TOTAL-",
853 cfCountTotal == 0? 0: Math.round((float) cfFragTotal / cfCountTotal * 100));
854 return frags;
857 public static void renameFile(FileSystem fs, Path src, Path dst) throws IOException {
858 if (fs.exists(dst) && !fs.delete(dst, false)) {
859 throw new IOException("Can not delete " + dst);
861 if (!fs.rename(src, dst)) {
862 throw new IOException("Can not rename from " + src + " to " + dst);
867 * A {@link PathFilter} that returns only regular files.
869 static class FileFilter extends AbstractFileStatusFilter {
870 private final FileSystem fs;
872 public FileFilter(final FileSystem fs) {
873 this.fs = fs;
876 @Override
877 protected boolean accept(Path p, @CheckForNull Boolean isDir) {
878 try {
879 return isFile(fs, isDir, p);
880 } catch (IOException e) {
881 LOG.warn("Unable to verify if path={} is a regular file", p, e);
882 return false;
888 * Directory filter that doesn't include any of the directories in the specified blacklist
890 public static class BlackListDirFilter extends AbstractFileStatusFilter {
891 private final FileSystem fs;
892 private List<String> blacklist;
895 * Create a filter on the givem filesystem with the specified blacklist
896 * @param fs filesystem to filter
897 * @param directoryNameBlackList list of the names of the directories to filter. If
898 * <tt>null</tt>, all directories are returned
900 @SuppressWarnings("unchecked")
901 public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) {
902 this.fs = fs;
903 blacklist =
904 (List<String>) (directoryNameBlackList == null ? Collections.emptyList()
905 : directoryNameBlackList);
908 @Override
909 protected boolean accept(Path p, @CheckForNull Boolean isDir) {
910 if (!isValidName(p.getName())) {
911 return false;
914 try {
915 return isDirectory(fs, isDir, p);
916 } catch (IOException e) {
917 LOG.warn("An error occurred while verifying if [{}] is a valid directory."
918 + " Returning 'not valid' and continuing.", p, e);
919 return false;
923 protected boolean isValidName(final String name) {
924 return !blacklist.contains(name);
929 * A {@link PathFilter} that only allows directories.
931 public static class DirFilter extends BlackListDirFilter {
933 public DirFilter(FileSystem fs) {
934 super(fs, null);
939 * A {@link PathFilter} that returns usertable directories. To get all directories use the
940 * {@link BlackListDirFilter} with a <tt>null</tt> blacklist
942 public static class UserTableDirFilter extends BlackListDirFilter {
943 public UserTableDirFilter(FileSystem fs) {
944 super(fs, HConstants.HBASE_NON_TABLE_DIRS);
947 @Override
948 protected boolean isValidName(final String name) {
949 if (!super.isValidName(name))
950 return false;
952 try {
953 TableName.isLegalTableQualifierName(Bytes.toBytes(name));
954 } catch (IllegalArgumentException e) {
955 LOG.info("Invalid table name: {}", name);
956 return false;
958 return true;
962 public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir)
963 throws IOException {
964 List<Path> tableDirs = new ArrayList<>();
965 Path baseNamespaceDir = new Path(rootdir, HConstants.BASE_NAMESPACE_DIR);
966 if (fs.exists(baseNamespaceDir)) {
967 for (FileStatus status : fs.globStatus(new Path(baseNamespaceDir, "*"))) {
968 tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath()));
971 return tableDirs;
975 * @param fs
976 * @param rootdir
977 * @return All the table directories under <code>rootdir</code>. Ignore non table hbase folders such as
978 * .logs, .oldlogs, .corrupt folders.
979 * @throws IOException
981 public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir)
982 throws IOException {
983 // presumes any directory under hbase.rootdir is a table
984 FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs));
985 List<Path> tabledirs = new ArrayList<>(dirs.length);
986 for (FileStatus dir: dirs) {
987 tabledirs.add(dir.getPath());
989 return tabledirs;
993 * Filter for all dirs that don't start with '.'
995 public static class RegionDirFilter extends AbstractFileStatusFilter {
996 // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names.
997 final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$");
998 final FileSystem fs;
1000 public RegionDirFilter(FileSystem fs) {
1001 this.fs = fs;
1004 @Override
1005 protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1006 if (!regionDirPattern.matcher(p.getName()).matches()) {
1007 return false;
1010 try {
1011 return isDirectory(fs, isDir, p);
1012 } catch (IOException ioe) {
1013 // Maybe the file was moved or the fs was disconnected.
1014 LOG.warn("Skipping file {} due to IOException", p, ioe);
1015 return false;
1021 * Given a particular table dir, return all the regiondirs inside it, excluding files such as
1022 * .tableinfo
1023 * @param fs A file system for the Path
1024 * @param tableDir Path to a specific table directory &lt;hbase.rootdir&gt;/&lt;tabledir&gt;
1025 * @return List of paths to valid region directories in table dir.
1026 * @throws IOException
1028 public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir) throws IOException {
1029 // assumes we are in a table dir.
1030 List<FileStatus> rds = listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
1031 if (rds == null) {
1032 return Collections.emptyList();
1034 List<Path> regionDirs = new ArrayList<>(rds.size());
1035 for (FileStatus rdfs: rds) {
1036 Path rdPath = rdfs.getPath();
1037 regionDirs.add(rdPath);
1039 return regionDirs;
1042 public static Path getRegionDirFromRootDir(Path rootDir, RegionInfo region) {
1043 return getRegionDirFromTableDir(CommonFSUtils.getTableDir(rootDir, region.getTable()), region);
1046 public static Path getRegionDirFromTableDir(Path tableDir, RegionInfo region) {
1047 return getRegionDirFromTableDir(tableDir,
1048 ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName());
1051 public static Path getRegionDirFromTableDir(Path tableDir, String encodedRegionName) {
1052 return new Path(tableDir, encodedRegionName);
1056 * Filter for all dirs that are legal column family names. This is generally used for colfam
1057 * dirs &lt;hbase.rootdir&gt;/&lt;tabledir&gt;/&lt;regiondir&gt;/&lt;colfamdir&gt;.
1059 public static class FamilyDirFilter extends AbstractFileStatusFilter {
1060 final FileSystem fs;
1062 public FamilyDirFilter(FileSystem fs) {
1063 this.fs = fs;
1066 @Override
1067 protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1068 try {
1069 // throws IAE if invalid
1070 ColumnFamilyDescriptorBuilder.isLegalColumnFamilyName(Bytes.toBytes(p.getName()));
1071 } catch (IllegalArgumentException iae) {
1072 // path name is an invalid family name and thus is excluded.
1073 return false;
1076 try {
1077 return isDirectory(fs, isDir, p);
1078 } catch (IOException ioe) {
1079 // Maybe the file was moved or the fs was disconnected.
1080 LOG.warn("Skipping file {} due to IOException", p, ioe);
1081 return false;
1087 * Given a particular region dir, return all the familydirs inside it
1089 * @param fs A file system for the Path
1090 * @param regionDir Path to a specific region directory
1091 * @return List of paths to valid family directories in region dir.
1092 * @throws IOException
1094 public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir)
1095 throws IOException {
1096 // assumes we are in a region dir.
1097 return getFilePaths(fs, regionDir, new FamilyDirFilter(fs));
1100 public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir)
1101 throws IOException {
1102 return getFilePaths(fs, familyDir, new ReferenceFileFilter(fs));
1105 public static List<Path> getReferenceAndLinkFilePaths(final FileSystem fs, final Path familyDir)
1106 throws IOException {
1107 return getFilePaths(fs, familyDir, new ReferenceAndLinkFileFilter(fs));
1110 private static List<Path> getFilePaths(final FileSystem fs, final Path dir,
1111 final PathFilter pathFilter) throws IOException {
1112 FileStatus[] fds = fs.listStatus(dir, pathFilter);
1113 List<Path> files = new ArrayList<>(fds.length);
1114 for (FileStatus fdfs: fds) {
1115 Path fdPath = fdfs.getPath();
1116 files.add(fdPath);
1118 return files;
1121 public static int getRegionReferenceAndLinkFileCount(final FileSystem fs, final Path p) {
1122 int result = 0;
1123 try {
1124 for (Path familyDir : getFamilyDirs(fs, p)) {
1125 result += getReferenceAndLinkFilePaths(fs, familyDir).size();
1127 } catch (IOException e) {
1128 LOG.warn("Error Counting reference files.", e);
1130 return result;
1133 public static class ReferenceAndLinkFileFilter implements PathFilter {
1135 private final FileSystem fs;
1137 public ReferenceAndLinkFileFilter(FileSystem fs) {
1138 this.fs = fs;
1141 @Override
1142 public boolean accept(Path rd) {
1143 try {
1144 // only files can be references.
1145 return !fs.getFileStatus(rd).isDirectory() && (StoreFileInfo.isReference(rd) ||
1146 HFileLink.isHFileLink(rd));
1147 } catch (IOException ioe) {
1148 // Maybe the file was moved or the fs was disconnected.
1149 LOG.warn("Skipping file " + rd +" due to IOException", ioe);
1150 return false;
1156 * Filter for HFiles that excludes reference files.
1158 public static class HFileFilter extends AbstractFileStatusFilter {
1159 final FileSystem fs;
1161 public HFileFilter(FileSystem fs) {
1162 this.fs = fs;
1165 @Override
1166 protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1167 if (!StoreFileInfo.isHFile(p) && !StoreFileInfo.isMobFile(p)) {
1168 return false;
1171 try {
1172 return isFile(fs, isDir, p);
1173 } catch (IOException ioe) {
1174 // Maybe the file was moved or the fs was disconnected.
1175 LOG.warn("Skipping file {} due to IOException", p, ioe);
1176 return false;
1182 * Filter for HFileLinks (StoreFiles and HFiles not included).
1183 * the filter itself does not consider if a link is file or not.
1185 public static class HFileLinkFilter implements PathFilter {
1187 @Override
1188 public boolean accept(Path p) {
1189 return HFileLink.isHFileLink(p);
1193 public static class ReferenceFileFilter extends AbstractFileStatusFilter {
1195 private final FileSystem fs;
1197 public ReferenceFileFilter(FileSystem fs) {
1198 this.fs = fs;
1201 @Override
1202 protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1203 if (!StoreFileInfo.isReference(p)) {
1204 return false;
1207 try {
1208 // only files can be references.
1209 return isFile(fs, isDir, p);
1210 } catch (IOException ioe) {
1211 // Maybe the file was moved or the fs was disconnected.
1212 LOG.warn("Skipping file {} due to IOException", p, ioe);
1213 return false;
1219 * Called every so-often by storefile map builder getTableStoreFilePathMap to
1220 * report progress.
1222 interface ProgressReporter {
1224 * @param status File or directory we are about to process.
1226 void progress(FileStatus status);
1230 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1231 * table StoreFile names to the full Path.
1232 * <br>
1233 * Example...<br>
1234 * Key = 3944417774205889744 <br>
1235 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1237 * @param map map to add values. If null, this method will create and populate one to return
1238 * @param fs The file system to use.
1239 * @param hbaseRootDir The root directory to scan.
1240 * @param tableName name of the table to scan.
1241 * @return Map keyed by StoreFile name with a value of the full Path.
1242 * @throws IOException When scanning the directory fails.
1243 * @throws InterruptedException
1245 public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map,
1246 final FileSystem fs, final Path hbaseRootDir, TableName tableName)
1247 throws IOException, InterruptedException {
1248 return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null, null,
1249 (ProgressReporter)null);
1253 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1254 * table StoreFile names to the full Path. Note that because this method can be called
1255 * on a 'live' HBase system that we will skip files that no longer exist by the time
1256 * we traverse them and similarly the user of the result needs to consider that some
1257 * entries in this map may not exist by the time this call completes.
1258 * <br>
1259 * Example...<br>
1260 * Key = 3944417774205889744 <br>
1261 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1263 * @param resultMap map to add values. If null, this method will create and populate one to return
1264 * @param fs The file system to use.
1265 * @param hbaseRootDir The root directory to scan.
1266 * @param tableName name of the table to scan.
1267 * @param sfFilter optional path filter to apply to store files
1268 * @param executor optional executor service to parallelize this operation
1269 * @param progressReporter Instance or null; gets called every time we move to new region of
1270 * family dir and for each store file.
1271 * @return Map keyed by StoreFile name with a value of the full Path.
1272 * @throws IOException When scanning the directory fails.
1273 * @deprecated Since 2.3.0. For removal in hbase4. Use ProgressReporter override instead.
1275 @Deprecated
1276 public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap,
1277 final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter,
1278 ExecutorService executor, final HbckErrorReporter progressReporter)
1279 throws IOException, InterruptedException {
1280 return getTableStoreFilePathMap(resultMap, fs, hbaseRootDir, tableName, sfFilter, executor,
1281 new ProgressReporter() {
1282 @Override
1283 public void progress(FileStatus status) {
1284 // status is not used in this implementation.
1285 progressReporter.progress();
1291 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1292 * table StoreFile names to the full Path. Note that because this method can be called
1293 * on a 'live' HBase system that we will skip files that no longer exist by the time
1294 * we traverse them and similarly the user of the result needs to consider that some
1295 * entries in this map may not exist by the time this call completes.
1296 * <br>
1297 * Example...<br>
1298 * Key = 3944417774205889744 <br>
1299 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1301 * @param resultMap map to add values. If null, this method will create and populate one
1302 * to return
1303 * @param fs The file system to use.
1304 * @param hbaseRootDir The root directory to scan.
1305 * @param tableName name of the table to scan.
1306 * @param sfFilter optional path filter to apply to store files
1307 * @param executor optional executor service to parallelize this operation
1308 * @param progressReporter Instance or null; gets called every time we move to new region of
1309 * family dir and for each store file.
1310 * @return Map keyed by StoreFile name with a value of the full Path.
1311 * @throws IOException When scanning the directory fails.
1312 * @throws InterruptedException the thread is interrupted, either before or during the activity.
1314 public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap,
1315 final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter,
1316 ExecutorService executor, final ProgressReporter progressReporter)
1317 throws IOException, InterruptedException {
1319 final Map<String, Path> finalResultMap =
1320 resultMap == null ? new ConcurrentHashMap<>(128, 0.75f, 32) : resultMap;
1322 // only include the directory paths to tables
1323 Path tableDir = CommonFSUtils.getTableDir(hbaseRootDir, tableName);
1324 // Inside a table, there are compaction.dir directories to skip. Otherwise, all else
1325 // should be regions.
1326 final FamilyDirFilter familyFilter = new FamilyDirFilter(fs);
1327 final Vector<Exception> exceptions = new Vector<>();
1329 try {
1330 List<FileStatus> regionDirs = FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
1331 if (regionDirs == null) {
1332 return finalResultMap;
1335 final List<Future<?>> futures = new ArrayList<>(regionDirs.size());
1337 for (FileStatus regionDir : regionDirs) {
1338 if (null != progressReporter) {
1339 progressReporter.progress(regionDir);
1341 final Path dd = regionDir.getPath();
1343 if (!exceptions.isEmpty()) {
1344 break;
1347 Runnable getRegionStoreFileMapCall = new Runnable() {
1348 @Override
1349 public void run() {
1350 try {
1351 HashMap<String,Path> regionStoreFileMap = new HashMap<>();
1352 List<FileStatus> familyDirs = FSUtils.listStatusWithStatusFilter(fs, dd, familyFilter);
1353 if (familyDirs == null) {
1354 if (!fs.exists(dd)) {
1355 LOG.warn("Skipping region because it no longer exists: " + dd);
1356 } else {
1357 LOG.warn("Skipping region because it has no family dirs: " + dd);
1359 return;
1361 for (FileStatus familyDir : familyDirs) {
1362 if (null != progressReporter) {
1363 progressReporter.progress(familyDir);
1365 Path family = familyDir.getPath();
1366 if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) {
1367 continue;
1369 // now in family, iterate over the StoreFiles and
1370 // put in map
1371 FileStatus[] familyStatus = fs.listStatus(family);
1372 for (FileStatus sfStatus : familyStatus) {
1373 if (null != progressReporter) {
1374 progressReporter.progress(sfStatus);
1376 Path sf = sfStatus.getPath();
1377 if (sfFilter == null || sfFilter.accept(sf)) {
1378 regionStoreFileMap.put( sf.getName(), sf);
1382 finalResultMap.putAll(regionStoreFileMap);
1383 } catch (Exception e) {
1384 LOG.error("Could not get region store file map for region: " + dd, e);
1385 exceptions.add(e);
1390 // If executor is available, submit async tasks to exec concurrently, otherwise
1391 // just do serial sync execution
1392 if (executor != null) {
1393 Future<?> future = executor.submit(getRegionStoreFileMapCall);
1394 futures.add(future);
1395 } else {
1396 FutureTask<?> future = new FutureTask<>(getRegionStoreFileMapCall, null);
1397 future.run();
1398 futures.add(future);
1402 // Ensure all pending tasks are complete (or that we run into an exception)
1403 for (Future<?> f : futures) {
1404 if (!exceptions.isEmpty()) {
1405 break;
1407 try {
1408 f.get();
1409 } catch (ExecutionException e) {
1410 LOG.error("Unexpected exec exception! Should've been caught already. (Bug?)", e);
1411 // Shouldn't happen, we already logged/caught any exceptions in the Runnable
1414 } catch (IOException e) {
1415 LOG.error("Cannot execute getTableStoreFilePathMap for " + tableName, e);
1416 exceptions.add(e);
1417 } finally {
1418 if (!exceptions.isEmpty()) {
1419 // Just throw the first exception as an indication something bad happened
1420 // Don't need to propagate all the exceptions, we already logged them all anyway
1421 Throwables.propagateIfPossible(exceptions.firstElement(), IOException.class);
1422 throw new IOException(exceptions.firstElement());
1426 return finalResultMap;
1429 public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) {
1430 int result = 0;
1431 try {
1432 for (Path familyDir:getFamilyDirs(fs, p)){
1433 result += getReferenceFilePaths(fs, familyDir).size();
1435 } catch (IOException e) {
1436 LOG.warn("Error counting reference files", e);
1438 return result;
1442 * Runs through the HBase rootdir and creates a reverse lookup map for
1443 * table StoreFile names to the full Path.
1444 * <br>
1445 * Example...<br>
1446 * Key = 3944417774205889744 <br>
1447 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1449 * @param fs The file system to use.
1450 * @param hbaseRootDir The root directory to scan.
1451 * @return Map keyed by StoreFile name with a value of the full Path.
1452 * @throws IOException When scanning the directory fails.
1454 public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs,
1455 final Path hbaseRootDir)
1456 throws IOException, InterruptedException {
1457 return getTableStoreFilePathMap(fs, hbaseRootDir, null, null, (ProgressReporter)null);
1461 * Runs through the HBase rootdir and creates a reverse lookup map for
1462 * table StoreFile names to the full Path.
1463 * <br>
1464 * Example...<br>
1465 * Key = 3944417774205889744 <br>
1466 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1468 * @param fs The file system to use.
1469 * @param hbaseRootDir The root directory to scan.
1470 * @param sfFilter optional path filter to apply to store files
1471 * @param executor optional executor service to parallelize this operation
1472 * @param progressReporter Instance or null; gets called every time we move to new region of
1473 * family dir and for each store file.
1474 * @return Map keyed by StoreFile name with a value of the full Path.
1475 * @throws IOException When scanning the directory fails.
1476 * @deprecated Since 2.3.0. Will be removed in hbase4. Used {@link
1477 * #getTableStoreFilePathMap(FileSystem, Path, PathFilter, ExecutorService, ProgressReporter)}
1479 @Deprecated
1480 public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs,
1481 final Path hbaseRootDir, PathFilter sfFilter, ExecutorService executor,
1482 HbckErrorReporter progressReporter)
1483 throws IOException, InterruptedException {
1484 return getTableStoreFilePathMap(fs, hbaseRootDir, sfFilter, executor,
1485 new ProgressReporter() {
1486 @Override
1487 public void progress(FileStatus status) {
1488 // status is not used in this implementation.
1489 progressReporter.progress();
1495 * Runs through the HBase rootdir and creates a reverse lookup map for
1496 * table StoreFile names to the full Path.
1497 * <br>
1498 * Example...<br>
1499 * Key = 3944417774205889744 <br>
1500 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1502 * @param fs The file system to use.
1503 * @param hbaseRootDir The root directory to scan.
1504 * @param sfFilter optional path filter to apply to store files
1505 * @param executor optional executor service to parallelize this operation
1506 * @param progressReporter Instance or null; gets called every time we move to new region of
1507 * family dir and for each store file.
1508 * @return Map keyed by StoreFile name with a value of the full Path.
1509 * @throws IOException When scanning the directory fails.
1510 * @throws InterruptedException
1512 public static Map<String, Path> getTableStoreFilePathMap(
1513 final FileSystem fs, final Path hbaseRootDir, PathFilter sfFilter,
1514 ExecutorService executor, ProgressReporter progressReporter)
1515 throws IOException, InterruptedException {
1516 ConcurrentHashMap<String, Path> map = new ConcurrentHashMap<>(1024, 0.75f, 32);
1518 // if this method looks similar to 'getTableFragmentation' that is because
1519 // it was borrowed from it.
1521 // only include the directory paths to tables
1522 for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) {
1523 getTableStoreFilePathMap(map, fs, hbaseRootDir, CommonFSUtils.getTableName(tableDir),
1524 sfFilter, executor, progressReporter);
1526 return map;
1530 * Filters FileStatuses in an array and returns a list
1532 * @param input An array of FileStatuses
1533 * @param filter A required filter to filter the array
1534 * @return A list of FileStatuses
1536 public static List<FileStatus> filterFileStatuses(FileStatus[] input,
1537 FileStatusFilter filter) {
1538 if (input == null) return null;
1539 return filterFileStatuses(Iterators.forArray(input), filter);
1543 * Filters FileStatuses in an iterator and returns a list
1545 * @param input An iterator of FileStatuses
1546 * @param filter A required filter to filter the array
1547 * @return A list of FileStatuses
1549 public static List<FileStatus> filterFileStatuses(Iterator<FileStatus> input,
1550 FileStatusFilter filter) {
1551 if (input == null) return null;
1552 ArrayList<FileStatus> results = new ArrayList<>();
1553 while (input.hasNext()) {
1554 FileStatus f = input.next();
1555 if (filter.accept(f)) {
1556 results.add(f);
1559 return results;
1563 * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
1564 * This accommodates differences between hadoop versions, where hadoop 1
1565 * does not throw a FileNotFoundException, and return an empty FileStatus[]
1566 * while Hadoop 2 will throw FileNotFoundException.
1568 * @param fs file system
1569 * @param dir directory
1570 * @param filter file status filter
1571 * @return null if dir is empty or doesn't exist, otherwise FileStatus list
1573 public static List<FileStatus> listStatusWithStatusFilter(final FileSystem fs,
1574 final Path dir, final FileStatusFilter filter) throws IOException {
1575 FileStatus [] status = null;
1576 try {
1577 status = fs.listStatus(dir);
1578 } catch (FileNotFoundException fnfe) {
1579 LOG.trace("{} does not exist", dir);
1580 return null;
1583 if (ArrayUtils.getLength(status) == 0) {
1584 return null;
1587 if (filter == null) {
1588 return Arrays.asList(status);
1589 } else {
1590 List<FileStatus> status2 = filterFileStatuses(status, filter);
1591 if (status2 == null || status2.isEmpty()) {
1592 return null;
1593 } else {
1594 return status2;
1600 * This function is to scan the root path of the file system to get the
1601 * degree of locality for each region on each of the servers having at least
1602 * one block of that region.
1603 * This is used by the tool {@link org.apache.hadoop.hbase.master.RegionPlacementMaintainer}
1605 * @param conf
1606 * the configuration to use
1607 * @return the mapping from region encoded name to a map of server names to
1608 * locality fraction
1609 * @throws IOException
1610 * in case of file system errors or interrupts
1612 public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1613 final Configuration conf) throws IOException {
1614 return getRegionDegreeLocalityMappingFromFS(
1615 conf, null,
1616 conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE));
1621 * This function is to scan the root path of the file system to get the
1622 * degree of locality for each region on each of the servers having at least
1623 * one block of that region.
1625 * @param conf
1626 * the configuration to use
1627 * @param desiredTable
1628 * the table you wish to scan locality for
1629 * @param threadPoolSize
1630 * the thread pool size to use
1631 * @return the mapping from region encoded name to a map of server names to
1632 * locality fraction
1633 * @throws IOException
1634 * in case of file system errors or interrupts
1636 public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1637 final Configuration conf, final String desiredTable, int threadPoolSize)
1638 throws IOException {
1639 Map<String, Map<String, Float>> regionDegreeLocalityMapping = new ConcurrentHashMap<>();
1640 getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, regionDegreeLocalityMapping);
1641 return regionDegreeLocalityMapping;
1645 * This function is to scan the root path of the file system to get either the
1646 * mapping between the region name and its best locality region server or the
1647 * degree of locality of each region on each of the servers having at least
1648 * one block of that region. The output map parameters are both optional.
1650 * @param conf
1651 * the configuration to use
1652 * @param desiredTable
1653 * the table you wish to scan locality for
1654 * @param threadPoolSize
1655 * the thread pool size to use
1656 * @param regionDegreeLocalityMapping
1657 * the map into which to put the locality degree mapping or null,
1658 * must be a thread-safe implementation
1659 * @throws IOException
1660 * in case of file system errors or interrupts
1662 private static void getRegionLocalityMappingFromFS(final Configuration conf,
1663 final String desiredTable, int threadPoolSize,
1664 final Map<String, Map<String, Float>> regionDegreeLocalityMapping) throws IOException {
1665 final FileSystem fs = FileSystem.get(conf);
1666 final Path rootPath = CommonFSUtils.getRootDir(conf);
1667 final long startTime = EnvironmentEdgeManager.currentTime();
1668 final Path queryPath;
1669 // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/*
1670 if (null == desiredTable) {
1671 queryPath =
1672 new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/");
1673 } else {
1674 queryPath = new Path(
1675 CommonFSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)).toString() + "/*/");
1678 // reject all paths that are not appropriate
1679 PathFilter pathFilter = new PathFilter() {
1680 @Override
1681 public boolean accept(Path path) {
1682 // this is the region name; it may get some noise data
1683 if (null == path) {
1684 return false;
1687 // no parent?
1688 Path parent = path.getParent();
1689 if (null == parent) {
1690 return false;
1693 String regionName = path.getName();
1694 if (null == regionName) {
1695 return false;
1698 if (!regionName.toLowerCase(Locale.ROOT).matches("[0-9a-f]+")) {
1699 return false;
1701 return true;
1705 FileStatus[] statusList = fs.globStatus(queryPath, pathFilter);
1707 if (LOG.isDebugEnabled()) {
1708 LOG.debug("Query Path: {} ; # list of files: {}", queryPath, Arrays.toString(statusList));
1711 if (null == statusList) {
1712 return;
1715 // lower the number of threads in case we have very few expected regions
1716 threadPoolSize = Math.min(threadPoolSize, statusList.length);
1718 // run in multiple threads
1719 final ExecutorService tpe = Executors.newFixedThreadPool(threadPoolSize,
1720 new ThreadFactoryBuilder().setNameFormat("FSRegionQuery-pool-%d").setDaemon(true)
1721 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
1722 try {
1723 // ignore all file status items that are not of interest
1724 for (FileStatus regionStatus : statusList) {
1725 if (null == regionStatus || !regionStatus.isDirectory()) {
1726 continue;
1729 final Path regionPath = regionStatus.getPath();
1730 if (null != regionPath) {
1731 tpe.execute(new FSRegionScanner(fs, regionPath, null, regionDegreeLocalityMapping));
1734 } finally {
1735 tpe.shutdown();
1736 final long threadWakeFrequency = (long) conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
1737 HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
1738 try {
1739 // here we wait until TPE terminates, which is either naturally or by
1740 // exceptions in the execution of the threads
1741 while (!tpe.awaitTermination(threadWakeFrequency,
1742 TimeUnit.MILLISECONDS)) {
1743 // printing out rough estimate, so as to not introduce
1744 // AtomicInteger
1745 LOG.info("Locality checking is underway: { Scanned Regions : "
1746 + ((ThreadPoolExecutor) tpe).getCompletedTaskCount() + "/"
1747 + ((ThreadPoolExecutor) tpe).getTaskCount() + " }");
1749 } catch (InterruptedException e) {
1750 Thread.currentThread().interrupt();
1751 throw (InterruptedIOException) new InterruptedIOException().initCause(e);
1755 long overhead = EnvironmentEdgeManager.currentTime() - startTime;
1756 LOG.info("Scan DFS for locality info takes {}ms", overhead);
1760 * Do our short circuit read setup.
1761 * Checks buffer size to use and whether to do checksumming in hbase or hdfs.
1762 * @param conf
1764 public static void setupShortCircuitRead(final Configuration conf) {
1765 // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property.
1766 boolean shortCircuitSkipChecksum =
1767 conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
1768 boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
1769 if (shortCircuitSkipChecksum) {
1770 LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " +
1771 "be set to true." + (useHBaseChecksum ? " HBase checksum doesn't require " +
1772 "it, see https://issues.apache.org/jira/browse/HBASE-6868." : ""));
1773 assert !shortCircuitSkipChecksum; //this will fail if assertions are on
1775 checkShortCircuitReadBufferSize(conf);
1779 * Check if short circuit read buffer size is set and if not, set it to hbase value.
1780 * @param conf
1782 public static void checkShortCircuitReadBufferSize(final Configuration conf) {
1783 final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
1784 final int notSet = -1;
1785 // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
1786 final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
1787 int size = conf.getInt(dfsKey, notSet);
1788 // If a size is set, return -- we will use it.
1789 if (size != notSet) return;
1790 // But short circuit buffer size is normally not set. Put in place the hbase wanted size.
1791 int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
1792 conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
1796 * @param c
1797 * @return The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs.
1798 * @throws IOException
1800 public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c)
1801 throws IOException {
1802 if (!CommonFSUtils.isHDFS(c)) {
1803 return null;
1805 // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal
1806 // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it
1807 // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients.
1808 final String name = "getHedgedReadMetrics";
1809 DFSClient dfsclient = ((DistributedFileSystem)FileSystem.get(c)).getClient();
1810 Method m;
1811 try {
1812 m = dfsclient.getClass().getDeclaredMethod(name);
1813 } catch (NoSuchMethodException e) {
1814 LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
1815 e.getMessage());
1816 return null;
1817 } catch (SecurityException e) {
1818 LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
1819 e.getMessage());
1820 return null;
1822 m.setAccessible(true);
1823 try {
1824 return (DFSHedgedReadMetrics)m.invoke(dfsclient);
1825 } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
1826 LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
1827 e.getMessage());
1828 return null;
1832 public static List<Path> copyFilesParallel(FileSystem srcFS, Path src, FileSystem dstFS, Path dst,
1833 Configuration conf, int threads) throws IOException {
1834 ExecutorService pool = Executors.newFixedThreadPool(threads);
1835 List<Future<Void>> futures = new ArrayList<>();
1836 List<Path> traversedPaths;
1837 try {
1838 traversedPaths = copyFiles(srcFS, src, dstFS, dst, conf, pool, futures);
1839 for (Future<Void> future : futures) {
1840 future.get();
1842 } catch (ExecutionException | InterruptedException | IOException e) {
1843 throw new IOException("Copy snapshot reference files failed", e);
1844 } finally {
1845 pool.shutdownNow();
1847 return traversedPaths;
1850 private static List<Path> copyFiles(FileSystem srcFS, Path src, FileSystem dstFS, Path dst,
1851 Configuration conf, ExecutorService pool, List<Future<Void>> futures) throws IOException {
1852 List<Path> traversedPaths = new ArrayList<>();
1853 traversedPaths.add(dst);
1854 FileStatus currentFileStatus = srcFS.getFileStatus(src);
1855 if (currentFileStatus.isDirectory()) {
1856 if (!dstFS.mkdirs(dst)) {
1857 throw new IOException("Create directory failed: " + dst);
1859 FileStatus[] subPaths = srcFS.listStatus(src);
1860 for (FileStatus subPath : subPaths) {
1861 traversedPaths.addAll(copyFiles(srcFS, subPath.getPath(), dstFS,
1862 new Path(dst, subPath.getPath().getName()), conf, pool, futures));
1864 } else {
1865 Future<Void> future = pool.submit(() -> {
1866 FileUtil.copy(srcFS, src, dstFS, dst, false, false, conf);
1867 return null;
1869 futures.add(future);
1871 return traversedPaths;
1875 * @return A set containing all namenode addresses of fs
1877 private static Set<InetSocketAddress> getNNAddresses(DistributedFileSystem fs,
1878 Configuration conf) {
1879 Set<InetSocketAddress> addresses = new HashSet<>();
1880 String serviceName = fs.getCanonicalServiceName();
1882 if (serviceName.startsWith("ha-hdfs")) {
1883 try {
1884 Map<String, Map<String, InetSocketAddress>> addressMap =
1885 DFSUtil.getNNServiceRpcAddressesForCluster(conf);
1886 String nameService = serviceName.substring(serviceName.indexOf(":") + 1);
1887 if (addressMap.containsKey(nameService)) {
1888 Map<String, InetSocketAddress> nnMap = addressMap.get(nameService);
1889 for (Map.Entry<String, InetSocketAddress> e2 : nnMap.entrySet()) {
1890 InetSocketAddress addr = e2.getValue();
1891 addresses.add(addr);
1894 } catch (Exception e) {
1895 LOG.warn("DFSUtil.getNNServiceRpcAddresses failed. serviceName=" + serviceName, e);
1897 } else {
1898 URI uri = fs.getUri();
1899 int port = uri.getPort();
1900 if (port < 0) {
1901 int idx = serviceName.indexOf(':');
1902 port = Integer.parseInt(serviceName.substring(idx + 1));
1904 InetSocketAddress addr = new InetSocketAddress(uri.getHost(), port);
1905 addresses.add(addr);
1908 return addresses;
1912 * @param conf the Configuration of HBase
1913 * @return Whether srcFs and desFs are on same hdfs or not
1915 public static boolean isSameHdfs(Configuration conf, FileSystem srcFs, FileSystem desFs) {
1916 // By getCanonicalServiceName, we could make sure both srcFs and desFs
1917 // show a unified format which contains scheme, host and port.
1918 String srcServiceName = srcFs.getCanonicalServiceName();
1919 String desServiceName = desFs.getCanonicalServiceName();
1921 if (srcServiceName == null || desServiceName == null) {
1922 return false;
1924 if (srcServiceName.equals(desServiceName)) {
1925 return true;
1927 if (srcServiceName.startsWith("ha-hdfs") && desServiceName.startsWith("ha-hdfs")) {
1928 Collection<String> internalNameServices =
1929 conf.getTrimmedStringCollection("dfs.internal.nameservices");
1930 if (!internalNameServices.isEmpty()) {
1931 if (internalNameServices.contains(srcServiceName.split(":")[1])) {
1932 return true;
1933 } else {
1934 return false;
1938 if (srcFs instanceof DistributedFileSystem && desFs instanceof DistributedFileSystem) {
1939 // If one serviceName is an HA format while the other is a non-HA format,
1940 // maybe they refer to the same FileSystem.
1941 // For example, srcFs is "ha-hdfs://nameservices" and desFs is "hdfs://activeNamenode:port"
1942 Set<InetSocketAddress> srcAddrs = getNNAddresses((DistributedFileSystem) srcFs, conf);
1943 Set<InetSocketAddress> desAddrs = getNNAddresses((DistributedFileSystem) desFs, conf);
1944 if (Sets.intersection(srcAddrs, desAddrs).size() > 0) {
1945 return true;
1949 return false;