HBASE-24033 Add ut for loading the corrupt recovered hfiles (#1322)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / wal / IOTestProvider.java
blobd062c77cb336a91598a77d5ac16119bb48922b88
1 /**
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org.apache.hadoop.hbase.wal;
21 import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.DEFAULT_PROVIDER_ID;
22 import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.META_WAL_PROVIDER_ID;
23 import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.Collection;
28 import java.util.Collections;
29 import java.util.List;
30 import java.util.concurrent.atomic.AtomicBoolean;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.fs.FileSystem;
34 import org.apache.hadoop.fs.Path;
35 import org.apache.hadoop.hbase.HConstants;
36 import org.apache.hadoop.hbase.client.RegionInfo;
37 // imports for things that haven't moved from regionserver.wal yet.
38 import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
39 import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
40 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
41 import org.apache.hadoop.hbase.util.CommonFSUtils;
42 import org.apache.hadoop.hbase.wal.WAL.Entry;
43 import org.apache.yetus.audience.InterfaceAudience;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
47 /**
48 * A WAL Provider that returns a single thread safe WAL that optionally can skip parts of our normal
49 * interactions with HDFS.
50 * <p>
51 * This implementation picks a directory in HDFS based on the same mechanisms as the
52 * {@link FSHLogProvider}. Users can configure how much interaction we have with HDFS with the
53 * configuration property "hbase.wal.iotestprovider.operations". The value should be a comma
54 * separated list of allowed operations:
55 * <ul>
56 * <li><em>append</em> : edits will be written to the underlying filesystem</li>
57 * <li><em>sync</em> : wal syncs will result in hflush calls</li>
58 * <li><em>fileroll</em> : roll requests will result in creating a new file on the underlying
59 * filesystem.</li>
60 * </ul>
61 * Additionally, the special cases "all" and "none" are recognized. If ommited, the value defaults
62 * to "all." Behavior is undefined if "all" or "none" are paired with additional values. Behavior is
63 * also undefined if values not listed above are included.
64 * <p>
65 * Only those operations listed will occur between the returned WAL and HDFS. All others will be
66 * no-ops.
67 * <p>
68 * Note that in the case of allowing "append" operations but not allowing "fileroll", the returned
69 * WAL will just keep writing to the same file. This won't avoid all costs associated with file
70 * management over time, becaue the data set size may result in additional HDFS block allocations.
72 @InterfaceAudience.Private
73 public class IOTestProvider implements WALProvider {
74 private static final Logger LOG = LoggerFactory.getLogger(IOTestProvider.class);
76 private static final String ALLOWED_OPERATIONS = "hbase.wal.iotestprovider.operations";
77 private enum AllowedOperations {
78 all,
79 append,
80 sync,
81 fileroll,
82 none
85 private WALFactory factory;
87 private Configuration conf;
89 private volatile FSHLog log;
91 private String providerId;
92 protected AtomicBoolean initialized = new AtomicBoolean(false);
94 private List<WALActionsListener> listeners = new ArrayList<>();
95 /**
96 * @param factory factory that made us, identity used for FS layout. may not be null
97 * @param conf may not be null
98 * @param providerId differentiate between providers from one facotry, used for FS layout. may be
99 * null
101 @Override
102 public void init(WALFactory factory, Configuration conf, String providerId) throws IOException {
103 if (!initialized.compareAndSet(false, true)) {
104 throw new IllegalStateException("WALProvider.init should only be called once.");
106 this.factory = factory;
107 this.conf = conf;
108 this.providerId = providerId != null ? providerId : DEFAULT_PROVIDER_ID;
111 @Override
112 public List<WAL> getWALs() {
113 return Collections.singletonList(log);
116 private FSHLog createWAL() throws IOException {
117 String logPrefix = factory.factoryId + WAL_FILE_NAME_DELIMITER + providerId;
118 return new IOTestWAL(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf),
119 AbstractFSWALProvider.getWALDirectoryName(factory.factoryId),
120 HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners, true, logPrefix,
121 META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null);
124 @Override
125 public WAL getWAL(RegionInfo region) throws IOException {
126 FSHLog log = this.log;
127 if (log != null) {
128 return log;
130 synchronized (this) {
131 log = this.log;
132 if (log == null) {
133 log = createWAL();
134 this.log = log;
137 return log;
140 @Override
141 public void close() throws IOException {
142 FSHLog log = this.log;
143 if (log != null) {
144 log.close();
148 @Override
149 public void shutdown() throws IOException {
150 FSHLog log = this.log;
151 if (log != null) {
152 log.shutdown();
156 private static class IOTestWAL extends FSHLog {
158 private final boolean doFileRolls;
160 // Used to differntiate between roll calls before and after we finish construction.
161 private final boolean initialized;
164 * Create an edit log at the given <code>dir</code> location.
166 * You should never have to load an existing log. If there is a log at
167 * startup, it should have already been processed and deleted by the time the
168 * WAL object is started up.
170 * @param fs filesystem handle
171 * @param rootDir path to where logs and oldlogs
172 * @param logDir dir where wals are stored
173 * @param archiveDir dir where wals are archived
174 * @param conf configuration to use
175 * @param listeners Listeners on WAL events. Listeners passed here will
176 * be registered before we do anything else; e.g. the
177 * Constructor {@link #rollWriter()}.
178 * @param failIfWALExists If true IOException will be thrown if files related to this wal
179 * already exist.
180 * @param prefix should always be hostname and port in distributed env and
181 * it will be URL encoded before being used.
182 * If prefix is null, "wal" will be used
183 * @param suffix will be url encoded. null is treated as empty. non-empty must start with
184 * {@link AbstractFSWALProvider#WAL_FILE_NAME_DELIMITER}
185 * @throws IOException
187 public IOTestWAL(final FileSystem fs, final Path rootDir, final String logDir,
188 final String archiveDir, final Configuration conf,
189 final List<WALActionsListener> listeners,
190 final boolean failIfWALExists, final String prefix, final String suffix)
191 throws IOException {
192 super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
193 Collection<String> operations = conf.getStringCollection(ALLOWED_OPERATIONS);
194 doFileRolls = operations.isEmpty() || operations.contains(AllowedOperations.all.name()) ||
195 operations.contains(AllowedOperations.fileroll.name());
196 initialized = true;
197 LOG.info("Initialized with file rolling " + (doFileRolls ? "enabled" : "disabled"));
200 private Writer noRollsWriter;
202 // creatWriterInstance is where the new pipeline is set up for doing file rolls
203 // if we are skipping it, just keep returning the same writer.
204 @Override
205 protected Writer createWriterInstance(final Path path) throws IOException {
206 // we get called from the FSHLog constructor (!); always roll in this case since
207 // we don't know yet if we're supposed to generally roll and
208 // we need an initial file in the case of doing appends but no rolls.
209 if (!initialized || doFileRolls) {
210 LOG.info("creating new writer instance.");
211 final ProtobufLogWriter writer = new IOTestWriter();
212 try {
213 writer.init(fs, path, conf, false, this.blocksize);
214 } catch (CommonFSUtils.StreamLacksCapabilityException exception) {
215 throw new IOException("Can't create writer instance because underlying FileSystem " +
216 "doesn't support needed stream capabilities.", exception);
218 if (!initialized) {
219 LOG.info("storing initial writer instance in case file rolling isn't allowed.");
220 noRollsWriter = writer;
222 return writer;
223 } else {
224 LOG.info("WAL rolling disabled, returning the first writer.");
225 // Initial assignment happens during the constructor call, so there ought not be
226 // a race for first assignment.
227 return noRollsWriter;
233 * Presumes init will be called by a single thread prior to any access of other methods.
235 private static class IOTestWriter extends ProtobufLogWriter {
236 private boolean doAppends;
237 private boolean doSyncs;
239 @Override
240 public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable,
241 long blocksize) throws IOException, CommonFSUtils.StreamLacksCapabilityException {
242 Collection<String> operations = conf.getStringCollection(ALLOWED_OPERATIONS);
243 if (operations.isEmpty() || operations.contains(AllowedOperations.all.name())) {
244 doAppends = doSyncs = true;
245 } else if (operations.contains(AllowedOperations.none.name())) {
246 doAppends = doSyncs = false;
247 } else {
248 doAppends = operations.contains(AllowedOperations.append.name());
249 doSyncs = operations.contains(AllowedOperations.sync.name());
251 LOG.info("IOTestWriter initialized with appends " + (doAppends ? "enabled" : "disabled") +
252 " and syncs " + (doSyncs ? "enabled" : "disabled"));
253 super.init(fs, path, conf, overwritable, blocksize);
256 @Override
257 protected String getWriterClassName() {
258 return ProtobufLogWriter.class.getSimpleName();
261 @Override
262 public void append(Entry entry) throws IOException {
263 if (doAppends) {
264 super.append(entry);
268 @Override
269 public void sync(boolean forceSync) throws IOException {
270 if (doSyncs) {
271 super.sync(forceSync);
276 @Override
277 public long getNumLogFiles() {
278 return this.log.getNumLogFiles();
281 @Override
282 public long getLogFileSize() {
283 return this.log.getLogFileSize();
286 @Override
287 public void addWALActionsListener(WALActionsListener listener) {
288 // TODO Implement WALProvider.addWALActionLister