2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.regionserver
;
20 import java
.io
.IOException
;
21 import java
.util
.ArrayDeque
;
22 import java
.util
.ArrayList
;
23 import java
.util
.HashMap
;
24 import java
.util
.HashSet
;
25 import java
.util
.List
;
27 import java
.util
.Queue
;
29 import java
.util
.TreeSet
;
30 import java
.util
.concurrent
.TimeUnit
;
31 import java
.util
.concurrent
.atomic
.AtomicInteger
;
32 import java
.util
.stream
.Collectors
;
33 import org
.apache
.commons
.lang3
.mutable
.MutableObject
;
34 import org
.apache
.hadoop
.conf
.Configuration
;
35 import org
.apache
.hadoop
.hbase
.Cell
;
36 import org
.apache
.hadoop
.hbase
.CellUtil
;
37 import org
.apache
.hadoop
.hbase
.client
.AsyncClusterConnection
;
38 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
39 import org
.apache
.hadoop
.hbase
.client
.RegionReplicaUtil
;
40 import org
.apache
.hadoop
.hbase
.client
.TableDescriptor
;
41 import org
.apache
.hadoop
.hbase
.ipc
.ServerCall
;
42 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
43 import org
.apache
.hadoop
.hbase
.util
.FutureUtils
;
44 import org
.apache
.hadoop
.hbase
.wal
.WAL
;
45 import org
.apache
.hadoop
.hbase
.wal
.WALEdit
;
46 import org
.apache
.hadoop
.hbase
.wal
.WALKeyImpl
;
47 import org
.apache
.yetus
.audience
.InterfaceAudience
;
48 import org
.slf4j
.Logger
;
49 import org
.slf4j
.LoggerFactory
;
51 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.base
.Preconditions
;
53 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.WALProtos
.FlushDescriptor
;
56 * The class for replicating WAL edits to secondary replicas, one instance per region.
58 @InterfaceAudience.Private
59 public class RegionReplicationSink
{
61 private static final Logger LOG
= LoggerFactory
.getLogger(RegionReplicationSink
.class);
63 public static final String MAX_PENDING_SIZE
= "hbase.region.read-replica.sink.max-pending-size";
65 public static final long MAX_PENDING_SIZE_DEFAULT
= 10L * 1024 * 1024;
67 public static final String RETRIES_NUMBER
= "hbase.region.read-replica.sink.retries.number";
69 public static final int RETRIES_NUMBER_DEFAULT
= 3;
71 public static final String RPC_TIMEOUT_MS
= "hbase.region.read-replica.sink.rpc.timeout.ms";
73 public static final long RPC_TIMEOUT_MS_DEFAULT
= 200;
75 public static final String OPERATION_TIMEOUT_MS
=
76 "hbase.region.read-replica.sink.operation.timeout.ms";
78 public static final long OPERATION_TIMEOUT_MS_DEFAULT
= 1000;
80 private static final class SinkEntry
{
86 final ServerCall
<?
> rpcCall
;
88 SinkEntry(WALKeyImpl key
, WALEdit edit
, ServerCall
<?
> rpcCall
) {
91 this.rpcCall
= rpcCall
;
92 if (rpcCall
!= null) {
93 // increase the reference count to avoid the rpc framework free the memory before we
94 // actually sending them out.
95 rpcCall
.retainByWAL();
100 * Should be called regardless of the result of the replicating operation. Unless you still want
101 * to reuse this entry, otherwise you must call this method to release the possible off heap
105 if (rpcCall
!= null) {
106 rpcCall
.releaseByWAL();
111 private final RegionInfo primary
;
113 private final TableDescriptor tableDesc
;
115 private final Runnable flushRequester
;
117 private final AsyncClusterConnection conn
;
119 // used to track the replicas which we failed to replicate edits to them
120 // will be cleared after we get a flush edit.
121 private final Set
<Integer
> failedReplicas
= new HashSet
<>();
123 private final Queue
<SinkEntry
> entries
= new ArrayDeque
<>();
125 private final int retries
;
127 private final long rpcTimeoutNs
;
129 private final long operationTimeoutNs
;
131 private boolean sending
;
133 private boolean stopping
;
135 private boolean stopped
;
137 RegionReplicationSink(Configuration conf
, RegionInfo primary
, TableDescriptor td
,
138 Runnable flushRequester
, AsyncClusterConnection conn
) {
139 Preconditions
.checkArgument(RegionReplicaUtil
.isDefaultReplica(primary
), "%s is not primary",
141 Preconditions
.checkArgument(td
.getRegionReplication() > 1,
142 "region replication should be greater than 1 but got %s", td
.getRegionReplication());
143 this.primary
= primary
;
145 this.flushRequester
= flushRequester
;
147 this.retries
= conf
.getInt(RETRIES_NUMBER
, RETRIES_NUMBER_DEFAULT
);
149 TimeUnit
.MILLISECONDS
.toNanos(conf
.getLong(RPC_TIMEOUT_MS
, RPC_TIMEOUT_MS_DEFAULT
));
150 this.operationTimeoutNs
= TimeUnit
.MILLISECONDS
151 .toNanos(conf
.getLong(OPERATION_TIMEOUT_MS
, OPERATION_TIMEOUT_MS_DEFAULT
));
154 private void onComplete(List
<SinkEntry
> sent
,
155 Map
<Integer
, MutableObject
<Throwable
>> replica2Error
) {
156 sent
.forEach(SinkEntry
::replicated
);
157 Set
<Integer
> failed
= new HashSet
<>();
158 for (Map
.Entry
<Integer
, MutableObject
<Throwable
>> entry
: replica2Error
.entrySet()) {
159 Integer replicaId
= entry
.getKey();
160 Throwable error
= entry
.getValue().getValue();
162 LOG
.warn("Failed to replicate to secondary replica {} for {}, stop replicating" +
163 " for a while and trigger a flush", replicaId
, primary
, error
);
164 failed
.add(replicaId
);
167 synchronized (entries
) {
168 if (!failed
.isEmpty()) {
169 failedReplicas
.addAll(failed
);
170 flushRequester
.run();
178 if (!entries
.isEmpty()) {
184 private void send() {
185 List
<SinkEntry
> toSend
= new ArrayList
<>();
186 for (SinkEntry entry
;;) {
187 entry
= entries
.poll();
193 int toSendReplicaCount
= tableDesc
.getRegionReplication() - 1 - failedReplicas
.size();
194 if (toSendReplicaCount
<= 0) {
198 List
<WAL
.Entry
> walEntries
=
199 toSend
.stream().map(e
-> new WAL
.Entry(e
.key
, e
.edit
)).collect(Collectors
.toList());
200 AtomicInteger remaining
= new AtomicInteger(toSendReplicaCount
);
201 Map
<Integer
, MutableObject
<Throwable
>> replica2Error
= new HashMap
<>();
202 for (int replicaId
= 1; replicaId
< tableDesc
.getRegionReplication(); replicaId
++) {
203 MutableObject
<Throwable
> error
= new MutableObject
<>();
204 replica2Error
.put(replicaId
, error
);
205 RegionInfo replica
= RegionReplicaUtil
.getRegionInfoForReplica(primary
, replicaId
);
206 FutureUtils
.addListener(
207 conn
.replicate(replica
, walEntries
, retries
, rpcTimeoutNs
, operationTimeoutNs
), (r
, e
) -> {
209 if (remaining
.decrementAndGet() == 0) {
210 onComplete(toSend
, replica2Error
);
216 private boolean flushAllStores(FlushDescriptor flushDesc
) {
217 Set
<byte[]> storesFlushed
=
218 flushDesc
.getStoreFlushesList().stream().map(sfd
-> sfd
.getFamilyName().toByteArray())
219 .collect(Collectors
.toCollection(() -> new TreeSet
<>(Bytes
.BYTES_COMPARATOR
)));
220 if (storesFlushed
.size() != tableDesc
.getColumnFamilyCount()) {
223 return storesFlushed
.containsAll(tableDesc
.getColumnFamilyNames());
227 * Add this edit to replication queue.
229 * The {@code rpcCall} is for retaining the cells if the edit is built within an rpc call and the
230 * rpc call has cell scanner, which is off heap.
232 public void add(WALKeyImpl key
, WALEdit edit
, ServerCall
<?
> rpcCall
) {
233 if (!tableDesc
.hasRegionMemStoreReplication() && !edit
.isMetaEdit()) {
234 // only replicate meta edit if region memstore replication is not enabled
237 synchronized (entries
) {
241 if (edit
.isMetaEdit()) {
242 // check whether we flushed all stores, which means we could drop all the previous edits,
243 // and also, recover from the previous failure of some replicas
244 for (Cell metaCell
: edit
.getCells()) {
245 if (CellUtil
.matchingFamily(metaCell
, WALEdit
.METAFAMILY
)) {
246 FlushDescriptor flushDesc
;
248 flushDesc
= WALEdit
.getFlushDescriptor(metaCell
);
249 } catch (IOException e
) {
250 LOG
.warn("Failed to parse FlushDescriptor from {}", metaCell
);
253 if (flushDesc
!= null && flushAllStores(flushDesc
)) {
254 LOG
.debug("Got a flush all request, clear failed replicas {} and {} pending" +
255 " replication entries", failedReplicas
, entries
.size());
257 failedReplicas
.clear();
262 // TODO: limit the total cached entries here, and we should have a global limitation, not for
264 entries
.add(new SinkEntry(key
, edit
, rpcCall
));
272 * Stop the replication sink.
274 * Usually this should only be called when you want to close a region.
277 synchronized (entries
) {
287 * Make sure that we have finished all the replicating requests.
289 * After returning, we can make sure there will be no new replicating requests to secondary
292 * This is used to keep the replicating order the same with the WAL edit order when writing.
294 void waitUntilStopped() throws InterruptedException
{
295 synchronized (entries
) {