2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org
.apache
.hadoop
.hbase
.regionserver
;
18 import static org
.junit
.Assert
.assertFalse
;
19 import static org
.junit
.Assert
.assertNotSame
;
20 import static org
.junit
.Assert
.assertSame
;
21 import static org
.junit
.Assert
.assertTrue
;
22 import static org
.mockito
.Mockito
.mock
;
23 import static org
.mockito
.Mockito
.when
;
25 import java
.io
.IOException
;
26 import java
.util
.ArrayList
;
27 import java
.util
.List
;
28 import org
.apache
.hadoop
.conf
.Configuration
;
29 import org
.apache
.hadoop
.fs
.FSDataInputStream
;
30 import org
.apache
.hadoop
.fs
.FSDataOutputStream
;
31 import org
.apache
.hadoop
.fs
.FileSystem
;
32 import org
.apache
.hadoop
.fs
.Path
;
33 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
34 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
35 import org
.apache
.hadoop
.hbase
.HDFSBlocksDistribution
;
36 import org
.apache
.hadoop
.hbase
.io
.FileLink
;
37 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
38 import org
.apache
.hadoop
.hbase
.testclassification
.RegionServerTests
;
39 import org
.apache
.hadoop
.hdfs
.MiniDFSCluster
;
40 import org
.apache
.hadoop
.hdfs
.client
.HdfsDataInputStream
;
41 import org
.junit
.After
;
42 import org
.junit
.Before
;
43 import org
.junit
.ClassRule
;
44 import org
.junit
.Test
;
45 import org
.junit
.experimental
.categories
.Category
;
47 @Category({ RegionServerTests
.class, MediumTests
.class})
48 public class TestInputStreamBlockDistribution
{
51 public static final HBaseClassTestRule CLASS_RULE
=
52 HBaseClassTestRule
.forClass(TestInputStreamBlockDistribution
.class);
54 private Configuration conf
;
55 private FileSystem fs
;
56 private Path testPath
;
59 public void setUp() throws Exception
{
60 HBaseTestingUtil testUtil
= new HBaseTestingUtil();
61 conf
= testUtil
.getConfiguration();
62 conf
.setInt("dfs.blocksize", 1024 * 1024);
63 conf
.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024);
65 testUtil
.startMiniDFSCluster(1);
66 MiniDFSCluster cluster
= testUtil
.getDFSCluster();
67 fs
= cluster
.getFileSystem();
69 testPath
= new Path(testUtil
.getDefaultRootDirPath(), "test.file");
71 writeSomeData(fs
, testPath
, 256 << 20, (byte)2);
75 public void tearDown() throws Exception
{
76 fs
.delete(testPath
, false);
81 public void itDerivesLocalityFromHFileInputStream() throws Exception
{
82 try (FSDataInputStream stream
= fs
.open(testPath
)) {
83 HDFSBlocksDistribution initial
= new HDFSBlocksDistribution();
84 InputStreamBlockDistribution test
=
85 new InputStreamBlockDistribution(stream
, getMockedStoreFileInfo(initial
, false));
87 assertSame(initial
, test
.getHDFSBlockDistribution());
89 test
.setLastCachedAt(test
.getCachePeriodMs() + 1);
91 assertNotSame(initial
, test
.getHDFSBlockDistribution());
97 public void itDerivesLocalityFromFileLinkInputStream() throws Exception
{
98 List
<Path
> files
= new ArrayList
<Path
>();
101 FileLink link
= new FileLink(files
);
102 try (FSDataInputStream stream
= link
.open(fs
)) {
104 HDFSBlocksDistribution initial
= new HDFSBlocksDistribution();
106 InputStreamBlockDistribution test
= new InputStreamBlockDistribution(stream
,
107 getMockedStoreFileInfo(initial
, true));
109 assertSame(initial
, test
.getHDFSBlockDistribution());
111 test
.setLastCachedAt(test
.getCachePeriodMs() + 1);
113 assertNotSame(initial
, test
.getHDFSBlockDistribution());
118 public void itFallsBackOnLastKnownValueWhenUnsupported() {
119 FSDataInputStream fakeStream
= mock(FSDataInputStream
.class);
120 HDFSBlocksDistribution initial
= new HDFSBlocksDistribution();
122 InputStreamBlockDistribution test
= new InputStreamBlockDistribution(fakeStream
,
123 getMockedStoreFileInfo(initial
, false));
125 assertSame(initial
, test
.getHDFSBlockDistribution());
126 test
.setLastCachedAt(test
.getCachePeriodMs() + 1);
128 // fakeStream is not an HdfsDataInputStream or FileLink, so will fail to resolve
129 assertSame(initial
, test
.getHDFSBlockDistribution());
130 assertTrue(test
.isStreamUnsupported());
134 public void itFallsBackOnLastKnownValueOnException() throws IOException
{
135 HdfsDataInputStream fakeStream
= mock(HdfsDataInputStream
.class);
136 when(fakeStream
.getAllBlocks()).thenThrow(new IOException("test"));
138 HDFSBlocksDistribution initial
= new HDFSBlocksDistribution();
140 InputStreamBlockDistribution test
= new InputStreamBlockDistribution(fakeStream
,
141 getMockedStoreFileInfo(initial
, false));
143 assertSame(initial
, test
.getHDFSBlockDistribution());
144 test
.setLastCachedAt(test
.getCachePeriodMs() + 1);
146 // fakeStream throws an exception, so falls back on original
147 assertSame(initial
, test
.getHDFSBlockDistribution());
149 assertFalse(test
.isStreamUnsupported());
153 * Write up to 'size' bytes with value 'v' into a new file called 'path'.
155 private void writeSomeData(FileSystem fs
, Path path
, long size
, byte v
) throws IOException
{
156 byte[] data
= new byte[4096];
157 for (int i
= 0; i
< data
.length
; i
++) {
161 FSDataOutputStream stream
= fs
.create(path
);
164 while (written
< size
) {
165 stream
.write(data
, 0, data
.length
);
166 written
+= data
.length
;
173 private StoreFileInfo
getMockedStoreFileInfo(HDFSBlocksDistribution distribution
,
174 boolean isFileLink
) {
175 StoreFileInfo mock
= mock(StoreFileInfo
.class);
176 when(mock
.getHDFSBlockDistribution())
177 .thenReturn(distribution
);
178 when(mock
.getConf()).thenReturn(conf
);
179 when(mock
.isLink()).thenReturn(isFileLink
);