From fa37aed8f691a11dd3700d21319df444bbe2beda Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Sat, 27 Nov 2021 12:03:18 +0800 Subject: [PATCH] HBASE-26481 Consider rolling upgrading from old region replication framework (#3880) Signed-off-by: Xin Sun Reviewed-by: GeorryHuang --- .../hbase/client/AsyncRpcRetryingCaller.java | 12 +- .../hadoop/hbase/replication/ReplicationUtils.java | 3 + .../AsyncRegionReplicationRetryingCaller.java | 41 +++++-- .../master/replication/ReplicationPeerManager.java | 12 +- .../regionserver/ReplicationSourceManager.java | 9 +- .../hbase/client/TestFallbackToUseReplay.java | 129 +++++++++++++++++++++ ...StartupWithLegacyRegionReplicationEndpoint.java | 85 ++++++++++++++ 7 files changed, 278 insertions(+), 13 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFallbackToUseReplay.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestStartupWithLegacyRegionReplicationEndpoint.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java index 586e7d52e0..8648572a04 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java @@ -147,6 +147,16 @@ public abstract class AsyncRpcRetryingCaller { return Optional.empty(); } + // Sub classes can override this method to change the error type, to control the retry logic. + // For example, during rolling upgrading, if we call this newly added method, we will get a + // UnsupportedOperationException(wrapped by a DNRIOE), and sometimes we may want to fallback to + // use the old method first, so the sub class could change the exception type to something not a + // DNRIOE, so we will schedule a retry, and the next time the sub class could use old method to + // make the rpc call. + protected Throwable preProcessError(Throwable error) { + return error; + } + protected final void onError(Throwable t, Supplier errMsg, Consumer updateCachedLocation) { if (future.isDone()) { @@ -156,7 +166,7 @@ public abstract class AsyncRpcRetryingCaller { LOG.debug("The future is already done, canceled={}, give up retrying", future.isCancelled()); return; } - Throwable error = translateException(t); + Throwable error = preProcessError(translateException(t)); // We use this retrying caller to open a scanner, as it is idempotent, but we may throw // ScannerResetException, which is a DoNotRetryIOException when opening a scanner as now we will // also fetch data when opening a scanner. The intention here is that if we hit a diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java index a786206a64..e8ecec262b 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java @@ -56,6 +56,9 @@ public final class ReplicationUtils { // since some FileSystem implementation may not support atomic rename. public static final String RENAME_WAL_SUFFIX = ".ren"; + public static final String LEGACY_REGION_REPLICATION_ENDPOINT_NAME = + "org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint"; + private ReplicationUtils() { } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java index c854ba3833..726559fc28 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java @@ -22,6 +22,7 @@ import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import java.io.IOException; import java.util.List; import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil; import org.apache.hadoop.hbase.util.Pair; @@ -43,6 +44,11 @@ public class AsyncRegionReplicationRetryingCaller extends AsyncRpcRetryingCaller private final Entry[] entries; + // whether to use replay instead of replicateToReplica, during rolling upgrading if the target + // region server has not been upgraded then it will not have the replicateToReplica method, so we + // could use replay method first, though it is not perfect. + private boolean useReplay; + public AsyncRegionReplicationRetryingCaller(HashedWheelTimer retryTimer, AsyncClusterConnectionImpl conn, int maxAttempts, long rpcTimeoutNs, long operationTimeoutNs, RegionInfo replica, List entries) { @@ -53,6 +59,27 @@ public class AsyncRegionReplicationRetryingCaller extends AsyncRpcRetryingCaller this.entries = entries.toArray(new Entry[0]); } + @Override + protected Throwable preProcessError(Throwable error) { + if (error instanceof DoNotRetryIOException && + error.getCause() instanceof UnsupportedOperationException) { + // fallback to use replay, and also return the cause to let the upper retry + useReplay = true; + return error.getCause(); + } + return error; + } + + private void onComplete(HRegionLocation loc) { + if (controller.failed()) { + onError(controller.getFailed(), + () -> "Call to " + loc.getServerName() + " for " + replica + " failed", + err -> conn.getLocator().updateCachedLocationOnError(loc, err)); + } else { + future.complete(null); + } + } + private void call(HRegionLocation loc) { AdminService.Interface stub; try { @@ -67,15 +94,11 @@ public class AsyncRegionReplicationRetryingCaller extends AsyncRpcRetryingCaller .buildReplicateWALEntryRequest(entries, replica.getEncodedNameAsBytes(), null, null, null); resetCallTimeout(); controller.setCellScanner(pair.getSecond()); - stub.replicateToReplica(controller, pair.getFirst(), r -> { - if (controller.failed()) { - onError(controller.getFailed(), - () -> "Call to " + loc.getServerName() + " for " + replica + " failed", - err -> conn.getLocator().updateCachedLocationOnError(loc, err)); - } else { - future.complete(null); - } - }); + if (useReplay) { + stub.replay(controller, pair.getFirst(), r -> onComplete(loc)); + } else { + stub.replicateToReplica(controller, pair.getFirst(), r -> onComplete(loc)); + } } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index d9829a5f1e..b62d4b4283 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -57,6 +57,8 @@ import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; import org.apache.hbase.thirdparty.com.google.common.collect.Maps; @@ -69,6 +71,8 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Maps; @InterfaceAudience.Private public class ReplicationPeerManager { + private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerManager.class); + private final ReplicationPeerStorage peerStorage; private final ReplicationQueueStorage queueStorage; @@ -546,7 +550,13 @@ public class ReplicationPeerManager { ConcurrentMap peers = new ConcurrentHashMap<>(); for (String peerId : peerStorage.listPeerIds()) { ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId); - + if (ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME + .equals(peerConfig.getReplicationEndpointImpl())) { + // we do not use this endpoint for region replication any more, see HBASE-26233 + LOG.warn("Legacy region replication peer found, removing: {}", peerConfig); + peerStorage.removePeer(peerId); + continue; + } peerConfig = ReplicationPeerConfigUtil.updateReplicationBasePeerConfigs(conf, peerConfig); peerStorage.updatePeerConfig(peerId, peerConfig); boolean enabled = peerStorage.isPeerEnabled(peerId); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 9f8d8dc219..7a16e3591b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -342,8 +342,14 @@ public class ReplicationSourceManager { * @param peerId the id of the replication peer * @return the source that was created */ - ReplicationSourceInterface addSource(String peerId) throws IOException { + void addSource(String peerId) throws IOException { ReplicationPeer peer = replicationPeers.getPeer(peerId); + if (ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME + .equals(peer.getPeerConfig().getReplicationEndpointImpl())) { + // we do not use this endpoint for region replication any more, see HBASE-26233 + LOG.warn("Legacy region replication peer found, skip adding: {}", peer.getPeerConfig()); + return; + } ReplicationSourceInterface src = createSource(peerId, peer); // synchronized on latestPaths to avoid missing the new log synchronized (this.latestPaths) { @@ -370,7 +376,6 @@ public class ReplicationSourceManager { syncReplicationPeerMappingManager.add(peer.getId(), peerConfig); } src.startup(); - return src; } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFallbackToUseReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFallbackToUseReplay.java new file mode 100644 index 0000000000..35c4d958a3 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFallbackToUseReplay.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.Interface; + +/** + * Make sure we could fallback to use replay method if replicateToReplica method is not present, + * i.e, we are connecting an old region server. + */ +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestFallbackToUseReplay { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestFallbackToUseReplay.class); + + private static Configuration CONF = HBaseConfiguration.create(); + + private static AsyncClusterConnectionImpl CONN; + + private static AsyncRegionReplicationRetryingCaller CALLER; + + private static RegionInfo REPLICA = + RegionInfoBuilder.newBuilder(TableName.valueOf("test")).setReplicaId(1).build(); + + private static AtomicBoolean REPLAY_CALLED = new AtomicBoolean(false); + + @BeforeClass + public static void setUpBeforeClass() throws IOException { + CONF.setInt(AsyncConnectionConfiguration.START_LOG_ERRORS_AFTER_COUNT_KEY, 0); + AsyncRegionLocator locator = mock(AsyncRegionLocator.class); + when(locator.getRegionLocation(any(), any(), anyInt(), any(), anyLong())) + .thenReturn(CompletableFuture.completedFuture(new HRegionLocation(REPLICA, + ServerName.valueOf("localhost", 12345, EnvironmentEdgeManager.currentTime())))); + AdminService.Interface stub = mock(AdminService.Interface.class); + // fail the call to replicateToReplica + doAnswer(i -> { + HBaseRpcController controller = i.getArgument(0, HBaseRpcController.class); + controller.setFailed(new DoNotRetryIOException(new UnsupportedOperationException())); + RpcCallback done = i.getArgument(2, RpcCallback.class); + done.run(null); + return null; + }).when(stub).replicateToReplica(any(), any(), any()); + doAnswer(i -> { + REPLAY_CALLED.set(true); + RpcCallback done = i.getArgument(2, RpcCallback.class); + done.run(null); + return null; + }).when(stub).replay(any(), any(), any()); + CONN = new AsyncClusterConnectionImpl(CONF, mock(ConnectionRegistry.class), "test", null, + User.getCurrent()) { + + @Override + AsyncRegionLocator getLocator() { + return locator; + } + + @Override + Interface getAdminStub(ServerName serverName) throws IOException { + return stub; + } + }; + CALLER = new AsyncRegionReplicationRetryingCaller(AsyncClusterConnectionImpl.RETRY_TIMER, CONN, + 10, TimeUnit.SECONDS.toNanos(1), TimeUnit.SECONDS.toNanos(10), REPLICA, + Collections.emptyList()); + } + + @AfterClass + public static void tearDownAfterClass() throws IOException { + Closeables.close(CONN, true); + } + + @Test + public void testFallback() { + CALLER.call().join(); + assertTrue(REPLAY_CALLED.get()); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestStartupWithLegacyRegionReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestStartupWithLegacyRegionReplicationEndpoint.java new file mode 100644 index 0000000000..2c75d373c7 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestStartupWithLegacyRegionReplicationEndpoint.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.regionreplication; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationUtils; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Make sure we could start the cluster with RegionReplicaReplicationEndpoint configured. + */ +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestStartupWithLegacyRegionReplicationEndpoint { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestStartupWithLegacyRegionReplicationEndpoint.class); + + private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); + + @BeforeClass + public static void setUp() throws Exception { + UTIL.startMiniCluster(1); + } + + @AfterClass + public static void tearDown() throws IOException { + UTIL.shutdownMiniCluster(); + } + + @Test + public void test() throws Exception { + ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder() + .setClusterKey("127.0.0.1:2181:/hbase") + .setReplicationEndpointImpl(ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME).build(); + // can not use Admin.addPeer as it will fail with ClassNotFound + UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().addPeer("legacy", peerConfig, + true); + UTIL.getMiniHBaseCluster().stopRegionServer(0); + RegionServerThread rst = UTIL.getMiniHBaseCluster().startRegionServer(); + // we should still have this peer + assertNotNull(UTIL.getAdmin().getReplicationPeerConfig("legacy")); + // but at RS side, we should not have this peer loaded as replication source + assertTrue(rst.getRegionServer().getReplicationSourceService().getReplicationManager() + .getSources().isEmpty()); + UTIL.shutdownMiniHBaseCluster(); + UTIL.restartHBaseCluster(1); + // now we should have removed the peer + assertThrows(ReplicationPeerNotFoundException.class, + () -> UTIL.getAdmin().getReplicationPeerConfig("legacy")); + // at rs side, we should not have the peer this time, not only for not having replication source + assertTrue(UTIL.getMiniHBaseCluster().getRegionServer(0).getReplicationSourceService() + .getReplicationManager().getReplicationPeers().getAllPeerIds().isEmpty()); + } +} -- 2.11.4.GIT