HBASE-26412 Handle sink failure in RegionReplicationSink (#3815)
[hbase.git] / hbase-server / src / main / java / org / apache / hadoop / hbase / regionserver / RegionReplicationSink.java
blobcdd77e8ea619793ac1637fec63674ce873548f60
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.regionserver;
20 import java.io.IOException;
21 import java.util.ArrayDeque;
22 import java.util.ArrayList;
23 import java.util.HashMap;
24 import java.util.HashSet;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.Queue;
28 import java.util.Set;
29 import java.util.TreeSet;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicInteger;
32 import java.util.stream.Collectors;
33 import org.apache.commons.lang3.mutable.MutableObject;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.hbase.Cell;
36 import org.apache.hadoop.hbase.CellUtil;
37 import org.apache.hadoop.hbase.client.AsyncClusterConnection;
38 import org.apache.hadoop.hbase.client.RegionInfo;
39 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
40 import org.apache.hadoop.hbase.client.TableDescriptor;
41 import org.apache.hadoop.hbase.ipc.ServerCall;
42 import org.apache.hadoop.hbase.util.Bytes;
43 import org.apache.hadoop.hbase.util.FutureUtils;
44 import org.apache.hadoop.hbase.wal.WAL;
45 import org.apache.hadoop.hbase.wal.WALEdit;
46 import org.apache.hadoop.hbase.wal.WALKeyImpl;
47 import org.apache.yetus.audience.InterfaceAudience;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
51 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
53 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
55 /**
56 * The class for replicating WAL edits to secondary replicas, one instance per region.
58 @InterfaceAudience.Private
59 public class RegionReplicationSink {
61 private static final Logger LOG = LoggerFactory.getLogger(RegionReplicationSink.class);
63 public static final String MAX_PENDING_SIZE = "hbase.region.read-replica.sink.max-pending-size";
65 public static final long MAX_PENDING_SIZE_DEFAULT = 10L * 1024 * 1024;
67 public static final String RETRIES_NUMBER = "hbase.region.read-replica.sink.retries.number";
69 public static final int RETRIES_NUMBER_DEFAULT = 3;
71 public static final String RPC_TIMEOUT_MS = "hbase.region.read-replica.sink.rpc.timeout.ms";
73 public static final long RPC_TIMEOUT_MS_DEFAULT = 200;
75 public static final String OPERATION_TIMEOUT_MS =
76 "hbase.region.read-replica.sink.operation.timeout.ms";
78 public static final long OPERATION_TIMEOUT_MS_DEFAULT = 1000;
80 private static final class SinkEntry {
82 final WALKeyImpl key;
84 final WALEdit edit;
86 final ServerCall<?> rpcCall;
88 SinkEntry(WALKeyImpl key, WALEdit edit, ServerCall<?> rpcCall) {
89 this.key = key;
90 this.edit = edit;
91 this.rpcCall = rpcCall;
92 if (rpcCall != null) {
93 // increase the reference count to avoid the rpc framework free the memory before we
94 // actually sending them out.
95 rpcCall.retainByWAL();
99 /**
100 * Should be called regardless of the result of the replicating operation. Unless you still want
101 * to reuse this entry, otherwise you must call this method to release the possible off heap
102 * memories.
104 void replicated() {
105 if (rpcCall != null) {
106 rpcCall.releaseByWAL();
111 private final RegionInfo primary;
113 private final TableDescriptor tableDesc;
115 private final Runnable flushRequester;
117 private final AsyncClusterConnection conn;
119 // used to track the replicas which we failed to replicate edits to them
120 // will be cleared after we get a flush edit.
121 private final Set<Integer> failedReplicas = new HashSet<>();
123 private final Queue<SinkEntry> entries = new ArrayDeque<>();
125 private final int retries;
127 private final long rpcTimeoutNs;
129 private final long operationTimeoutNs;
131 private boolean sending;
133 private boolean stopping;
135 private boolean stopped;
137 RegionReplicationSink(Configuration conf, RegionInfo primary, TableDescriptor td,
138 Runnable flushRequester, AsyncClusterConnection conn) {
139 Preconditions.checkArgument(RegionReplicaUtil.isDefaultReplica(primary), "%s is not primary",
140 primary);
141 Preconditions.checkArgument(td.getRegionReplication() > 1,
142 "region replication should be greater than 1 but got %s", td.getRegionReplication());
143 this.primary = primary;
144 this.tableDesc = td;
145 this.flushRequester = flushRequester;
146 this.conn = conn;
147 this.retries = conf.getInt(RETRIES_NUMBER, RETRIES_NUMBER_DEFAULT);
148 this.rpcTimeoutNs =
149 TimeUnit.MILLISECONDS.toNanos(conf.getLong(RPC_TIMEOUT_MS, RPC_TIMEOUT_MS_DEFAULT));
150 this.operationTimeoutNs = TimeUnit.MILLISECONDS
151 .toNanos(conf.getLong(OPERATION_TIMEOUT_MS, OPERATION_TIMEOUT_MS_DEFAULT));
154 private void onComplete(List<SinkEntry> sent,
155 Map<Integer, MutableObject<Throwable>> replica2Error) {
156 sent.forEach(SinkEntry::replicated);
157 Set<Integer> failed = new HashSet<>();
158 for (Map.Entry<Integer, MutableObject<Throwable>> entry : replica2Error.entrySet()) {
159 Integer replicaId = entry.getKey();
160 Throwable error = entry.getValue().getValue();
161 if (error != null) {
162 LOG.warn("Failed to replicate to secondary replica {} for {}, stop replicating" +
163 " for a while and trigger a flush", replicaId, primary, error);
164 failed.add(replicaId);
167 synchronized (entries) {
168 if (!failed.isEmpty()) {
169 failedReplicas.addAll(failed);
170 flushRequester.run();
172 sending = false;
173 if (stopping) {
174 stopped = true;
175 entries.notifyAll();
176 return;
178 if (!entries.isEmpty()) {
179 send();
184 private void send() {
185 List<SinkEntry> toSend = new ArrayList<>();
186 for (SinkEntry entry;;) {
187 entry = entries.poll();
188 if (entry == null) {
189 break;
191 toSend.add(entry);
193 int toSendReplicaCount = tableDesc.getRegionReplication() - 1 - failedReplicas.size();
194 if (toSendReplicaCount <= 0) {
195 return;
197 sending = true;
198 List<WAL.Entry> walEntries =
199 toSend.stream().map(e -> new WAL.Entry(e.key, e.edit)).collect(Collectors.toList());
200 AtomicInteger remaining = new AtomicInteger(toSendReplicaCount);
201 Map<Integer, MutableObject<Throwable>> replica2Error = new HashMap<>();
202 for (int replicaId = 1; replicaId < tableDesc.getRegionReplication(); replicaId++) {
203 MutableObject<Throwable> error = new MutableObject<>();
204 replica2Error.put(replicaId, error);
205 RegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(primary, replicaId);
206 FutureUtils.addListener(
207 conn.replicate(replica, walEntries, retries, rpcTimeoutNs, operationTimeoutNs), (r, e) -> {
208 error.setValue(e);
209 if (remaining.decrementAndGet() == 0) {
210 onComplete(toSend, replica2Error);
216 private boolean flushAllStores(FlushDescriptor flushDesc) {
217 Set<byte[]> storesFlushed =
218 flushDesc.getStoreFlushesList().stream().map(sfd -> sfd.getFamilyName().toByteArray())
219 .collect(Collectors.toCollection(() -> new TreeSet<>(Bytes.BYTES_COMPARATOR)));
220 if (storesFlushed.size() != tableDesc.getColumnFamilyCount()) {
221 return false;
223 return storesFlushed.containsAll(tableDesc.getColumnFamilyNames());
227 * Add this edit to replication queue.
228 * <p/>
229 * The {@code rpcCall} is for retaining the cells if the edit is built within an rpc call and the
230 * rpc call has cell scanner, which is off heap.
232 public void add(WALKeyImpl key, WALEdit edit, ServerCall<?> rpcCall) {
233 if (!tableDesc.hasRegionMemStoreReplication() && !edit.isMetaEdit()) {
234 // only replicate meta edit if region memstore replication is not enabled
235 return;
237 synchronized (entries) {
238 if (stopping) {
239 return;
241 if (edit.isMetaEdit()) {
242 // check whether we flushed all stores, which means we could drop all the previous edits,
243 // and also, recover from the previous failure of some replicas
244 for (Cell metaCell : edit.getCells()) {
245 if (CellUtil.matchingFamily(metaCell, WALEdit.METAFAMILY)) {
246 FlushDescriptor flushDesc;
247 try {
248 flushDesc = WALEdit.getFlushDescriptor(metaCell);
249 } catch (IOException e) {
250 LOG.warn("Failed to parse FlushDescriptor from {}", metaCell);
251 continue;
253 if (flushDesc != null && flushAllStores(flushDesc)) {
254 LOG.debug("Got a flush all request, clear failed replicas {} and {} pending" +
255 " replication entries", failedReplicas, entries.size());
256 entries.clear();
257 failedReplicas.clear();
262 // TODO: limit the total cached entries here, and we should have a global limitation, not for
263 // only this region.
264 entries.add(new SinkEntry(key, edit, rpcCall));
265 if (!sending) {
266 send();
272 * Stop the replication sink.
273 * <p/>
274 * Usually this should only be called when you want to close a region.
276 void stop() {
277 synchronized (entries) {
278 stopping = true;
279 if (!sending) {
280 stopped = true;
281 entries.notifyAll();
287 * Make sure that we have finished all the replicating requests.
288 * <p/>
289 * After returning, we can make sure there will be no new replicating requests to secondary
290 * replicas.
291 * <p/>
292 * This is used to keep the replicating order the same with the WAL edit order when writing.
294 void waitUntilStopped() throws InterruptedException {
295 synchronized (entries) {
296 while (!stopped) {
297 entries.wait();