HBASE-26567 Remove IndexType from ChunkCreator (#3947)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / regionserver / TestReplicateToReplica.java
blobd9f846d789aba8d2fac0ff319ccdcc3dc3e69b1d
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.regionserver;
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertFalse;
22 import static org.mockito.ArgumentMatchers.any;
23 import static org.mockito.ArgumentMatchers.anyInt;
24 import static org.mockito.ArgumentMatchers.anyList;
25 import static org.mockito.ArgumentMatchers.anyLong;
26 import static org.mockito.Mockito.mock;
27 import static org.mockito.Mockito.times;
28 import static org.mockito.Mockito.verify;
29 import static org.mockito.Mockito.when;
31 import java.io.IOException;
32 import java.util.ArrayDeque;
33 import java.util.ArrayList;
34 import java.util.Arrays;
35 import java.util.Collection;
36 import java.util.List;
37 import java.util.Queue;
38 import java.util.UUID;
39 import java.util.concurrent.CompletableFuture;
40 import org.apache.hadoop.conf.Configuration;
41 import org.apache.hadoop.fs.FileSystem;
42 import org.apache.hadoop.fs.Path;
43 import org.apache.hadoop.hbase.CellScanner;
44 import org.apache.hadoop.hbase.HBaseClassTestRule;
45 import org.apache.hadoop.hbase.HBaseTestingUtil;
46 import org.apache.hadoop.hbase.HConstants;
47 import org.apache.hadoop.hbase.ServerName;
48 import org.apache.hadoop.hbase.TableName;
49 import org.apache.hadoop.hbase.TableNameTestRule;
50 import org.apache.hadoop.hbase.client.AsyncClusterConnection;
51 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
52 import org.apache.hadoop.hbase.client.Get;
53 import org.apache.hadoop.hbase.client.Put;
54 import org.apache.hadoop.hbase.client.RegionInfo;
55 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
56 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
57 import org.apache.hadoop.hbase.client.TableDescriptor;
58 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
59 import org.apache.hadoop.hbase.executor.ExecutorService;
60 import org.apache.hadoop.hbase.executor.ExecutorType;
61 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
62 import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
63 import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
64 import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager;
65 import org.apache.hadoop.hbase.testclassification.MediumTests;
66 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
67 import org.apache.hadoop.hbase.util.Bytes;
68 import org.apache.hadoop.hbase.util.Pair;
69 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
70 import org.apache.hadoop.hbase.wal.WAL;
71 import org.apache.hadoop.hbase.wal.WALFactory;
72 import org.junit.After;
73 import org.junit.AfterClass;
74 import org.junit.Before;
75 import org.junit.BeforeClass;
76 import org.junit.ClassRule;
77 import org.junit.Rule;
78 import org.junit.Test;
79 import org.junit.experimental.categories.Category;
81 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
82 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
84 @Category({ RegionServerTests.class, MediumTests.class })
85 public class TestReplicateToReplica {
87 @ClassRule
88 public static final HBaseClassTestRule CLASS_RULE =
89 HBaseClassTestRule.forClass(TestReplicateToReplica.class);
91 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
93 private static byte[] FAMILY = Bytes.toBytes("family");
95 private static byte[] QUAL = Bytes.toBytes("qualifier");
97 private static ExecutorService EXEC;
99 @Rule
100 public final TableNameTestRule name = new TableNameTestRule();
102 private TableName tableName;
104 private Path testDir;
106 private TableDescriptor td;
108 private RegionServerServices rss;
110 private AsyncClusterConnection conn;
112 private RegionReplicationBufferManager manager;
114 private FlushRequester flushRequester;
116 private HRegion primary;
118 private HRegion secondary;
120 private WALFactory walFactory;
122 private boolean queueReqAndResps;
124 private Queue<Pair<List<WAL.Entry>, CompletableFuture<Void>>> reqAndResps;
126 private static List<Put> TO_ADD_AFTER_PREPARE_FLUSH;
128 public static final class HRegionForTest extends HRegion {
130 public HRegionForTest(HRegionFileSystem fs, WAL wal, Configuration confParam,
131 TableDescriptor htd, RegionServerServices rsServices) {
132 super(fs, wal, confParam, htd, rsServices);
135 @SuppressWarnings("deprecation")
136 public HRegionForTest(Path tableDir, WAL wal, FileSystem fs, Configuration confParam,
137 RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) {
138 super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
141 @Override
142 protected PrepareFlushResult internalPrepareFlushCache(WAL wal, long myseqid,
143 Collection<HStore> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker,
144 FlushLifeCycleTracker tracker) throws IOException {
145 PrepareFlushResult result = super.internalPrepareFlushCache(wal, myseqid, storesToFlush,
146 status, writeFlushWalMarker, tracker);
147 for (Put put : TO_ADD_AFTER_PREPARE_FLUSH) {
148 put(put);
150 TO_ADD_AFTER_PREPARE_FLUSH.clear();
151 return result;
156 @BeforeClass
157 public static void setUpBeforeClass() {
158 Configuration conf = UTIL.getConfiguration();
159 conf.setInt("hbase.region.read-replica.sink.flush.min-interval.secs", 1);
160 conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
161 conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY, true);
162 conf.setClass(HConstants.REGION_IMPL, HRegionForTest.class, HRegion.class);
163 EXEC = new ExecutorService("test");
164 EXEC.startExecutorService(EXEC.new ExecutorConfig().setCorePoolSize(1)
165 .setExecutorType(ExecutorType.RS_COMPACTED_FILES_DISCHARGER));
166 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
167 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
170 @AfterClass
171 public static void tearDownAfterClass() {
172 EXEC.shutdown();
173 UTIL.cleanupTestDir();
176 @Before
177 public void setUp() throws IOException {
178 TO_ADD_AFTER_PREPARE_FLUSH = new ArrayList<>();
179 tableName = name.getTableName();
180 testDir = UTIL.getDataTestDir(tableName.getNameAsString());
181 Configuration conf = UTIL.getConfiguration();
182 conf.set(HConstants.HBASE_DIR, testDir.toString());
184 td = TableDescriptorBuilder.newBuilder(tableName)
185 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(2)
186 .setRegionMemStoreReplication(true).build();
188 reqAndResps = new ArrayDeque<>();
189 queueReqAndResps = true;
190 conn = mock(AsyncClusterConnection.class);
191 when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong())).thenAnswer(i -> {
192 if (queueReqAndResps) {
193 @SuppressWarnings("unchecked")
194 List<WAL.Entry> entries = i.getArgument(1, List.class);
195 CompletableFuture<Void> future = new CompletableFuture<>();
196 reqAndResps.add(Pair.newPair(entries, future));
197 return future;
198 } else {
199 return CompletableFuture.completedFuture(null);
203 flushRequester = mock(FlushRequester.class);
205 rss = mock(RegionServerServices.class);
206 when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1));
207 when(rss.getConfiguration()).thenReturn(conf);
208 when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting(conf));
209 when(rss.getExecutorService()).thenReturn(EXEC);
210 when(rss.getAsyncClusterConnection()).thenReturn(conn);
211 when(rss.getFlushRequester()).thenReturn(flushRequester);
213 manager = new RegionReplicationBufferManager(rss);
214 when(rss.getRegionReplicationBufferManager()).thenReturn(manager);
216 RegionInfo primaryHri = RegionInfoBuilder.newBuilder(td.getTableName()).build();
217 RegionInfo secondaryHri = RegionReplicaUtil.getRegionInfoForReplica(primaryHri, 1);
219 walFactory = new WALFactory(conf, UUID.randomUUID().toString());
220 WAL wal = walFactory.getWAL(primaryHri);
221 primary = HRegion.createHRegion(primaryHri, testDir, conf, td, wal);
222 primary.close();
224 primary = HRegion.openHRegion(testDir, primaryHri, td, wal, conf, rss, null);
225 secondary = HRegion.openHRegion(secondaryHri, td, null, conf, rss, null);
227 when(rss.getRegions()).then(i -> {
228 return Arrays.asList(primary, secondary);
231 // process the open events
232 replicateAll();
235 @After
236 public void tearDown() throws IOException {
237 // close region will issue a flush, which will enqueue an edit into the replication sink so we
238 // need to complete it otherwise the test will hang.
239 queueReqAndResps = false;
240 failAll();
241 HBaseTestingUtil.closeRegionAndWAL(primary);
242 HBaseTestingUtil.closeRegionAndWAL(secondary);
243 if (walFactory != null) {
244 walFactory.close();
248 private FlushResult flushPrimary() throws IOException {
249 return primary.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
252 private void replicate(Pair<List<WAL.Entry>, CompletableFuture<Void>> pair) throws IOException {
253 Pair<ReplicateWALEntryRequest, CellScanner> params = ReplicationProtobufUtil
254 .buildReplicateWALEntryRequest(pair.getFirst().toArray(new WAL.Entry[0]),
255 secondary.getRegionInfo().getEncodedNameAsBytes(), null, null, null);
256 for (WALEntry entry : params.getFirst().getEntryList()) {
257 secondary.replayWALEntry(entry, params.getSecond());
259 pair.getSecond().complete(null);
262 private void replicateOne() throws IOException {
263 replicate(reqAndResps.remove());
266 private void replicateAll() throws IOException {
267 for (Pair<List<WAL.Entry>, CompletableFuture<Void>> pair;;) {
268 pair = reqAndResps.poll();
269 if (pair == null) {
270 break;
272 replicate(pair);
276 private void failOne() {
277 reqAndResps.remove().getSecond().completeExceptionally(new IOException("Inject error"));
280 private void failAll() {
281 for (Pair<List<WAL.Entry>, CompletableFuture<Void>> pair;;) {
282 pair = reqAndResps.poll();
283 if (pair == null) {
284 break;
286 pair.getSecond().completeExceptionally(new IOException("Inject error"));
290 @Test
291 public void testNormalReplicate() throws IOException {
292 byte[] row = Bytes.toBytes(0);
293 primary.put(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
294 replicateOne();
295 assertEquals(1, Bytes.toInt(secondary.get(new Get(row)).getValue(FAMILY, QUAL)));
298 @Test
299 public void testNormalFlush() throws IOException {
300 byte[] row = Bytes.toBytes(0);
301 primary.put(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
302 TO_ADD_AFTER_PREPARE_FLUSH.add(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(2)));
303 flushPrimary();
304 replicateAll();
305 assertEquals(2, Bytes.toInt(secondary.get(new Get(row)).getValue(FAMILY, QUAL)));
307 // we should have the same memstore size, i.e, the secondary should have also dropped the
308 // snapshot
309 assertEquals(primary.getMemStoreDataSize(), secondary.getMemStoreDataSize());
312 @Test
313 public void testErrorBeforeFlushStart() throws IOException {
314 byte[] row = Bytes.toBytes(0);
315 primary.put(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
316 failOne();
317 verify(flushRequester, times(1)).requestFlush(any(), anyList(), any());
318 TO_ADD_AFTER_PREPARE_FLUSH.add(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(2)));
319 flushPrimary();
320 // this also tests start flush with empty memstore at secondary replica side
321 replicateAll();
322 assertEquals(2, Bytes.toInt(secondary.get(new Get(row)).getValue(FAMILY, QUAL)));
323 assertEquals(primary.getMemStoreDataSize(), secondary.getMemStoreDataSize());
326 @Test
327 public void testErrorAfterFlushStartBeforeFlushCommit() throws IOException {
328 primary.put(new Put(Bytes.toBytes(0)).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
329 replicateAll();
330 TO_ADD_AFTER_PREPARE_FLUSH
331 .add(new Put(Bytes.toBytes(1)).addColumn(FAMILY, QUAL, Bytes.toBytes(2)));
332 flushPrimary();
333 // replicate the start flush edit
334 replicateOne();
335 // fail the remaining edits, the put and the commit flush edit
336 failOne();
337 verify(flushRequester, times(1)).requestFlush(any(), anyList(), any());
338 primary.put(new Put(Bytes.toBytes(2)).addColumn(FAMILY, QUAL, Bytes.toBytes(3)));
339 flushPrimary();
340 replicateAll();
341 for (int i = 0; i < 3; i++) {
342 assertEquals(i + 1,
343 Bytes.toInt(secondary.get(new Get(Bytes.toBytes(i))).getValue(FAMILY, QUAL)));
345 // should have nothing in memstore
346 assertEquals(0, secondary.getMemStoreDataSize());
349 @Test
350 public void testCatchUpWithCannotFlush() throws IOException, InterruptedException {
351 byte[] row = Bytes.toBytes(0);
352 primary.put(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
353 failOne();
354 verify(flushRequester, times(1)).requestFlush(any(), anyList(), any());
355 flushPrimary();
356 failAll();
357 Thread.sleep(2000);
358 // we will request flush the second time
359 verify(flushRequester, times(2)).requestFlush(any(), anyList(), any());
360 // we can not flush because no content in memstore
361 FlushResult result = flushPrimary();
362 assertEquals(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.getResult());
363 // the secondary replica does not have this row yet
364 assertFalse(secondary.get(new Get(row).setCheckExistenceOnly(true)).getExists().booleanValue());
365 // replicate the can not flush edit
366 replicateOne();
367 // we should have the row now
368 assertEquals(1, Bytes.toInt(secondary.get(new Get(row)).getValue(FAMILY, QUAL)));
371 @Test
372 public void testCatchUpWithReopen() throws IOException {
373 byte[] row = Bytes.toBytes(0);
374 primary.put(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
375 failOne();
376 primary.close();
377 // the secondary replica does not have this row yet, although the above close has flushed the
378 // data out
379 assertFalse(secondary.get(new Get(row).setCheckExistenceOnly(true)).getExists().booleanValue());
381 // reopen
382 primary = HRegion.openHRegion(testDir, primary.getRegionInfo(), td, primary.getWAL(),
383 UTIL.getConfiguration(), rss, null);
384 replicateAll();
385 // we should have the row now
386 assertEquals(1, Bytes.toInt(secondary.get(new Get(row)).getValue(FAMILY, QUAL)));