HBASE-24033 Add ut for loading the corrupt recovered hfiles (#1322)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / regionserver / TestHRegionReplayEvents.java
blob518234b921d15c0f7f2d675d94f4e57676fd04e7
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.regionserver;
20 import static org.apache.hadoop.hbase.regionserver.TestHRegion.assertGet;
21 import static org.apache.hadoop.hbase.regionserver.TestHRegion.putData;
22 import static org.apache.hadoop.hbase.regionserver.TestHRegion.verifyData;
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.assertFalse;
25 import static org.junit.Assert.assertNotNull;
26 import static org.junit.Assert.assertNull;
27 import static org.junit.Assert.assertTrue;
28 import static org.junit.Assert.fail;
29 import static org.mockito.ArgumentMatchers.any;
30 import static org.mockito.Mockito.mock;
31 import static org.mockito.Mockito.spy;
32 import static org.mockito.Mockito.times;
33 import static org.mockito.Mockito.verify;
34 import static org.mockito.Mockito.when;
36 import java.io.FileNotFoundException;
37 import java.io.IOException;
38 import java.util.ArrayList;
39 import java.util.List;
40 import java.util.Map;
41 import java.util.Objects;
42 import java.util.Random;
43 import org.apache.hadoop.conf.Configuration;
44 import org.apache.hadoop.fs.FSDataOutputStream;
45 import org.apache.hadoop.fs.Path;
46 import org.apache.hadoop.hbase.Cell;
47 import org.apache.hadoop.hbase.CellBuilderType;
48 import org.apache.hadoop.hbase.CellUtil;
49 import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
50 import org.apache.hadoop.hbase.HBaseClassTestRule;
51 import org.apache.hadoop.hbase.HBaseTestingUtility;
52 import org.apache.hadoop.hbase.HConstants;
53 import org.apache.hadoop.hbase.KeyValue;
54 import org.apache.hadoop.hbase.ServerName;
55 import org.apache.hadoop.hbase.TableName;
56 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
57 import org.apache.hadoop.hbase.client.Durability;
58 import org.apache.hadoop.hbase.client.Get;
59 import org.apache.hadoop.hbase.client.Put;
60 import org.apache.hadoop.hbase.client.RegionInfo;
61 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
62 import org.apache.hadoop.hbase.client.TableDescriptor;
63 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
64 import org.apache.hadoop.hbase.executor.ExecutorService;
65 import org.apache.hadoop.hbase.io.hfile.HFile;
66 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
67 import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl;
68 import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult;
69 import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
70 import org.apache.hadoop.hbase.testclassification.LargeTests;
71 import org.apache.hadoop.hbase.util.Bytes;
72 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
73 import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
74 import org.apache.hadoop.hbase.util.FSUtils;
75 import org.apache.hadoop.hbase.util.Pair;
76 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
77 import org.apache.hadoop.hbase.wal.WAL;
78 import org.apache.hadoop.hbase.wal.WALEdit;
79 import org.apache.hadoop.hbase.wal.WALFactory;
80 import org.apache.hadoop.hbase.wal.WALKeyImpl;
81 import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay;
82 import org.apache.hadoop.util.StringUtils;
83 import org.junit.After;
84 import org.junit.AfterClass;
85 import org.junit.Before;
86 import org.junit.BeforeClass;
87 import org.junit.ClassRule;
88 import org.junit.Rule;
89 import org.junit.Test;
90 import org.junit.experimental.categories.Category;
91 import org.junit.rules.TestName;
92 import org.mockito.Mockito;
93 import org.slf4j.Logger;
94 import org.slf4j.LoggerFactory;
96 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
97 import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
99 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
100 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
101 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
102 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
103 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
104 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
105 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor;
106 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
107 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
108 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
111 * Tests of HRegion methods for replaying flush, compaction, region open, etc events for secondary
112 * region replicas
114 @Category(LargeTests.class)
115 public class TestHRegionReplayEvents {
117 @ClassRule
118 public static final HBaseClassTestRule CLASS_RULE =
119 HBaseClassTestRule.forClass(TestHRegionReplayEvents.class);
121 private static final Logger LOG = LoggerFactory.getLogger(TestHRegion.class);
122 @Rule public TestName name = new TestName();
124 private static HBaseTestingUtility TEST_UTIL;
126 public static Configuration CONF;
127 private String dir;
129 private byte[][] families = new byte[][] {
130 Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")};
132 // Test names
133 protected byte[] tableName;
134 protected String method;
135 protected final byte[] row = Bytes.toBytes("rowA");
136 protected final byte[] row2 = Bytes.toBytes("rowB");
137 protected byte[] cq = Bytes.toBytes("cq");
139 // per test fields
140 private Path rootDir;
141 private TableDescriptor htd;
142 private RegionServerServices rss;
143 private RegionInfo primaryHri, secondaryHri;
144 private HRegion primaryRegion, secondaryRegion;
145 private WAL walPrimary, walSecondary;
146 private WAL.Reader reader;
148 @BeforeClass
149 public static void setUpBeforeClass() throws Exception {
150 TEST_UTIL = new HBaseTestingUtility();
151 TEST_UTIL.startMiniDFSCluster(1);
154 @AfterClass
155 public static void tearDownAfterClass() throws Exception {
156 LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir());
157 TEST_UTIL.cleanupTestDir();
158 TEST_UTIL.shutdownMiniDFSCluster();
161 @Before
162 public void setUp() throws Exception {
163 CONF = TEST_UTIL.getConfiguration();
164 dir = TEST_UTIL.getDataTestDir("TestHRegionReplayEvents").toString();
165 method = name.getMethodName();
166 tableName = Bytes.toBytes(name.getMethodName());
167 rootDir = new Path(dir + method);
168 TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, rootDir.toString());
169 method = name.getMethodName();
170 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TableName.valueOf(method));
171 for (byte[] family : families) {
172 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family));
174 htd = builder.build();
176 long time = System.currentTimeMillis();
177 ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
178 primaryHri =
179 RegionInfoBuilder.newBuilder(htd.getTableName()).setRegionId(time).setReplicaId(0).build();
180 secondaryHri =
181 RegionInfoBuilder.newBuilder(htd.getTableName()).setRegionId(time).setReplicaId(1).build();
183 WALFactory wals = TestHRegion.createWALFactory(CONF, rootDir);
184 walPrimary = wals.getWAL(primaryHri);
185 walSecondary = wals.getWAL(secondaryHri);
187 rss = mock(RegionServerServices.class);
188 when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1));
189 when(rss.getConfiguration()).thenReturn(CONF);
190 when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting(CONF));
191 String string = org.apache.hadoop.hbase.executor.EventType.RS_COMPACTED_FILES_DISCHARGER
192 .toString();
193 ExecutorService es = new ExecutorService(string);
194 es.startExecutorService(
195 string+"-"+string, 1);
196 when(rss.getExecutorService()).thenReturn(es);
197 primaryRegion = HRegion.createHRegion(primaryHri, rootDir, CONF, htd, walPrimary);
198 primaryRegion.close();
199 List<HRegion> regions = new ArrayList<>();
200 regions.add(primaryRegion);
201 Mockito.doReturn(regions).when(rss).getRegions();
203 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
204 secondaryRegion = HRegion.openHRegion(secondaryHri, htd, null, CONF, rss, null);
206 reader = null;
209 @After
210 public void tearDown() throws Exception {
211 if (reader != null) {
212 reader.close();
215 if (primaryRegion != null) {
216 HBaseTestingUtility.closeRegionAndWAL(primaryRegion);
218 if (secondaryRegion != null) {
219 HBaseTestingUtility.closeRegionAndWAL(secondaryRegion);
222 EnvironmentEdgeManagerTestHelper.reset();
225 String getName() {
226 return name.getMethodName();
229 // Some of the test cases are as follows:
230 // 1. replay flush start marker again
231 // 2. replay flush with smaller seqId than what is there in memstore snapshot
232 // 3. replay flush with larger seqId than what is there in memstore snapshot
233 // 4. replay flush commit without flush prepare. non droppable memstore
234 // 5. replay flush commit without flush prepare. droppable memstore
235 // 6. replay open region event
236 // 7. replay open region event after flush start
237 // 8. replay flush form an earlier seqId (test ignoring seqIds)
238 // 9. start flush does not prevent region from closing.
240 @Test
241 public void testRegionReplicaSecondaryCannotFlush() throws IOException {
242 // load some data and flush ensure that the secondary replica will not execute the flush
244 // load some data to secondary by replaying
245 putDataByReplay(secondaryRegion, 0, 1000, cq, families);
247 verifyData(secondaryRegion, 0, 1000, cq, families);
249 // flush region
250 FlushResultImpl flush = (FlushResultImpl)secondaryRegion.flush(true);
251 assertEquals(FlushResultImpl.Result.CANNOT_FLUSH, flush.result);
253 verifyData(secondaryRegion, 0, 1000, cq, families);
255 // close the region, and inspect that it has not flushed
256 Map<byte[], List<HStoreFile>> files = secondaryRegion.close(false);
257 // assert that there are no files (due to flush)
258 for (List<HStoreFile> f : files.values()) {
259 assertTrue(f.isEmpty());
264 * Tests a case where we replay only a flush start marker, then the region is closed. This region
265 * should not block indefinitely
267 @Test
268 public void testOnlyReplayingFlushStartDoesNotHoldUpRegionClose() throws IOException {
269 // load some data to primary and flush
270 int start = 0;
271 LOG.info("-- Writing some data to primary from " + start + " to " + (start+100));
272 putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families);
273 LOG.info("-- Flushing primary, creating 3 files for 3 stores");
274 primaryRegion.flush(true);
276 // now replay the edits and the flush marker
277 reader = createWALReaderForPrimary();
279 LOG.info("-- Replaying edits and flush events in secondary");
280 while (true) {
281 WAL.Entry entry = reader.next();
282 if (entry == null) {
283 break;
285 FlushDescriptor flushDesc
286 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
287 if (flushDesc != null) {
288 if (flushDesc.getAction() == FlushAction.START_FLUSH) {
289 LOG.info("-- Replaying flush start in secondary");
290 secondaryRegion.replayWALFlushStartMarker(flushDesc);
291 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
292 LOG.info("-- NOT Replaying flush commit in secondary");
294 } else {
295 replayEdit(secondaryRegion, entry);
299 assertTrue(rss.getRegionServerAccounting().getGlobalMemStoreDataSize() > 0);
300 // now close the region which should not cause hold because of un-committed flush
301 secondaryRegion.close();
303 // verify that the memstore size is back to what it was
304 assertEquals(0, rss.getRegionServerAccounting().getGlobalMemStoreDataSize());
307 static int replayEdit(HRegion region, WAL.Entry entry) throws IOException {
308 if (WALEdit.isMetaEditFamily(entry.getEdit().getCells().get(0))) {
309 return 0; // handled elsewhere
311 Put put = new Put(CellUtil.cloneRow(entry.getEdit().getCells().get(0)));
312 for (Cell cell : entry.getEdit().getCells()) put.add(cell);
313 put.setDurability(Durability.SKIP_WAL);
314 MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0);
315 region.batchReplay(new MutationReplay[] {mutation},
316 entry.getKey().getSequenceId());
317 return Integer.parseInt(Bytes.toString(put.getRow()));
320 WAL.Reader createWALReaderForPrimary() throws FileNotFoundException, IOException {
321 return WALFactory.createReader(TEST_UTIL.getTestFileSystem(),
322 AbstractFSWALProvider.getCurrentFileName(walPrimary),
323 TEST_UTIL.getConfiguration());
326 @Test
327 public void testBatchReplayWithMultipleNonces() throws IOException {
328 try {
329 MutationReplay[] mutations = new MutationReplay[100];
330 for (int i = 0; i < 100; i++) {
331 Put put = new Put(Bytes.toBytes(i));
332 put.setDurability(Durability.SYNC_WAL);
333 for (byte[] familly : this.families) {
334 put.addColumn(familly, this.cq, null);
335 long nonceNum = i / 10;
336 mutations[i] = new MutationReplay(MutationType.PUT, put, nonceNum, nonceNum);
339 primaryRegion.batchReplay(mutations, 20);
340 } catch (Exception e) {
341 String msg = "Error while replay of batch with multiple nonces. ";
342 LOG.error(msg, e);
343 fail(msg + e.getMessage());
347 @Test
348 public void testReplayFlushesAndCompactions() throws IOException {
349 // initiate a secondary region with some data.
351 // load some data to primary and flush. 3 flushes and some more unflushed data
352 putDataWithFlushes(primaryRegion, 100, 300, 100);
354 // compaction from primary
355 LOG.info("-- Compacting primary, only 1 store");
356 primaryRegion.compactStore(Bytes.toBytes("cf1"),
357 NoLimitThroughputController.INSTANCE);
359 // now replay the edits and the flush marker
360 reader = createWALReaderForPrimary();
362 LOG.info("-- Replaying edits and flush events in secondary");
363 int lastReplayed = 0;
364 int expectedStoreFileCount = 0;
365 while (true) {
366 WAL.Entry entry = reader.next();
367 if (entry == null) {
368 break;
370 FlushDescriptor flushDesc
371 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
372 CompactionDescriptor compactionDesc
373 = WALEdit.getCompaction(entry.getEdit().getCells().get(0));
374 if (flushDesc != null) {
375 // first verify that everything is replayed and visible before flush event replay
376 verifyData(secondaryRegion, 0, lastReplayed, cq, families);
377 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
378 long storeMemstoreSize = store.getMemStoreSize().getHeapSize();
379 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
380 MemStoreSize mss = store.getFlushableSize();
381 long storeSize = store.getSize();
382 long storeSizeUncompressed = store.getStoreSizeUncompressed();
383 if (flushDesc.getAction() == FlushAction.START_FLUSH) {
384 LOG.info("-- Replaying flush start in secondary");
385 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(flushDesc);
386 assertNull(result.result);
387 assertEquals(result.flushOpSeqId, flushDesc.getFlushSequenceNumber());
389 // assert that the store memstore is smaller now
390 long newStoreMemstoreSize = store.getMemStoreSize().getHeapSize();
391 LOG.info("Memstore size reduced by:"
392 + StringUtils.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize));
393 assertTrue(storeMemstoreSize > newStoreMemstoreSize);
395 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
396 LOG.info("-- Replaying flush commit in secondary");
397 secondaryRegion.replayWALFlushCommitMarker(flushDesc);
399 // assert that the flush files are picked
400 expectedStoreFileCount++;
401 for (HStore s : secondaryRegion.getStores()) {
402 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
404 MemStoreSize newMss = store.getFlushableSize();
405 assertTrue(mss.getHeapSize() > newMss.getHeapSize());
407 // assert that the region memstore is smaller now
408 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
409 assertTrue(regionMemstoreSize > newRegionMemstoreSize);
411 // assert that the store sizes are bigger
412 assertTrue(store.getSize() > storeSize);
413 assertTrue(store.getStoreSizeUncompressed() > storeSizeUncompressed);
414 assertEquals(store.getSize(), store.getStorefilesSize());
416 // after replay verify that everything is still visible
417 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
418 } else if (compactionDesc != null) {
419 secondaryRegion.replayWALCompactionMarker(compactionDesc, true, false, Long.MAX_VALUE);
421 // assert that the compaction is applied
422 for (HStore store : secondaryRegion.getStores()) {
423 if (store.getColumnFamilyName().equals("cf1")) {
424 assertEquals(1, store.getStorefilesCount());
425 } else {
426 assertEquals(expectedStoreFileCount, store.getStorefilesCount());
429 } else {
430 lastReplayed = replayEdit(secondaryRegion, entry);
434 assertEquals(400-1, lastReplayed);
435 LOG.info("-- Verifying edits from secondary");
436 verifyData(secondaryRegion, 0, 400, cq, families);
438 LOG.info("-- Verifying edits from primary. Ensuring that files are not deleted");
439 verifyData(primaryRegion, 0, lastReplayed, cq, families);
440 for (HStore store : primaryRegion.getStores()) {
441 if (store.getColumnFamilyName().equals("cf1")) {
442 assertEquals(1, store.getStorefilesCount());
443 } else {
444 assertEquals(expectedStoreFileCount, store.getStorefilesCount());
450 * Tests cases where we prepare a flush with some seqId and we receive other flush start markers
451 * equal to, greater or less than the previous flush start marker.
453 @Test
454 public void testReplayFlushStartMarkers() throws IOException {
455 // load some data to primary and flush. 1 flush and some more unflushed data
456 putDataWithFlushes(primaryRegion, 100, 100, 100);
457 int numRows = 200;
459 // now replay the edits and the flush marker
460 reader = createWALReaderForPrimary();
462 LOG.info("-- Replaying edits and flush events in secondary");
464 FlushDescriptor startFlushDesc = null;
466 int lastReplayed = 0;
467 while (true) {
468 WAL.Entry entry = reader.next();
469 if (entry == null) {
470 break;
472 FlushDescriptor flushDesc
473 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
474 if (flushDesc != null) {
475 // first verify that everything is replayed and visible before flush event replay
476 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
477 long storeMemstoreSize = store.getMemStoreSize().getHeapSize();
478 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
479 MemStoreSize mss = store.getFlushableSize();
481 if (flushDesc.getAction() == FlushAction.START_FLUSH) {
482 startFlushDesc = flushDesc;
483 LOG.info("-- Replaying flush start in secondary");
484 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
485 assertNull(result.result);
486 assertEquals(result.flushOpSeqId, startFlushDesc.getFlushSequenceNumber());
487 assertTrue(regionMemstoreSize > 0);
488 assertTrue(mss.getHeapSize() > 0);
490 // assert that the store memstore is smaller now
491 long newStoreMemstoreSize = store.getMemStoreSize().getHeapSize();
492 LOG.info("Memstore size reduced by:"
493 + StringUtils.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize));
494 assertTrue(storeMemstoreSize > newStoreMemstoreSize);
495 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
498 // after replay verify that everything is still visible
499 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
500 } else {
501 lastReplayed = replayEdit(secondaryRegion, entry);
505 // at this point, there should be some data (rows 0-100) in memstore snapshot
506 // and some more data in memstores (rows 100-200)
508 verifyData(secondaryRegion, 0, numRows, cq, families);
510 // Test case 1: replay the same flush start marker again
511 LOG.info("-- Replaying same flush start in secondary again");
512 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
513 assertNull(result); // this should return null. Ignoring the flush start marker
514 // assert that we still have prepared flush with the previous setup.
515 assertNotNull(secondaryRegion.getPrepareFlushResult());
516 assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
517 startFlushDesc.getFlushSequenceNumber());
518 assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty
519 verifyData(secondaryRegion, 0, numRows, cq, families);
521 // Test case 2: replay a flush start marker with a smaller seqId
522 FlushDescriptor startFlushDescSmallerSeqId
523 = clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() - 50);
524 LOG.info("-- Replaying same flush start in secondary again " + startFlushDescSmallerSeqId);
525 result = secondaryRegion.replayWALFlushStartMarker(startFlushDescSmallerSeqId);
526 assertNull(result); // this should return null. Ignoring the flush start marker
527 // assert that we still have prepared flush with the previous setup.
528 assertNotNull(secondaryRegion.getPrepareFlushResult());
529 assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
530 startFlushDesc.getFlushSequenceNumber());
531 assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty
532 verifyData(secondaryRegion, 0, numRows, cq, families);
534 // Test case 3: replay a flush start marker with a larger seqId
535 FlushDescriptor startFlushDescLargerSeqId
536 = clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() + 50);
537 LOG.info("-- Replaying same flush start in secondary again " + startFlushDescLargerSeqId);
538 result = secondaryRegion.replayWALFlushStartMarker(startFlushDescLargerSeqId);
539 assertNull(result); // this should return null. Ignoring the flush start marker
540 // assert that we still have prepared flush with the previous setup.
541 assertNotNull(secondaryRegion.getPrepareFlushResult());
542 assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
543 startFlushDesc.getFlushSequenceNumber());
544 assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty
545 verifyData(secondaryRegion, 0, numRows, cq, families);
547 LOG.info("-- Verifying edits from secondary");
548 verifyData(secondaryRegion, 0, numRows, cq, families);
550 LOG.info("-- Verifying edits from primary.");
551 verifyData(primaryRegion, 0, numRows, cq, families);
555 * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker
556 * less than the previous flush start marker.
558 @Test
559 public void testReplayFlushCommitMarkerSmallerThanFlushStartMarker() throws IOException {
560 // load some data to primary and flush. 2 flushes and some more unflushed data
561 putDataWithFlushes(primaryRegion, 100, 200, 100);
562 int numRows = 300;
564 // now replay the edits and the flush marker
565 reader = createWALReaderForPrimary();
567 LOG.info("-- Replaying edits and flush events in secondary");
568 FlushDescriptor startFlushDesc = null;
569 FlushDescriptor commitFlushDesc = null;
571 int lastReplayed = 0;
572 while (true) {
573 System.out.println(lastReplayed);
574 WAL.Entry entry = reader.next();
575 if (entry == null) {
576 break;
578 FlushDescriptor flushDesc
579 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
580 if (flushDesc != null) {
581 if (flushDesc.getAction() == FlushAction.START_FLUSH) {
582 // don't replay the first flush start marker, hold on to it, replay the second one
583 if (startFlushDesc == null) {
584 startFlushDesc = flushDesc;
585 } else {
586 LOG.info("-- Replaying flush start in secondary");
587 startFlushDesc = flushDesc;
588 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
589 assertNull(result.result);
591 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
592 // do not replay any flush commit yet
593 if (commitFlushDesc == null) {
594 commitFlushDesc = flushDesc; // hold on to the first flush commit marker
597 // after replay verify that everything is still visible
598 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
599 } else {
600 lastReplayed = replayEdit(secondaryRegion, entry);
604 // at this point, there should be some data (rows 0-200) in memstore snapshot
605 // and some more data in memstores (rows 200-300)
606 verifyData(secondaryRegion, 0, numRows, cq, families);
608 // no store files in the region
609 int expectedStoreFileCount = 0;
610 for (HStore s : secondaryRegion.getStores()) {
611 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
613 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
615 // Test case 1: replay the a flush commit marker smaller than what we have prepared
616 LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START"
617 + startFlushDesc);
618 assertTrue(commitFlushDesc.getFlushSequenceNumber() < startFlushDesc.getFlushSequenceNumber());
620 LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
621 secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
623 // assert that the flush files are picked
624 expectedStoreFileCount++;
625 for (HStore s : secondaryRegion.getStores()) {
626 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
628 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
629 MemStoreSize mss = store.getFlushableSize();
630 assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped
632 // assert that the region memstore is same as before
633 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
634 assertEquals(regionMemstoreSize, newRegionMemstoreSize);
636 assertNotNull(secondaryRegion.getPrepareFlushResult()); // not dropped
638 LOG.info("-- Verifying edits from secondary");
639 verifyData(secondaryRegion, 0, numRows, cq, families);
641 LOG.info("-- Verifying edits from primary.");
642 verifyData(primaryRegion, 0, numRows, cq, families);
646 * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker
647 * larger than the previous flush start marker.
649 @Test
650 public void testReplayFlushCommitMarkerLargerThanFlushStartMarker() throws IOException {
651 // load some data to primary and flush. 1 flush and some more unflushed data
652 putDataWithFlushes(primaryRegion, 100, 100, 100);
653 int numRows = 200;
655 // now replay the edits and the flush marker
656 reader = createWALReaderForPrimary();
658 LOG.info("-- Replaying edits and flush events in secondary");
659 FlushDescriptor startFlushDesc = null;
660 FlushDescriptor commitFlushDesc = null;
662 int lastReplayed = 0;
663 while (true) {
664 WAL.Entry entry = reader.next();
665 if (entry == null) {
666 break;
668 FlushDescriptor flushDesc
669 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
670 if (flushDesc != null) {
671 if (flushDesc.getAction() == FlushAction.START_FLUSH) {
672 if (startFlushDesc == null) {
673 LOG.info("-- Replaying flush start in secondary");
674 startFlushDesc = flushDesc;
675 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
676 assertNull(result.result);
678 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
679 // do not replay any flush commit yet
680 // hold on to the flush commit marker but simulate a larger
681 // flush commit seqId
682 commitFlushDesc =
683 FlushDescriptor.newBuilder(flushDesc)
684 .setFlushSequenceNumber(flushDesc.getFlushSequenceNumber() + 50)
685 .build();
687 // after replay verify that everything is still visible
688 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
689 } else {
690 lastReplayed = replayEdit(secondaryRegion, entry);
694 // at this point, there should be some data (rows 0-100) in memstore snapshot
695 // and some more data in memstores (rows 100-200)
696 verifyData(secondaryRegion, 0, numRows, cq, families);
698 // no store files in the region
699 int expectedStoreFileCount = 0;
700 for (HStore s : secondaryRegion.getStores()) {
701 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
703 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
705 // Test case 1: replay the a flush commit marker larger than what we have prepared
706 LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START"
707 + startFlushDesc);
708 assertTrue(commitFlushDesc.getFlushSequenceNumber() > startFlushDesc.getFlushSequenceNumber());
710 LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
711 secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
713 // assert that the flush files are picked
714 expectedStoreFileCount++;
715 for (HStore s : secondaryRegion.getStores()) {
716 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
718 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
719 MemStoreSize mss = store.getFlushableSize();
720 assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped
722 // assert that the region memstore is smaller than before, but not empty
723 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
724 assertTrue(newRegionMemstoreSize > 0);
725 assertTrue(regionMemstoreSize > newRegionMemstoreSize);
727 assertNull(secondaryRegion.getPrepareFlushResult()); // prepare snapshot should be dropped
729 LOG.info("-- Verifying edits from secondary");
730 verifyData(secondaryRegion, 0, numRows, cq, families);
732 LOG.info("-- Verifying edits from primary.");
733 verifyData(primaryRegion, 0, numRows, cq, families);
737 * Tests the case where we receive a flush commit before receiving any flush prepare markers.
738 * The memstore edits should be dropped after the flush commit replay since they should be in
739 * flushed files
741 @Test
742 public void testReplayFlushCommitMarkerWithoutFlushStartMarkerDroppableMemstore()
743 throws IOException {
744 testReplayFlushCommitMarkerWithoutFlushStartMarker(true);
748 * Tests the case where we receive a flush commit before receiving any flush prepare markers.
749 * The memstore edits should be not dropped after the flush commit replay since not every edit
750 * will be in flushed files (based on seqId)
752 @Test
753 public void testReplayFlushCommitMarkerWithoutFlushStartMarkerNonDroppableMemstore()
754 throws IOException {
755 testReplayFlushCommitMarkerWithoutFlushStartMarker(false);
759 * Tests the case where we receive a flush commit before receiving any flush prepare markers
761 public void testReplayFlushCommitMarkerWithoutFlushStartMarker(boolean droppableMemstore)
762 throws IOException {
763 // load some data to primary and flush. 1 flushes and some more unflushed data.
764 // write more data after flush depending on whether droppableSnapshot
765 putDataWithFlushes(primaryRegion, 100, 100, droppableMemstore ? 0 : 100);
766 int numRows = droppableMemstore ? 100 : 200;
768 // now replay the edits and the flush marker
769 reader = createWALReaderForPrimary();
771 LOG.info("-- Replaying edits and flush events in secondary");
772 FlushDescriptor commitFlushDesc = null;
774 int lastReplayed = 0;
775 while (true) {
776 WAL.Entry entry = reader.next();
777 if (entry == null) {
778 break;
780 FlushDescriptor flushDesc
781 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
782 if (flushDesc != null) {
783 if (flushDesc.getAction() == FlushAction.START_FLUSH) {
784 // do not replay flush start marker
785 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
786 commitFlushDesc = flushDesc; // hold on to the flush commit marker
788 // after replay verify that everything is still visible
789 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
790 } else {
791 lastReplayed = replayEdit(secondaryRegion, entry);
795 // at this point, there should be some data (rows 0-200) in the memstore without snapshot
796 // and some more data in memstores (rows 100-300)
797 verifyData(secondaryRegion, 0, numRows, cq, families);
799 // no store files in the region
800 int expectedStoreFileCount = 0;
801 for (HStore s : secondaryRegion.getStores()) {
802 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
804 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
806 // Test case 1: replay a flush commit marker without start flush marker
807 assertNull(secondaryRegion.getPrepareFlushResult());
808 assertTrue(commitFlushDesc.getFlushSequenceNumber() > 0);
810 // ensure all files are visible in secondary
811 for (HStore store : secondaryRegion.getStores()) {
812 assertTrue(store.getMaxSequenceId().orElse(0L) <= secondaryRegion.getReadPoint(null));
815 LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
816 secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
818 // assert that the flush files are picked
819 expectedStoreFileCount++;
820 for (HStore s : secondaryRegion.getStores()) {
821 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
823 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
824 MemStoreSize mss = store.getFlushableSize();
825 if (droppableMemstore) {
826 // assert that the memstore is dropped
827 assertTrue(mss.getHeapSize() == MutableSegment.DEEP_OVERHEAD);
828 } else {
829 assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped
832 // assert that the region memstore is same as before (we could not drop)
833 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
834 if (droppableMemstore) {
835 assertTrue(0 == newRegionMemstoreSize);
836 } else {
837 assertTrue(regionMemstoreSize == newRegionMemstoreSize);
840 LOG.info("-- Verifying edits from secondary");
841 verifyData(secondaryRegion, 0, numRows, cq, families);
843 LOG.info("-- Verifying edits from primary.");
844 verifyData(primaryRegion, 0, numRows, cq, families);
847 private FlushDescriptor clone(FlushDescriptor flush, long flushSeqId) {
848 return FlushDescriptor.newBuilder(flush)
849 .setFlushSequenceNumber(flushSeqId)
850 .build();
854 * Tests replaying region open markers from primary region. Checks whether the files are picked up
856 @Test
857 public void testReplayRegionOpenEvent() throws IOException {
858 putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush
859 int numRows = 100;
861 // close the region and open again.
862 primaryRegion.close();
863 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
865 // now replay the edits and the flush marker
866 reader = createWALReaderForPrimary();
867 List<RegionEventDescriptor> regionEvents = Lists.newArrayList();
869 LOG.info("-- Replaying edits and region events in secondary");
870 while (true) {
871 WAL.Entry entry = reader.next();
872 if (entry == null) {
873 break;
875 FlushDescriptor flushDesc
876 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
877 RegionEventDescriptor regionEventDesc
878 = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
880 if (flushDesc != null) {
881 // don't replay flush events
882 } else if (regionEventDesc != null) {
883 regionEvents.add(regionEventDesc);
884 } else {
885 // don't replay edits
889 // we should have 1 open, 1 close and 1 open event
890 assertEquals(3, regionEvents.size());
892 // replay the first region open event.
893 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(0));
895 // replay the close event as well
896 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(1));
898 // no store files in the region
899 int expectedStoreFileCount = 0;
900 for (HStore s : secondaryRegion.getStores()) {
901 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
903 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
904 assertTrue(regionMemstoreSize == 0);
906 // now replay the region open event that should contain new file locations
907 LOG.info("Testing replaying region open event " + regionEvents.get(2));
908 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2));
910 // assert that the flush files are picked
911 expectedStoreFileCount++;
912 for (HStore s : secondaryRegion.getStores()) {
913 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
915 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
916 MemStoreSize mss = store.getFlushableSize();
917 assertTrue(mss.getHeapSize() == MutableSegment.DEEP_OVERHEAD);
919 // assert that the region memstore is empty
920 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
921 assertTrue(newRegionMemstoreSize == 0);
923 assertNull(secondaryRegion.getPrepareFlushResult()); //prepare snapshot should be dropped if any
925 LOG.info("-- Verifying edits from secondary");
926 verifyData(secondaryRegion, 0, numRows, cq, families);
928 LOG.info("-- Verifying edits from primary.");
929 verifyData(primaryRegion, 0, numRows, cq, families);
933 * Tests the case where we replay a region open event after a flush start but before receiving
934 * flush commit
936 @Test
937 public void testReplayRegionOpenEventAfterFlushStart() throws IOException {
938 putDataWithFlushes(primaryRegion, 100, 100, 100);
939 int numRows = 200;
941 // close the region and open again.
942 primaryRegion.close();
943 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
945 // now replay the edits and the flush marker
946 reader = createWALReaderForPrimary();
947 List<RegionEventDescriptor> regionEvents = Lists.newArrayList();
949 LOG.info("-- Replaying edits and region events in secondary");
950 while (true) {
951 WAL.Entry entry = reader.next();
952 if (entry == null) {
953 break;
955 FlushDescriptor flushDesc
956 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
957 RegionEventDescriptor regionEventDesc
958 = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
960 if (flushDesc != null) {
961 // only replay flush start
962 if (flushDesc.getAction() == FlushAction.START_FLUSH) {
963 secondaryRegion.replayWALFlushStartMarker(flushDesc);
965 } else if (regionEventDesc != null) {
966 regionEvents.add(regionEventDesc);
967 } else {
968 replayEdit(secondaryRegion, entry);
972 // at this point, there should be some data (rows 0-100) in the memstore snapshot
973 // and some more data in memstores (rows 100-200)
974 verifyData(secondaryRegion, 0, numRows, cq, families);
976 // we should have 1 open, 1 close and 1 open event
977 assertEquals(3, regionEvents.size());
979 // no store files in the region
980 int expectedStoreFileCount = 0;
981 for (HStore s : secondaryRegion.getStores()) {
982 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
985 // now replay the region open event that should contain new file locations
986 LOG.info("Testing replaying region open event " + regionEvents.get(2));
987 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2));
989 // assert that the flush files are picked
990 expectedStoreFileCount = 2; // two flushes happened
991 for (HStore s : secondaryRegion.getStores()) {
992 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
994 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
995 MemStoreSize newSnapshotSize = store.getSnapshotSize();
996 assertTrue(newSnapshotSize.getDataSize() == 0);
998 // assert that the region memstore is empty
999 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
1000 assertTrue(newRegionMemstoreSize == 0);
1002 assertNull(secondaryRegion.getPrepareFlushResult()); //prepare snapshot should be dropped if any
1004 LOG.info("-- Verifying edits from secondary");
1005 verifyData(secondaryRegion, 0, numRows, cq, families);
1007 LOG.info("-- Verifying edits from primary.");
1008 verifyData(primaryRegion, 0, numRows, cq, families);
1012 * Tests whether edits coming in for replay are skipped which have smaller seq id than the seqId
1013 * of the last replayed region open event.
1015 @Test
1016 public void testSkippingEditsWithSmallerSeqIdAfterRegionOpenEvent() throws IOException {
1017 putDataWithFlushes(primaryRegion, 100, 100, 0);
1018 int numRows = 100;
1020 // close the region and open again.
1021 primaryRegion.close();
1022 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
1024 // now replay the edits and the flush marker
1025 reader = createWALReaderForPrimary();
1026 List<RegionEventDescriptor> regionEvents = Lists.newArrayList();
1027 List<WAL.Entry> edits = Lists.newArrayList();
1029 LOG.info("-- Replaying edits and region events in secondary");
1030 while (true) {
1031 WAL.Entry entry = reader.next();
1032 if (entry == null) {
1033 break;
1035 FlushDescriptor flushDesc
1036 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1037 RegionEventDescriptor regionEventDesc
1038 = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
1040 if (flushDesc != null) {
1041 // don't replay flushes
1042 } else if (regionEventDesc != null) {
1043 regionEvents.add(regionEventDesc);
1044 } else {
1045 edits.add(entry);
1049 // replay the region open of first open, but with the seqid of the second open
1050 // this way non of the flush files will be picked up.
1051 secondaryRegion.replayWALRegionEventMarker(
1052 RegionEventDescriptor.newBuilder(regionEvents.get(0)).setLogSequenceNumber(
1053 regionEvents.get(2).getLogSequenceNumber()).build());
1056 // replay edits from the before region close. If replay does not
1057 // skip these the following verification will NOT fail.
1058 for (WAL.Entry entry: edits) {
1059 replayEdit(secondaryRegion, entry);
1062 boolean expectedFail = false;
1063 try {
1064 verifyData(secondaryRegion, 0, numRows, cq, families);
1065 } catch (AssertionError e) {
1066 expectedFail = true; // expected
1068 if (!expectedFail) {
1069 fail("Should have failed this verification");
1073 @Test
1074 public void testReplayFlushSeqIds() throws IOException {
1075 // load some data to primary and flush
1076 int start = 0;
1077 LOG.info("-- Writing some data to primary from " + start + " to " + (start+100));
1078 putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families);
1079 LOG.info("-- Flushing primary, creating 3 files for 3 stores");
1080 primaryRegion.flush(true);
1082 // now replay the flush marker
1083 reader = createWALReaderForPrimary();
1085 long flushSeqId = -1;
1086 LOG.info("-- Replaying flush events in secondary");
1087 while (true) {
1088 WAL.Entry entry = reader.next();
1089 if (entry == null) {
1090 break;
1092 FlushDescriptor flushDesc
1093 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1094 if (flushDesc != null) {
1095 if (flushDesc.getAction() == FlushAction.START_FLUSH) {
1096 LOG.info("-- Replaying flush start in secondary");
1097 secondaryRegion.replayWALFlushStartMarker(flushDesc);
1098 flushSeqId = flushDesc.getFlushSequenceNumber();
1099 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
1100 LOG.info("-- Replaying flush commit in secondary");
1101 secondaryRegion.replayWALFlushCommitMarker(flushDesc);
1102 assertEquals(flushSeqId, flushDesc.getFlushSequenceNumber());
1105 // else do not replay
1108 // TODO: what to do with this?
1109 // assert that the newly picked up flush file is visible
1110 long readPoint = secondaryRegion.getMVCC().getReadPoint();
1111 assertEquals(flushSeqId, readPoint);
1113 // after replay verify that everything is still visible
1114 verifyData(secondaryRegion, 0, 100, cq, families);
1117 @Test
1118 public void testSeqIdsFromReplay() throws IOException {
1119 // test the case where seqId's coming from replayed WALEdits are made persisted with their
1120 // original seqIds and they are made visible through mvcc read point upon replay
1121 String method = name.getMethodName();
1122 byte[] tableName = Bytes.toBytes(method);
1123 byte[] family = Bytes.toBytes("family");
1125 HRegion region = initHRegion(tableName, family);
1126 try {
1127 // replay an entry that is bigger than current read point
1128 long readPoint = region.getMVCC().getReadPoint();
1129 long origSeqId = readPoint + 100;
1131 Put put = new Put(row).addColumn(family, row, row);
1132 put.setDurability(Durability.SKIP_WAL); // we replay with skip wal
1133 replay(region, put, origSeqId);
1135 // read point should have advanced to this seqId
1136 assertGet(region, family, row);
1138 // region seqId should have advanced at least to this seqId
1139 assertEquals(origSeqId, region.getReadPoint(null));
1141 // replay an entry that is smaller than current read point
1142 // caution: adding an entry below current read point might cause partial dirty reads. Normal
1143 // replay does not allow reads while replay is going on.
1144 put = new Put(row2).addColumn(family, row2, row2);
1145 put.setDurability(Durability.SKIP_WAL);
1146 replay(region, put, origSeqId - 50);
1148 assertGet(region, family, row2);
1149 } finally {
1150 region.close();
1155 * Tests that a region opened in secondary mode would not write region open / close
1156 * events to its WAL.
1157 * @throws IOException
1159 @Test
1160 public void testSecondaryRegionDoesNotWriteRegionEventsToWAL() throws IOException {
1161 secondaryRegion.close();
1162 walSecondary = spy(walSecondary);
1164 // test for region open and close
1165 secondaryRegion = HRegion.openHRegion(secondaryHri, htd, walSecondary, CONF, rss, null);
1166 verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class),
1167 any(WALEdit.class));
1169 // test for replay prepare flush
1170 putDataByReplay(secondaryRegion, 0, 10, cq, families);
1171 secondaryRegion.replayWALFlushStartMarker(FlushDescriptor.newBuilder().
1172 setFlushSequenceNumber(10)
1173 .setTableName(UnsafeByteOperations.unsafeWrap(
1174 primaryRegion.getTableDescriptor().getTableName().getName()))
1175 .setAction(FlushAction.START_FLUSH)
1176 .setEncodedRegionName(
1177 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1178 .setRegionName(UnsafeByteOperations.unsafeWrap(
1179 primaryRegion.getRegionInfo().getRegionName()))
1180 .build());
1182 verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class),
1183 any(WALEdit.class));
1185 secondaryRegion.close();
1186 verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class),
1187 any(WALEdit.class));
1191 * Tests the reads enabled flag for the region. When unset all reads should be rejected
1193 @Test
1194 public void testRegionReadsEnabledFlag() throws IOException {
1196 putDataByReplay(secondaryRegion, 0, 100, cq, families);
1198 verifyData(secondaryRegion, 0, 100, cq, families);
1200 // now disable reads
1201 secondaryRegion.setReadsEnabled(false);
1202 try {
1203 verifyData(secondaryRegion, 0, 100, cq, families);
1204 fail("Should have failed with IOException");
1205 } catch(IOException ex) {
1206 // expected
1209 // verify that we can still replay data
1210 putDataByReplay(secondaryRegion, 100, 100, cq, families);
1212 // now enable reads again
1213 secondaryRegion.setReadsEnabled(true);
1214 verifyData(secondaryRegion, 0, 200, cq, families);
1218 * Tests the case where a request for flush cache is sent to the region, but region cannot flush.
1219 * It should write the flush request marker instead.
1221 @Test
1222 public void testWriteFlushRequestMarker() throws IOException {
1223 // primary region is empty at this point. Request a flush with writeFlushRequestWalMarker=false
1224 FlushResultImpl result = primaryRegion.flushcache(true, false, FlushLifeCycleTracker.DUMMY);
1225 assertNotNull(result);
1226 assertEquals(FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.result);
1227 assertFalse(result.wroteFlushWalMarker);
1229 // request flush again, but this time with writeFlushRequestWalMarker = true
1230 result = primaryRegion.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
1231 assertNotNull(result);
1232 assertEquals(FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.result);
1233 assertTrue(result.wroteFlushWalMarker);
1235 List<FlushDescriptor> flushes = Lists.newArrayList();
1236 reader = createWALReaderForPrimary();
1237 while (true) {
1238 WAL.Entry entry = reader.next();
1239 if (entry == null) {
1240 break;
1242 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1243 if (flush != null) {
1244 flushes.add(flush);
1248 assertEquals(1, flushes.size());
1249 assertNotNull(flushes.get(0));
1250 assertEquals(FlushDescriptor.FlushAction.CANNOT_FLUSH, flushes.get(0).getAction());
1254 * Test the case where the secondary region replica is not in reads enabled state because it is
1255 * waiting for a flush or region open marker from primary region. Replaying CANNOT_FLUSH
1256 * flush marker entry should restore the reads enabled status in the region and allow the reads
1257 * to continue.
1259 @Test
1260 public void testReplayingFlushRequestRestoresReadsEnabledState() throws IOException {
1261 disableReads(secondaryRegion);
1263 // Test case 1: Test that replaying CANNOT_FLUSH request marker assuming this came from
1264 // triggered flush restores readsEnabled
1265 primaryRegion.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
1266 reader = createWALReaderForPrimary();
1267 while (true) {
1268 WAL.Entry entry = reader.next();
1269 if (entry == null) {
1270 break;
1272 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1273 if (flush != null) {
1274 secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId());
1278 // now reads should be enabled
1279 secondaryRegion.get(new Get(Bytes.toBytes(0)));
1283 * Test the case where the secondary region replica is not in reads enabled state because it is
1284 * waiting for a flush or region open marker from primary region. Replaying flush start and commit
1285 * entries should restore the reads enabled status in the region and allow the reads
1286 * to continue.
1288 @Test
1289 public void testReplayingFlushRestoresReadsEnabledState() throws IOException {
1290 // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came
1291 // from triggered flush restores readsEnabled
1292 disableReads(secondaryRegion);
1294 // put some data in primary
1295 putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families);
1296 primaryRegion.flush(true);
1297 // I seem to need to push more edits through so the WAL flushes on local fs. This was not
1298 // needed before HBASE-15028. Not sure whats up. I can see that we have not flushed if I
1299 // look at the WAL if I pause the test here and then use WALPrettyPrinter to look at content..
1300 // Doing same check before HBASE-15028 I can see all edits flushed to the WAL. Somethings up
1301 // but can't figure it... and this is only test that seems to suffer this flush issue.
1302 // St.Ack 20160201
1303 putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families);
1305 reader = createWALReaderForPrimary();
1306 while (true) {
1307 WAL.Entry entry = reader.next();
1308 LOG.info(Objects.toString(entry));
1309 if (entry == null) {
1310 break;
1312 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1313 if (flush != null) {
1314 secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId());
1315 } else {
1316 replayEdit(secondaryRegion, entry);
1320 // now reads should be enabled
1321 verifyData(secondaryRegion, 0, 100, cq, families);
1325 * Test the case where the secondary region replica is not in reads enabled state because it is
1326 * waiting for a flush or region open marker from primary region. Replaying flush start and commit
1327 * entries should restore the reads enabled status in the region and allow the reads
1328 * to continue.
1330 @Test
1331 public void testReplayingFlushWithEmptyMemstoreRestoresReadsEnabledState() throws IOException {
1332 // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came
1333 // from triggered flush restores readsEnabled
1334 disableReads(secondaryRegion);
1336 // put some data in primary
1337 putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families);
1338 primaryRegion.flush(true);
1340 reader = createWALReaderForPrimary();
1341 while (true) {
1342 WAL.Entry entry = reader.next();
1343 if (entry == null) {
1344 break;
1346 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1347 if (flush != null) {
1348 secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId());
1352 // now reads should be enabled
1353 verifyData(secondaryRegion, 0, 100, cq, families);
1357 * Test the case where the secondary region replica is not in reads enabled state because it is
1358 * waiting for a flush or region open marker from primary region. Replaying region open event
1359 * entry from primary should restore the reads enabled status in the region and allow the reads
1360 * to continue.
1362 @Test
1363 public void testReplayingRegionOpenEventRestoresReadsEnabledState() throws IOException {
1364 // Test case 3: Test that replaying region open event markers restores readsEnabled
1365 disableReads(secondaryRegion);
1367 primaryRegion.close();
1368 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
1370 reader = createWALReaderForPrimary();
1371 while (true) {
1372 WAL.Entry entry = reader.next();
1373 if (entry == null) {
1374 break;
1377 RegionEventDescriptor regionEventDesc
1378 = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
1380 if (regionEventDesc != null) {
1381 secondaryRegion.replayWALRegionEventMarker(regionEventDesc);
1385 // now reads should be enabled
1386 secondaryRegion.get(new Get(Bytes.toBytes(0)));
1389 @Test
1390 public void testRefresStoreFiles() throws IOException {
1391 assertEquals(0, primaryRegion.getStoreFileList(families).size());
1392 assertEquals(0, secondaryRegion.getStoreFileList(families).size());
1394 // Test case 1: refresh with an empty region
1395 secondaryRegion.refreshStoreFiles();
1396 assertEquals(0, secondaryRegion.getStoreFileList(families).size());
1398 // do one flush
1399 putDataWithFlushes(primaryRegion, 100, 100, 0);
1400 int numRows = 100;
1402 // refresh the store file list, and ensure that the files are picked up.
1403 secondaryRegion.refreshStoreFiles();
1404 assertPathListsEqual(primaryRegion.getStoreFileList(families),
1405 secondaryRegion.getStoreFileList(families));
1406 assertEquals(families.length, secondaryRegion.getStoreFileList(families).size());
1408 LOG.info("-- Verifying edits from secondary");
1409 verifyData(secondaryRegion, 0, numRows, cq, families);
1411 // Test case 2: 3 some more flushes
1412 putDataWithFlushes(primaryRegion, 100, 300, 0);
1413 numRows = 300;
1415 // refresh the store file list, and ensure that the files are picked up.
1416 secondaryRegion.refreshStoreFiles();
1417 assertPathListsEqual(primaryRegion.getStoreFileList(families),
1418 secondaryRegion.getStoreFileList(families));
1419 assertEquals(families.length * 4, secondaryRegion.getStoreFileList(families).size());
1421 LOG.info("-- Verifying edits from secondary");
1422 verifyData(secondaryRegion, 0, numRows, cq, families);
1424 if (FSUtils.WINDOWS) {
1425 // compaction cannot move files while they are open in secondary on windows. Skip remaining.
1426 return;
1429 // Test case 3: compact primary files
1430 primaryRegion.compactStores();
1431 List<HRegion> regions = new ArrayList<>();
1432 regions.add(primaryRegion);
1433 Mockito.doReturn(regions).when(rss).getRegions();
1434 CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, rss, false);
1435 cleaner.chore();
1436 secondaryRegion.refreshStoreFiles();
1437 assertPathListsEqual(primaryRegion.getStoreFileList(families),
1438 secondaryRegion.getStoreFileList(families));
1439 assertEquals(families.length, secondaryRegion.getStoreFileList(families).size());
1441 LOG.info("-- Verifying edits from secondary");
1442 verifyData(secondaryRegion, 0, numRows, cq, families);
1444 LOG.info("-- Replaying edits in secondary");
1446 // Test case 4: replay some edits, ensure that memstore is dropped.
1447 assertTrue(secondaryRegion.getMemStoreDataSize() == 0);
1448 putDataWithFlushes(primaryRegion, 400, 400, 0);
1449 numRows = 400;
1451 reader = createWALReaderForPrimary();
1452 while (true) {
1453 WAL.Entry entry = reader.next();
1454 if (entry == null) {
1455 break;
1457 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1458 if (flush != null) {
1459 // do not replay flush
1460 } else {
1461 replayEdit(secondaryRegion, entry);
1465 assertTrue(secondaryRegion.getMemStoreDataSize() > 0);
1467 secondaryRegion.refreshStoreFiles();
1469 assertTrue(secondaryRegion.getMemStoreDataSize() == 0);
1471 LOG.info("-- Verifying edits from primary");
1472 verifyData(primaryRegion, 0, numRows, cq, families);
1473 LOG.info("-- Verifying edits from secondary");
1474 verifyData(secondaryRegion, 0, numRows, cq, families);
1478 * Paths can be qualified or not. This does the assertion using String->Path conversion.
1480 private void assertPathListsEqual(List<String> list1, List<String> list2) {
1481 List<Path> l1 = new ArrayList<>(list1.size());
1482 for (String path : list1) {
1483 l1.add(Path.getPathWithoutSchemeAndAuthority(new Path(path)));
1485 List<Path> l2 = new ArrayList<>(list2.size());
1486 for (String path : list2) {
1487 l2.add(Path.getPathWithoutSchemeAndAuthority(new Path(path)));
1489 assertEquals(l1, l2);
1492 private void disableReads(HRegion region) {
1493 region.setReadsEnabled(false);
1494 try {
1495 verifyData(region, 0, 1, cq, families);
1496 fail("Should have failed with IOException");
1497 } catch(IOException ex) {
1498 // expected
1502 private void replay(HRegion region, Put put, long replaySeqId) throws IOException {
1503 put.setDurability(Durability.SKIP_WAL);
1504 MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0);
1505 region.batchReplay(new MutationReplay[] {mutation}, replaySeqId);
1509 * Tests replaying region open markers from primary region. Checks whether the files are picked up
1511 @Test
1512 public void testReplayBulkLoadEvent() throws IOException {
1513 LOG.info("testReplayBulkLoadEvent starts");
1514 putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush
1516 // close the region and open again.
1517 primaryRegion.close();
1518 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
1520 // bulk load a file into primary region
1521 Random random = new Random();
1522 byte[] randomValues = new byte[20];
1523 random.nextBytes(randomValues);
1524 Path testPath = TEST_UTIL.getDataTestDirOnTestFS();
1526 List<Pair<byte[], String>> familyPaths = new ArrayList<>();
1527 int expectedLoadFileCount = 0;
1528 for (byte[] family : families) {
1529 familyPaths.add(new Pair<>(family, createHFileForFamilies(testPath, family, randomValues)));
1530 expectedLoadFileCount++;
1532 primaryRegion.bulkLoadHFiles(familyPaths, false, null);
1534 // now replay the edits and the bulk load marker
1535 reader = createWALReaderForPrimary();
1537 LOG.info("-- Replaying edits and region events in secondary");
1538 BulkLoadDescriptor bulkloadEvent = null;
1539 while (true) {
1540 WAL.Entry entry = reader.next();
1541 if (entry == null) {
1542 break;
1544 bulkloadEvent = WALEdit.getBulkLoadDescriptor(entry.getEdit().getCells().get(0));
1545 if (bulkloadEvent != null) {
1546 break;
1550 // we should have 1 bulk load event
1551 assertTrue(bulkloadEvent != null);
1552 assertEquals(expectedLoadFileCount, bulkloadEvent.getStoresCount());
1554 // replay the bulk load event
1555 secondaryRegion.replayWALBulkLoadEventMarker(bulkloadEvent);
1558 List<String> storeFileName = new ArrayList<>();
1559 for (StoreDescriptor storeDesc : bulkloadEvent.getStoresList()) {
1560 storeFileName.addAll(storeDesc.getStoreFileList());
1562 // assert that the bulk loaded files are picked
1563 for (HStore s : secondaryRegion.getStores()) {
1564 for (HStoreFile sf : s.getStorefiles()) {
1565 storeFileName.remove(sf.getPath().getName());
1568 assertTrue("Found some store file isn't loaded:" + storeFileName, storeFileName.isEmpty());
1570 LOG.info("-- Verifying edits from secondary");
1571 for (byte[] family : families) {
1572 assertGet(secondaryRegion, family, randomValues);
1576 @Test
1577 public void testReplayingFlushCommitWithFileAlreadyDeleted() throws IOException {
1578 // tests replaying flush commit marker, but the flush file has already been compacted
1579 // from primary and also deleted from the archive directory
1580 secondaryRegion.replayWALFlushCommitMarker(FlushDescriptor.newBuilder().
1581 setFlushSequenceNumber(Long.MAX_VALUE)
1582 .setTableName(UnsafeByteOperations.unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName()))
1583 .setAction(FlushAction.COMMIT_FLUSH)
1584 .setEncodedRegionName(
1585 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1586 .setRegionName(UnsafeByteOperations.unsafeWrap(
1587 primaryRegion.getRegionInfo().getRegionName()))
1588 .addStoreFlushes(StoreFlushDescriptor.newBuilder()
1589 .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0]))
1590 .setStoreHomeDir("/store_home_dir")
1591 .addFlushOutput("/foo/baz/123")
1592 .build())
1593 .build());
1596 @Test
1597 public void testReplayingCompactionWithFileAlreadyDeleted() throws IOException {
1598 // tests replaying compaction marker, but the compaction output file has already been compacted
1599 // from primary and also deleted from the archive directory
1600 secondaryRegion.replayWALCompactionMarker(CompactionDescriptor.newBuilder()
1601 .setTableName(UnsafeByteOperations.unsafeWrap(
1602 primaryRegion.getTableDescriptor().getTableName().getName()))
1603 .setEncodedRegionName(
1604 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1605 .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0]))
1606 .addCompactionInput("/123")
1607 .addCompactionOutput("/456")
1608 .setStoreHomeDir("/store_home_dir")
1609 .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName()))
1610 .build()
1611 , true, true, Long.MAX_VALUE);
1614 @Test
1615 public void testReplayingRegionOpenEventWithFileAlreadyDeleted() throws IOException {
1616 // tests replaying region open event marker, but the region files have already been compacted
1617 // from primary and also deleted from the archive directory
1618 secondaryRegion.replayWALRegionEventMarker(RegionEventDescriptor.newBuilder()
1619 .setTableName(UnsafeByteOperations.unsafeWrap(
1620 primaryRegion.getTableDescriptor().getTableName().getName()))
1621 .setEncodedRegionName(
1622 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1623 .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName()))
1624 .setEventType(EventType.REGION_OPEN)
1625 .setServer(ProtobufUtil.toServerName(ServerName.valueOf("foo", 1, 1)))
1626 .setLogSequenceNumber(Long.MAX_VALUE)
1627 .addStores(StoreDescriptor.newBuilder()
1628 .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0]))
1629 .setStoreHomeDir("/store_home_dir")
1630 .addStoreFile("/123")
1631 .build())
1632 .build());
1635 @Test
1636 public void testReplayingBulkLoadEventWithFileAlreadyDeleted() throws IOException {
1637 // tests replaying bulk load event marker, but the bulk load files have already been compacted
1638 // from primary and also deleted from the archive directory
1639 secondaryRegion.replayWALBulkLoadEventMarker(BulkLoadDescriptor.newBuilder()
1640 .setTableName(ProtobufUtil.toProtoTableName(primaryRegion.getTableDescriptor().getTableName()))
1641 .setEncodedRegionName(
1642 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1643 .setBulkloadSeqNum(Long.MAX_VALUE)
1644 .addStores(StoreDescriptor.newBuilder()
1645 .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0]))
1646 .setStoreHomeDir("/store_home_dir")
1647 .addStoreFile("/123")
1648 .build())
1649 .build());
1652 private String createHFileForFamilies(Path testPath, byte[] family,
1653 byte[] valueBytes) throws IOException {
1654 HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration());
1655 // TODO We need a way to do this without creating files
1656 Path testFile = new Path(testPath, TEST_UTIL.getRandomUUID().toString());
1657 FSDataOutputStream out = TEST_UTIL.getTestFileSystem().create(testFile);
1658 try {
1659 hFileFactory.withOutputStream(out);
1660 hFileFactory.withFileContext(new HFileContextBuilder().build());
1661 HFile.Writer writer = hFileFactory.create();
1662 try {
1663 writer.append(new KeyValue(ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY)
1664 .setRow(valueBytes)
1665 .setFamily(family)
1666 .setQualifier(valueBytes)
1667 .setTimestamp(0L)
1668 .setType(KeyValue.Type.Put.getCode())
1669 .setValue(valueBytes)
1670 .build()));
1671 } finally {
1672 writer.close();
1674 } finally {
1675 out.close();
1677 return testFile.toString();
1680 /** Puts a total of numRows + numRowsAfterFlush records indexed with numeric row keys. Does
1681 * a flush every flushInterval number of records. Then it puts numRowsAfterFlush number of
1682 * more rows but does not execute flush after
1683 * @throws IOException */
1684 private void putDataWithFlushes(HRegion region, int flushInterval,
1685 int numRows, int numRowsAfterFlush) throws IOException {
1686 int start = 0;
1687 for (; start < numRows; start += flushInterval) {
1688 LOG.info("-- Writing some data to primary from " + start + " to " + (start+flushInterval));
1689 putData(region, Durability.SYNC_WAL, start, flushInterval, cq, families);
1690 LOG.info("-- Flushing primary, creating 3 files for 3 stores");
1691 region.flush(true);
1693 LOG.info("-- Writing some more data to primary, not flushing");
1694 putData(region, Durability.SYNC_WAL, start, numRowsAfterFlush, cq, families);
1697 private void putDataByReplay(HRegion region,
1698 int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
1699 for (int i = startRow; i < startRow + numRows; i++) {
1700 Put put = new Put(Bytes.toBytes("" + i));
1701 put.setDurability(Durability.SKIP_WAL);
1702 for (byte[] family : families) {
1703 put.addColumn(family, qf, EnvironmentEdgeManager.currentTime(), null);
1705 replay(region, put, i+1);
1709 private static HRegion initHRegion(byte[] tableName, byte[]... families) throws IOException {
1710 return TEST_UTIL.createLocalHRegion(TableName.valueOf(tableName), HConstants.EMPTY_START_ROW,
1711 HConstants.EMPTY_END_ROW, false, Durability.SYNC_WAL, null, families);