2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org
.apache
.hadoop
.hbase
.regionserver
;
18 import com
.google
.errorprone
.annotations
.RestrictedApi
;
19 import java
.io
.IOException
;
20 import org
.apache
.hadoop
.conf
.Configuration
;
21 import org
.apache
.hadoop
.fs
.FSDataInputStream
;
22 import org
.apache
.hadoop
.hbase
.HDFSBlocksDistribution
;
23 import org
.apache
.hadoop
.hbase
.io
.FileLink
;
24 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
25 import org
.apache
.hadoop
.hbase
.util
.FSUtils
;
26 import org
.apache
.hadoop
.hdfs
.client
.HdfsDataInputStream
;
27 import org
.apache
.yetus
.audience
.InterfaceAudience
;
28 import org
.slf4j
.Logger
;
29 import org
.slf4j
.LoggerFactory
;
32 * Computes the HDFSBlockDistribution for a file based on the underlying located blocks
33 * for an HdfsDataInputStream reading that file. The backing DFSInputStream.getAllBlocks involves
34 * allocating an array of numBlocks size per call. It may also involve calling the namenode, if
35 * the DFSInputStream has not fetched all the blocks yet. In order to avoid allocation pressure,
36 * we cache the computed distribution for a configurable period of time.
38 * This class only gets instantiated for the <b>first</b> FSDataInputStream of each StoreFile (i.e.
39 * the one backing {@link HStoreFile#initialReader}). It's then used to dynamically update the
40 * value returned by {@link HStoreFile#getHDFSBlockDistribution()}.
42 * Once the backing FSDataInputStream is closed, we should not expect the distribution result
43 * to change anymore. This is ok becuase the initialReader's InputStream is only closed when the
44 * StoreFile itself is closed, at which point nothing will be querying getHDFSBlockDistribution
45 * anymore. If/When the StoreFile is reopened, a new {@link InputStreamBlockDistribution} will
46 * be created for the new initialReader.
48 @InterfaceAudience.Private
49 public class InputStreamBlockDistribution
{
50 private static final Logger LOG
= LoggerFactory
.getLogger(InputStreamBlockDistribution
.class);
52 private static final String HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED
=
53 "hbase.locality.inputstream.derive.enabled";
54 private static final boolean DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED
= false;
56 private static final String HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD
=
57 "hbase.locality.inputstream.derive.cache.period";
58 private static final int DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD
= 60_000
;
60 private final FSDataInputStream stream
;
61 private final StoreFileInfo fileInfo
;
62 private final int cachePeriodMs
;
64 private HDFSBlocksDistribution hdfsBlocksDistribution
;
65 private long lastCachedAt
;
66 private boolean streamUnsupported
;
69 * This should only be called for the first FSDataInputStream of a StoreFile,
70 * in {@link HStoreFile#open()}.
72 * @see InputStreamBlockDistribution
73 * @param stream the input stream to derive locality from
74 * @param fileInfo the StoreFileInfo for the related store file
76 public InputStreamBlockDistribution(FSDataInputStream stream
, StoreFileInfo fileInfo
) {
78 this.fileInfo
= fileInfo
;
79 this.cachePeriodMs
= fileInfo
.getConf().getInt(
80 HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD
,
81 DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD
);
82 this.lastCachedAt
= EnvironmentEdgeManager
.currentTime();
83 this.streamUnsupported
= false;
84 this.hdfsBlocksDistribution
= fileInfo
.getHDFSBlockDistribution();
88 * True if we should derive StoreFile HDFSBlockDistribution from the underlying input stream
90 public static boolean isEnabled(Configuration conf
) {
91 return conf
.getBoolean(HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED
,
92 DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED
);
96 * Get the HDFSBlocksDistribution derived from the StoreFile input stream, re-computing if cache
99 public synchronized HDFSBlocksDistribution
getHDFSBlockDistribution() {
100 if (EnvironmentEdgeManager
.currentTime() - lastCachedAt
> cachePeriodMs
) {
102 LOG
.debug("Refreshing HDFSBlockDistribution for {}", fileInfo
);
103 computeBlockDistribution();
104 } catch (IOException e
) {
105 LOG
.warn("Failed to recompute block distribution for {}. Falling back on cached value.",
109 return hdfsBlocksDistribution
;
112 private void computeBlockDistribution() throws IOException
{
113 lastCachedAt
= EnvironmentEdgeManager
.currentTime();
115 FSDataInputStream stream
;
116 if (fileInfo
.isLink()) {
117 stream
= FileLink
.getUnderlyingFileLinkInputStream(this.stream
);
119 stream
= this.stream
;
122 if (!(stream
instanceof HdfsDataInputStream
)) {
123 if (!streamUnsupported
) {
124 LOG
.warn("{} for storeFileInfo={}, isLink={}, is not an HdfsDataInputStream so cannot be "
125 + "used to derive locality. Falling back on cached value.",
126 stream
, fileInfo
, fileInfo
.isLink());
127 streamUnsupported
= true;
132 streamUnsupported
= false;
133 hdfsBlocksDistribution
= FSUtils
.computeHDFSBlocksDistribution((HdfsDataInputStream
) stream
);
137 * For tests only, sets lastCachedAt so we can force a refresh
139 @RestrictedApi(explanation
= "Should only be called in tests", link
= "",
140 allowedOnPath
= ".*/src/test/.*")
141 synchronized void setLastCachedAt(long timestamp
) {
142 lastCachedAt
= timestamp
;
146 * For tests only, returns the configured cache period
148 @RestrictedApi(explanation
= "Should only be called in tests", link
= "",
149 allowedOnPath
= ".*/src/test/.*")
150 long getCachePeriodMs() {
151 return cachePeriodMs
;
155 * For tests only, returns whether the passed stream is supported
157 @RestrictedApi(explanation
= "Should only be called in tests", link
= "",
158 allowedOnPath
= ".*/src/test/.*")
159 boolean isStreamUnsupported() {
160 return streamUnsupported
;