2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.wal
;
20 import java
.io
.IOException
;
21 import org
.apache
.hadoop
.conf
.Configuration
;
22 import org
.apache
.hadoop
.fs
.FileSystem
;
23 import org
.apache
.hadoop
.fs
.Path
;
24 import org
.apache
.hadoop
.hbase
.io
.asyncfs
.FanOutOneBlockAsyncDFSOutput
;
25 import org
.apache
.hadoop
.hbase
.io
.asyncfs
.FanOutOneBlockAsyncDFSOutputHelper
;
26 import org
.apache
.hadoop
.hbase
.io
.asyncfs
.FanOutOneBlockAsyncDFSOutputSaslHelper
;
27 import org
.apache
.hadoop
.hbase
.io
.asyncfs
.monitor
.StreamSlowMonitor
;
28 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.AsyncFSWAL
;
29 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.AsyncProtobufLogWriter
;
30 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.WALUtil
;
31 import org
.apache
.hadoop
.hbase
.util
.CommonFSUtils
;
32 import org
.apache
.hadoop
.hbase
.util
.CommonFSUtils
.StreamLacksCapabilityException
;
33 import org
.apache
.hadoop
.hbase
.util
.Pair
;
34 import org
.apache
.yetus
.audience
.InterfaceAudience
;
35 import org
.apache
.yetus
.audience
.InterfaceStability
;
36 import org
.slf4j
.Logger
;
37 import org
.slf4j
.LoggerFactory
;
39 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.base
.Throwables
;
40 import org
.apache
.hbase
.thirdparty
.io
.netty
.channel
.Channel
;
41 import org
.apache
.hbase
.thirdparty
.io
.netty
.channel
.EventLoopGroup
;
44 * A WAL provider that use {@link AsyncFSWAL}.
46 @InterfaceAudience.Private
47 @InterfaceStability.Evolving
48 public class AsyncFSWALProvider
extends AbstractFSWALProvider
<AsyncFSWAL
> {
50 private static final Logger LOG
= LoggerFactory
.getLogger(AsyncFSWALProvider
.class);
52 public static final String WRITER_IMPL
= "hbase.regionserver.hlog.async.writer.impl";
54 // Only public so classes back in regionserver.wal can access
55 public interface AsyncWriter
extends WALProvider
.AsyncWriter
{
57 * @throws IOException if something goes wrong initializing an output stream
58 * @throws StreamLacksCapabilityException if the given FileSystem can't provide streams that
59 * meet the needs of the given Writer implementation.
61 void init(FileSystem fs
, Path path
, Configuration c
, boolean overwritable
, long blocksize
,
62 StreamSlowMonitor monitor
) throws IOException
, CommonFSUtils
.StreamLacksCapabilityException
;
65 private EventLoopGroup eventLoopGroup
;
67 private Class
<?
extends Channel
> channelClass
;
70 protected AsyncFSWAL
createWAL() throws IOException
{
71 return new AsyncFSWAL(CommonFSUtils
.getWALFileSystem(conf
), this.abortable
,
72 CommonFSUtils
.getWALRootDir(conf
), getWALDirectoryName(factory
.factoryId
),
73 getWALArchiveDirectoryName(conf
, factory
.factoryId
), conf
, listeners
, true, logPrefix
,
74 META_WAL_PROVIDER_ID
.equals(providerId
) ? META_WAL_PROVIDER_ID
: null, eventLoopGroup
,
75 channelClass
, factory
.getExcludeDatanodeManager().getStreamSlowMonitor(providerId
));
79 protected void doInit(Configuration conf
) throws IOException
{
80 Pair
<EventLoopGroup
, Class
<?
extends Channel
>> eventLoopGroupAndChannelClass
=
81 NettyAsyncFSWALConfigHelper
.getEventLoopConfig(conf
);
82 eventLoopGroup
= eventLoopGroupAndChannelClass
.getFirst();
83 channelClass
= eventLoopGroupAndChannelClass
.getSecond();
87 * Public because of AsyncFSWAL. Should be package-private
89 public static AsyncWriter
createAsyncWriter(Configuration conf
, FileSystem fs
, Path path
,
90 boolean overwritable
, EventLoopGroup eventLoopGroup
,
91 Class
<?
extends Channel
> channelClass
) throws IOException
{
92 return createAsyncWriter(conf
, fs
, path
, overwritable
, WALUtil
.getWALBlockSize(conf
, fs
, path
),
93 eventLoopGroup
, channelClass
, StreamSlowMonitor
.create(conf
, path
.getName()));
97 * Public because of AsyncFSWAL. Should be package-private
99 public static AsyncWriter
createAsyncWriter(Configuration conf
, FileSystem fs
, Path path
,
100 boolean overwritable
, long blocksize
, EventLoopGroup eventLoopGroup
,
101 Class
<?
extends Channel
> channelClass
, StreamSlowMonitor monitor
) throws IOException
{
102 // Configuration already does caching for the Class lookup.
103 Class
<?
extends AsyncWriter
> logWriterClass
= conf
.getClass(
104 WRITER_IMPL
, AsyncProtobufLogWriter
.class, AsyncWriter
.class);
106 AsyncWriter writer
= logWriterClass
.getConstructor(EventLoopGroup
.class, Class
.class)
107 .newInstance(eventLoopGroup
, channelClass
);
108 writer
.init(fs
, path
, conf
, overwritable
, blocksize
, monitor
);
110 } catch (Exception e
) {
111 if (e
instanceof CommonFSUtils
.StreamLacksCapabilityException
) {
112 LOG
.error("The RegionServer async write ahead log provider " +
113 "relies on the ability to call " + e
.getMessage() + " for proper operation during " +
114 "component failures, but the current FileSystem does not support doing so. Please " +
115 "check the config value of '" + CommonFSUtils
.HBASE_WAL_DIR
+ "' and ensure " +
116 "it points to a FileSystem mount that has suitable capabilities for output streams.");
118 LOG
.debug("Error instantiating log writer.", e
);
120 Throwables
.propagateIfPossible(e
, IOException
.class);
121 throw new IOException("cannot get log writer", e
);
126 * Test whether we can load the helper classes for async dfs output.
128 public static boolean load() {
130 Class
.forName(FanOutOneBlockAsyncDFSOutput
.class.getName());
131 Class
.forName(FanOutOneBlockAsyncDFSOutputHelper
.class.getName());
132 Class
.forName(FanOutOneBlockAsyncDFSOutputSaslHelper
.class.getName());
134 } catch (Throwable e
) {