2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.util
;
20 import static org
.junit
.Assert
.assertEquals
;
21 import static org
.junit
.Assert
.assertFalse
;
22 import static org
.junit
.Assert
.assertNotEquals
;
23 import static org
.junit
.Assert
.assertNotNull
;
24 import static org
.junit
.Assert
.assertNull
;
25 import static org
.junit
.Assert
.assertTrue
;
28 import java
.io
.IOException
;
29 import java
.util
.List
;
30 import java
.util
.Random
;
32 import org
.apache
.hadoop
.conf
.Configuration
;
33 import org
.apache
.hadoop
.fs
.FSDataInputStream
;
34 import org
.apache
.hadoop
.fs
.FSDataOutputStream
;
35 import org
.apache
.hadoop
.fs
.FileStatus
;
36 import org
.apache
.hadoop
.fs
.FileSystem
;
37 import org
.apache
.hadoop
.fs
.Path
;
38 import org
.apache
.hadoop
.fs
.StreamCapabilities
;
39 import org
.apache
.hadoop
.fs
.permission
.FsPermission
;
40 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
41 import org
.apache
.hadoop
.hbase
.HBaseConfiguration
;
42 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
43 import org
.apache
.hadoop
.hbase
.HConstants
;
44 import org
.apache
.hadoop
.hbase
.HDFSBlocksDistribution
;
45 import org
.apache
.hadoop
.hbase
.client
.RegionInfoBuilder
;
46 import org
.apache
.hadoop
.hbase
.exceptions
.DeserializationException
;
47 import org
.apache
.hadoop
.hbase
.fs
.HFileSystem
;
48 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
49 import org
.apache
.hadoop
.hbase
.testclassification
.MiscTests
;
50 import org
.apache
.hadoop
.hdfs
.DFSConfigKeys
;
51 import org
.apache
.hadoop
.hdfs
.DFSHedgedReadMetrics
;
52 import org
.apache
.hadoop
.hdfs
.DFSTestUtil
;
53 import org
.apache
.hadoop
.hdfs
.DistributedFileSystem
;
54 import org
.apache
.hadoop
.hdfs
.MiniDFSCluster
;
55 import org
.apache
.hadoop
.hdfs
.client
.HdfsDataInputStream
;
56 import org
.junit
.Assert
;
57 import org
.junit
.Before
;
58 import org
.junit
.ClassRule
;
59 import org
.junit
.Test
;
60 import org
.junit
.experimental
.categories
.Category
;
61 import org
.slf4j
.Logger
;
62 import org
.slf4j
.LoggerFactory
;
65 * Test {@link FSUtils}.
67 @Category({MiscTests
.class, MediumTests
.class})
68 public class TestFSUtils
{
71 public static final HBaseClassTestRule CLASS_RULE
=
72 HBaseClassTestRule
.forClass(TestFSUtils
.class);
74 private static final Logger LOG
= LoggerFactory
.getLogger(TestFSUtils
.class);
76 private HBaseTestingUtil htu
;
77 private FileSystem fs
;
78 private Configuration conf
;
81 public void setUp() throws IOException
{
82 htu
= new HBaseTestingUtil();
83 fs
= htu
.getTestFileSystem();
84 conf
= htu
.getConfiguration();
88 public void testIsHDFS() throws Exception
{
89 assertFalse(CommonFSUtils
.isHDFS(conf
));
90 MiniDFSCluster cluster
= null;
92 cluster
= htu
.startMiniDFSCluster(1);
93 assertTrue(CommonFSUtils
.isHDFS(conf
));
95 if (cluster
!= null) {
101 private void WriteDataToHDFS(FileSystem fs
, Path file
, int dataSize
)
103 FSDataOutputStream out
= fs
.create(file
);
104 byte [] data
= new byte[dataSize
];
105 out
.write(data
, 0, dataSize
);
110 public void testComputeHDFSBlocksDistributionByInputStream() throws Exception
{
111 testComputeHDFSBlocksDistribution((fs
, testFile
) -> {
112 try (FSDataInputStream open
= fs
.open(testFile
)) {
113 assertTrue(open
instanceof HdfsDataInputStream
);
114 return FSUtils
.computeHDFSBlocksDistribution((HdfsDataInputStream
) open
);
120 public void testComputeHDFSBlockDistribution() throws Exception
{
121 testComputeHDFSBlocksDistribution((fs
, testFile
) -> {
122 FileStatus status
= fs
.getFileStatus(testFile
);
123 return FSUtils
.computeHDFSBlocksDistribution(fs
, status
, 0, status
.getLen());
128 interface HDFSBlockDistributionFunction
{
129 HDFSBlocksDistribution
getForPath(FileSystem fs
, Path path
) throws IOException
;
132 private void testComputeHDFSBlocksDistribution(
133 HDFSBlockDistributionFunction fileToBlockDistribution
) throws Exception
{
134 final int DEFAULT_BLOCK_SIZE
= 1024;
135 conf
.setLong("dfs.blocksize", DEFAULT_BLOCK_SIZE
);
136 MiniDFSCluster cluster
= null;
137 Path testFile
= null;
140 // set up a cluster with 3 nodes
141 String hosts
[] = new String
[] { "host1", "host2", "host3" };
142 cluster
= htu
.startMiniDFSCluster(hosts
);
143 cluster
.waitActive();
144 FileSystem fs
= cluster
.getFileSystem();
146 // create a file with two blocks
147 testFile
= new Path("/test1.txt");
148 WriteDataToHDFS(fs
, testFile
, 2*DEFAULT_BLOCK_SIZE
);
150 // given the default replication factor is 3, the same as the number of
151 // datanodes; the locality index for each host should be 100%,
152 // or getWeight for each host should be the same as getUniqueBlocksWeights
153 final long maxTime
= EnvironmentEdgeManager
.currentTime() + 2000;
158 HDFSBlocksDistribution blocksDistribution
=
159 fileToBlockDistribution
.getForPath(fs
, testFile
);
161 long uniqueBlocksTotalWeight
=
162 blocksDistribution
.getUniqueBlocksTotalWeight();
163 for (String host
: hosts
) {
164 long weight
= blocksDistribution
.getWeight(host
);
165 ok
= (ok
&& uniqueBlocksTotalWeight
== weight
);
167 } while (!ok
&& EnvironmentEdgeManager
.currentTime() < maxTime
);
170 htu
.shutdownMiniDFSCluster();
175 // set up a cluster with 4 nodes
176 String hosts
[] = new String
[] { "host1", "host2", "host3", "host4" };
177 cluster
= htu
.startMiniDFSCluster(hosts
);
178 cluster
.waitActive();
179 FileSystem fs
= cluster
.getFileSystem();
181 // create a file with three blocks
182 testFile
= new Path("/test2.txt");
183 WriteDataToHDFS(fs
, testFile
, 3*DEFAULT_BLOCK_SIZE
);
185 // given the default replication factor is 3, we will have total of 9
186 // replica of blocks; thus the host with the highest weight should have
187 // weight == 3 * DEFAULT_BLOCK_SIZE
188 final long maxTime
= EnvironmentEdgeManager
.currentTime() + 2000;
190 long uniqueBlocksTotalWeight
;
192 HDFSBlocksDistribution blocksDistribution
=
193 fileToBlockDistribution
.getForPath(fs
, testFile
);
194 uniqueBlocksTotalWeight
= blocksDistribution
.getUniqueBlocksTotalWeight();
196 String tophost
= blocksDistribution
.getTopHosts().get(0);
197 weight
= blocksDistribution
.getWeight(tophost
);
199 // NameNode is informed asynchronously, so we may have a delay. See HBASE-6175
200 } while (uniqueBlocksTotalWeight
!= weight
&&
201 EnvironmentEdgeManager
.currentTime() < maxTime
);
202 assertTrue(uniqueBlocksTotalWeight
== weight
);
205 htu
.shutdownMiniDFSCluster();
210 // set up a cluster with 4 nodes
211 String hosts
[] = new String
[] { "host1", "host2", "host3", "host4" };
212 cluster
= htu
.startMiniDFSCluster(hosts
);
213 cluster
.waitActive();
214 FileSystem fs
= cluster
.getFileSystem();
216 // create a file with one block
217 testFile
= new Path("/test3.txt");
218 WriteDataToHDFS(fs
, testFile
, DEFAULT_BLOCK_SIZE
);
220 // given the default replication factor is 3, we will have total of 3
221 // replica of blocks; thus there is one host without weight
222 final long maxTime
= EnvironmentEdgeManager
.currentTime() + 2000;
223 HDFSBlocksDistribution blocksDistribution
;
225 blocksDistribution
= fileToBlockDistribution
.getForPath(fs
, testFile
);
226 // NameNode is informed asynchronously, so we may have a delay. See HBASE-6175
228 while (blocksDistribution
.getTopHosts().size() != 3 &&
229 EnvironmentEdgeManager
.currentTime() < maxTime
);
230 assertEquals("Wrong number of hosts distributing blocks.", 3,
231 blocksDistribution
.getTopHosts().size());
233 htu
.shutdownMiniDFSCluster();
237 private void writeVersionFile(Path versionFile
, String version
) throws IOException
{
238 if (CommonFSUtils
.isExists(fs
, versionFile
)) {
239 assertTrue(CommonFSUtils
.delete(fs
, versionFile
, true));
241 try (FSDataOutputStream s
= fs
.create(versionFile
)) {
244 assertTrue(fs
.exists(versionFile
));
248 public void testVersion() throws DeserializationException
, IOException
{
249 final Path rootdir
= htu
.getDataTestDir();
250 final FileSystem fs
= rootdir
.getFileSystem(conf
);
251 assertNull(FSUtils
.getVersion(fs
, rootdir
));
252 // No meta dir so no complaint from checkVersion.
253 // Presumes it a new install. Will create version file.
254 FSUtils
.checkVersion(fs
, rootdir
, true);
255 // Now remove the version file and create a metadir so checkVersion fails.
256 Path versionFile
= new Path(rootdir
, HConstants
.VERSION_FILE_NAME
);
257 assertTrue(CommonFSUtils
.isExists(fs
, versionFile
));
258 assertTrue(CommonFSUtils
.delete(fs
, versionFile
, true));
260 FSUtils
.getRegionDirFromRootDir(rootdir
, RegionInfoBuilder
.FIRST_META_REGIONINFO
);
261 FsPermission defaultPerms
= CommonFSUtils
.getFilePermissions(fs
, this.conf
,
262 HConstants
.DATA_FILE_UMASK_KEY
);
263 CommonFSUtils
.create(fs
, metaRegionDir
, defaultPerms
, false);
264 boolean thrown
= false;
266 FSUtils
.checkVersion(fs
, rootdir
, true);
267 } catch (FileSystemVersionException e
) {
270 assertTrue("Expected FileSystemVersionException", thrown
);
271 // Write out a good version file. See if we can read it in and convert.
272 String version
= HConstants
.FILE_SYSTEM_VERSION
;
273 writeVersionFile(versionFile
, version
);
274 FileStatus
[] status
= fs
.listStatus(versionFile
);
275 assertNotNull(status
);
276 assertTrue(status
.length
> 0);
277 String newVersion
= FSUtils
.getVersion(fs
, rootdir
);
278 assertEquals(version
.length(), newVersion
.length());
279 assertEquals(version
, newVersion
);
280 // File will have been converted. Exercise the pb format
281 assertEquals(version
, FSUtils
.getVersion(fs
, rootdir
));
282 FSUtils
.checkVersion(fs
, rootdir
, true);
283 // Write an old version file.
284 String oldVersion
= "1";
285 writeVersionFile(versionFile
, oldVersion
);
286 newVersion
= FSUtils
.getVersion(fs
, rootdir
);
287 assertNotEquals(version
, newVersion
);
290 FSUtils
.checkVersion(fs
, rootdir
, true);
291 } catch (FileSystemVersionException e
) {
294 assertTrue("Expected FileSystemVersionException", thrown
);
298 public void testPermMask() throws Exception
{
299 final Path rootdir
= htu
.getDataTestDir();
300 final FileSystem fs
= rootdir
.getFileSystem(conf
);
301 // default fs permission
302 FsPermission defaultFsPerm
= CommonFSUtils
.getFilePermissions(fs
, conf
,
303 HConstants
.DATA_FILE_UMASK_KEY
);
304 // 'hbase.data.umask.enable' is false. We will get default fs permission.
305 assertEquals(FsPermission
.getFileDefault(), defaultFsPerm
);
307 conf
.setBoolean(HConstants
.ENABLE_DATA_FILE_UMASK
, true);
308 // first check that we don't crash if we don't have perms set
309 FsPermission defaultStartPerm
= CommonFSUtils
.getFilePermissions(fs
, conf
,
310 HConstants
.DATA_FILE_UMASK_KEY
);
311 // default 'hbase.data.umask'is 000, and this umask will be used when
312 // 'hbase.data.umask.enable' is true.
313 // Therefore we will not get the real fs default in this case.
314 // Instead we will get the starting point FULL_RWX_PERMISSIONS
315 assertEquals(new FsPermission(CommonFSUtils
.FULL_RWX_PERMISSIONS
), defaultStartPerm
);
317 conf
.setStrings(HConstants
.DATA_FILE_UMASK_KEY
, "077");
318 // now check that we get the right perms
319 FsPermission filePerm
= CommonFSUtils
.getFilePermissions(fs
, conf
,
320 HConstants
.DATA_FILE_UMASK_KEY
);
321 assertEquals(new FsPermission("700"), filePerm
);
323 // then that the correct file is created
324 Path p
= new Path("target" + File
.separator
+ HBaseTestingUtil
.getRandomUUID().toString());
326 FSDataOutputStream out
= FSUtils
.create(conf
, fs
, p
, filePerm
, null);
328 FileStatus stat
= fs
.getFileStatus(p
);
329 assertEquals(new FsPermission("700"), stat
.getPermission());
337 public void testDeleteAndExists() throws Exception
{
338 final Path rootdir
= htu
.getDataTestDir();
339 final FileSystem fs
= rootdir
.getFileSystem(conf
);
340 conf
.setBoolean(HConstants
.ENABLE_DATA_FILE_UMASK
, true);
341 FsPermission perms
= CommonFSUtils
.getFilePermissions(fs
, conf
, HConstants
.DATA_FILE_UMASK_KEY
);
342 // then that the correct file is created
343 String file
= HBaseTestingUtil
.getRandomUUID().toString();
344 Path p
= new Path(htu
.getDataTestDir(), "temptarget" + File
.separator
+ file
);
345 Path p1
= new Path(htu
.getDataTestDir(), "temppath" + File
.separator
+ file
);
347 FSDataOutputStream out
= FSUtils
.create(conf
, fs
, p
, perms
, null);
349 assertTrue("The created file should be present", CommonFSUtils
.isExists(fs
, p
));
350 // delete the file with recursion as false. Only the file will be deleted.
351 CommonFSUtils
.delete(fs
, p
, false);
352 // Create another file
353 FSDataOutputStream out1
= FSUtils
.create(conf
, fs
, p1
, perms
, null);
355 // delete the file with recursion as false. Still the file only will be deleted
356 CommonFSUtils
.delete(fs
, p1
, true);
357 assertFalse("The created file should be present", CommonFSUtils
.isExists(fs
, p1
));
360 CommonFSUtils
.delete(fs
, p
, true);
361 CommonFSUtils
.delete(fs
, p1
, true);
366 public void testFilteredStatusDoesNotThrowOnNotFound() throws Exception
{
367 MiniDFSCluster cluster
= htu
.startMiniDFSCluster(1);
369 assertNull(FSUtils
.listStatusWithStatusFilter(cluster
.getFileSystem(), new Path("definitely/doesn't/exist"), null));
377 public void testRenameAndSetModifyTime() throws Exception
{
378 MiniDFSCluster cluster
= htu
.startMiniDFSCluster(1);
379 assertTrue(CommonFSUtils
.isHDFS(conf
));
381 FileSystem fs
= FileSystem
.get(conf
);
382 Path testDir
= htu
.getDataTestDirOnTestFS("testArchiveFile");
384 String file
= HBaseTestingUtil
.getRandomUUID().toString();
385 Path p
= new Path(testDir
, file
);
387 FSDataOutputStream out
= fs
.create(p
);
389 assertTrue("The created file should be present", CommonFSUtils
.isExists(fs
, p
));
391 long expect
= EnvironmentEdgeManager
.currentTime() + 1000;
392 assertNotEquals(expect
, fs
.getFileStatus(p
).getModificationTime());
394 ManualEnvironmentEdge mockEnv
= new ManualEnvironmentEdge();
395 mockEnv
.setValue(expect
);
396 EnvironmentEdgeManager
.injectEdge(mockEnv
);
398 String dstFile
= HBaseTestingUtil
.getRandomUUID().toString();
399 Path dst
= new Path(testDir
, dstFile
);
401 assertTrue(CommonFSUtils
.renameAndSetModifyTime(fs
, p
, dst
));
402 assertFalse("The moved file should not be present", CommonFSUtils
.isExists(fs
, p
));
403 assertTrue("The dst file should be present", CommonFSUtils
.isExists(fs
, dst
));
405 assertEquals(expect
, fs
.getFileStatus(dst
).getModificationTime());
408 EnvironmentEdgeManager
.reset();
413 public void testSetStoragePolicyDefault() throws Exception
{
414 verifyNoHDFSApiInvocationForDefaultPolicy();
415 verifyFileInDirWithStoragePolicy(HConstants
.DEFAULT_WAL_STORAGE_POLICY
);
419 * Note: currently the default policy is set to defer to HDFS and this case is to verify the
420 * logic, will need to remove the check if the default policy is changed
422 private void verifyNoHDFSApiInvocationForDefaultPolicy() {
423 FileSystem testFs
= new AlwaysFailSetStoragePolicyFileSystem();
424 // There should be no exception thrown when setting to default storage policy, which indicates
425 // the HDFS API hasn't been called
427 CommonFSUtils
.setStoragePolicy(testFs
, new Path("non-exist"),
428 HConstants
.DEFAULT_WAL_STORAGE_POLICY
, true);
429 } catch (IOException e
) {
430 Assert
.fail("Should have bypassed the FS API when setting default storage policy");
432 // There should be exception thrown when given non-default storage policy, which indicates the
433 // HDFS API has been called
435 CommonFSUtils
.setStoragePolicy(testFs
, new Path("non-exist"), "HOT", true);
436 Assert
.fail("Should have invoked the FS API but haven't");
437 } catch (IOException e
) {
438 // expected given an invalid path
442 class AlwaysFailSetStoragePolicyFileSystem
extends DistributedFileSystem
{
444 public void setStoragePolicy(final Path src
, final String policyName
)
446 throw new IOException("The setStoragePolicy method is invoked");
450 /* might log a warning, but still work. (always warning on Hadoop < 2.6.0) */
452 public void testSetStoragePolicyValidButMaybeNotPresent() throws Exception
{
453 verifyFileInDirWithStoragePolicy("ALL_SSD");
456 final String INVALID_STORAGE_POLICY
= "1772";
458 /* should log a warning, but still work. (different warning on Hadoop < 2.6.0) */
460 public void testSetStoragePolicyInvalid() throws Exception
{
461 verifyFileInDirWithStoragePolicy(INVALID_STORAGE_POLICY
);
464 // Here instead of TestCommonFSUtils because we need a minicluster
465 private void verifyFileInDirWithStoragePolicy(final String policy
) throws Exception
{
466 conf
.set(HConstants
.WAL_STORAGE_POLICY
, policy
);
468 MiniDFSCluster cluster
= htu
.startMiniDFSCluster(1);
470 assertTrue(CommonFSUtils
.isHDFS(conf
));
472 FileSystem fs
= FileSystem
.get(conf
);
473 Path testDir
= htu
.getDataTestDirOnTestFS("testArchiveFile");
476 String storagePolicy
=
477 conf
.get(HConstants
.WAL_STORAGE_POLICY
, HConstants
.DEFAULT_WAL_STORAGE_POLICY
);
478 CommonFSUtils
.setStoragePolicy(fs
, testDir
, storagePolicy
);
480 String file
= HBaseTestingUtil
.getRandomUUID().toString();
481 Path p
= new Path(testDir
, file
);
482 WriteDataToHDFS(fs
, p
, 4096);
483 HFileSystem hfs
= new HFileSystem(fs
);
484 String policySet
= hfs
.getStoragePolicyName(p
);
485 LOG
.debug("The storage policy of path " + p
+ " is " + policySet
);
486 if (policy
.equals(HConstants
.DEFER_TO_HDFS_STORAGE_POLICY
)
487 || policy
.equals(INVALID_STORAGE_POLICY
)) {
488 String hdfsDefaultPolicy
= hfs
.getStoragePolicyName(hfs
.getHomeDirectory());
489 LOG
.debug("The default hdfs storage policy (indicated by home path: "
490 + hfs
.getHomeDirectory() + ") is " + hdfsDefaultPolicy
);
491 Assert
.assertEquals(hdfsDefaultPolicy
, policySet
);
493 Assert
.assertEquals(policy
, policySet
);
495 // will assert existence before deleting.
496 cleanupFile(fs
, testDir
);
503 * Ugly test that ensures we can get at the hedged read counters in dfsclient.
504 * Does a bit of preading with hedged reads enabled using code taken from hdfs TestPread.
507 @Test public void testDFSHedgedReadMetrics() throws Exception
{
508 // Enable hedged reads and set it so the threshold is really low.
509 // Most of this test is taken from HDFS, from TestPread.
510 conf
.setInt(DFSConfigKeys
.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE
, 5);
511 conf
.setLong(DFSConfigKeys
.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS
, 0);
512 conf
.setLong(DFSConfigKeys
.DFS_BLOCK_SIZE_KEY
, 4096);
513 conf
.setLong(DFSConfigKeys
.DFS_CLIENT_READ_PREFETCH_SIZE_KEY
, 4096);
514 // Set short retry timeouts so this test runs faster
515 conf
.setInt(DFSConfigKeys
.DFS_CLIENT_RETRY_WINDOW_BASE
, 0);
516 conf
.setBoolean("dfs.datanode.transferTo.allowed", false);
517 MiniDFSCluster cluster
= new MiniDFSCluster
.Builder(conf
).numDataNodes(3).build();
518 // Get the metrics. Should be empty.
519 DFSHedgedReadMetrics metrics
= FSUtils
.getDFSHedgedReadMetrics(conf
);
520 assertEquals(0, metrics
.getHedgedReadOps());
521 FileSystem fileSys
= cluster
.getFileSystem();
523 Path p
= new Path("preadtest.dat");
524 // We need > 1 blocks to test out the hedged reads.
525 DFSTestUtil
.createFile(fileSys
, p
, 12 * blockSize
, 12 * blockSize
,
526 blockSize
, (short) 3, seed
);
527 pReadFile(fileSys
, p
);
528 cleanupFile(fileSys
, p
);
529 assertTrue(metrics
.getHedgedReadOps() > 0);
538 public void testCopyFilesParallel() throws Exception
{
539 MiniDFSCluster cluster
= htu
.startMiniDFSCluster(1);
540 cluster
.waitActive();
541 FileSystem fs
= cluster
.getFileSystem();
542 Path src
= new Path("/src");
544 for (int i
= 0; i
< 50; i
++) {
545 WriteDataToHDFS(fs
, new Path(src
, String
.valueOf(i
)), 1024);
547 Path sub
= new Path(src
, "sub");
549 for (int i
= 0; i
< 50; i
++) {
550 WriteDataToHDFS(fs
, new Path(sub
, String
.valueOf(i
)), 1024);
552 Path dst
= new Path("/dst");
553 List
<Path
> allFiles
= FSUtils
.copyFilesParallel(fs
, src
, fs
, dst
, conf
, 4);
555 assertEquals(102, allFiles
.size());
556 FileStatus
[] list
= fs
.listStatus(dst
);
557 assertEquals(51, list
.length
);
558 FileStatus
[] sublist
= fs
.listStatus(new Path(dst
, "sub"));
559 assertEquals(50, sublist
.length
);
562 // Below is taken from TestPread over in HDFS.
563 static final int blockSize
= 4096;
564 static final long seed
= 0xDEADBEEFL
;
565 private Random rand
= new Random(); // This test depends on Random#setSeed
567 private void pReadFile(FileSystem fileSys
, Path name
) throws IOException
{
568 FSDataInputStream stm
= fileSys
.open(name
);
569 byte[] expected
= new byte[12 * blockSize
];
571 rand
.nextBytes(expected
);
572 // do a sanity check. Read first 4K bytes
573 byte[] actual
= new byte[4096];
574 stm
.readFully(actual
);
575 checkAndEraseData(actual
, 0, expected
, "Read Sanity Test");
576 // now do a pread for the first 8K bytes
577 actual
= new byte[8192];
578 doPread(stm
, 0L, actual
, 0, 8192);
579 checkAndEraseData(actual
, 0, expected
, "Pread Test 1");
580 // Now check to see if the normal read returns 4K-8K byte range
581 actual
= new byte[4096];
582 stm
.readFully(actual
);
583 checkAndEraseData(actual
, 4096, expected
, "Pread Test 2");
584 // Now see if we can cross a single block boundary successfully
585 // read 4K bytes from blockSize - 2K offset
586 stm
.readFully(blockSize
- 2048, actual
, 0, 4096);
587 checkAndEraseData(actual
, (blockSize
- 2048), expected
, "Pread Test 3");
588 // now see if we can cross two block boundaries successfully
589 // read blockSize + 4K bytes from blockSize - 2K offset
590 actual
= new byte[blockSize
+ 4096];
591 stm
.readFully(blockSize
- 2048, actual
);
592 checkAndEraseData(actual
, (blockSize
- 2048), expected
, "Pread Test 4");
593 // now see if we can cross two block boundaries that are not cached
594 // read blockSize + 4K bytes from 10*blockSize - 2K offset
595 actual
= new byte[blockSize
+ 4096];
596 stm
.readFully(10 * blockSize
- 2048, actual
);
597 checkAndEraseData(actual
, (10 * blockSize
- 2048), expected
, "Pread Test 5");
598 // now check that even after all these preads, we can still read
600 actual
= new byte[4096];
601 stm
.readFully(actual
);
602 checkAndEraseData(actual
, 8192, expected
, "Pread Test 6");
605 // check block location caching
606 stm
= fileSys
.open(name
);
607 stm
.readFully(1, actual
, 0, 4096);
608 stm
.readFully(4*blockSize
, actual
, 0, 4096);
609 stm
.readFully(7*blockSize
, actual
, 0, 4096);
610 actual
= new byte[3*4096];
611 stm
.readFully(0*blockSize
, actual
, 0, 3*4096);
612 checkAndEraseData(actual
, 0, expected
, "Pread Test 7");
613 actual
= new byte[8*4096];
614 stm
.readFully(3*blockSize
, actual
, 0, 8*4096);
615 checkAndEraseData(actual
, 3*blockSize
, expected
, "Pread Test 8");
617 stm
.readFully(11*blockSize
+blockSize
/2, actual
, 0, blockSize
/2);
618 IOException res
= null;
619 try { // read beyond the end of the file
620 stm
.readFully(11*blockSize
+blockSize
/2, actual
, 0, blockSize
);
621 } catch (IOException e
) {
622 // should throw an exception
625 assertTrue("Error reading beyond file boundary.", res
!= null);
630 private void checkAndEraseData(byte[] actual
, int from
, byte[] expected
, String message
) {
631 for (int idx
= 0; idx
< actual
.length
; idx
++) {
632 assertEquals(message
+" byte "+(from
+idx
)+" differs. expected "+
633 expected
[from
+idx
]+" actual "+actual
[idx
],
634 actual
[idx
], expected
[from
+idx
]);
639 private void doPread(FSDataInputStream stm
, long position
, byte[] buffer
,
640 int offset
, int length
) throws IOException
{
642 // long totalRead = 0;
643 // DFSInputStream dfstm = null;
645 /* Disable. This counts do not add up. Some issue in original hdfs tests?
646 if (stm.getWrappedStream() instanceof DFSInputStream) {
647 dfstm = (DFSInputStream) (stm.getWrappedStream());
648 totalRead = dfstm.getReadStatistics().getTotalBytesRead();
651 while (nread
< length
) {
653 stm
.read(position
+ nread
, buffer
, offset
+ nread
, length
- nread
);
654 assertTrue("Error in pread", nbytes
> 0);
658 /* Disable. This counts do not add up. Some issue in original hdfs tests?
661 assertTrue("Expected read statistic to be incremented",
662 length <= dfstm.getReadStatistics().getTotalBytesRead() - totalRead);
664 assertEquals("Expected read statistic to be incremented", length, dfstm
665 .getReadStatistics().getTotalBytesRead() - totalRead);
670 private void cleanupFile(FileSystem fileSys
, Path name
) throws IOException
{
671 assertTrue(fileSys
.exists(name
));
672 assertTrue(fileSys
.delete(name
, true));
673 assertTrue(!fileSys
.exists(name
));
679 Class
.forName("org.apache.hadoop.fs.StreamCapabilities");
680 LOG
.debug("Test thought StreamCapabilities class was present.");
681 } catch (ClassNotFoundException exception
) {
682 LOG
.debug("Test didn't think StreamCapabilities class was present.");
686 // Here instead of TestCommonFSUtils because we need a minicluster
688 public void checkStreamCapabilitiesOnHdfsDataOutputStream() throws Exception
{
689 MiniDFSCluster cluster
= htu
.startMiniDFSCluster(1);
690 try (FileSystem filesystem
= cluster
.getFileSystem()) {
691 FSDataOutputStream stream
= filesystem
.create(new Path("/tmp/foobar"));
692 assertTrue(stream
.hasCapability(StreamCapabilities
.HSYNC
));
693 assertTrue(stream
.hasCapability(StreamCapabilities
.HFLUSH
));
694 assertFalse(stream
.hasCapability("a capability that hopefully HDFS doesn't add."));
700 private void testIsSameHdfs(int nnport
) throws IOException
{
701 Configuration conf
= HBaseConfiguration
.create();
702 Path srcPath
= new Path("hdfs://localhost:" + nnport
+ "/");
703 Path desPath
= new Path("hdfs://127.0.0.1/");
704 FileSystem srcFs
= srcPath
.getFileSystem(conf
);
705 FileSystem desFs
= desPath
.getFileSystem(conf
);
707 assertTrue(FSUtils
.isSameHdfs(conf
, srcFs
, desFs
));
709 desPath
= new Path("hdfs://127.0.0.1:8070/");
710 desFs
= desPath
.getFileSystem(conf
);
711 assertTrue(!FSUtils
.isSameHdfs(conf
, srcFs
, desFs
));
713 desPath
= new Path("hdfs://127.0.1.1:" + nnport
+ "/");
714 desFs
= desPath
.getFileSystem(conf
);
715 assertTrue(!FSUtils
.isSameHdfs(conf
, srcFs
, desFs
));
717 conf
.set("fs.defaultFS", "hdfs://haosong-hadoop");
718 conf
.set("dfs.nameservices", "haosong-hadoop");
719 conf
.set("dfs.ha.namenodes.haosong-hadoop", "nn1,nn2");
720 conf
.set("dfs.client.failover.proxy.provider.haosong-hadoop",
721 "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
723 conf
.set("dfs.namenode.rpc-address.haosong-hadoop.nn1", "127.0.0.1:" + nnport
);
724 conf
.set("dfs.namenode.rpc-address.haosong-hadoop.nn2", "127.10.2.1:8000");
725 desPath
= new Path("/");
726 desFs
= desPath
.getFileSystem(conf
);
727 assertTrue(FSUtils
.isSameHdfs(conf
, srcFs
, desFs
));
729 conf
.set("dfs.namenode.rpc-address.haosong-hadoop.nn1", "127.10.2.1:" + nnport
);
730 conf
.set("dfs.namenode.rpc-address.haosong-hadoop.nn2", "127.0.0.1:8000");
731 desPath
= new Path("/");
732 desFs
= desPath
.getFileSystem(conf
);
733 assertTrue(!FSUtils
.isSameHdfs(conf
, srcFs
, desFs
));
737 public void testIsSameHdfs() throws IOException
{
738 String hadoopVersion
= org
.apache
.hadoop
.util
.VersionInfo
.getVersion();
739 LOG
.info("hadoop version is: " + hadoopVersion
);
740 boolean isHadoop3_0_0
= hadoopVersion
.startsWith("3.0.0");
742 // Hadoop 3.0.0 alpha1+ ~ 3.0.0 GA changed default nn port to 9820.
744 testIsSameHdfs(9820);
746 // pre hadoop 3.0.0 defaults to port 8020
747 // Hadoop 3.0.1 changed it back to port 8020. See HDFS-12990
748 testIsSameHdfs(8020);