From 7593e86c5fa161ce39007183dab4cb077a37e43c Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Wed, 12 Dec 2018 09:33:33 +0800 Subject: [PATCH] HBASE-21538 Rewrite RegionReplicaFlushHandler to use AsyncClusterConnection --- .../hbase/client/AsyncClusterConnection.java | 8 ++ .../hadoop/hbase/client/AsyncConnectionImpl.java | 8 ++ .../hbase/client/ClusterConnectionFactory.java | 16 +-- .../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 36 ++++--- .../hbase/protobuf/ReplicationProtbufUtil.java | 15 +-- .../hadoop/hbase/regionserver/HRegionServer.java | 3 +- .../handler/RegionReplicaFlushHandler.java | 110 ++++++++++++--------- 7 files changed, 106 insertions(+), 90 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java index 1327fd779b..f1f64ca164 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java @@ -17,10 +17,13 @@ */ package org.apache.hadoop.hbase.client; +import java.util.concurrent.CompletableFuture; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; + /** * The asynchronous connection for internal usage. */ @@ -41,4 +44,9 @@ public interface AsyncClusterConnection extends AsyncConnection { * Get the rpc client we used to communicate with other servers. */ RpcClient getRpcClient(); + + /** + * Flush a region and get the response. + */ + CompletableFuture flush(byte[] regionName, boolean writeFlushWALMarker); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index 62b9d8ba7a..c17cca9fca 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -60,6 +60,7 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; @@ -384,4 +385,11 @@ class AsyncConnectionImpl implements AsyncClusterConnection { public AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName) { return new AsyncRegionServerAdmin(serverName, this); } + + @Override + public CompletableFuture flush(byte[] regionName, + boolean writeFlushWALMarker) { + RawAsyncHBaseAdmin admin = (RawAsyncHBaseAdmin) getAdmin(); + return admin.flushRegionInternal(regionName, writeFlushWALMarker); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java index 68c0630bee..79484dbb6a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java @@ -18,15 +18,12 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; -import java.io.InterruptedIOException; import java.net.SocketAddress; -import java.util.concurrent.ExecutionException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hbase.thirdparty.com.google.common.base.Throwables; - /** * The factory for creating {@link AsyncClusterConnection}. */ @@ -48,16 +45,7 @@ public final class ClusterConnectionFactory { public static AsyncClusterConnection createAsyncClusterConnection(Configuration conf, SocketAddress localAddress, User user) throws IOException { AsyncRegistry registry = AsyncRegistryFactory.getRegistry(conf); - String clusterId; - try { - clusterId = registry.getClusterId().get(); - } catch (InterruptedException e) { - throw (IOException) new InterruptedIOException().initCause(e); - } catch (ExecutionException e) { - Throwable cause = e.getCause(); - Throwables.propagateIfPossible(cause, IOException.class); - throw new IOException(cause); - } + String clusterId = FutureUtils.get(registry.getClusterId()); return new AsyncConnectionImpl(conf, registry, clusterId, localAddress, user); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 3303fd39e4..b3d3468791 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -910,7 +910,19 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { @Override public CompletableFuture flushRegion(byte[] regionName) { - CompletableFuture future = new CompletableFuture<>(); + return flushRegionInternal(regionName, false).thenAccept(r -> { + }); + } + + /** + * This method is for internal use only, where we need the response of the flush. + *

+ * As it exposes the protobuf message, please do NOT try to expose it as a public + * API. + */ + CompletableFuture flushRegionInternal(byte[] regionName, + boolean writeFlushWALMarker) { + CompletableFuture future = new CompletableFuture<>(); addListener(getRegionLocation(regionName), (location, err) -> { if (err != null) { future.completeExceptionally(err); @@ -922,7 +934,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); return; } - addListener(flush(serverName, location.getRegion()), (ret, err2) -> { + addListener(flush(serverName, location.getRegion(), writeFlushWALMarker), (ret, err2) -> { if (err2 != null) { future.completeExceptionally(err2); } else { @@ -933,15 +945,14 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { return future; } - private CompletableFuture flush(final ServerName serverName, final RegionInfo regionInfo) { - return this. newAdminCaller() - .serverName(serverName) - .action( - (controller, stub) -> this. adminCall( - controller, stub, RequestConverter.buildFlushRegionRequest(regionInfo - .getRegionName()), (s, c, req, done) -> s.flushRegion(c, req, done), - resp -> null)) - .call(); + private CompletableFuture flush(ServerName serverName, RegionInfo regionInfo, + boolean writeFlushWALMarker) { + return this. newAdminCaller().serverName(serverName) + .action((controller, stub) -> this + . adminCall(controller, stub, + RequestConverter.buildFlushRegionRequest(regionInfo.getRegionName(), writeFlushWALMarker), + (s, c, req, done) -> s.flushRegion(c, req, done), resp -> resp)) + .call(); } @Override @@ -954,7 +965,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } List> compactFutures = new ArrayList<>(); if (hRegionInfos != null) { - hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region))); + hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region, false).thenAccept(r -> { + }))); } addListener(CompletableFuture.allOf( compactFutures.toArray(new CompletableFuture[compactFutures.size()])), (ret, err2) -> { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java index 74fad26589..9f41a7695c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java @@ -18,13 +18,10 @@ */ package org.apache.hadoop.hbase.protobuf; - import java.io.IOException; -import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import java.util.concurrent.ExecutionException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; @@ -32,12 +29,12 @@ import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; import org.apache.hadoop.hbase.io.SizedCellScanner; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; +import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hbase.thirdparty.com.google.common.base.Throwables; import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; @@ -60,15 +57,7 @@ public class ReplicationProtbufUtil { throws IOException { Pair p = buildReplicateWALEntryRequest( entries, null, replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir); - try { - admin.replicateWALEntry(p.getFirst(), p.getSecond()).get(); - } catch (InterruptedException e) { - throw (IOException) new InterruptedIOException().initCause(e); - } catch (ExecutionException e) { - Throwable cause = e.getCause(); - Throwables.propagateIfPossible(cause, IOException.class); - throw new IOException(e); - } + FutureUtils.get(admin.replicateWALEntry(p.getFirst(), p.getSecond())); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 992a39ad32..a6214ba228 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -2409,8 +2409,7 @@ public class HRegionServer extends HasThread implements // submit it to be handled by one of the handlers so that we do not block OpenRegionHandler if (this.executorService != null) { - this.executorService.submit(new RegionReplicaFlushHandler(this, clusterConnection, - rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region)); + this.executorService.submit(new RegionReplicaFlushHandler(this, region)); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java index 81b6d7e0a9..0729203246 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java @@ -20,26 +20,23 @@ package org.apache.hadoop.hbase.regionserver.handler; import java.io.IOException; import java.io.InterruptedIOException; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.client.ClusterConnection; -import org.apache.hadoop.hbase.client.FlushRegionCallable; -import org.apache.hadoop.hbase.client.RegionReplicaUtil; -import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; +import org.apache.hadoop.hbase.client.AsyncClusterConnection; import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.executor.EventType; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.hadoop.hbase.util.RetryCounterFactory; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; /** * HBASE-11580: With the async wal approach (HBASE-11568), the edits are not persisted to wal in @@ -56,20 +53,13 @@ public class RegionReplicaFlushHandler extends EventHandler { private static final Logger LOG = LoggerFactory.getLogger(RegionReplicaFlushHandler.class); - private final ClusterConnection connection; - private final RpcRetryingCallerFactory rpcRetryingCallerFactory; - private final RpcControllerFactory rpcControllerFactory; - private final int operationTimeout; + private final AsyncClusterConnection connection; + private final HRegion region; - public RegionReplicaFlushHandler(Server server, ClusterConnection connection, - RpcRetryingCallerFactory rpcRetryingCallerFactory, RpcControllerFactory rpcControllerFactory, - int operationTimeout, HRegion region) { + public RegionReplicaFlushHandler(Server server, HRegion region) { super(server, EventType.RS_REGION_REPLICA_FLUSH); - this.connection = connection; - this.rpcRetryingCallerFactory = rpcRetryingCallerFactory; - this.rpcControllerFactory = rpcControllerFactory; - this.operationTimeout = operationTimeout; + this.connection = server.getAsyncClusterConnection(); this.region = region; } @@ -103,7 +93,7 @@ public class RegionReplicaFlushHandler extends EventHandler { return numRetries; } - void triggerFlushInPrimaryRegion(final HRegion region) throws IOException, RuntimeException { + void triggerFlushInPrimaryRegion(final HRegion region) throws IOException { long pause = connection.getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); @@ -117,45 +107,59 @@ public class RegionReplicaFlushHandler extends EventHandler { } while (!region.isClosing() && !region.isClosed() && !server.isAborted() && !server.isStopped()) { - FlushRegionCallable flushCallable = new FlushRegionCallable( - connection, rpcControllerFactory, - RegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()), true); - // TODO: flushRegion() is a blocking call waiting for the flush to complete. Ideally we // do not have to wait for the whole flush here, just initiate it. - FlushRegionResponse response = null; + FlushRegionResponse response; try { - response = rpcRetryingCallerFactory.newCaller() - .callWithRetries(flushCallable, this.operationTimeout); - } catch (IOException ex) { - if (ex instanceof TableNotFoundException - || connection.isTableDisabled(region.getRegionInfo().getTable())) { + response = FutureUtils.get(connection.flush(ServerRegionReplicaUtil + .getRegionInfoForDefaultReplica(region.getRegionInfo()).getRegionName(), true)); + } catch (IOException e) { + if (e instanceof TableNotFoundException || FutureUtils + .get(connection.getAdmin().isTableDisabled(region.getRegionInfo().getTable()))) { return; } - throw ex; + if (!counter.shouldRetry()) { + throw e; + } + // The reason that why we need to retry here is that, the retry for asynchronous admin + // request is much simpler than the normal operation, if we failed to locate the region once + // then we will throw the exception out and will not try to relocate again. So here we need + // to add some retries by ourselves to prevent shutting down the region server too + // frequent... + LOG.debug("Failed to trigger a flush of primary region replica {} of region {}, retry={}", + ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()) + .getRegionNameAsString(), + region.getRegionInfo().getRegionNameAsString(), counter.getAttemptTimes(), e); + try { + counter.sleepUntilNextRetry(); + } catch (InterruptedException e1) { + throw new InterruptedIOException(e1.getMessage()); + } + continue; } if (response.getFlushed()) { // then we have to wait for seeing the flush entry. All reads will be rejected until we see // a complete flush cycle or replay a region open event if (LOG.isDebugEnabled()) { - LOG.debug("Successfully triggered a flush of primary region replica " - + ServerRegionReplicaUtil - .getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName() - + " of region " + region.getRegionInfo().getEncodedName() - + " Now waiting and blocking reads until observing a full flush cycle"); + LOG.debug("Successfully triggered a flush of primary region replica " + + ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()) + .getRegionNameAsString() + + " of region " + region.getRegionInfo().getRegionNameAsString() + + " Now waiting and blocking reads until observing a full flush cycle"); } region.setReadsEnabled(true); break; } else { if (response.hasWroteFlushWalMarker()) { - if(response.getWroteFlushWalMarker()) { + if (response.getWroteFlushWalMarker()) { if (LOG.isDebugEnabled()) { - LOG.debug("Successfully triggered an empty flush marker(memstore empty) of primary " - + "region replica " + ServerRegionReplicaUtil - .getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName() - + " of region " + region.getRegionInfo().getEncodedName() + " Now waiting and " - + "blocking reads until observing a flush marker"); + LOG.debug("Successfully triggered an empty flush marker(memstore empty) of primary " + + "region replica " + + ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()) + .getRegionNameAsString() + + " of region " + region.getRegionInfo().getRegionNameAsString() + + " Now waiting and " + "blocking reads until observing a flush marker"); } region.setReadsEnabled(true); break; @@ -164,15 +168,23 @@ public class RegionReplicaFlushHandler extends EventHandler { // closing or already flushing. Retry flush again after some sleep. if (!counter.shouldRetry()) { throw new IOException("Cannot cause primary to flush or drop a wal marker after " + - "retries. Failing opening of this region replica " - + region.getRegionInfo().getEncodedName()); + counter.getAttemptTimes() + " retries. Failing opening of this region replica " + + region.getRegionInfo().getRegionNameAsString()); + } else { + LOG.warn( + "Cannot cause primary replica {} to flush or drop a wal marker " + + "for region replica {}, retry={}", + ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()) + .getRegionNameAsString(), + region.getRegionInfo().getRegionNameAsString(), counter.getAttemptTimes()); } } } else { // nothing to do. Are we dealing with an old server? - LOG.warn("Was not able to trigger a flush from primary region due to old server version? " - + "Continuing to open the secondary region replica: " - + region.getRegionInfo().getEncodedName()); + LOG.warn( + "Was not able to trigger a flush from primary region due to old server version? " + + "Continuing to open the secondary region replica: " + + region.getRegionInfo().getRegionNameAsString()); region.setReadsEnabled(true); break; } -- 2.11.4.GIT