From f3ee9b8aa37dd30d34ff54cd39fb9b4b6d22e683 Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Thu, 26 Mar 2020 17:05:31 +0800 Subject: [PATCH] HBASE-24000 Simplify CommonFSUtils after upgrading to hadoop 2.10.0 (#1335) Signed-off-by: stack Signed-off-by: Nick Dimiduk --- .../apache/hadoop/hbase/util/CommonFSUtils.java | 217 ++++----------------- .../hadoop/hbase/util/TestCommonFSUtils.java | 41 +--- .../procedure2/store/wal/WALProcedureStore.java | 10 +- .../store/wal/TestWALProcedureStore.java | 2 +- .../hbase/io/asyncfs/AsyncFSOutputHelper.java | 18 +- .../hbase/regionserver/wal/ProtobufLogWriter.java | 10 +- .../hbase/regionserver/wal/TestHBaseWalOnEC.java | 8 +- .../org/apache/hadoop/hbase/util/TestFSUtils.java | 11 +- 8 files changed, 68 insertions(+), 249 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java index ea0cb2bc3c..ea655a25d5 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java @@ -27,11 +27,11 @@ import java.net.URISyntaxException; import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSDataOutputStreamBuilder; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; @@ -490,26 +490,19 @@ public abstract class CommonFSUtils { } String trimmedStoragePolicy = storagePolicy.trim(); if (trimmedStoragePolicy.isEmpty()) { - if (LOG.isTraceEnabled()) { - LOG.trace("We were passed an empty storagePolicy, exiting early."); - } + LOG.trace("We were passed an empty storagePolicy, exiting early."); return; } else { trimmedStoragePolicy = trimmedStoragePolicy.toUpperCase(Locale.ROOT); } if (trimmedStoragePolicy.equals(HConstants.DEFER_TO_HDFS_STORAGE_POLICY)) { - if (LOG.isTraceEnabled()) { - LOG.trace("We were passed the defer-to-hdfs policy {}, exiting early.", - trimmedStoragePolicy); - } + LOG.trace("We were passed the defer-to-hdfs policy {}, exiting early.", trimmedStoragePolicy); return; } try { invokeSetStoragePolicy(fs, path, trimmedStoragePolicy); } catch (IOException e) { - if (LOG.isTraceEnabled()) { - LOG.trace("Failed to invoke set storage policy API on FS", e); - } + LOG.trace("Failed to invoke set storage policy API on FS", e); if (throwException) { throw e; } @@ -525,10 +518,7 @@ public abstract class CommonFSUtils { try { fs.setStoragePolicy(path, storagePolicy); - - if (LOG.isDebugEnabled()) { - LOG.debug("Set storagePolicy={} for path={}", storagePolicy, path); - } + LOG.debug("Set storagePolicy={} for path={}", storagePolicy, path); } catch (Exception e) { toThrow = e; // This swallows FNFE, should we be throwing it? seems more likely to indicate dev @@ -541,19 +531,9 @@ public abstract class CommonFSUtils { LOG.debug("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e); } - // check for lack of HDFS-7228 - if (e instanceof RemoteException && - HadoopIllegalArgumentException.class.getName().equals( - ((RemoteException)e).getClassName())) { - if (LOG.isDebugEnabled()) { - LOG.debug("Given storage policy, '" +storagePolicy +"', was rejected and probably " + - "isn't a valid policy for the version of Hadoop you're running. I.e. if you're " + - "trying to use SSD related policies then you're likely missing HDFS-7228. For " + - "more information see the 'ArchivalStorage' docs for your Hadoop release."); - } // Hadoop 2.8+, 3.0-a1+ added FileSystem.setStoragePolicy with a default implementation // that throws UnsupportedOperationException - } else if (e instanceof UnsupportedOperationException) { + if (e instanceof UnsupportedOperationException) { if (LOG.isDebugEnabled()) { LOG.debug("The underlying FileSystem implementation doesn't support " + "setStoragePolicy. This is probably intentional on their part, since HDFS-9345 " + @@ -759,200 +739,75 @@ public abstract class CommonFSUtils { conf.setIfUnset(dfsKey, Integer.toString(hbaseSize)); } - private static class DfsBuilderUtility { - static Class dfsClass = null; - static Method createMethod; - static Method overwriteMethod; - static Method bufferSizeMethod; - static Method blockSizeMethod; - static Method recursiveMethod; - static Method replicateMethod; - static Method replicationMethod; - static Method buildMethod; - static boolean allMethodsPresent = false; + private static final class DfsBuilderUtility { + private static final Class BUILDER; + private static final Method REPLICATE; static { - String dfsName = "org.apache.hadoop.hdfs.DistributedFileSystem"; - String builderName = dfsName + "$HdfsDataOutputStreamBuilder"; + String builderName = "org.apache.hadoop.hdfs.DistributedFileSystem$HdfsDataOutputStreamBuilder"; Class builderClass = null; - - try { - dfsClass = Class.forName(dfsName); - } catch (ClassNotFoundException e) { - LOG.debug("{} not available, will not use builder API for file creation.", dfsName); - } try { builderClass = Class.forName(builderName); } catch (ClassNotFoundException e) { - LOG.debug("{} not available, will not use builder API for file creation.", builderName); + LOG.debug("{} not available, will not set replicate when creating output stream", builderName); } - - if (dfsClass != null && builderClass != null) { + Method replicateMethod = null; + if (builderClass != null) { try { - createMethod = dfsClass.getMethod("createFile", Path.class); - overwriteMethod = builderClass.getMethod("overwrite", boolean.class); - bufferSizeMethod = builderClass.getMethod("bufferSize", int.class); - blockSizeMethod = builderClass.getMethod("blockSize", long.class); - recursiveMethod = builderClass.getMethod("recursive"); replicateMethod = builderClass.getMethod("replicate"); - replicationMethod = builderClass.getMethod("replication", short.class); - buildMethod = builderClass.getMethod("build"); - - allMethodsPresent = true; LOG.debug("Using builder API via reflection for DFS file creation."); } catch (NoSuchMethodException e) { - LOG.debug("Could not find method on builder; will use old DFS API for file creation {}", - e.getMessage()); - } - } - } - - /** - * Attempt to use builder API via reflection to create a file with the given parameters and - * replication enabled. - */ - static FSDataOutputStream createHelper(FileSystem fs, Path path, boolean overwritable, - int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException { - if (allMethodsPresent && dfsClass.isInstance(fs)) { - try { - Object builder; - - builder = createMethod.invoke(fs, path); - builder = overwriteMethod.invoke(builder, overwritable); - builder = bufferSizeMethod.invoke(builder, bufferSize); - builder = blockSizeMethod.invoke(builder, blockSize); - if (isRecursive) { - builder = recursiveMethod.invoke(builder); - } - builder = replicateMethod.invoke(builder); - builder = replicationMethod.invoke(builder, replication); - return (FSDataOutputStream) buildMethod.invoke(builder); - } catch (IllegalAccessException | InvocationTargetException e) { - // Should have caught this failure during initialization, so log full trace here - LOG.warn("Couldn't use reflection with builder API", e); + LOG.debug("Could not find replicate method on builder; will not set replicate when" + + " creating output stream", e); } } - - if (isRecursive) { - return fs.create(path, overwritable, bufferSize, replication, blockSize, null); - } - return fs.createNonRecursive(path, overwritable, bufferSize, replication, blockSize, null); + BUILDER = builderClass; + REPLICATE = replicateMethod; } /** - * Attempt to use builder API via reflection to create a file with the given parameters and - * replication enabled. + * Attempt to use builder API via reflection to call the replicate method on the given builder. */ - static FSDataOutputStream createHelper(FileSystem fs, Path path, boolean overwritable) - throws IOException { - if (allMethodsPresent && dfsClass.isInstance(fs)) { + static void replicate(FSDataOutputStreamBuilder builder) { + if (BUILDER != null && REPLICATE != null && BUILDER.isAssignableFrom(builder.getClass())) { try { - Object builder; - - builder = createMethod.invoke(fs, path); - builder = overwriteMethod.invoke(builder, overwritable); - builder = replicateMethod.invoke(builder); - return (FSDataOutputStream) buildMethod.invoke(builder); + REPLICATE.invoke(builder); } catch (IllegalAccessException | InvocationTargetException e) { // Should have caught this failure during initialization, so log full trace here LOG.warn("Couldn't use reflection with builder API", e); } } - - return fs.create(path, overwritable); } } /** * Attempt to use builder API via reflection to create a file with the given parameters and * replication enabled. - *

+ *

* Will not attempt to enable replication when passed an HFileSystem. */ - public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwritable) - throws IOException { - return DfsBuilderUtility.createHelper(fs, path, overwritable); + public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwrite) + throws IOException { + FSDataOutputStreamBuilder builder = fs.createFile(path).overwrite(overwrite); + DfsBuilderUtility.replicate(builder); + return builder.build(); } /** * Attempt to use builder API via reflection to create a file with the given parameters and * replication enabled. - *

+ *

* Will not attempt to enable replication when passed an HFileSystem. */ - public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwritable, - int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException { - return DfsBuilderUtility.createHelper(fs, path, overwritable, bufferSize, replication, - blockSize, isRecursive); - } - - // Holder singleton idiom. JVM spec ensures this will be run at most once per Classloader, and - // not until we attempt to reference it. - private static class StreamCapabilities { - public static final boolean PRESENT; - public static final Class CLASS; - public static final Method METHOD; - static { - boolean tmp = false; - Class clazz = null; - Method method = null; - try { - clazz = Class.forName("org.apache.hadoop.fs.StreamCapabilities"); - method = clazz.getMethod("hasCapability", String.class); - tmp = true; - } catch(ClassNotFoundException|NoSuchMethodException|SecurityException exception) { - LOG.warn("Your Hadoop installation does not include the StreamCapabilities class from " + - "HDFS-11644, so we will skip checking if any FSDataOutputStreams actually " + - "support hflush/hsync. If you are running on top of HDFS this probably just " + - "means you have an older version and this can be ignored. If you are running on " + - "top of an alternate FileSystem implementation you should manually verify that " + - "hflush and hsync are implemented; otherwise you risk data loss and hard to " + - "diagnose errors when our assumptions are violated."); - LOG.debug("The first request to check for StreamCapabilities came from this stacktrace.", - exception); - } finally { - PRESENT = tmp; - CLASS = clazz; - METHOD = method; - } + public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwrite, + int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException { + FSDataOutputStreamBuilder builder = fs.createFile(path).overwrite(overwrite) + .bufferSize(bufferSize).replication(replication).blockSize(blockSize); + if (isRecursive) { + builder.recursive(); } - } - - /** - * If our FileSystem version includes the StreamCapabilities class, check if the given stream has - * a particular capability. - * @param stream capabilities are per-stream instance, so check this one specifically. must not be - * null - * @param capability what to look for, per Hadoop Common's FileSystem docs - * @return true if there are no StreamCapabilities. false if there are, but this stream doesn't - * implement it. return result of asking the stream otherwise. - * @throws NullPointerException if {@code stream} is {@code null} - */ - public static boolean hasCapability(FSDataOutputStream stream, String capability) { - // be consistent whether or not StreamCapabilities is present - Objects.requireNonNull(stream, "stream cannot be null"); - // If o.a.h.fs.StreamCapabilities doesn't exist, assume everyone does everything - // otherwise old versions of Hadoop will break. - boolean result = true; - if (StreamCapabilities.PRESENT) { - // if StreamCapabilities is present, but the stream doesn't implement it - // or we run into a problem invoking the method, - // we treat that as equivalent to not declaring anything - result = false; - if (StreamCapabilities.CLASS.isAssignableFrom(stream.getClass())) { - try { - result = ((Boolean)StreamCapabilities.METHOD.invoke(stream, capability)).booleanValue(); - } catch (IllegalAccessException|IllegalArgumentException|InvocationTargetException - exception) { - LOG.warn("Your Hadoop installation's StreamCapabilities implementation doesn't match " + - "our understanding of how it's supposed to work. Please file a JIRA and include " + - "the following stack trace. In the mean time we're interpreting this behavior " + - "difference as a lack of capability support, which will probably cause a failure.", - exception); - } - } - } - return result; + DfsBuilderUtility.replicate(builder); + return builder.build(); } /** diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestCommonFSUtils.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestCommonFSUtils.java index e2dbd413a5..e6a427edee 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestCommonFSUtils.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestCommonFSUtils.java @@ -19,9 +19,8 @@ package org.apache.hadoop.hbase.util; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; -import java.io.ByteArrayOutputStream; + import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; @@ -35,8 +34,6 @@ import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Test {@link CommonFSUtils}. @@ -48,8 +45,6 @@ public class TestCommonFSUtils { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestCommonFSUtils.class); - private static final Logger LOG = LoggerFactory.getLogger(TestCommonFSUtils.class); - private HBaseCommonTestingUtility htu; private Configuration conf; @@ -131,38 +126,4 @@ public class TestCommonFSUtils { Path logFile = new Path(CommonFSUtils.getWALRootDir(conf), "test/testlog"); assertEquals("test/testlog", CommonFSUtils.removeWALRootPath(logFile, conf)); } - - @Test(expected=NullPointerException.class) - public void streamCapabilitiesDoesNotAllowNullStream() { - CommonFSUtils.hasCapability(null, "hopefully any string"); - } - - private static final boolean STREAM_CAPABILITIES_IS_PRESENT; - static { - boolean tmp = false; - try { - Class.forName("org.apache.hadoop.fs.StreamCapabilities"); - tmp = true; - LOG.debug("Test thought StreamCapabilities class was present."); - } catch (ClassNotFoundException exception) { - LOG.debug("Test didn't think StreamCapabilities class was present."); - } finally { - STREAM_CAPABILITIES_IS_PRESENT = tmp; - } - } - - @Test - public void checkStreamCapabilitiesOnKnownNoopStream() throws IOException { - FSDataOutputStream stream = new FSDataOutputStream(new ByteArrayOutputStream(), null); - assertNotEquals("We expect our dummy FSDOS to claim capabilities iff the StreamCapabilities " + - "class is not defined.", STREAM_CAPABILITIES_IS_PRESENT, - CommonFSUtils.hasCapability(stream, "hsync")); - assertNotEquals("We expect our dummy FSDOS to claim capabilities iff the StreamCapabilities " + - "class is not defined.", STREAM_CAPABILITIES_IS_PRESENT, - CommonFSUtils.hasCapability(stream, "hflush")); - assertNotEquals("We expect our dummy FSDOS to claim capabilities iff the StreamCapabilities " + - "class is not defined.", STREAM_CAPABILITIES_IS_PRESENT, - CommonFSUtils.hasCapability(stream, "a capability that hopefully no filesystem will " + - "implement.")); - } } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index b99ed35087..b0301c6760 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -35,11 +35,13 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSError; import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.log.HBaseMarkers; @@ -1084,8 +1086,8 @@ public class WALProcedureStore extends ProcedureStoreBase { // After we create the stream but before we attempt to use it at all // ensure that we can provide the level of data safety we're configured // to provide. - final String durability = useHsync ? "hsync" : "hflush"; - if (enforceStreamCapability && !(CommonFSUtils.hasCapability(newStream, durability))) { + final String durability = useHsync ? StreamCapabilities.HSYNC : StreamCapabilities.HFLUSH; + if (enforceStreamCapability && !newStream.hasCapability(durability)) { throw new IllegalStateException("The procedure WAL relies on the ability to " + durability + " for proper operation during component failures, but the underlying filesystem does " + "not support doing so. Please check the config value of '" + USE_HSYNC_CONF_KEY + @@ -1151,12 +1153,12 @@ public class WALProcedureStore extends ProcedureStoreBase { log.addToSize(trailerSize); } } - } catch (IOException e) { + } catch (IOException | FSError e) { LOG.warn("Unable to write the trailer", e); } try { stream.close(); - } catch (IOException e) { + } catch (IOException | FSError e) { LOG.error("Unable to close the stream", e); } stream = null; diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java index c34a210316..c8335eeb7d 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java @@ -655,7 +655,7 @@ public class TestWALProcedureStore { } @Test - public void testLogFileAleadExists() throws IOException { + public void testLogFileAlreadyExists() throws IOException { final boolean[] tested = {false}; WALProcedureStore mStore = Mockito.spy(procStore); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java index d1645f8462..452da1ca4f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java @@ -18,15 +18,17 @@ package org.apache.hadoop.hbase.io.asyncfs; import java.io.IOException; - import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; @@ -63,11 +65,15 @@ public final class AsyncFSOutputHelper { // After we create the stream but before we attempt to use it at all // ensure that we can provide the level of data safety we're configured // to provide. - if (fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true) && - !(CommonFSUtils.hasCapability(out, "hflush") && - CommonFSUtils.hasCapability(out, "hsync"))) { - out.close(); - throw new CommonFSUtils.StreamLacksCapabilityException("hflush and hsync"); + if (fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true)) { + if (!out.hasCapability(StreamCapabilities.HFLUSH)) { + Closeables.close(out, true); + throw new StreamLacksCapabilityException(StreamCapabilities.HFLUSH); + } + if (!out.hasCapability(StreamCapabilities.HSYNC)) { + Closeables.close(out, true); + throw new StreamLacksCapabilityException(StreamCapabilities.HSYNC); + } } return new WrapperAsyncFSOutput(f, out); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java index 5c8e0d21f8..ff08da8f44 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java @@ -22,6 +22,7 @@ import java.io.OutputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; @@ -90,18 +91,17 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter return this.output; } - @SuppressWarnings("deprecation") @Override protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, short replication, long blockSize) throws IOException, StreamLacksCapabilityException { this.output = CommonFSUtils.createForWal(fs, path, overwritable, bufferSize, replication, blockSize, false); if (fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true)) { - if (!CommonFSUtils.hasCapability(output, "hflush")) { - throw new StreamLacksCapabilityException("hflush"); + if (!output.hasCapability(StreamCapabilities.HFLUSH)) { + throw new StreamLacksCapabilityException(StreamCapabilities.HFLUSH); } - if (!CommonFSUtils.hasCapability(output, "hsync")) { - throw new StreamLacksCapabilityException("hsync"); + if (!output.hasCapability(StreamCapabilities.HSYNC)) { + throw new StreamLacksCapabilityException(StreamCapabilities.HSYNC); } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHBaseWalOnEC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHBaseWalOnEC.java index a7f1624974..96c5729423 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHBaseWalOnEC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHBaseWalOnEC.java @@ -24,9 +24,9 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.lang.reflect.Method; - import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; @@ -55,8 +55,6 @@ public class TestHBaseWalOnEC { private static final HBaseTestingUtility util = new HBaseTestingUtility(); - private static final String HFLUSH = "hflush"; - @BeforeClass public static void setup() throws Exception { try { @@ -75,7 +73,7 @@ public class TestHBaseWalOnEC { try (FSDataOutputStream out = fs.create(new Path("/canary"))) { // If this comes back as having hflush then some test setup assumption is wrong. // Fail the test so that a developer has to look and triage - assertFalse("Did not enable EC!", CommonFSUtils.hasCapability(out, HFLUSH)); + assertFalse("Did not enable EC!", out.hasCapability(StreamCapabilities.HFLUSH)); } } catch (NoSuchMethodException e) { // We're not testing anything interesting if EC is not available, so skip the rest of the test @@ -95,7 +93,7 @@ public class TestHBaseWalOnEC { public void testStreamCreate() throws IOException { try (FSDataOutputStream out = CommonFSUtils.createForWal(util.getDFSCluster().getFileSystem(), new Path("/testStreamCreate"), true)) { - assertTrue(CommonFSUtils.hasCapability(out, HFLUSH)); + assertTrue(out.hasCapability(StreamCapabilities.HFLUSH)); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java index 1296c4723d..d425557bdb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -661,15 +662,11 @@ public class TestFSUtils { MiniDFSCluster cluster = htu.startMiniDFSCluster(1); try (FileSystem filesystem = cluster.getFileSystem()) { FSDataOutputStream stream = filesystem.create(new Path("/tmp/foobar")); - assertTrue(FSUtils.hasCapability(stream, "hsync")); - assertTrue(FSUtils.hasCapability(stream, "hflush")); - assertNotEquals("We expect HdfsDataOutputStream to say it has a dummy capability iff the " + - "StreamCapabilities class is not defined.", - STREAM_CAPABILITIES_IS_PRESENT, - FSUtils.hasCapability(stream, "a capability that hopefully HDFS doesn't add.")); + assertTrue(stream.hasCapability(StreamCapabilities.HSYNC)); + assertTrue(stream.hasCapability(StreamCapabilities.HFLUSH)); + assertFalse(stream.hasCapability("a capability that hopefully HDFS doesn't add.")); } finally { cluster.shutdown(); } } - } -- 2.11.4.GIT