3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
.wal
;
21 import static org
.apache
.hadoop
.hbase
.wal
.AbstractFSWALProvider
.DEFAULT_PROVIDER_ID
;
22 import static org
.apache
.hadoop
.hbase
.wal
.AbstractFSWALProvider
.META_WAL_PROVIDER_ID
;
23 import static org
.apache
.hadoop
.hbase
.wal
.AbstractFSWALProvider
.WAL_FILE_NAME_DELIMITER
;
25 import java
.io
.IOException
;
26 import java
.util
.ArrayList
;
27 import java
.util
.Collection
;
28 import java
.util
.Collections
;
29 import java
.util
.List
;
30 import java
.util
.concurrent
.atomic
.AtomicBoolean
;
32 import org
.apache
.hadoop
.conf
.Configuration
;
33 import org
.apache
.hadoop
.fs
.FileSystem
;
34 import org
.apache
.hadoop
.fs
.Path
;
35 import org
.apache
.hadoop
.hbase
.HConstants
;
36 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
37 // imports for things that haven't moved from regionserver.wal yet.
38 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.FSHLog
;
39 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.ProtobufLogWriter
;
40 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.WALActionsListener
;
41 import org
.apache
.hadoop
.hbase
.util
.CommonFSUtils
;
42 import org
.apache
.hadoop
.hbase
.wal
.WAL
.Entry
;
43 import org
.apache
.yetus
.audience
.InterfaceAudience
;
44 import org
.slf4j
.Logger
;
45 import org
.slf4j
.LoggerFactory
;
48 * A WAL Provider that returns a single thread safe WAL that optionally can skip parts of our normal
49 * interactions with HDFS.
51 * This implementation picks a directory in HDFS based on the same mechanisms as the
52 * {@link FSHLogProvider}. Users can configure how much interaction we have with HDFS with the
53 * configuration property "hbase.wal.iotestprovider.operations". The value should be a comma
54 * separated list of allowed operations:
56 * <li><em>append</em> : edits will be written to the underlying filesystem</li>
57 * <li><em>sync</em> : wal syncs will result in hflush calls</li>
58 * <li><em>fileroll</em> : roll requests will result in creating a new file on the underlying
61 * Additionally, the special cases "all" and "none" are recognized. If ommited, the value defaults
62 * to "all." Behavior is undefined if "all" or "none" are paired with additional values. Behavior is
63 * also undefined if values not listed above are included.
65 * Only those operations listed will occur between the returned WAL and HDFS. All others will be
68 * Note that in the case of allowing "append" operations but not allowing "fileroll", the returned
69 * WAL will just keep writing to the same file. This won't avoid all costs associated with file
70 * management over time, becaue the data set size may result in additional HDFS block allocations.
72 @InterfaceAudience.Private
73 public class IOTestProvider
implements WALProvider
{
74 private static final Logger LOG
= LoggerFactory
.getLogger(IOTestProvider
.class);
76 private static final String ALLOWED_OPERATIONS
= "hbase.wal.iotestprovider.operations";
77 private enum AllowedOperations
{
85 private WALFactory factory
;
87 private Configuration conf
;
89 private volatile FSHLog log
;
91 private String providerId
;
92 protected AtomicBoolean initialized
= new AtomicBoolean(false);
94 private List
<WALActionsListener
> listeners
= new ArrayList
<>();
96 * @param factory factory that made us, identity used for FS layout. may not be null
97 * @param conf may not be null
98 * @param providerId differentiate between providers from one facotry, used for FS layout. may be
102 public void init(WALFactory factory
, Configuration conf
, String providerId
) throws IOException
{
103 if (!initialized
.compareAndSet(false, true)) {
104 throw new IllegalStateException("WALProvider.init should only be called once.");
106 this.factory
= factory
;
108 this.providerId
= providerId
!= null ? providerId
: DEFAULT_PROVIDER_ID
;
112 public List
<WAL
> getWALs() {
113 return Collections
.singletonList(log
);
116 private FSHLog
createWAL() throws IOException
{
117 String logPrefix
= factory
.factoryId
+ WAL_FILE_NAME_DELIMITER
+ providerId
;
118 return new IOTestWAL(CommonFSUtils
.getWALFileSystem(conf
), CommonFSUtils
.getWALRootDir(conf
),
119 AbstractFSWALProvider
.getWALDirectoryName(factory
.factoryId
),
120 HConstants
.HREGION_OLDLOGDIR_NAME
, conf
, listeners
, true, logPrefix
,
121 META_WAL_PROVIDER_ID
.equals(providerId
) ? META_WAL_PROVIDER_ID
: null);
125 public WAL
getWAL(RegionInfo region
) throws IOException
{
126 FSHLog log
= this.log
;
130 synchronized (this) {
141 public void close() throws IOException
{
142 FSHLog log
= this.log
;
149 public void shutdown() throws IOException
{
150 FSHLog log
= this.log
;
156 private static class IOTestWAL
extends FSHLog
{
158 private final boolean doFileRolls
;
160 // Used to differntiate between roll calls before and after we finish construction.
161 private final boolean initialized
;
164 * Create an edit log at the given <code>dir</code> location.
166 * You should never have to load an existing log. If there is a log at
167 * startup, it should have already been processed and deleted by the time the
168 * WAL object is started up.
170 * @param fs filesystem handle
171 * @param rootDir path to where logs and oldlogs
172 * @param logDir dir where wals are stored
173 * @param archiveDir dir where wals are archived
174 * @param conf configuration to use
175 * @param listeners Listeners on WAL events. Listeners passed here will
176 * be registered before we do anything else; e.g. the
177 * Constructor {@link #rollWriter()}.
178 * @param failIfWALExists If true IOException will be thrown if files related to this wal
180 * @param prefix should always be hostname and port in distributed env and
181 * it will be URL encoded before being used.
182 * If prefix is null, "wal" will be used
183 * @param suffix will be url encoded. null is treated as empty. non-empty must start with
184 * {@link AbstractFSWALProvider#WAL_FILE_NAME_DELIMITER}
185 * @throws IOException
187 public IOTestWAL(final FileSystem fs
, final Path rootDir
, final String logDir
,
188 final String archiveDir
, final Configuration conf
,
189 final List
<WALActionsListener
> listeners
,
190 final boolean failIfWALExists
, final String prefix
, final String suffix
)
192 super(fs
, rootDir
, logDir
, archiveDir
, conf
, listeners
, failIfWALExists
, prefix
, suffix
);
193 Collection
<String
> operations
= conf
.getStringCollection(ALLOWED_OPERATIONS
);
194 doFileRolls
= operations
.isEmpty() || operations
.contains(AllowedOperations
.all
.name()) ||
195 operations
.contains(AllowedOperations
.fileroll
.name());
197 LOG
.info("Initialized with file rolling " + (doFileRolls ?
"enabled" : "disabled"));
200 private Writer noRollsWriter
;
202 // creatWriterInstance is where the new pipeline is set up for doing file rolls
203 // if we are skipping it, just keep returning the same writer.
205 protected Writer
createWriterInstance(final Path path
) throws IOException
{
206 // we get called from the FSHLog constructor (!); always roll in this case since
207 // we don't know yet if we're supposed to generally roll and
208 // we need an initial file in the case of doing appends but no rolls.
209 if (!initialized
|| doFileRolls
) {
210 LOG
.info("creating new writer instance.");
211 final ProtobufLogWriter writer
= new IOTestWriter();
213 writer
.init(fs
, path
, conf
, false, this.blocksize
);
214 } catch (CommonFSUtils
.StreamLacksCapabilityException exception
) {
215 throw new IOException("Can't create writer instance because underlying FileSystem " +
216 "doesn't support needed stream capabilities.", exception
);
219 LOG
.info("storing initial writer instance in case file rolling isn't allowed.");
220 noRollsWriter
= writer
;
224 LOG
.info("WAL rolling disabled, returning the first writer.");
225 // Initial assignment happens during the constructor call, so there ought not be
226 // a race for first assignment.
227 return noRollsWriter
;
233 * Presumes init will be called by a single thread prior to any access of other methods.
235 private static class IOTestWriter
extends ProtobufLogWriter
{
236 private boolean doAppends
;
237 private boolean doSyncs
;
240 public void init(FileSystem fs
, Path path
, Configuration conf
, boolean overwritable
,
241 long blocksize
) throws IOException
, CommonFSUtils
.StreamLacksCapabilityException
{
242 Collection
<String
> operations
= conf
.getStringCollection(ALLOWED_OPERATIONS
);
243 if (operations
.isEmpty() || operations
.contains(AllowedOperations
.all
.name())) {
244 doAppends
= doSyncs
= true;
245 } else if (operations
.contains(AllowedOperations
.none
.name())) {
246 doAppends
= doSyncs
= false;
248 doAppends
= operations
.contains(AllowedOperations
.append
.name());
249 doSyncs
= operations
.contains(AllowedOperations
.sync
.name());
251 LOG
.info("IOTestWriter initialized with appends " + (doAppends ?
"enabled" : "disabled") +
252 " and syncs " + (doSyncs ?
"enabled" : "disabled"));
253 super.init(fs
, path
, conf
, overwritable
, blocksize
);
257 protected String
getWriterClassName() {
258 return ProtobufLogWriter
.class.getSimpleName();
262 public void append(Entry entry
) throws IOException
{
269 public void sync(boolean forceSync
) throws IOException
{
271 super.sync(forceSync
);
277 public long getNumLogFiles() {
278 return this.log
.getNumLogFiles();
282 public long getLogFileSize() {
283 return this.log
.getLogFileSize();
287 public void addWALActionsListener(WALActionsListener listener
) {
288 // TODO Implement WALProvider.addWALActionLister