HBASE-26412 Handle sink failure in RegionReplicationSink (#3815)
[hbase.git] / hbase-server / src / main / java / org / apache / hadoop / hbase / regionserver / InputStreamBlockDistribution.java
blobaa15cda922d772d7176b287b4e47dd7cc101cf30
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.apache.hadoop.hbase.regionserver;
18 import com.google.errorprone.annotations.RestrictedApi;
19 import java.io.IOException;
20 import org.apache.hadoop.conf.Configuration;
21 import org.apache.hadoop.fs.FSDataInputStream;
22 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
23 import org.apache.hadoop.hbase.io.FileLink;
24 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
25 import org.apache.hadoop.hbase.util.FSUtils;
26 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
27 import org.apache.yetus.audience.InterfaceAudience;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
31 /**
32 * Computes the HDFSBlockDistribution for a file based on the underlying located blocks
33 * for an HdfsDataInputStream reading that file. The backing DFSInputStream.getAllBlocks involves
34 * allocating an array of numBlocks size per call. It may also involve calling the namenode, if
35 * the DFSInputStream has not fetched all the blocks yet. In order to avoid allocation pressure,
36 * we cache the computed distribution for a configurable period of time.
37 * <p>
38 * This class only gets instantiated for the <b>first</b> FSDataInputStream of each StoreFile (i.e.
39 * the one backing {@link HStoreFile#initialReader}). It's then used to dynamically update the
40 * value returned by {@link HStoreFile#getHDFSBlockDistribution()}.
41 * <p>
42 * Once the backing FSDataInputStream is closed, we should not expect the distribution result
43 * to change anymore. This is ok becuase the initialReader's InputStream is only closed when the
44 * StoreFile itself is closed, at which point nothing will be querying getHDFSBlockDistribution
45 * anymore. If/When the StoreFile is reopened, a new {@link InputStreamBlockDistribution} will
46 * be created for the new initialReader.
48 @InterfaceAudience.Private
49 public class InputStreamBlockDistribution {
50 private static final Logger LOG = LoggerFactory.getLogger(InputStreamBlockDistribution.class);
52 private static final String HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED =
53 "hbase.locality.inputstream.derive.enabled";
54 private static final boolean DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED = false;
56 private static final String HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD =
57 "hbase.locality.inputstream.derive.cache.period";
58 private static final int DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD = 60_000;
60 private final FSDataInputStream stream;
61 private final StoreFileInfo fileInfo;
62 private final int cachePeriodMs;
64 private HDFSBlocksDistribution hdfsBlocksDistribution;
65 private long lastCachedAt;
66 private boolean streamUnsupported;
68 /**
69 * This should only be called for the first FSDataInputStream of a StoreFile,
70 * in {@link HStoreFile#open()}.
72 * @see InputStreamBlockDistribution
73 * @param stream the input stream to derive locality from
74 * @param fileInfo the StoreFileInfo for the related store file
76 public InputStreamBlockDistribution(FSDataInputStream stream, StoreFileInfo fileInfo) {
77 this.stream = stream;
78 this.fileInfo = fileInfo;
79 this.cachePeriodMs = fileInfo.getConf().getInt(
80 HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD,
81 DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD);
82 this.lastCachedAt = EnvironmentEdgeManager.currentTime();
83 this.streamUnsupported = false;
84 this.hdfsBlocksDistribution = fileInfo.getHDFSBlockDistribution();
87 /**
88 * True if we should derive StoreFile HDFSBlockDistribution from the underlying input stream
90 public static boolean isEnabled(Configuration conf) {
91 return conf.getBoolean(HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED,
92 DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED);
95 /**
96 * Get the HDFSBlocksDistribution derived from the StoreFile input stream, re-computing if cache
97 * is expired.
99 public synchronized HDFSBlocksDistribution getHDFSBlockDistribution() {
100 if (EnvironmentEdgeManager.currentTime() - lastCachedAt > cachePeriodMs) {
101 try {
102 LOG.debug("Refreshing HDFSBlockDistribution for {}", fileInfo);
103 computeBlockDistribution();
104 } catch (IOException e) {
105 LOG.warn("Failed to recompute block distribution for {}. Falling back on cached value.",
106 fileInfo, e);
109 return hdfsBlocksDistribution;
112 private void computeBlockDistribution() throws IOException {
113 lastCachedAt = EnvironmentEdgeManager.currentTime();
115 FSDataInputStream stream;
116 if (fileInfo.isLink()) {
117 stream = FileLink.getUnderlyingFileLinkInputStream(this.stream);
118 } else {
119 stream = this.stream;
122 if (!(stream instanceof HdfsDataInputStream)) {
123 if (!streamUnsupported) {
124 LOG.warn("{} for storeFileInfo={}, isLink={}, is not an HdfsDataInputStream so cannot be "
125 + "used to derive locality. Falling back on cached value.",
126 stream, fileInfo, fileInfo.isLink());
127 streamUnsupported = true;
129 return;
132 streamUnsupported = false;
133 hdfsBlocksDistribution = FSUtils.computeHDFSBlocksDistribution((HdfsDataInputStream) stream);
137 * For tests only, sets lastCachedAt so we can force a refresh
139 @RestrictedApi(explanation = "Should only be called in tests", link = "",
140 allowedOnPath = ".*/src/test/.*")
141 synchronized void setLastCachedAt(long timestamp) {
142 lastCachedAt = timestamp;
146 * For tests only, returns the configured cache period
148 @RestrictedApi(explanation = "Should only be called in tests", link = "",
149 allowedOnPath = ".*/src/test/.*")
150 long getCachePeriodMs() {
151 return cachePeriodMs;
155 * For tests only, returns whether the passed stream is supported
157 @RestrictedApi(explanation = "Should only be called in tests", link = "",
158 allowedOnPath = ".*/src/test/.*")
159 boolean isStreamUnsupported() {
160 return streamUnsupported;