2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.util
;
20 import static org
.junit
.Assert
.assertEquals
;
21 import static org
.junit
.Assert
.assertFalse
;
22 import static org
.junit
.Assert
.assertNotEquals
;
23 import static org
.junit
.Assert
.assertNotNull
;
24 import static org
.junit
.Assert
.assertNull
;
25 import static org
.junit
.Assert
.assertTrue
;
28 import java
.io
.IOException
;
29 import java
.util
.List
;
30 import java
.util
.Random
;
31 import org
.apache
.hadoop
.conf
.Configuration
;
32 import org
.apache
.hadoop
.fs
.FSDataInputStream
;
33 import org
.apache
.hadoop
.fs
.FSDataOutputStream
;
34 import org
.apache
.hadoop
.fs
.FileStatus
;
35 import org
.apache
.hadoop
.fs
.FileSystem
;
36 import org
.apache
.hadoop
.fs
.Path
;
37 import org
.apache
.hadoop
.fs
.StreamCapabilities
;
38 import org
.apache
.hadoop
.fs
.permission
.FsPermission
;
39 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
40 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
41 import org
.apache
.hadoop
.hbase
.HConstants
;
42 import org
.apache
.hadoop
.hbase
.HDFSBlocksDistribution
;
43 import org
.apache
.hadoop
.hbase
.client
.RegionInfoBuilder
;
44 import org
.apache
.hadoop
.hbase
.exceptions
.DeserializationException
;
45 import org
.apache
.hadoop
.hbase
.fs
.HFileSystem
;
46 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
47 import org
.apache
.hadoop
.hbase
.testclassification
.MiscTests
;
48 import org
.apache
.hadoop
.hdfs
.DFSConfigKeys
;
49 import org
.apache
.hadoop
.hdfs
.DFSHedgedReadMetrics
;
50 import org
.apache
.hadoop
.hdfs
.DFSTestUtil
;
51 import org
.apache
.hadoop
.hdfs
.DistributedFileSystem
;
52 import org
.apache
.hadoop
.hdfs
.MiniDFSCluster
;
53 import org
.junit
.Assert
;
54 import org
.junit
.Before
;
55 import org
.junit
.ClassRule
;
56 import org
.junit
.Test
;
57 import org
.junit
.experimental
.categories
.Category
;
58 import org
.slf4j
.Logger
;
59 import org
.slf4j
.LoggerFactory
;
62 * Test {@link FSUtils}.
64 @Category({MiscTests
.class, MediumTests
.class})
65 public class TestFSUtils
{
68 public static final HBaseClassTestRule CLASS_RULE
=
69 HBaseClassTestRule
.forClass(TestFSUtils
.class);
71 private static final Logger LOG
= LoggerFactory
.getLogger(TestFSUtils
.class);
73 private HBaseTestingUtility htu
;
74 private FileSystem fs
;
75 private Configuration conf
;
78 public void setUp() throws IOException
{
79 htu
= new HBaseTestingUtility();
80 fs
= htu
.getTestFileSystem();
81 conf
= htu
.getConfiguration();
84 @Test public void testIsHDFS() throws Exception
{
85 assertFalse(FSUtils
.isHDFS(conf
));
86 MiniDFSCluster cluster
= null;
88 cluster
= htu
.startMiniDFSCluster(1);
89 assertTrue(FSUtils
.isHDFS(conf
));
91 if (cluster
!= null) cluster
.shutdown();
95 private void WriteDataToHDFS(FileSystem fs
, Path file
, int dataSize
)
97 FSDataOutputStream out
= fs
.create(file
);
98 byte [] data
= new byte[dataSize
];
99 out
.write(data
, 0, dataSize
);
103 @Test public void testcomputeHDFSBlocksDistribution() throws Exception
{
104 final int DEFAULT_BLOCK_SIZE
= 1024;
105 conf
.setLong("dfs.blocksize", DEFAULT_BLOCK_SIZE
);
106 MiniDFSCluster cluster
= null;
107 Path testFile
= null;
110 // set up a cluster with 3 nodes
111 String hosts
[] = new String
[] { "host1", "host2", "host3" };
112 cluster
= htu
.startMiniDFSCluster(hosts
);
113 cluster
.waitActive();
114 FileSystem fs
= cluster
.getFileSystem();
116 // create a file with two blocks
117 testFile
= new Path("/test1.txt");
118 WriteDataToHDFS(fs
, testFile
, 2*DEFAULT_BLOCK_SIZE
);
120 // given the default replication factor is 3, the same as the number of
121 // datanodes; the locality index for each host should be 100%,
122 // or getWeight for each host should be the same as getUniqueBlocksWeights
123 final long maxTime
= System
.currentTimeMillis() + 2000;
127 FileStatus status
= fs
.getFileStatus(testFile
);
128 HDFSBlocksDistribution blocksDistribution
=
129 FSUtils
.computeHDFSBlocksDistribution(fs
, status
, 0, status
.getLen());
130 long uniqueBlocksTotalWeight
=
131 blocksDistribution
.getUniqueBlocksTotalWeight();
132 for (String host
: hosts
) {
133 long weight
= blocksDistribution
.getWeight(host
);
134 ok
= (ok
&& uniqueBlocksTotalWeight
== weight
);
136 } while (!ok
&& System
.currentTimeMillis() < maxTime
);
139 htu
.shutdownMiniDFSCluster();
144 // set up a cluster with 4 nodes
145 String hosts
[] = new String
[] { "host1", "host2", "host3", "host4" };
146 cluster
= htu
.startMiniDFSCluster(hosts
);
147 cluster
.waitActive();
148 FileSystem fs
= cluster
.getFileSystem();
150 // create a file with three blocks
151 testFile
= new Path("/test2.txt");
152 WriteDataToHDFS(fs
, testFile
, 3*DEFAULT_BLOCK_SIZE
);
154 // given the default replication factor is 3, we will have total of 9
155 // replica of blocks; thus the host with the highest weight should have
156 // weight == 3 * DEFAULT_BLOCK_SIZE
157 final long maxTime
= System
.currentTimeMillis() + 2000;
159 long uniqueBlocksTotalWeight
;
161 FileStatus status
= fs
.getFileStatus(testFile
);
162 HDFSBlocksDistribution blocksDistribution
=
163 FSUtils
.computeHDFSBlocksDistribution(fs
, status
, 0, status
.getLen());
164 uniqueBlocksTotalWeight
= blocksDistribution
.getUniqueBlocksTotalWeight();
166 String tophost
= blocksDistribution
.getTopHosts().get(0);
167 weight
= blocksDistribution
.getWeight(tophost
);
169 // NameNode is informed asynchronously, so we may have a delay. See HBASE-6175
170 } while (uniqueBlocksTotalWeight
!= weight
&& System
.currentTimeMillis() < maxTime
);
171 assertTrue(uniqueBlocksTotalWeight
== weight
);
174 htu
.shutdownMiniDFSCluster();
179 // set up a cluster with 4 nodes
180 String hosts
[] = new String
[] { "host1", "host2", "host3", "host4" };
181 cluster
= htu
.startMiniDFSCluster(hosts
);
182 cluster
.waitActive();
183 FileSystem fs
= cluster
.getFileSystem();
185 // create a file with one block
186 testFile
= new Path("/test3.txt");
187 WriteDataToHDFS(fs
, testFile
, DEFAULT_BLOCK_SIZE
);
189 // given the default replication factor is 3, we will have total of 3
190 // replica of blocks; thus there is one host without weight
191 final long maxTime
= System
.currentTimeMillis() + 2000;
192 HDFSBlocksDistribution blocksDistribution
;
194 FileStatus status
= fs
.getFileStatus(testFile
);
195 blocksDistribution
= FSUtils
.computeHDFSBlocksDistribution(fs
, status
, 0, status
.getLen());
196 // NameNode is informed asynchronously, so we may have a delay. See HBASE-6175
198 while (blocksDistribution
.getTopHosts().size() != 3 && System
.currentTimeMillis() < maxTime
);
199 assertEquals("Wrong number of hosts distributing blocks.", 3,
200 blocksDistribution
.getTopHosts().size());
202 htu
.shutdownMiniDFSCluster();
206 private void writeVersionFile(Path versionFile
, String version
) throws IOException
{
207 if (FSUtils
.isExists(fs
, versionFile
)) {
208 assertTrue(FSUtils
.delete(fs
, versionFile
, true));
210 try (FSDataOutputStream s
= fs
.create(versionFile
)) {
213 assertTrue(fs
.exists(versionFile
));
217 public void testVersion() throws DeserializationException
, IOException
{
218 final Path rootdir
= htu
.getDataTestDir();
219 final FileSystem fs
= rootdir
.getFileSystem(conf
);
220 assertNull(FSUtils
.getVersion(fs
, rootdir
));
221 // No meta dir so no complaint from checkVersion.
222 // Presumes it a new install. Will create version file.
223 FSUtils
.checkVersion(fs
, rootdir
, true);
224 // Now remove the version file and create a metadir so checkVersion fails.
225 Path versionFile
= new Path(rootdir
, HConstants
.VERSION_FILE_NAME
);
226 assertTrue(FSUtils
.isExists(fs
, versionFile
));
227 assertTrue(FSUtils
.delete(fs
, versionFile
, true));
229 FSUtils
.getRegionDirFromRootDir(rootdir
, RegionInfoBuilder
.FIRST_META_REGIONINFO
);
230 FsPermission defaultPerms
= FSUtils
.getFilePermissions(fs
, this.conf
,
231 HConstants
.DATA_FILE_UMASK_KEY
);
232 FSUtils
.create(fs
, metaRegionDir
, defaultPerms
, false);
233 boolean thrown
= false;
235 FSUtils
.checkVersion(fs
, rootdir
, true);
236 } catch (FileSystemVersionException e
) {
239 assertTrue("Expected FileSystemVersionException", thrown
);
240 // Write out a good version file. See if we can read it in and convert.
241 String version
= HConstants
.FILE_SYSTEM_VERSION
;
242 writeVersionFile(versionFile
, version
);
243 FileStatus
[] status
= fs
.listStatus(versionFile
);
244 assertNotNull(status
);
245 assertTrue(status
.length
> 0);
246 String newVersion
= FSUtils
.getVersion(fs
, rootdir
);
247 assertEquals(version
.length(), newVersion
.length());
248 assertEquals(version
, newVersion
);
249 // File will have been converted. Exercise the pb format
250 assertEquals(version
, FSUtils
.getVersion(fs
, rootdir
));
251 FSUtils
.checkVersion(fs
, rootdir
, true);
252 // Write an old version file.
253 String oldVersion
= "1";
254 writeVersionFile(versionFile
, oldVersion
);
255 newVersion
= FSUtils
.getVersion(fs
, rootdir
);
256 assertNotEquals(version
, newVersion
);
259 FSUtils
.checkVersion(fs
, rootdir
, true);
260 } catch (FileSystemVersionException e
) {
263 assertTrue("Expected FileSystemVersionException", thrown
);
267 public void testPermMask() throws Exception
{
268 final Path rootdir
= htu
.getDataTestDir();
269 final FileSystem fs
= rootdir
.getFileSystem(conf
);
270 // default fs permission
271 FsPermission defaultFsPerm
= FSUtils
.getFilePermissions(fs
, conf
,
272 HConstants
.DATA_FILE_UMASK_KEY
);
273 // 'hbase.data.umask.enable' is false. We will get default fs permission.
274 assertEquals(FsPermission
.getFileDefault(), defaultFsPerm
);
276 conf
.setBoolean(HConstants
.ENABLE_DATA_FILE_UMASK
, true);
277 // first check that we don't crash if we don't have perms set
278 FsPermission defaultStartPerm
= FSUtils
.getFilePermissions(fs
, conf
,
279 HConstants
.DATA_FILE_UMASK_KEY
);
280 // default 'hbase.data.umask'is 000, and this umask will be used when
281 // 'hbase.data.umask.enable' is true.
282 // Therefore we will not get the real fs default in this case.
283 // Instead we will get the starting point FULL_RWX_PERMISSIONS
284 assertEquals(new FsPermission(FSUtils
.FULL_RWX_PERMISSIONS
), defaultStartPerm
);
286 conf
.setStrings(HConstants
.DATA_FILE_UMASK_KEY
, "077");
287 // now check that we get the right perms
288 FsPermission filePerm
= FSUtils
.getFilePermissions(fs
, conf
,
289 HConstants
.DATA_FILE_UMASK_KEY
);
290 assertEquals(new FsPermission("700"), filePerm
);
292 // then that the correct file is created
293 Path p
= new Path("target" + File
.separator
+ htu
.getRandomUUID().toString());
295 FSDataOutputStream out
= FSUtils
.create(conf
, fs
, p
, filePerm
, null);
297 FileStatus stat
= fs
.getFileStatus(p
);
298 assertEquals(new FsPermission("700"), stat
.getPermission());
306 public void testDeleteAndExists() throws Exception
{
307 final Path rootdir
= htu
.getDataTestDir();
308 final FileSystem fs
= rootdir
.getFileSystem(conf
);
309 conf
.setBoolean(HConstants
.ENABLE_DATA_FILE_UMASK
, true);
310 FsPermission perms
= FSUtils
.getFilePermissions(fs
, conf
, HConstants
.DATA_FILE_UMASK_KEY
);
311 // then that the correct file is created
312 String file
= htu
.getRandomUUID().toString();
313 Path p
= new Path(htu
.getDataTestDir(), "temptarget" + File
.separator
+ file
);
314 Path p1
= new Path(htu
.getDataTestDir(), "temppath" + File
.separator
+ file
);
316 FSDataOutputStream out
= FSUtils
.create(conf
, fs
, p
, perms
, null);
318 assertTrue("The created file should be present", FSUtils
.isExists(fs
, p
));
319 // delete the file with recursion as false. Only the file will be deleted.
320 FSUtils
.delete(fs
, p
, false);
321 // Create another file
322 FSDataOutputStream out1
= FSUtils
.create(conf
, fs
, p1
, perms
, null);
324 // delete the file with recursion as false. Still the file only will be deleted
325 FSUtils
.delete(fs
, p1
, true);
326 assertFalse("The created file should be present", FSUtils
.isExists(fs
, p1
));
329 FSUtils
.delete(fs
, p
, true);
330 FSUtils
.delete(fs
, p1
, true);
335 public void testFilteredStatusDoesNotThrowOnNotFound() throws Exception
{
336 MiniDFSCluster cluster
= htu
.startMiniDFSCluster(1);
338 assertNull(FSUtils
.listStatusWithStatusFilter(cluster
.getFileSystem(), new Path("definitely/doesn't/exist"), null));
346 public void testRenameAndSetModifyTime() throws Exception
{
347 MiniDFSCluster cluster
= htu
.startMiniDFSCluster(1);
348 assertTrue(FSUtils
.isHDFS(conf
));
350 FileSystem fs
= FileSystem
.get(conf
);
351 Path testDir
= htu
.getDataTestDirOnTestFS("testArchiveFile");
353 String file
= htu
.getRandomUUID().toString();
354 Path p
= new Path(testDir
, file
);
356 FSDataOutputStream out
= fs
.create(p
);
358 assertTrue("The created file should be present", FSUtils
.isExists(fs
, p
));
360 long expect
= System
.currentTimeMillis() + 1000;
361 assertNotEquals(expect
, fs
.getFileStatus(p
).getModificationTime());
363 ManualEnvironmentEdge mockEnv
= new ManualEnvironmentEdge();
364 mockEnv
.setValue(expect
);
365 EnvironmentEdgeManager
.injectEdge(mockEnv
);
367 String dstFile
= htu
.getRandomUUID().toString();
368 Path dst
= new Path(testDir
, dstFile
);
370 assertTrue(FSUtils
.renameAndSetModifyTime(fs
, p
, dst
));
371 assertFalse("The moved file should not be present", FSUtils
.isExists(fs
, p
));
372 assertTrue("The dst file should be present", FSUtils
.isExists(fs
, dst
));
374 assertEquals(expect
, fs
.getFileStatus(dst
).getModificationTime());
377 EnvironmentEdgeManager
.reset();
382 public void testSetStoragePolicyDefault() throws Exception
{
383 verifyNoHDFSApiInvocationForDefaultPolicy();
384 verifyFileInDirWithStoragePolicy(HConstants
.DEFAULT_WAL_STORAGE_POLICY
);
388 * Note: currently the default policy is set to defer to HDFS and this case is to verify the
389 * logic, will need to remove the check if the default policy is changed
391 private void verifyNoHDFSApiInvocationForDefaultPolicy() {
392 FileSystem testFs
= new AlwaysFailSetStoragePolicyFileSystem();
393 // There should be no exception thrown when setting to default storage policy, which indicates
394 // the HDFS API hasn't been called
396 FSUtils
.setStoragePolicy(testFs
, new Path("non-exist"), HConstants
.DEFAULT_WAL_STORAGE_POLICY
,
398 } catch (IOException e
) {
399 Assert
.fail("Should have bypassed the FS API when setting default storage policy");
401 // There should be exception thrown when given non-default storage policy, which indicates the
402 // HDFS API has been called
404 FSUtils
.setStoragePolicy(testFs
, new Path("non-exist"), "HOT", true);
405 Assert
.fail("Should have invoked the FS API but haven't");
406 } catch (IOException e
) {
407 // expected given an invalid path
411 class AlwaysFailSetStoragePolicyFileSystem
extends DistributedFileSystem
{
413 public void setStoragePolicy(final Path src
, final String policyName
)
415 throw new IOException("The setStoragePolicy method is invoked");
419 /* might log a warning, but still work. (always warning on Hadoop < 2.6.0) */
421 public void testSetStoragePolicyValidButMaybeNotPresent() throws Exception
{
422 verifyFileInDirWithStoragePolicy("ALL_SSD");
425 final String INVALID_STORAGE_POLICY
= "1772";
427 /* should log a warning, but still work. (different warning on Hadoop < 2.6.0) */
429 public void testSetStoragePolicyInvalid() throws Exception
{
430 verifyFileInDirWithStoragePolicy(INVALID_STORAGE_POLICY
);
433 // Here instead of TestCommonFSUtils because we need a minicluster
434 private void verifyFileInDirWithStoragePolicy(final String policy
) throws Exception
{
435 conf
.set(HConstants
.WAL_STORAGE_POLICY
, policy
);
437 MiniDFSCluster cluster
= htu
.startMiniDFSCluster(1);
439 assertTrue(FSUtils
.isHDFS(conf
));
441 FileSystem fs
= FileSystem
.get(conf
);
442 Path testDir
= htu
.getDataTestDirOnTestFS("testArchiveFile");
445 String storagePolicy
=
446 conf
.get(HConstants
.WAL_STORAGE_POLICY
, HConstants
.DEFAULT_WAL_STORAGE_POLICY
);
447 FSUtils
.setStoragePolicy(fs
, testDir
, storagePolicy
);
449 String file
=htu
.getRandomUUID().toString();
450 Path p
= new Path(testDir
, file
);
451 WriteDataToHDFS(fs
, p
, 4096);
452 HFileSystem hfs
= new HFileSystem(fs
);
453 String policySet
= hfs
.getStoragePolicyName(p
);
454 LOG
.debug("The storage policy of path " + p
+ " is " + policySet
);
455 if (policy
.equals(HConstants
.DEFER_TO_HDFS_STORAGE_POLICY
)
456 || policy
.equals(INVALID_STORAGE_POLICY
)) {
457 String hdfsDefaultPolicy
= hfs
.getStoragePolicyName(hfs
.getHomeDirectory());
458 LOG
.debug("The default hdfs storage policy (indicated by home path: "
459 + hfs
.getHomeDirectory() + ") is " + hdfsDefaultPolicy
);
460 Assert
.assertEquals(hdfsDefaultPolicy
, policySet
);
462 Assert
.assertEquals(policy
, policySet
);
464 // will assert existance before deleting.
465 cleanupFile(fs
, testDir
);
472 * Ugly test that ensures we can get at the hedged read counters in dfsclient.
473 * Does a bit of preading with hedged reads enabled using code taken from hdfs TestPread.
476 @Test public void testDFSHedgedReadMetrics() throws Exception
{
477 // Enable hedged reads and set it so the threshold is really low.
478 // Most of this test is taken from HDFS, from TestPread.
479 conf
.setInt(DFSConfigKeys
.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE
, 5);
480 conf
.setLong(DFSConfigKeys
.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS
, 0);
481 conf
.setLong(DFSConfigKeys
.DFS_BLOCK_SIZE_KEY
, 4096);
482 conf
.setLong(DFSConfigKeys
.DFS_CLIENT_READ_PREFETCH_SIZE_KEY
, 4096);
483 // Set short retry timeouts so this test runs faster
484 conf
.setInt(DFSConfigKeys
.DFS_CLIENT_RETRY_WINDOW_BASE
, 0);
485 conf
.setBoolean("dfs.datanode.transferTo.allowed", false);
486 MiniDFSCluster cluster
= new MiniDFSCluster
.Builder(conf
).numDataNodes(3).build();
487 // Get the metrics. Should be empty.
488 DFSHedgedReadMetrics metrics
= FSUtils
.getDFSHedgedReadMetrics(conf
);
489 assertEquals(0, metrics
.getHedgedReadOps());
490 FileSystem fileSys
= cluster
.getFileSystem();
492 Path p
= new Path("preadtest.dat");
493 // We need > 1 blocks to test out the hedged reads.
494 DFSTestUtil
.createFile(fileSys
, p
, 12 * blockSize
, 12 * blockSize
,
495 blockSize
, (short) 3, seed
);
496 pReadFile(fileSys
, p
);
497 cleanupFile(fileSys
, p
);
498 assertTrue(metrics
.getHedgedReadOps() > 0);
507 public void testCopyFilesParallel() throws Exception
{
508 MiniDFSCluster cluster
= htu
.startMiniDFSCluster(1);
509 cluster
.waitActive();
510 FileSystem fs
= cluster
.getFileSystem();
511 Path src
= new Path("/src");
513 for (int i
= 0; i
< 50; i
++) {
514 WriteDataToHDFS(fs
, new Path(src
, String
.valueOf(i
)), 1024);
516 Path sub
= new Path(src
, "sub");
518 for (int i
= 0; i
< 50; i
++) {
519 WriteDataToHDFS(fs
, new Path(sub
, String
.valueOf(i
)), 1024);
521 Path dst
= new Path("/dst");
522 List
<Path
> allFiles
= FSUtils
.copyFilesParallel(fs
, src
, fs
, dst
, conf
, 4);
524 assertEquals(102, allFiles
.size());
525 FileStatus
[] list
= fs
.listStatus(dst
);
526 assertEquals(51, list
.length
);
527 FileStatus
[] sublist
= fs
.listStatus(new Path(dst
, "sub"));
528 assertEquals(50, sublist
.length
);
531 // Below is taken from TestPread over in HDFS.
532 static final int blockSize
= 4096;
533 static final long seed
= 0xDEADBEEFL
;
535 private void pReadFile(FileSystem fileSys
, Path name
) throws IOException
{
536 FSDataInputStream stm
= fileSys
.open(name
);
537 byte[] expected
= new byte[12 * blockSize
];
538 Random rand
= new Random(seed
);
539 rand
.nextBytes(expected
);
540 // do a sanity check. Read first 4K bytes
541 byte[] actual
= new byte[4096];
542 stm
.readFully(actual
);
543 checkAndEraseData(actual
, 0, expected
, "Read Sanity Test");
544 // now do a pread for the first 8K bytes
545 actual
= new byte[8192];
546 doPread(stm
, 0L, actual
, 0, 8192);
547 checkAndEraseData(actual
, 0, expected
, "Pread Test 1");
548 // Now check to see if the normal read returns 4K-8K byte range
549 actual
= new byte[4096];
550 stm
.readFully(actual
);
551 checkAndEraseData(actual
, 4096, expected
, "Pread Test 2");
552 // Now see if we can cross a single block boundary successfully
553 // read 4K bytes from blockSize - 2K offset
554 stm
.readFully(blockSize
- 2048, actual
, 0, 4096);
555 checkAndEraseData(actual
, (blockSize
- 2048), expected
, "Pread Test 3");
556 // now see if we can cross two block boundaries successfully
557 // read blockSize + 4K bytes from blockSize - 2K offset
558 actual
= new byte[blockSize
+ 4096];
559 stm
.readFully(blockSize
- 2048, actual
);
560 checkAndEraseData(actual
, (blockSize
- 2048), expected
, "Pread Test 4");
561 // now see if we can cross two block boundaries that are not cached
562 // read blockSize + 4K bytes from 10*blockSize - 2K offset
563 actual
= new byte[blockSize
+ 4096];
564 stm
.readFully(10 * blockSize
- 2048, actual
);
565 checkAndEraseData(actual
, (10 * blockSize
- 2048), expected
, "Pread Test 5");
566 // now check that even after all these preads, we can still read
568 actual
= new byte[4096];
569 stm
.readFully(actual
);
570 checkAndEraseData(actual
, 8192, expected
, "Pread Test 6");
573 // check block location caching
574 stm
= fileSys
.open(name
);
575 stm
.readFully(1, actual
, 0, 4096);
576 stm
.readFully(4*blockSize
, actual
, 0, 4096);
577 stm
.readFully(7*blockSize
, actual
, 0, 4096);
578 actual
= new byte[3*4096];
579 stm
.readFully(0*blockSize
, actual
, 0, 3*4096);
580 checkAndEraseData(actual
, 0, expected
, "Pread Test 7");
581 actual
= new byte[8*4096];
582 stm
.readFully(3*blockSize
, actual
, 0, 8*4096);
583 checkAndEraseData(actual
, 3*blockSize
, expected
, "Pread Test 8");
585 stm
.readFully(11*blockSize
+blockSize
/2, actual
, 0, blockSize
/2);
586 IOException res
= null;
587 try { // read beyond the end of the file
588 stm
.readFully(11*blockSize
+blockSize
/2, actual
, 0, blockSize
);
589 } catch (IOException e
) {
590 // should throw an exception
593 assertTrue("Error reading beyond file boundary.", res
!= null);
598 private void checkAndEraseData(byte[] actual
, int from
, byte[] expected
, String message
) {
599 for (int idx
= 0; idx
< actual
.length
; idx
++) {
600 assertEquals(message
+" byte "+(from
+idx
)+" differs. expected "+
601 expected
[from
+idx
]+" actual "+actual
[idx
],
602 actual
[idx
], expected
[from
+idx
]);
607 private void doPread(FSDataInputStream stm
, long position
, byte[] buffer
,
608 int offset
, int length
) throws IOException
{
610 // long totalRead = 0;
611 // DFSInputStream dfstm = null;
613 /* Disable. This counts do not add up. Some issue in original hdfs tests?
614 if (stm.getWrappedStream() instanceof DFSInputStream) {
615 dfstm = (DFSInputStream) (stm.getWrappedStream());
616 totalRead = dfstm.getReadStatistics().getTotalBytesRead();
619 while (nread
< length
) {
621 stm
.read(position
+ nread
, buffer
, offset
+ nread
, length
- nread
);
622 assertTrue("Error in pread", nbytes
> 0);
626 /* Disable. This counts do not add up. Some issue in original hdfs tests?
629 assertTrue("Expected read statistic to be incremented",
630 length <= dfstm.getReadStatistics().getTotalBytesRead() - totalRead);
632 assertEquals("Expected read statistic to be incremented", length, dfstm
633 .getReadStatistics().getTotalBytesRead() - totalRead);
638 private void cleanupFile(FileSystem fileSys
, Path name
) throws IOException
{
639 assertTrue(fileSys
.exists(name
));
640 assertTrue(fileSys
.delete(name
, true));
641 assertTrue(!fileSys
.exists(name
));
645 private static final boolean STREAM_CAPABILITIES_IS_PRESENT
;
649 Class
.forName("org.apache.hadoop.fs.StreamCapabilities");
651 LOG
.debug("Test thought StreamCapabilities class was present.");
652 } catch (ClassNotFoundException exception
) {
653 LOG
.debug("Test didn't think StreamCapabilities class was present.");
655 STREAM_CAPABILITIES_IS_PRESENT
= tmp
;
659 // Here instead of TestCommonFSUtils because we need a minicluster
661 public void checkStreamCapabilitiesOnHdfsDataOutputStream() throws Exception
{
662 MiniDFSCluster cluster
= htu
.startMiniDFSCluster(1);
663 try (FileSystem filesystem
= cluster
.getFileSystem()) {
664 FSDataOutputStream stream
= filesystem
.create(new Path("/tmp/foobar"));
665 assertTrue(stream
.hasCapability(StreamCapabilities
.HSYNC
));
666 assertTrue(stream
.hasCapability(StreamCapabilities
.HFLUSH
));
667 assertFalse(stream
.hasCapability("a capability that hopefully HDFS doesn't add."));