2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.wal
;
20 import java
.io
.IOException
;
21 import org
.apache
.hadoop
.conf
.Configuration
;
22 import org
.apache
.hadoop
.fs
.FileSystem
;
23 import org
.apache
.hadoop
.fs
.Path
;
24 import org
.apache
.hadoop
.hbase
.io
.asyncfs
.FanOutOneBlockAsyncDFSOutput
;
25 import org
.apache
.hadoop
.hbase
.io
.asyncfs
.FanOutOneBlockAsyncDFSOutputHelper
;
26 import org
.apache
.hadoop
.hbase
.io
.asyncfs
.FanOutOneBlockAsyncDFSOutputSaslHelper
;
27 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.AsyncFSWAL
;
28 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.AsyncProtobufLogWriter
;
29 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.WALUtil
;
30 import org
.apache
.hadoop
.hbase
.util
.CommonFSUtils
;
31 import org
.apache
.hadoop
.hbase
.util
.CommonFSUtils
.StreamLacksCapabilityException
;
32 import org
.apache
.hadoop
.hbase
.util
.Pair
;
33 import org
.apache
.yetus
.audience
.InterfaceAudience
;
34 import org
.apache
.yetus
.audience
.InterfaceStability
;
35 import org
.slf4j
.Logger
;
36 import org
.slf4j
.LoggerFactory
;
38 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.base
.Throwables
;
39 import org
.apache
.hbase
.thirdparty
.io
.netty
.channel
.Channel
;
40 import org
.apache
.hbase
.thirdparty
.io
.netty
.channel
.EventLoopGroup
;
43 * A WAL provider that use {@link AsyncFSWAL}.
45 @InterfaceAudience.Private
46 @InterfaceStability.Evolving
47 public class AsyncFSWALProvider
extends AbstractFSWALProvider
<AsyncFSWAL
> {
49 private static final Logger LOG
= LoggerFactory
.getLogger(AsyncFSWALProvider
.class);
51 // Only public so classes back in regionserver.wal can access
52 public interface AsyncWriter
extends WALProvider
.AsyncWriter
{
54 * @throws IOException if something goes wrong initializing an output stream
55 * @throws StreamLacksCapabilityException if the given FileSystem can't provide streams that
56 * meet the needs of the given Writer implementation.
58 void init(FileSystem fs
, Path path
, Configuration c
, boolean overwritable
, long blocksize
)
59 throws IOException
, CommonFSUtils
.StreamLacksCapabilityException
;
62 private EventLoopGroup eventLoopGroup
;
64 private Class
<?
extends Channel
> channelClass
;
67 protected AsyncFSWAL
createWAL() throws IOException
{
68 return new AsyncFSWAL(CommonFSUtils
.getWALFileSystem(conf
), CommonFSUtils
.getWALRootDir(conf
),
69 getWALDirectoryName(factory
.factoryId
),
70 getWALArchiveDirectoryName(conf
, factory
.factoryId
), conf
, listeners
, true, logPrefix
,
71 META_WAL_PROVIDER_ID
.equals(providerId
) ? META_WAL_PROVIDER_ID
: null,
72 eventLoopGroup
, channelClass
);
76 protected void doInit(Configuration conf
) throws IOException
{
77 Pair
<EventLoopGroup
, Class
<?
extends Channel
>> eventLoopGroupAndChannelClass
=
78 NettyAsyncFSWALConfigHelper
.getEventLoopConfig(conf
);
79 eventLoopGroup
= eventLoopGroupAndChannelClass
.getFirst();
80 channelClass
= eventLoopGroupAndChannelClass
.getSecond();
84 * Public because of AsyncFSWAL. Should be package-private
86 public static AsyncWriter
createAsyncWriter(Configuration conf
, FileSystem fs
, Path path
,
87 boolean overwritable
, EventLoopGroup eventLoopGroup
,
88 Class
<?
extends Channel
> channelClass
) throws IOException
{
89 return createAsyncWriter(conf
, fs
, path
, overwritable
, WALUtil
.getWALBlockSize(conf
, fs
, path
),
90 eventLoopGroup
, channelClass
);
94 * Public because of AsyncFSWAL. Should be package-private
96 public static AsyncWriter
createAsyncWriter(Configuration conf
, FileSystem fs
, Path path
,
97 boolean overwritable
, long blocksize
, EventLoopGroup eventLoopGroup
,
98 Class
<?
extends Channel
> channelClass
) throws IOException
{
99 // Configuration already does caching for the Class lookup.
100 Class
<?
extends AsyncWriter
> logWriterClass
= conf
.getClass(
101 "hbase.regionserver.hlog.async.writer.impl", AsyncProtobufLogWriter
.class, AsyncWriter
.class);
103 AsyncWriter writer
= logWriterClass
.getConstructor(EventLoopGroup
.class, Class
.class)
104 .newInstance(eventLoopGroup
, channelClass
);
105 writer
.init(fs
, path
, conf
, overwritable
, blocksize
);
107 } catch (Exception e
) {
108 if (e
instanceof CommonFSUtils
.StreamLacksCapabilityException
) {
109 LOG
.error("The RegionServer async write ahead log provider " +
110 "relies on the ability to call " + e
.getMessage() + " for proper operation during " +
111 "component failures, but the current FileSystem does not support doing so. Please " +
112 "check the config value of '" + CommonFSUtils
.HBASE_WAL_DIR
+ "' and ensure " +
113 "it points to a FileSystem mount that has suitable capabilities for output streams.");
115 LOG
.debug("Error instantiating log writer.", e
);
117 Throwables
.propagateIfPossible(e
, IOException
.class);
118 throw new IOException("cannot get log writer", e
);
123 * Test whether we can load the helper classes for async dfs output.
125 public static boolean load() {
127 Class
.forName(FanOutOneBlockAsyncDFSOutput
.class.getName());
128 Class
.forName(FanOutOneBlockAsyncDFSOutputHelper
.class.getName());
129 Class
.forName(FanOutOneBlockAsyncDFSOutputSaslHelper
.class.getName());
131 } catch (Throwable e
) {