2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.regionserver
;
20 import static org
.junit
.Assert
.assertEquals
;
21 import static org
.junit
.Assert
.assertFalse
;
22 import static org
.mockito
.ArgumentMatchers
.any
;
23 import static org
.mockito
.ArgumentMatchers
.anyInt
;
24 import static org
.mockito
.ArgumentMatchers
.anyList
;
25 import static org
.mockito
.ArgumentMatchers
.anyLong
;
26 import static org
.mockito
.Mockito
.mock
;
27 import static org
.mockito
.Mockito
.times
;
28 import static org
.mockito
.Mockito
.verify
;
29 import static org
.mockito
.Mockito
.when
;
31 import java
.io
.IOException
;
32 import java
.util
.ArrayDeque
;
33 import java
.util
.ArrayList
;
34 import java
.util
.Arrays
;
35 import java
.util
.Collection
;
36 import java
.util
.List
;
37 import java
.util
.Queue
;
38 import java
.util
.UUID
;
39 import java
.util
.concurrent
.CompletableFuture
;
40 import org
.apache
.hadoop
.conf
.Configuration
;
41 import org
.apache
.hadoop
.fs
.FileSystem
;
42 import org
.apache
.hadoop
.fs
.Path
;
43 import org
.apache
.hadoop
.hbase
.CellScanner
;
44 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
45 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
46 import org
.apache
.hadoop
.hbase
.HConstants
;
47 import org
.apache
.hadoop
.hbase
.ServerName
;
48 import org
.apache
.hadoop
.hbase
.TableName
;
49 import org
.apache
.hadoop
.hbase
.TableNameTestRule
;
50 import org
.apache
.hadoop
.hbase
.client
.AsyncClusterConnection
;
51 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptorBuilder
;
52 import org
.apache
.hadoop
.hbase
.client
.Get
;
53 import org
.apache
.hadoop
.hbase
.client
.Put
;
54 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
55 import org
.apache
.hadoop
.hbase
.client
.RegionInfoBuilder
;
56 import org
.apache
.hadoop
.hbase
.client
.RegionReplicaUtil
;
57 import org
.apache
.hadoop
.hbase
.client
.TableDescriptor
;
58 import org
.apache
.hadoop
.hbase
.client
.TableDescriptorBuilder
;
59 import org
.apache
.hadoop
.hbase
.executor
.ExecutorService
;
60 import org
.apache
.hadoop
.hbase
.executor
.ExecutorType
;
61 import org
.apache
.hadoop
.hbase
.monitoring
.MonitoredTask
;
62 import org
.apache
.hadoop
.hbase
.protobuf
.ReplicationProtobufUtil
;
63 import org
.apache
.hadoop
.hbase
.regionserver
.HRegion
.FlushResult
;
64 import org
.apache
.hadoop
.hbase
.regionserver
.regionreplication
.RegionReplicationBufferManager
;
65 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
66 import org
.apache
.hadoop
.hbase
.testclassification
.RegionServerTests
;
67 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
68 import org
.apache
.hadoop
.hbase
.util
.Pair
;
69 import org
.apache
.hadoop
.hbase
.util
.ServerRegionReplicaUtil
;
70 import org
.apache
.hadoop
.hbase
.wal
.WAL
;
71 import org
.apache
.hadoop
.hbase
.wal
.WALFactory
;
72 import org
.junit
.After
;
73 import org
.junit
.AfterClass
;
74 import org
.junit
.Before
;
75 import org
.junit
.BeforeClass
;
76 import org
.junit
.ClassRule
;
77 import org
.junit
.Rule
;
78 import org
.junit
.Test
;
79 import org
.junit
.experimental
.categories
.Category
;
81 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.AdminProtos
.ReplicateWALEntryRequest
;
82 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.AdminProtos
.WALEntry
;
84 @Category({ RegionServerTests
.class, MediumTests
.class })
85 public class TestReplicateToReplica
{
88 public static final HBaseClassTestRule CLASS_RULE
=
89 HBaseClassTestRule
.forClass(TestReplicateToReplica
.class);
91 private static final HBaseTestingUtil UTIL
= new HBaseTestingUtil();
93 private static byte[] FAMILY
= Bytes
.toBytes("family");
95 private static byte[] QUAL
= Bytes
.toBytes("qualifier");
97 private static ExecutorService EXEC
;
100 public final TableNameTestRule name
= new TableNameTestRule();
102 private TableName tableName
;
104 private Path testDir
;
106 private TableDescriptor td
;
108 private RegionServerServices rss
;
110 private AsyncClusterConnection conn
;
112 private RegionReplicationBufferManager manager
;
114 private FlushRequester flushRequester
;
116 private HRegion primary
;
118 private HRegion secondary
;
120 private WALFactory walFactory
;
122 private boolean queueReqAndResps
;
124 private Queue
<Pair
<List
<WAL
.Entry
>, CompletableFuture
<Void
>>> reqAndResps
;
126 private static List
<Put
> TO_ADD_AFTER_PREPARE_FLUSH
;
128 public static final class HRegionForTest
extends HRegion
{
130 public HRegionForTest(HRegionFileSystem fs
, WAL wal
, Configuration confParam
,
131 TableDescriptor htd
, RegionServerServices rsServices
) {
132 super(fs
, wal
, confParam
, htd
, rsServices
);
135 @SuppressWarnings("deprecation")
136 public HRegionForTest(Path tableDir
, WAL wal
, FileSystem fs
, Configuration confParam
,
137 RegionInfo regionInfo
, TableDescriptor htd
, RegionServerServices rsServices
) {
138 super(tableDir
, wal
, fs
, confParam
, regionInfo
, htd
, rsServices
);
142 protected PrepareFlushResult
internalPrepareFlushCache(WAL wal
, long myseqid
,
143 Collection
<HStore
> storesToFlush
, MonitoredTask status
, boolean writeFlushWalMarker
,
144 FlushLifeCycleTracker tracker
) throws IOException
{
145 PrepareFlushResult result
= super.internalPrepareFlushCache(wal
, myseqid
, storesToFlush
,
146 status
, writeFlushWalMarker
, tracker
);
147 for (Put put
: TO_ADD_AFTER_PREPARE_FLUSH
) {
150 TO_ADD_AFTER_PREPARE_FLUSH
.clear();
157 public static void setUpBeforeClass() {
158 Configuration conf
= UTIL
.getConfiguration();
159 conf
.setInt("hbase.region.read-replica.sink.flush.min-interval.secs", 1);
160 conf
.setBoolean(ServerRegionReplicaUtil
.REGION_REPLICA_REPLICATION_CONF_KEY
, true);
161 conf
.setBoolean(ServerRegionReplicaUtil
.REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY
, true);
162 conf
.setClass(HConstants
.REGION_IMPL
, HRegionForTest
.class, HRegion
.class);
163 EXEC
= new ExecutorService("test");
164 EXEC
.startExecutorService(EXEC
.new ExecutorConfig().setCorePoolSize(1)
165 .setExecutorType(ExecutorType
.RS_COMPACTED_FILES_DISCHARGER
));
166 ChunkCreator
.initialize(MemStoreLAB
.CHUNK_SIZE_DEFAULT
, false, 0, 0, 0, null,
167 MemStoreLAB
.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT
);
171 public static void tearDownAfterClass() {
173 UTIL
.cleanupTestDir();
177 public void setUp() throws IOException
{
178 TO_ADD_AFTER_PREPARE_FLUSH
= new ArrayList
<>();
179 tableName
= name
.getTableName();
180 testDir
= UTIL
.getDataTestDir(tableName
.getNameAsString());
181 Configuration conf
= UTIL
.getConfiguration();
182 conf
.set(HConstants
.HBASE_DIR
, testDir
.toString());
184 td
= TableDescriptorBuilder
.newBuilder(tableName
)
185 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of(FAMILY
)).setRegionReplication(2)
186 .setRegionMemStoreReplication(true).build();
188 reqAndResps
= new ArrayDeque
<>();
189 queueReqAndResps
= true;
190 conn
= mock(AsyncClusterConnection
.class);
191 when(conn
.replicate(any(), anyList(), anyInt(), anyLong(), anyLong())).thenAnswer(i
-> {
192 if (queueReqAndResps
) {
193 @SuppressWarnings("unchecked")
194 List
<WAL
.Entry
> entries
= i
.getArgument(1, List
.class);
195 CompletableFuture
<Void
> future
= new CompletableFuture
<>();
196 reqAndResps
.add(Pair
.newPair(entries
, future
));
199 return CompletableFuture
.completedFuture(null);
203 flushRequester
= mock(FlushRequester
.class);
205 rss
= mock(RegionServerServices
.class);
206 when(rss
.getServerName()).thenReturn(ServerName
.valueOf("foo", 1, 1));
207 when(rss
.getConfiguration()).thenReturn(conf
);
208 when(rss
.getRegionServerAccounting()).thenReturn(new RegionServerAccounting(conf
));
209 when(rss
.getExecutorService()).thenReturn(EXEC
);
210 when(rss
.getAsyncClusterConnection()).thenReturn(conn
);
211 when(rss
.getFlushRequester()).thenReturn(flushRequester
);
213 manager
= new RegionReplicationBufferManager(rss
);
214 when(rss
.getRegionReplicationBufferManager()).thenReturn(manager
);
216 RegionInfo primaryHri
= RegionInfoBuilder
.newBuilder(td
.getTableName()).build();
217 RegionInfo secondaryHri
= RegionReplicaUtil
.getRegionInfoForReplica(primaryHri
, 1);
219 walFactory
= new WALFactory(conf
, UUID
.randomUUID().toString());
220 WAL wal
= walFactory
.getWAL(primaryHri
);
221 primary
= HRegion
.createHRegion(primaryHri
, testDir
, conf
, td
, wal
);
224 primary
= HRegion
.openHRegion(testDir
, primaryHri
, td
, wal
, conf
, rss
, null);
225 secondary
= HRegion
.openHRegion(secondaryHri
, td
, null, conf
, rss
, null);
227 when(rss
.getRegions()).then(i
-> {
228 return Arrays
.asList(primary
, secondary
);
231 // process the open events
236 public void tearDown() throws IOException
{
237 // close region will issue a flush, which will enqueue an edit into the replication sink so we
238 // need to complete it otherwise the test will hang.
239 queueReqAndResps
= false;
241 HBaseTestingUtil
.closeRegionAndWAL(primary
);
242 HBaseTestingUtil
.closeRegionAndWAL(secondary
);
243 if (walFactory
!= null) {
248 private FlushResult
flushPrimary() throws IOException
{
249 return primary
.flushcache(true, true, FlushLifeCycleTracker
.DUMMY
);
252 private void replicate(Pair
<List
<WAL
.Entry
>, CompletableFuture
<Void
>> pair
) throws IOException
{
253 Pair
<ReplicateWALEntryRequest
, CellScanner
> params
= ReplicationProtobufUtil
254 .buildReplicateWALEntryRequest(pair
.getFirst().toArray(new WAL
.Entry
[0]),
255 secondary
.getRegionInfo().getEncodedNameAsBytes(), null, null, null);
256 for (WALEntry entry
: params
.getFirst().getEntryList()) {
257 secondary
.replayWALEntry(entry
, params
.getSecond());
259 pair
.getSecond().complete(null);
262 private void replicateOne() throws IOException
{
263 replicate(reqAndResps
.remove());
266 private void replicateAll() throws IOException
{
267 for (Pair
<List
<WAL
.Entry
>, CompletableFuture
<Void
>> pair
;;) {
268 pair
= reqAndResps
.poll();
276 private void failOne() {
277 reqAndResps
.remove().getSecond().completeExceptionally(new IOException("Inject error"));
280 private void failAll() {
281 for (Pair
<List
<WAL
.Entry
>, CompletableFuture
<Void
>> pair
;;) {
282 pair
= reqAndResps
.poll();
286 pair
.getSecond().completeExceptionally(new IOException("Inject error"));
291 public void testNormalReplicate() throws IOException
{
292 byte[] row
= Bytes
.toBytes(0);
293 primary
.put(new Put(row
).addColumn(FAMILY
, QUAL
, Bytes
.toBytes(1)));
295 assertEquals(1, Bytes
.toInt(secondary
.get(new Get(row
)).getValue(FAMILY
, QUAL
)));
299 public void testNormalFlush() throws IOException
{
300 byte[] row
= Bytes
.toBytes(0);
301 primary
.put(new Put(row
).addColumn(FAMILY
, QUAL
, Bytes
.toBytes(1)));
302 TO_ADD_AFTER_PREPARE_FLUSH
.add(new Put(row
).addColumn(FAMILY
, QUAL
, Bytes
.toBytes(2)));
305 assertEquals(2, Bytes
.toInt(secondary
.get(new Get(row
)).getValue(FAMILY
, QUAL
)));
307 // we should have the same memstore size, i.e, the secondary should have also dropped the
309 assertEquals(primary
.getMemStoreDataSize(), secondary
.getMemStoreDataSize());
313 public void testErrorBeforeFlushStart() throws IOException
{
314 byte[] row
= Bytes
.toBytes(0);
315 primary
.put(new Put(row
).addColumn(FAMILY
, QUAL
, Bytes
.toBytes(1)));
317 verify(flushRequester
, times(1)).requestFlush(any(), anyList(), any());
318 TO_ADD_AFTER_PREPARE_FLUSH
.add(new Put(row
).addColumn(FAMILY
, QUAL
, Bytes
.toBytes(2)));
320 // this also tests start flush with empty memstore at secondary replica side
322 assertEquals(2, Bytes
.toInt(secondary
.get(new Get(row
)).getValue(FAMILY
, QUAL
)));
323 assertEquals(primary
.getMemStoreDataSize(), secondary
.getMemStoreDataSize());
327 public void testErrorAfterFlushStartBeforeFlushCommit() throws IOException
{
328 primary
.put(new Put(Bytes
.toBytes(0)).addColumn(FAMILY
, QUAL
, Bytes
.toBytes(1)));
330 TO_ADD_AFTER_PREPARE_FLUSH
331 .add(new Put(Bytes
.toBytes(1)).addColumn(FAMILY
, QUAL
, Bytes
.toBytes(2)));
333 // replicate the start flush edit
335 // fail the remaining edits, the put and the commit flush edit
337 verify(flushRequester
, times(1)).requestFlush(any(), anyList(), any());
338 primary
.put(new Put(Bytes
.toBytes(2)).addColumn(FAMILY
, QUAL
, Bytes
.toBytes(3)));
341 for (int i
= 0; i
< 3; i
++) {
343 Bytes
.toInt(secondary
.get(new Get(Bytes
.toBytes(i
))).getValue(FAMILY
, QUAL
)));
345 // should have nothing in memstore
346 assertEquals(0, secondary
.getMemStoreDataSize());
350 public void testCatchUpWithCannotFlush() throws IOException
, InterruptedException
{
351 byte[] row
= Bytes
.toBytes(0);
352 primary
.put(new Put(row
).addColumn(FAMILY
, QUAL
, Bytes
.toBytes(1)));
354 verify(flushRequester
, times(1)).requestFlush(any(), anyList(), any());
358 // we will request flush the second time
359 verify(flushRequester
, times(2)).requestFlush(any(), anyList(), any());
360 // we can not flush because no content in memstore
361 FlushResult result
= flushPrimary();
362 assertEquals(FlushResult
.Result
.CANNOT_FLUSH_MEMSTORE_EMPTY
, result
.getResult());
363 // the secondary replica does not have this row yet
364 assertFalse(secondary
.get(new Get(row
).setCheckExistenceOnly(true)).getExists().booleanValue());
365 // replicate the can not flush edit
367 // we should have the row now
368 assertEquals(1, Bytes
.toInt(secondary
.get(new Get(row
)).getValue(FAMILY
, QUAL
)));
372 public void testCatchUpWithReopen() throws IOException
{
373 byte[] row
= Bytes
.toBytes(0);
374 primary
.put(new Put(row
).addColumn(FAMILY
, QUAL
, Bytes
.toBytes(1)));
377 // the secondary replica does not have this row yet, although the above close has flushed the
379 assertFalse(secondary
.get(new Get(row
).setCheckExistenceOnly(true)).getExists().booleanValue());
382 primary
= HRegion
.openHRegion(testDir
, primary
.getRegionInfo(), td
, primary
.getWAL(),
383 UTIL
.getConfiguration(), rss
, null);
385 // we should have the row now
386 assertEquals(1, Bytes
.toInt(secondary
.get(new Get(row
)).getValue(FAMILY
, QUAL
)));