HBASE-21843 RegionGroupingProvider breaks the meta wal file name pattern which may...
[hbase.git] / hbase-server / src / main / java / org / apache / hadoop / hbase / wal / SyncReplicationWALProvider.java
blob9859c204649fa34b2bbc569cc9ce8f2c180ff565
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.wal;
20 import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getWALArchiveDirectoryName;
21 import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getWALDirectoryName;
23 import java.io.IOException;
24 import java.lang.reflect.Constructor;
25 import java.lang.reflect.InvocationTargetException;
26 import java.util.ArrayList;
27 import java.util.List;
28 import java.util.Optional;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.ConcurrentMap;
31 import java.util.concurrent.atomic.AtomicBoolean;
32 import java.util.concurrent.locks.Lock;
33 import java.util.function.BiPredicate;
34 import java.util.regex.Matcher;
35 import java.util.regex.Pattern;
36 import java.util.stream.Collectors;
37 import java.util.stream.Stream;
38 import org.apache.hadoop.conf.Configuration;
39 import org.apache.hadoop.hbase.TableName;
40 import org.apache.hadoop.hbase.client.RegionInfo;
41 import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL;
42 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
43 import org.apache.hadoop.hbase.replication.ReplicationUtils;
44 import org.apache.hadoop.hbase.replication.SyncReplicationState;
45 import org.apache.hadoop.hbase.replication.regionserver.PeerActionListener;
46 import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider;
47 import org.apache.hadoop.hbase.util.CommonFSUtils;
48 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
49 import org.apache.hadoop.hbase.util.KeyLocker;
50 import org.apache.hadoop.hbase.util.Pair;
51 import org.apache.yetus.audience.InterfaceAudience;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
55 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
56 import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
57 import org.apache.hbase.thirdparty.com.google.common.collect.Streams;
58 import org.apache.hbase.thirdparty.io.netty.channel.Channel;
59 import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
61 /**
62 * The special {@link WALProvider} for synchronous replication.
63 * <p>
64 * It works like an interceptor, when getting WAL, first it will check if the given region should be
65 * replicated synchronously, if so it will return a special WAL for it, otherwise it will delegate
66 * the request to the normal {@link WALProvider}.
68 @InterfaceAudience.Private
69 public class SyncReplicationWALProvider implements WALProvider, PeerActionListener {
71 private static final Logger LOG = LoggerFactory.getLogger(SyncReplicationWALProvider.class);
73 // only for injecting errors for testcase, do not use it for other purpose.
74 @VisibleForTesting
75 public static final String DUAL_WAL_IMPL = "hbase.wal.sync.impl";
77 private final WALProvider provider;
79 private SyncReplicationPeerInfoProvider peerInfoProvider =
80 new DefaultSyncReplicationPeerInfoProvider();
82 private WALFactory factory;
84 private Configuration conf;
86 private List<WALActionsListener> listeners = new ArrayList<>();
88 private EventLoopGroup eventLoopGroup;
90 private Class<? extends Channel> channelClass;
92 private AtomicBoolean initialized = new AtomicBoolean(false);
94 // when switching from A to DA, we will put a Optional.empty into this map if there is no WAL for
95 // the peer yet. When getting WAL from this map the caller should know that it should not use
96 // DualAsyncFSWAL any more.
97 private final ConcurrentMap<String, Optional<DualAsyncFSWAL>> peerId2WAL =
98 new ConcurrentHashMap<>();
100 private final KeyLocker<String> createLock = new KeyLocker<>();
102 SyncReplicationWALProvider(WALProvider provider) {
103 this.provider = provider;
106 public void setPeerInfoProvider(SyncReplicationPeerInfoProvider peerInfoProvider) {
107 this.peerInfoProvider = peerInfoProvider;
110 @Override
111 public void init(WALFactory factory, Configuration conf, String providerId) throws IOException {
112 if (!initialized.compareAndSet(false, true)) {
113 throw new IllegalStateException("WALProvider.init should only be called once.");
115 provider.init(factory, conf, providerId);
116 this.conf = conf;
117 this.factory = factory;
118 Pair<EventLoopGroup, Class<? extends Channel>> eventLoopGroupAndChannelClass =
119 NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf);
120 eventLoopGroup = eventLoopGroupAndChannelClass.getFirst();
121 channelClass = eventLoopGroupAndChannelClass.getSecond();
124 // Use a timestamp to make it identical. That means, after we transit the peer to DA/S and then
125 // back to A, the log prefix will be changed. This is used to simplify the implementation for
126 // replication source, where we do not need to consider that a terminated shipper could be added
127 // back.
128 private String getLogPrefix(String peerId) {
129 return factory.factoryId + "-" + EnvironmentEdgeManager.currentTime() + "-" + peerId;
132 private DualAsyncFSWAL createWAL(String peerId, String remoteWALDir) throws IOException {
133 Class<? extends DualAsyncFSWAL> clazz =
134 conf.getClass(DUAL_WAL_IMPL, DualAsyncFSWAL.class, DualAsyncFSWAL.class);
135 try {
136 Constructor<?> constructor = null;
137 for (Constructor<?> c : clazz.getDeclaredConstructors()) {
138 if (c.getParameterCount() > 0) {
139 constructor = c;
140 break;
143 if (constructor == null) {
144 throw new IllegalArgumentException("No valid constructor provided for class " + clazz);
146 constructor.setAccessible(true);
147 return (DualAsyncFSWAL) constructor.newInstance(
148 CommonFSUtils.getWALFileSystem(conf),
149 ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir),
150 CommonFSUtils.getWALRootDir(conf),
151 ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId),
152 getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId),
153 conf, listeners, true, getLogPrefix(peerId), ReplicationUtils.SYNC_WAL_SUFFIX,
154 eventLoopGroup, channelClass);
155 } catch (InstantiationException | IllegalAccessException e) {
156 throw new RuntimeException(e);
157 } catch (InvocationTargetException e) {
158 Throwable cause = e.getTargetException();
159 Throwables.propagateIfPossible(cause, IOException.class);
160 throw new RuntimeException(cause);
164 private DualAsyncFSWAL getWAL(String peerId, String remoteWALDir) throws IOException {
165 Optional<DualAsyncFSWAL> opt = peerId2WAL.get(peerId);
166 if (opt != null) {
167 return opt.orElse(null);
169 Lock lock = createLock.acquireLock(peerId);
170 try {
171 opt = peerId2WAL.get(peerId);
172 if (opt != null) {
173 return opt.orElse(null);
175 DualAsyncFSWAL wal = createWAL(peerId, remoteWALDir);
176 boolean succ = false;
177 try {
178 wal.init();
179 succ = true;
180 } finally {
181 if (!succ) {
182 wal.close();
185 peerId2WAL.put(peerId, Optional.of(wal));
186 return wal;
187 } finally {
188 lock.unlock();
192 @Override
193 public WAL getWAL(RegionInfo region) throws IOException {
194 if (region == null) {
195 return provider.getWAL(null);
197 WAL wal = null;
198 Optional<Pair<String, String>> peerIdAndRemoteWALDir =
199 peerInfoProvider.getPeerIdAndRemoteWALDir(region.getTable());
200 if (peerIdAndRemoteWALDir.isPresent()) {
201 Pair<String, String> pair = peerIdAndRemoteWALDir.get();
202 wal = getWAL(pair.getFirst(), pair.getSecond());
204 return wal != null ? wal : provider.getWAL(region);
207 private Stream<WAL> getWALStream() {
208 return Streams.concat(
209 peerId2WAL.values().stream().filter(Optional::isPresent).map(Optional::get),
210 provider.getWALs().stream());
213 @Override
214 public List<WAL> getWALs() {
215 return getWALStream().collect(Collectors.toList());
218 @Override
219 public void shutdown() throws IOException {
220 // save the last exception and rethrow
221 IOException failure = null;
222 for (Optional<DualAsyncFSWAL> wal : peerId2WAL.values()) {
223 if (wal.isPresent()) {
224 try {
225 wal.get().shutdown();
226 } catch (IOException e) {
227 LOG.error("Shutdown WAL failed", e);
228 failure = e;
232 provider.shutdown();
233 if (failure != null) {
234 throw failure;
238 @Override
239 public void close() throws IOException {
240 // save the last exception and rethrow
241 IOException failure = null;
242 for (Optional<DualAsyncFSWAL> wal : peerId2WAL.values()) {
243 if (wal.isPresent()) {
244 try {
245 wal.get().close();
246 } catch (IOException e) {
247 LOG.error("Close WAL failed", e);
248 failure = e;
252 provider.close();
253 if (failure != null) {
254 throw failure;
258 @Override
259 public long getNumLogFiles() {
260 return peerId2WAL.size() + provider.getNumLogFiles();
263 @Override
264 public long getLogFileSize() {
265 return peerId2WAL.values().stream().filter(Optional::isPresent).map(Optional::get)
266 .mapToLong(DualAsyncFSWAL::getLogFileSize).sum() + provider.getLogFileSize();
269 private void safeClose(WAL wal) {
270 if (wal != null) {
271 try {
272 wal.close();
273 } catch (IOException e) {
274 LOG.error("Close WAL failed", e);
279 @Override
280 public void addWALActionsListener(WALActionsListener listener) {
281 listeners.add(listener);
282 provider.addWALActionsListener(listener);
285 @Override
286 public void peerSyncReplicationStateChange(String peerId, SyncReplicationState from,
287 SyncReplicationState to, int stage) {
288 if (from == SyncReplicationState.ACTIVE) {
289 if (stage == 0) {
290 Lock lock = createLock.acquireLock(peerId);
291 try {
292 Optional<DualAsyncFSWAL> opt = peerId2WAL.get(peerId);
293 if (opt != null) {
294 opt.ifPresent(w -> w.skipRemoteWAL(to == SyncReplicationState.STANDBY));
295 } else {
296 // add a place holder to tell the getWAL caller do not use DualAsyncFSWAL any more.
297 peerId2WAL.put(peerId, Optional.empty());
299 } finally {
300 lock.unlock();
302 } else if (stage == 1) {
303 peerId2WAL.remove(peerId).ifPresent(this::safeClose);
308 private static class DefaultSyncReplicationPeerInfoProvider
309 implements SyncReplicationPeerInfoProvider {
311 @Override
312 public Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(TableName table) {
313 return Optional.empty();
316 @Override
317 public boolean checkState(TableName table,
318 BiPredicate<SyncReplicationState, SyncReplicationState> checker) {
319 return false;
323 private static final Pattern LOG_PREFIX_PATTERN = Pattern.compile(".*-\\d+-(.+)");
326 * <p>
327 * Returns the peer id if the wal file name is in the special group for a sync replication peer.
328 * </p>
329 * <p>
330 * The prefix format is &lt;factoryId&gt;-&lt;ts&gt;-&lt;peerId&gt;.
331 * </p>
333 public static Optional<String> getSyncReplicationPeerIdFromWALName(String name) {
334 if (!name.endsWith(ReplicationUtils.SYNC_WAL_SUFFIX)) {
335 // fast path to return earlier if the name is not for a sync replication peer.
336 return Optional.empty();
338 String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(name);
339 Matcher matcher = LOG_PREFIX_PATTERN.matcher(logPrefix);
340 if (matcher.matches()) {
341 return Optional.of(matcher.group(1));
342 } else {
343 return Optional.empty();
347 @VisibleForTesting
348 WALProvider getWrappedProvider() {
349 return provider;