2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.wal
;
20 import static org
.apache
.hadoop
.hbase
.wal
.AbstractFSWALProvider
.getWALArchiveDirectoryName
;
21 import static org
.apache
.hadoop
.hbase
.wal
.AbstractFSWALProvider
.getWALDirectoryName
;
23 import java
.io
.IOException
;
24 import java
.lang
.reflect
.Constructor
;
25 import java
.lang
.reflect
.InvocationTargetException
;
26 import java
.util
.ArrayList
;
27 import java
.util
.List
;
28 import java
.util
.Optional
;
29 import java
.util
.concurrent
.ConcurrentHashMap
;
30 import java
.util
.concurrent
.ConcurrentMap
;
31 import java
.util
.concurrent
.atomic
.AtomicBoolean
;
32 import java
.util
.concurrent
.locks
.Lock
;
33 import java
.util
.function
.BiPredicate
;
34 import java
.util
.regex
.Matcher
;
35 import java
.util
.regex
.Pattern
;
36 import java
.util
.stream
.Collectors
;
37 import java
.util
.stream
.Stream
;
38 import org
.apache
.hadoop
.conf
.Configuration
;
39 import org
.apache
.hadoop
.hbase
.TableName
;
40 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
41 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.DualAsyncFSWAL
;
42 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.WALActionsListener
;
43 import org
.apache
.hadoop
.hbase
.replication
.ReplicationUtils
;
44 import org
.apache
.hadoop
.hbase
.replication
.SyncReplicationState
;
45 import org
.apache
.hadoop
.hbase
.replication
.regionserver
.PeerActionListener
;
46 import org
.apache
.hadoop
.hbase
.replication
.regionserver
.SyncReplicationPeerInfoProvider
;
47 import org
.apache
.hadoop
.hbase
.util
.CommonFSUtils
;
48 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
49 import org
.apache
.hadoop
.hbase
.util
.KeyLocker
;
50 import org
.apache
.hadoop
.hbase
.util
.Pair
;
51 import org
.apache
.yetus
.audience
.InterfaceAudience
;
52 import org
.slf4j
.Logger
;
53 import org
.slf4j
.LoggerFactory
;
55 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.annotations
.VisibleForTesting
;
56 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.base
.Throwables
;
57 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.Streams
;
58 import org
.apache
.hbase
.thirdparty
.io
.netty
.channel
.Channel
;
59 import org
.apache
.hbase
.thirdparty
.io
.netty
.channel
.EventLoopGroup
;
62 * The special {@link WALProvider} for synchronous replication.
64 * It works like an interceptor, when getting WAL, first it will check if the given region should be
65 * replicated synchronously, if so it will return a special WAL for it, otherwise it will delegate
66 * the request to the normal {@link WALProvider}.
68 @InterfaceAudience.Private
69 public class SyncReplicationWALProvider
implements WALProvider
, PeerActionListener
{
71 private static final Logger LOG
= LoggerFactory
.getLogger(SyncReplicationWALProvider
.class);
73 // only for injecting errors for testcase, do not use it for other purpose.
75 public static final String DUAL_WAL_IMPL
= "hbase.wal.sync.impl";
77 private final WALProvider provider
;
79 private SyncReplicationPeerInfoProvider peerInfoProvider
=
80 new DefaultSyncReplicationPeerInfoProvider();
82 private WALFactory factory
;
84 private Configuration conf
;
86 private List
<WALActionsListener
> listeners
= new ArrayList
<>();
88 private EventLoopGroup eventLoopGroup
;
90 private Class
<?
extends Channel
> channelClass
;
92 private AtomicBoolean initialized
= new AtomicBoolean(false);
94 // when switching from A to DA, we will put a Optional.empty into this map if there is no WAL for
95 // the peer yet. When getting WAL from this map the caller should know that it should not use
96 // DualAsyncFSWAL any more.
97 private final ConcurrentMap
<String
, Optional
<DualAsyncFSWAL
>> peerId2WAL
=
98 new ConcurrentHashMap
<>();
100 private final KeyLocker
<String
> createLock
= new KeyLocker
<>();
102 SyncReplicationWALProvider(WALProvider provider
) {
103 this.provider
= provider
;
106 public void setPeerInfoProvider(SyncReplicationPeerInfoProvider peerInfoProvider
) {
107 this.peerInfoProvider
= peerInfoProvider
;
111 public void init(WALFactory factory
, Configuration conf
, String providerId
) throws IOException
{
112 if (!initialized
.compareAndSet(false, true)) {
113 throw new IllegalStateException("WALProvider.init should only be called once.");
115 provider
.init(factory
, conf
, providerId
);
117 this.factory
= factory
;
118 Pair
<EventLoopGroup
, Class
<?
extends Channel
>> eventLoopGroupAndChannelClass
=
119 NettyAsyncFSWALConfigHelper
.getEventLoopConfig(conf
);
120 eventLoopGroup
= eventLoopGroupAndChannelClass
.getFirst();
121 channelClass
= eventLoopGroupAndChannelClass
.getSecond();
124 // Use a timestamp to make it identical. That means, after we transit the peer to DA/S and then
125 // back to A, the log prefix will be changed. This is used to simplify the implementation for
126 // replication source, where we do not need to consider that a terminated shipper could be added
128 private String
getLogPrefix(String peerId
) {
129 return factory
.factoryId
+ "-" + EnvironmentEdgeManager
.currentTime() + "-" + peerId
;
132 private DualAsyncFSWAL
createWAL(String peerId
, String remoteWALDir
) throws IOException
{
133 Class
<?
extends DualAsyncFSWAL
> clazz
=
134 conf
.getClass(DUAL_WAL_IMPL
, DualAsyncFSWAL
.class, DualAsyncFSWAL
.class);
136 Constructor
<?
> constructor
= null;
137 for (Constructor
<?
> c
: clazz
.getDeclaredConstructors()) {
138 if (c
.getParameterCount() > 0) {
143 if (constructor
== null) {
144 throw new IllegalArgumentException("No valid constructor provided for class " + clazz
);
146 constructor
.setAccessible(true);
147 return (DualAsyncFSWAL
) constructor
.newInstance(
148 CommonFSUtils
.getWALFileSystem(conf
),
149 ReplicationUtils
.getRemoteWALFileSystem(conf
, remoteWALDir
),
150 CommonFSUtils
.getWALRootDir(conf
),
151 ReplicationUtils
.getPeerRemoteWALDir(remoteWALDir
, peerId
),
152 getWALDirectoryName(factory
.factoryId
), getWALArchiveDirectoryName(conf
, factory
.factoryId
),
153 conf
, listeners
, true, getLogPrefix(peerId
), ReplicationUtils
.SYNC_WAL_SUFFIX
,
154 eventLoopGroup
, channelClass
);
155 } catch (InstantiationException
| IllegalAccessException e
) {
156 throw new RuntimeException(e
);
157 } catch (InvocationTargetException e
) {
158 Throwable cause
= e
.getTargetException();
159 Throwables
.propagateIfPossible(cause
, IOException
.class);
160 throw new RuntimeException(cause
);
164 private DualAsyncFSWAL
getWAL(String peerId
, String remoteWALDir
) throws IOException
{
165 Optional
<DualAsyncFSWAL
> opt
= peerId2WAL
.get(peerId
);
167 return opt
.orElse(null);
169 Lock lock
= createLock
.acquireLock(peerId
);
171 opt
= peerId2WAL
.get(peerId
);
173 return opt
.orElse(null);
175 DualAsyncFSWAL wal
= createWAL(peerId
, remoteWALDir
);
176 boolean succ
= false;
185 peerId2WAL
.put(peerId
, Optional
.of(wal
));
193 public WAL
getWAL(RegionInfo region
) throws IOException
{
194 if (region
== null) {
195 return provider
.getWAL(null);
198 Optional
<Pair
<String
, String
>> peerIdAndRemoteWALDir
=
199 peerInfoProvider
.getPeerIdAndRemoteWALDir(region
.getTable());
200 if (peerIdAndRemoteWALDir
.isPresent()) {
201 Pair
<String
, String
> pair
= peerIdAndRemoteWALDir
.get();
202 wal
= getWAL(pair
.getFirst(), pair
.getSecond());
204 return wal
!= null ? wal
: provider
.getWAL(region
);
207 private Stream
<WAL
> getWALStream() {
208 return Streams
.concat(
209 peerId2WAL
.values().stream().filter(Optional
::isPresent
).map(Optional
::get
),
210 provider
.getWALs().stream());
214 public List
<WAL
> getWALs() {
215 return getWALStream().collect(Collectors
.toList());
219 public void shutdown() throws IOException
{
220 // save the last exception and rethrow
221 IOException failure
= null;
222 for (Optional
<DualAsyncFSWAL
> wal
: peerId2WAL
.values()) {
223 if (wal
.isPresent()) {
225 wal
.get().shutdown();
226 } catch (IOException e
) {
227 LOG
.error("Shutdown WAL failed", e
);
233 if (failure
!= null) {
239 public void close() throws IOException
{
240 // save the last exception and rethrow
241 IOException failure
= null;
242 for (Optional
<DualAsyncFSWAL
> wal
: peerId2WAL
.values()) {
243 if (wal
.isPresent()) {
246 } catch (IOException e
) {
247 LOG
.error("Close WAL failed", e
);
253 if (failure
!= null) {
259 public long getNumLogFiles() {
260 return peerId2WAL
.size() + provider
.getNumLogFiles();
264 public long getLogFileSize() {
265 return peerId2WAL
.values().stream().filter(Optional
::isPresent
).map(Optional
::get
)
266 .mapToLong(DualAsyncFSWAL
::getLogFileSize
).sum() + provider
.getLogFileSize();
269 private void safeClose(WAL wal
) {
273 } catch (IOException e
) {
274 LOG
.error("Close WAL failed", e
);
280 public void addWALActionsListener(WALActionsListener listener
) {
281 listeners
.add(listener
);
282 provider
.addWALActionsListener(listener
);
286 public void peerSyncReplicationStateChange(String peerId
, SyncReplicationState from
,
287 SyncReplicationState to
, int stage
) {
288 if (from
== SyncReplicationState
.ACTIVE
) {
290 Lock lock
= createLock
.acquireLock(peerId
);
292 Optional
<DualAsyncFSWAL
> opt
= peerId2WAL
.get(peerId
);
294 opt
.ifPresent(w
-> w
.skipRemoteWAL(to
== SyncReplicationState
.STANDBY
));
296 // add a place holder to tell the getWAL caller do not use DualAsyncFSWAL any more.
297 peerId2WAL
.put(peerId
, Optional
.empty());
302 } else if (stage
== 1) {
303 peerId2WAL
.remove(peerId
).ifPresent(this::safeClose
);
308 private static class DefaultSyncReplicationPeerInfoProvider
309 implements SyncReplicationPeerInfoProvider
{
312 public Optional
<Pair
<String
, String
>> getPeerIdAndRemoteWALDir(TableName table
) {
313 return Optional
.empty();
317 public boolean checkState(TableName table
,
318 BiPredicate
<SyncReplicationState
, SyncReplicationState
> checker
) {
323 private static final Pattern LOG_PREFIX_PATTERN
= Pattern
.compile(".*-\\d+-(.+)");
327 * Returns the peer id if the wal file name is in the special group for a sync replication peer.
330 * The prefix format is <factoryId>-<ts>-<peerId>.
333 public static Optional
<String
> getSyncReplicationPeerIdFromWALName(String name
) {
334 if (!name
.endsWith(ReplicationUtils
.SYNC_WAL_SUFFIX
)) {
335 // fast path to return earlier if the name is not for a sync replication peer.
336 return Optional
.empty();
338 String logPrefix
= AbstractFSWALProvider
.getWALPrefixFromWALName(name
);
339 Matcher matcher
= LOG_PREFIX_PATTERN
.matcher(logPrefix
);
340 if (matcher
.matches()) {
341 return Optional
.of(matcher
.group(1));
343 return Optional
.empty();
348 WALProvider
getWrappedProvider() {