HBASE-26567 Remove IndexType from ChunkCreator (#3947)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / regionserver / TestHRegionReplayEvents.java
blob6214b312eb227b32b1c8247a7854b5a1e35420b6
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.regionserver;
20 import static org.apache.hadoop.hbase.regionserver.TestHRegion.assertGet;
21 import static org.apache.hadoop.hbase.regionserver.TestHRegion.putData;
22 import static org.apache.hadoop.hbase.regionserver.TestHRegion.verifyData;
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.assertFalse;
25 import static org.junit.Assert.assertNotNull;
26 import static org.junit.Assert.assertNull;
27 import static org.junit.Assert.assertTrue;
28 import static org.junit.Assert.fail;
29 import static org.mockito.ArgumentMatchers.any;
30 import static org.mockito.Mockito.mock;
31 import static org.mockito.Mockito.spy;
32 import static org.mockito.Mockito.times;
33 import static org.mockito.Mockito.verify;
34 import static org.mockito.Mockito.when;
36 import java.io.FileNotFoundException;
37 import java.io.IOException;
38 import java.util.ArrayList;
39 import java.util.List;
40 import java.util.Map;
41 import java.util.Objects;
42 import java.util.Random;
43 import org.apache.hadoop.conf.Configuration;
44 import org.apache.hadoop.fs.FSDataOutputStream;
45 import org.apache.hadoop.fs.Path;
46 import org.apache.hadoop.hbase.Cell;
47 import org.apache.hadoop.hbase.CellBuilderType;
48 import org.apache.hadoop.hbase.CellUtil;
49 import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
50 import org.apache.hadoop.hbase.HBaseClassTestRule;
51 import org.apache.hadoop.hbase.HBaseTestingUtil;
52 import org.apache.hadoop.hbase.HConstants;
53 import org.apache.hadoop.hbase.KeyValue;
54 import org.apache.hadoop.hbase.ServerName;
55 import org.apache.hadoop.hbase.TableName;
56 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
57 import org.apache.hadoop.hbase.client.Durability;
58 import org.apache.hadoop.hbase.client.Get;
59 import org.apache.hadoop.hbase.client.Put;
60 import org.apache.hadoop.hbase.client.RegionInfo;
61 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
62 import org.apache.hadoop.hbase.client.TableDescriptor;
63 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
64 import org.apache.hadoop.hbase.executor.ExecutorService;
65 import org.apache.hadoop.hbase.executor.ExecutorType;
66 import org.apache.hadoop.hbase.io.hfile.HFile;
67 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
68 import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl;
69 import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult;
70 import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
71 import org.apache.hadoop.hbase.testclassification.LargeTests;
72 import org.apache.hadoop.hbase.util.Bytes;
73 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
74 import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
75 import org.apache.hadoop.hbase.util.FSUtils;
76 import org.apache.hadoop.hbase.util.Pair;
77 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
78 import org.apache.hadoop.hbase.wal.WAL;
79 import org.apache.hadoop.hbase.wal.WALEdit;
80 import org.apache.hadoop.hbase.wal.WALFactory;
81 import org.apache.hadoop.hbase.wal.WALKeyImpl;
82 import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay;
83 import org.apache.hadoop.util.StringUtils;
84 import org.junit.After;
85 import org.junit.AfterClass;
86 import org.junit.Before;
87 import org.junit.BeforeClass;
88 import org.junit.ClassRule;
89 import org.junit.Rule;
90 import org.junit.Test;
91 import org.junit.experimental.categories.Category;
92 import org.junit.rules.TestName;
93 import org.mockito.Mockito;
94 import org.slf4j.Logger;
95 import org.slf4j.LoggerFactory;
97 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
98 import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
100 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
101 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
102 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
103 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
104 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
105 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
106 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor;
107 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
108 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
109 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
112 * Tests of HRegion methods for replaying flush, compaction, region open, etc events for secondary
113 * region replicas
115 @SuppressWarnings("deprecation")
116 @Category(LargeTests.class)
117 public class TestHRegionReplayEvents {
119 @ClassRule
120 public static final HBaseClassTestRule CLASS_RULE =
121 HBaseClassTestRule.forClass(TestHRegionReplayEvents.class);
123 private static final Logger LOG = LoggerFactory.getLogger(TestHRegion.class);
124 @Rule public TestName name = new TestName();
126 private static HBaseTestingUtil TEST_UTIL;
128 public static Configuration CONF;
129 private String dir;
131 private byte[][] families = new byte[][] {
132 Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")};
134 // Test names
135 protected byte[] tableName;
136 protected String method;
137 protected final byte[] row = Bytes.toBytes("rowA");
138 protected final byte[] row2 = Bytes.toBytes("rowB");
139 protected byte[] cq = Bytes.toBytes("cq");
141 // per test fields
142 private Path rootDir;
143 private TableDescriptor htd;
144 private RegionServerServices rss;
145 private RegionInfo primaryHri, secondaryHri;
146 private HRegion primaryRegion, secondaryRegion;
147 private WAL walPrimary, walSecondary;
148 private WAL.Reader reader;
150 @BeforeClass
151 public static void setUpBeforeClass() throws Exception {
152 TEST_UTIL = new HBaseTestingUtil();
153 TEST_UTIL.startMiniDFSCluster(1);
156 @AfterClass
157 public static void tearDownAfterClass() throws Exception {
158 LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir());
159 TEST_UTIL.cleanupTestDir();
160 TEST_UTIL.shutdownMiniDFSCluster();
163 @Before
164 public void setUp() throws Exception {
165 CONF = TEST_UTIL.getConfiguration();
166 dir = TEST_UTIL.getDataTestDir("TestHRegionReplayEvents").toString();
167 method = name.getMethodName();
168 tableName = Bytes.toBytes(name.getMethodName());
169 rootDir = new Path(dir + method);
170 TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, rootDir.toString());
171 method = name.getMethodName();
172 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TableName.valueOf(method));
173 for (byte[] family : families) {
174 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family));
176 htd = builder.build();
178 long time = EnvironmentEdgeManager.currentTime();
179 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0,
180 0, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
181 primaryHri =
182 RegionInfoBuilder.newBuilder(htd.getTableName()).setRegionId(time).setReplicaId(0).build();
183 secondaryHri =
184 RegionInfoBuilder.newBuilder(htd.getTableName()).setRegionId(time).setReplicaId(1).build();
186 WALFactory wals = TestHRegion.createWALFactory(CONF, rootDir);
187 walPrimary = wals.getWAL(primaryHri);
188 walSecondary = wals.getWAL(secondaryHri);
190 rss = mock(RegionServerServices.class);
191 when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1));
192 when(rss.getConfiguration()).thenReturn(CONF);
193 when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting(CONF));
194 String string = org.apache.hadoop.hbase.executor.EventType.RS_COMPACTED_FILES_DISCHARGER
195 .toString();
196 ExecutorService es = new ExecutorService(string);
197 es.startExecutorService(es.new ExecutorConfig().setCorePoolSize(1).setExecutorType(
198 ExecutorType.RS_COMPACTED_FILES_DISCHARGER));
199 when(rss.getExecutorService()).thenReturn(es);
200 primaryRegion = HRegion.createHRegion(primaryHri, rootDir, CONF, htd, walPrimary);
201 primaryRegion.close();
202 List<HRegion> regions = new ArrayList<>();
203 regions.add(primaryRegion);
204 Mockito.doReturn(regions).when(rss).getRegions();
206 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
207 secondaryRegion = HRegion.openHRegion(secondaryHri, htd, null, CONF, rss, null);
209 reader = null;
212 @After
213 public void tearDown() throws Exception {
214 if (reader != null) {
215 reader.close();
218 if (primaryRegion != null) {
219 HBaseTestingUtil.closeRegionAndWAL(primaryRegion);
221 if (secondaryRegion != null) {
222 HBaseTestingUtil.closeRegionAndWAL(secondaryRegion);
225 EnvironmentEdgeManagerTestHelper.reset();
228 String getName() {
229 return name.getMethodName();
232 // Some of the test cases are as follows:
233 // 1. replay flush start marker again
234 // 2. replay flush with smaller seqId than what is there in memstore snapshot
235 // 3. replay flush with larger seqId than what is there in memstore snapshot
236 // 4. replay flush commit without flush prepare. non droppable memstore
237 // 5. replay flush commit without flush prepare. droppable memstore
238 // 6. replay open region event
239 // 7. replay open region event after flush start
240 // 8. replay flush form an earlier seqId (test ignoring seqIds)
241 // 9. start flush does not prevent region from closing.
243 @Test
244 public void testRegionReplicaSecondaryCannotFlush() throws IOException {
245 // load some data and flush ensure that the secondary replica will not execute the flush
247 // load some data to secondary by replaying
248 putDataByReplay(secondaryRegion, 0, 1000, cq, families);
250 verifyData(secondaryRegion, 0, 1000, cq, families);
252 // flush region
253 FlushResultImpl flush = (FlushResultImpl)secondaryRegion.flush(true);
254 assertEquals(FlushResultImpl.Result.CANNOT_FLUSH, flush.result);
256 verifyData(secondaryRegion, 0, 1000, cq, families);
258 // close the region, and inspect that it has not flushed
259 Map<byte[], List<HStoreFile>> files = secondaryRegion.close(false);
260 // assert that there are no files (due to flush)
261 for (List<HStoreFile> f : files.values()) {
262 assertTrue(f.isEmpty());
267 * Tests a case where we replay only a flush start marker, then the region is closed. This region
268 * should not block indefinitely
270 @Test
271 public void testOnlyReplayingFlushStartDoesNotHoldUpRegionClose() throws IOException {
272 // load some data to primary and flush
273 int start = 0;
274 LOG.info("-- Writing some data to primary from " + start + " to " + (start+100));
275 putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families);
276 LOG.info("-- Flushing primary, creating 3 files for 3 stores");
277 primaryRegion.flush(true);
279 // now replay the edits and the flush marker
280 reader = createWALReaderForPrimary();
282 LOG.info("-- Replaying edits and flush events in secondary");
283 while (true) {
284 WAL.Entry entry = reader.next();
285 if (entry == null) {
286 break;
288 FlushDescriptor flushDesc
289 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
290 if (flushDesc != null) {
291 if (flushDesc.getAction() == FlushAction.START_FLUSH) {
292 LOG.info("-- Replaying flush start in secondary");
293 secondaryRegion.replayWALFlushStartMarker(flushDesc);
294 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
295 LOG.info("-- NOT Replaying flush commit in secondary");
297 } else {
298 replayEdit(secondaryRegion, entry);
302 assertTrue(rss.getRegionServerAccounting().getGlobalMemStoreDataSize() > 0);
303 // now close the region which should not cause hold because of un-committed flush
304 secondaryRegion.close();
306 // verify that the memstore size is back to what it was
307 assertEquals(0, rss.getRegionServerAccounting().getGlobalMemStoreDataSize());
310 static int replayEdit(HRegion region, WAL.Entry entry) throws IOException {
311 if (WALEdit.isMetaEditFamily(entry.getEdit().getCells().get(0))) {
312 return 0; // handled elsewhere
314 Put put = new Put(CellUtil.cloneRow(entry.getEdit().getCells().get(0)));
315 for (Cell cell : entry.getEdit().getCells()) put.add(cell);
316 put.setDurability(Durability.SKIP_WAL);
317 MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0);
318 region.batchReplay(new MutationReplay[] {mutation},
319 entry.getKey().getSequenceId());
320 return Integer.parseInt(Bytes.toString(put.getRow()));
323 WAL.Reader createWALReaderForPrimary() throws FileNotFoundException, IOException {
324 return WALFactory.createReader(TEST_UTIL.getTestFileSystem(),
325 AbstractFSWALProvider.getCurrentFileName(walPrimary),
326 TEST_UTIL.getConfiguration());
329 @Test
330 public void testBatchReplayWithMultipleNonces() throws IOException {
331 try {
332 MutationReplay[] mutations = new MutationReplay[100];
333 for (int i = 0; i < 100; i++) {
334 Put put = new Put(Bytes.toBytes(i));
335 put.setDurability(Durability.SYNC_WAL);
336 for (byte[] familly : this.families) {
337 put.addColumn(familly, this.cq, null);
338 long nonceNum = i / 10;
339 mutations[i] = new MutationReplay(MutationType.PUT, put, nonceNum, nonceNum);
342 primaryRegion.batchReplay(mutations, 20);
343 } catch (Exception e) {
344 String msg = "Error while replay of batch with multiple nonces. ";
345 LOG.error(msg, e);
346 fail(msg + e.getMessage());
350 @Test
351 public void testReplayFlushesAndCompactions() throws IOException {
352 // initiate a secondary region with some data.
354 // load some data to primary and flush. 3 flushes and some more unflushed data
355 putDataWithFlushes(primaryRegion, 100, 300, 100);
357 // compaction from primary
358 LOG.info("-- Compacting primary, only 1 store");
359 primaryRegion.compactStore(Bytes.toBytes("cf1"),
360 NoLimitThroughputController.INSTANCE);
362 // now replay the edits and the flush marker
363 reader = createWALReaderForPrimary();
365 LOG.info("-- Replaying edits and flush events in secondary");
366 int lastReplayed = 0;
367 int expectedStoreFileCount = 0;
368 while (true) {
369 WAL.Entry entry = reader.next();
370 if (entry == null) {
371 break;
373 FlushDescriptor flushDesc
374 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
375 CompactionDescriptor compactionDesc
376 = WALEdit.getCompaction(entry.getEdit().getCells().get(0));
377 if (flushDesc != null) {
378 // first verify that everything is replayed and visible before flush event replay
379 verifyData(secondaryRegion, 0, lastReplayed, cq, families);
380 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
381 long storeMemstoreSize = store.getMemStoreSize().getHeapSize();
382 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
383 MemStoreSize mss = store.getFlushableSize();
384 long storeSize = store.getSize();
385 long storeSizeUncompressed = store.getStoreSizeUncompressed();
386 if (flushDesc.getAction() == FlushAction.START_FLUSH) {
387 LOG.info("-- Replaying flush start in secondary");
388 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(flushDesc);
389 assertNull(result.result);
390 assertEquals(result.flushOpSeqId, flushDesc.getFlushSequenceNumber());
392 // assert that the store memstore is smaller now
393 long newStoreMemstoreSize = store.getMemStoreSize().getHeapSize();
394 LOG.info("Memstore size reduced by:"
395 + StringUtils.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize));
396 assertTrue(storeMemstoreSize > newStoreMemstoreSize);
398 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
399 LOG.info("-- Replaying flush commit in secondary");
400 secondaryRegion.replayWALFlushCommitMarker(flushDesc);
402 // assert that the flush files are picked
403 expectedStoreFileCount++;
404 for (HStore s : secondaryRegion.getStores()) {
405 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
407 MemStoreSize newMss = store.getFlushableSize();
408 assertTrue(mss.getHeapSize() > newMss.getHeapSize());
410 // assert that the region memstore is smaller now
411 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
412 assertTrue(regionMemstoreSize > newRegionMemstoreSize);
414 // assert that the store sizes are bigger
415 assertTrue(store.getSize() > storeSize);
416 assertTrue(store.getStoreSizeUncompressed() > storeSizeUncompressed);
417 assertEquals(store.getSize(), store.getStorefilesSize());
419 // after replay verify that everything is still visible
420 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
421 } else if (compactionDesc != null) {
422 secondaryRegion.replayWALCompactionMarker(compactionDesc, true, false, Long.MAX_VALUE);
424 // assert that the compaction is applied
425 for (HStore store : secondaryRegion.getStores()) {
426 if (store.getColumnFamilyName().equals("cf1")) {
427 assertEquals(1, store.getStorefilesCount());
428 } else {
429 assertEquals(expectedStoreFileCount, store.getStorefilesCount());
432 } else {
433 lastReplayed = replayEdit(secondaryRegion, entry);
437 assertEquals(400-1, lastReplayed);
438 LOG.info("-- Verifying edits from secondary");
439 verifyData(secondaryRegion, 0, 400, cq, families);
441 LOG.info("-- Verifying edits from primary. Ensuring that files are not deleted");
442 verifyData(primaryRegion, 0, lastReplayed, cq, families);
443 for (HStore store : primaryRegion.getStores()) {
444 if (store.getColumnFamilyName().equals("cf1")) {
445 assertEquals(1, store.getStorefilesCount());
446 } else {
447 assertEquals(expectedStoreFileCount, store.getStorefilesCount());
453 * Tests cases where we prepare a flush with some seqId and we receive other flush start markers
454 * equal to, greater or less than the previous flush start marker.
456 @Test
457 public void testReplayFlushStartMarkers() throws IOException {
458 // load some data to primary and flush. 1 flush and some more unflushed data
459 putDataWithFlushes(primaryRegion, 100, 100, 100);
460 int numRows = 200;
462 // now replay the edits and the flush marker
463 reader = createWALReaderForPrimary();
465 LOG.info("-- Replaying edits and flush events in secondary");
467 FlushDescriptor startFlushDesc = null;
469 int lastReplayed = 0;
470 while (true) {
471 WAL.Entry entry = reader.next();
472 if (entry == null) {
473 break;
475 FlushDescriptor flushDesc
476 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
477 if (flushDesc != null) {
478 // first verify that everything is replayed and visible before flush event replay
479 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
480 long storeMemstoreSize = store.getMemStoreSize().getHeapSize();
481 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
482 MemStoreSize mss = store.getFlushableSize();
484 if (flushDesc.getAction() == FlushAction.START_FLUSH) {
485 startFlushDesc = flushDesc;
486 LOG.info("-- Replaying flush start in secondary");
487 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
488 assertNull(result.result);
489 assertEquals(result.flushOpSeqId, startFlushDesc.getFlushSequenceNumber());
490 assertTrue(regionMemstoreSize > 0);
491 assertTrue(mss.getHeapSize() > 0);
493 // assert that the store memstore is smaller now
494 long newStoreMemstoreSize = store.getMemStoreSize().getHeapSize();
495 LOG.info("Memstore size reduced by:"
496 + StringUtils.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize));
497 assertTrue(storeMemstoreSize > newStoreMemstoreSize);
498 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
501 // after replay verify that everything is still visible
502 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
503 } else {
504 lastReplayed = replayEdit(secondaryRegion, entry);
508 // at this point, there should be some data (rows 0-100) in memstore snapshot
509 // and some more data in memstores (rows 100-200)
511 verifyData(secondaryRegion, 0, numRows, cq, families);
513 // Test case 1: replay the same flush start marker again
514 LOG.info("-- Replaying same flush start in secondary again");
515 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
516 assertNull(result); // this should return null. Ignoring the flush start marker
517 // assert that we still have prepared flush with the previous setup.
518 assertNotNull(secondaryRegion.getPrepareFlushResult());
519 assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
520 startFlushDesc.getFlushSequenceNumber());
521 assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty
522 verifyData(secondaryRegion, 0, numRows, cq, families);
524 // Test case 2: replay a flush start marker with a smaller seqId
525 FlushDescriptor startFlushDescSmallerSeqId
526 = clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() - 50);
527 LOG.info("-- Replaying same flush start in secondary again " + startFlushDescSmallerSeqId);
528 result = secondaryRegion.replayWALFlushStartMarker(startFlushDescSmallerSeqId);
529 assertNull(result); // this should return null. Ignoring the flush start marker
530 // assert that we still have prepared flush with the previous setup.
531 assertNotNull(secondaryRegion.getPrepareFlushResult());
532 assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
533 startFlushDesc.getFlushSequenceNumber());
534 assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty
535 verifyData(secondaryRegion, 0, numRows, cq, families);
537 // Test case 3: replay a flush start marker with a larger seqId
538 FlushDescriptor startFlushDescLargerSeqId
539 = clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() + 50);
540 LOG.info("-- Replaying same flush start in secondary again " + startFlushDescLargerSeqId);
541 result = secondaryRegion.replayWALFlushStartMarker(startFlushDescLargerSeqId);
542 assertNull(result); // this should return null. Ignoring the flush start marker
543 // assert that we still have prepared flush with the previous setup.
544 assertNotNull(secondaryRegion.getPrepareFlushResult());
545 assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
546 startFlushDesc.getFlushSequenceNumber());
547 assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty
548 verifyData(secondaryRegion, 0, numRows, cq, families);
550 LOG.info("-- Verifying edits from secondary");
551 verifyData(secondaryRegion, 0, numRows, cq, families);
553 LOG.info("-- Verifying edits from primary.");
554 verifyData(primaryRegion, 0, numRows, cq, families);
558 * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker
559 * less than the previous flush start marker.
561 @Test
562 public void testReplayFlushCommitMarkerSmallerThanFlushStartMarker() throws IOException {
563 // load some data to primary and flush. 2 flushes and some more unflushed data
564 putDataWithFlushes(primaryRegion, 100, 200, 100);
565 int numRows = 300;
567 // now replay the edits and the flush marker
568 reader = createWALReaderForPrimary();
570 LOG.info("-- Replaying edits and flush events in secondary");
571 FlushDescriptor startFlushDesc = null;
572 FlushDescriptor commitFlushDesc = null;
574 int lastReplayed = 0;
575 while (true) {
576 System.out.println(lastReplayed);
577 WAL.Entry entry = reader.next();
578 if (entry == null) {
579 break;
581 FlushDescriptor flushDesc
582 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
583 if (flushDesc != null) {
584 if (flushDesc.getAction() == FlushAction.START_FLUSH) {
585 // don't replay the first flush start marker, hold on to it, replay the second one
586 if (startFlushDesc == null) {
587 startFlushDesc = flushDesc;
588 } else {
589 LOG.info("-- Replaying flush start in secondary");
590 startFlushDesc = flushDesc;
591 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
592 assertNull(result.result);
594 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
595 // do not replay any flush commit yet
596 if (commitFlushDesc == null) {
597 commitFlushDesc = flushDesc; // hold on to the first flush commit marker
600 // after replay verify that everything is still visible
601 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
602 } else {
603 lastReplayed = replayEdit(secondaryRegion, entry);
607 // at this point, there should be some data (rows 0-200) in memstore snapshot
608 // and some more data in memstores (rows 200-300)
609 verifyData(secondaryRegion, 0, numRows, cq, families);
611 // no store files in the region
612 int expectedStoreFileCount = 0;
613 for (HStore s : secondaryRegion.getStores()) {
614 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
616 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
618 // Test case 1: replay the a flush commit marker smaller than what we have prepared
619 LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START"
620 + startFlushDesc);
621 assertTrue(commitFlushDesc.getFlushSequenceNumber() < startFlushDesc.getFlushSequenceNumber());
623 LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
624 secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
626 // assert that the flush files are picked
627 expectedStoreFileCount++;
628 for (HStore s : secondaryRegion.getStores()) {
629 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
631 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
632 MemStoreSize mss = store.getFlushableSize();
633 assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped
635 // assert that the region memstore is same as before
636 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
637 assertEquals(regionMemstoreSize, newRegionMemstoreSize);
639 assertNotNull(secondaryRegion.getPrepareFlushResult()); // not dropped
641 LOG.info("-- Verifying edits from secondary");
642 verifyData(secondaryRegion, 0, numRows, cq, families);
644 LOG.info("-- Verifying edits from primary.");
645 verifyData(primaryRegion, 0, numRows, cq, families);
649 * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker
650 * larger than the previous flush start marker.
652 @Test
653 public void testReplayFlushCommitMarkerLargerThanFlushStartMarker() throws IOException {
654 // load some data to primary and flush. 1 flush and some more unflushed data
655 putDataWithFlushes(primaryRegion, 100, 100, 100);
656 int numRows = 200;
658 // now replay the edits and the flush marker
659 reader = createWALReaderForPrimary();
661 LOG.info("-- Replaying edits and flush events in secondary");
662 FlushDescriptor startFlushDesc = null;
663 FlushDescriptor commitFlushDesc = null;
665 int lastReplayed = 0;
666 while (true) {
667 WAL.Entry entry = reader.next();
668 if (entry == null) {
669 break;
671 FlushDescriptor flushDesc
672 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
673 if (flushDesc != null) {
674 if (flushDesc.getAction() == FlushAction.START_FLUSH) {
675 if (startFlushDesc == null) {
676 LOG.info("-- Replaying flush start in secondary");
677 startFlushDesc = flushDesc;
678 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
679 assertNull(result.result);
681 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
682 // do not replay any flush commit yet
683 // hold on to the flush commit marker but simulate a larger
684 // flush commit seqId
685 commitFlushDesc =
686 FlushDescriptor.newBuilder(flushDesc)
687 .setFlushSequenceNumber(flushDesc.getFlushSequenceNumber() + 50)
688 .build();
690 // after replay verify that everything is still visible
691 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
692 } else {
693 lastReplayed = replayEdit(secondaryRegion, entry);
697 // at this point, there should be some data (rows 0-100) in memstore snapshot
698 // and some more data in memstores (rows 100-200)
699 verifyData(secondaryRegion, 0, numRows, cq, families);
701 // no store files in the region
702 int expectedStoreFileCount = 0;
703 for (HStore s : secondaryRegion.getStores()) {
704 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
706 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
708 // Test case 1: replay the a flush commit marker larger than what we have prepared
709 LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START"
710 + startFlushDesc);
711 assertTrue(commitFlushDesc.getFlushSequenceNumber() > startFlushDesc.getFlushSequenceNumber());
713 LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
714 secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
716 // assert that the flush files are picked
717 expectedStoreFileCount++;
718 for (HStore s : secondaryRegion.getStores()) {
719 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
721 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
722 MemStoreSize mss = store.getFlushableSize();
723 assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped
725 // assert that the region memstore is smaller than before, but not empty
726 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
727 assertTrue(newRegionMemstoreSize > 0);
728 assertTrue(regionMemstoreSize > newRegionMemstoreSize);
730 assertNull(secondaryRegion.getPrepareFlushResult()); // prepare snapshot should be dropped
732 LOG.info("-- Verifying edits from secondary");
733 verifyData(secondaryRegion, 0, numRows, cq, families);
735 LOG.info("-- Verifying edits from primary.");
736 verifyData(primaryRegion, 0, numRows, cq, families);
740 * Tests the case where we receive a flush commit before receiving any flush prepare markers.
741 * The memstore edits should be dropped after the flush commit replay since they should be in
742 * flushed files
744 @Test
745 public void testReplayFlushCommitMarkerWithoutFlushStartMarkerDroppableMemstore()
746 throws IOException {
747 testReplayFlushCommitMarkerWithoutFlushStartMarker(true);
751 * Tests the case where we receive a flush commit before receiving any flush prepare markers.
752 * The memstore edits should be not dropped after the flush commit replay since not every edit
753 * will be in flushed files (based on seqId)
755 @Test
756 public void testReplayFlushCommitMarkerWithoutFlushStartMarkerNonDroppableMemstore()
757 throws IOException {
758 testReplayFlushCommitMarkerWithoutFlushStartMarker(false);
762 * Tests the case where we receive a flush commit before receiving any flush prepare markers
764 public void testReplayFlushCommitMarkerWithoutFlushStartMarker(boolean droppableMemstore)
765 throws IOException {
766 // load some data to primary and flush. 1 flushes and some more unflushed data.
767 // write more data after flush depending on whether droppableSnapshot
768 putDataWithFlushes(primaryRegion, 100, 100, droppableMemstore ? 0 : 100);
769 int numRows = droppableMemstore ? 100 : 200;
771 // now replay the edits and the flush marker
772 reader = createWALReaderForPrimary();
774 LOG.info("-- Replaying edits and flush events in secondary");
775 FlushDescriptor commitFlushDesc = null;
777 int lastReplayed = 0;
778 while (true) {
779 WAL.Entry entry = reader.next();
780 if (entry == null) {
781 break;
783 FlushDescriptor flushDesc
784 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
785 if (flushDesc != null) {
786 if (flushDesc.getAction() == FlushAction.START_FLUSH) {
787 // do not replay flush start marker
788 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
789 commitFlushDesc = flushDesc; // hold on to the flush commit marker
791 // after replay verify that everything is still visible
792 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
793 } else {
794 lastReplayed = replayEdit(secondaryRegion, entry);
798 // at this point, there should be some data (rows 0-200) in the memstore without snapshot
799 // and some more data in memstores (rows 100-300)
800 verifyData(secondaryRegion, 0, numRows, cq, families);
802 // no store files in the region
803 int expectedStoreFileCount = 0;
804 for (HStore s : secondaryRegion.getStores()) {
805 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
807 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
809 // Test case 1: replay a flush commit marker without start flush marker
810 assertNull(secondaryRegion.getPrepareFlushResult());
811 assertTrue(commitFlushDesc.getFlushSequenceNumber() > 0);
813 // ensure all files are visible in secondary
814 for (HStore store : secondaryRegion.getStores()) {
815 assertTrue(store.getMaxSequenceId().orElse(0L) <= secondaryRegion.getReadPoint(null));
818 LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
819 secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
821 // assert that the flush files are picked
822 expectedStoreFileCount++;
823 for (HStore s : secondaryRegion.getStores()) {
824 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
826 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
827 MemStoreSize mss = store.getFlushableSize();
828 if (droppableMemstore) {
829 // assert that the memstore is dropped
830 assertTrue(mss.getHeapSize() == MutableSegment.DEEP_OVERHEAD);
831 } else {
832 assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped
835 // assert that the region memstore is same as before (we could not drop)
836 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
837 if (droppableMemstore) {
838 assertTrue(0 == newRegionMemstoreSize);
839 } else {
840 assertTrue(regionMemstoreSize == newRegionMemstoreSize);
843 LOG.info("-- Verifying edits from secondary");
844 verifyData(secondaryRegion, 0, numRows, cq, families);
846 LOG.info("-- Verifying edits from primary.");
847 verifyData(primaryRegion, 0, numRows, cq, families);
850 private FlushDescriptor clone(FlushDescriptor flush, long flushSeqId) {
851 return FlushDescriptor.newBuilder(flush)
852 .setFlushSequenceNumber(flushSeqId)
853 .build();
857 * Tests replaying region open markers from primary region. Checks whether the files are picked up
859 @Test
860 public void testReplayRegionOpenEvent() throws IOException {
861 putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush
862 int numRows = 100;
864 // close the region and open again.
865 primaryRegion.close();
866 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
868 // now replay the edits and the flush marker
869 reader = createWALReaderForPrimary();
870 List<RegionEventDescriptor> regionEvents = Lists.newArrayList();
872 LOG.info("-- Replaying edits and region events in secondary");
873 while (true) {
874 WAL.Entry entry = reader.next();
875 if (entry == null) {
876 break;
878 FlushDescriptor flushDesc
879 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
880 RegionEventDescriptor regionEventDesc
881 = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
883 if (flushDesc != null) {
884 // don't replay flush events
885 } else if (regionEventDesc != null) {
886 regionEvents.add(regionEventDesc);
887 } else {
888 // don't replay edits
892 // we should have 1 open, 1 close and 1 open event
893 assertEquals(3, regionEvents.size());
895 // replay the first region open event.
896 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(0));
898 // replay the close event as well
899 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(1));
901 // no store files in the region
902 int expectedStoreFileCount = 0;
903 for (HStore s : secondaryRegion.getStores()) {
904 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
906 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
907 assertTrue(regionMemstoreSize == 0);
909 // now replay the region open event that should contain new file locations
910 LOG.info("Testing replaying region open event " + regionEvents.get(2));
911 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2));
913 // assert that the flush files are picked
914 expectedStoreFileCount++;
915 for (HStore s : secondaryRegion.getStores()) {
916 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
918 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
919 MemStoreSize mss = store.getFlushableSize();
920 assertTrue(mss.getHeapSize() == MutableSegment.DEEP_OVERHEAD);
922 // assert that the region memstore is empty
923 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
924 assertTrue(newRegionMemstoreSize == 0);
926 assertNull(secondaryRegion.getPrepareFlushResult()); //prepare snapshot should be dropped if any
928 LOG.info("-- Verifying edits from secondary");
929 verifyData(secondaryRegion, 0, numRows, cq, families);
931 LOG.info("-- Verifying edits from primary.");
932 verifyData(primaryRegion, 0, numRows, cq, families);
936 * Tests the case where we replay a region open event after a flush start but before receiving
937 * flush commit
939 @Test
940 public void testReplayRegionOpenEventAfterFlushStart() throws IOException {
941 putDataWithFlushes(primaryRegion, 100, 100, 100);
942 int numRows = 200;
944 // close the region and open again.
945 primaryRegion.close();
946 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
948 // now replay the edits and the flush marker
949 reader = createWALReaderForPrimary();
950 List<RegionEventDescriptor> regionEvents = Lists.newArrayList();
952 LOG.info("-- Replaying edits and region events in secondary");
953 while (true) {
954 WAL.Entry entry = reader.next();
955 if (entry == null) {
956 break;
958 FlushDescriptor flushDesc
959 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
960 RegionEventDescriptor regionEventDesc
961 = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
963 if (flushDesc != null) {
964 // only replay flush start
965 if (flushDesc.getAction() == FlushAction.START_FLUSH) {
966 secondaryRegion.replayWALFlushStartMarker(flushDesc);
968 } else if (regionEventDesc != null) {
969 regionEvents.add(regionEventDesc);
970 } else {
971 replayEdit(secondaryRegion, entry);
975 // at this point, there should be some data (rows 0-100) in the memstore snapshot
976 // and some more data in memstores (rows 100-200)
977 verifyData(secondaryRegion, 0, numRows, cq, families);
979 // we should have 1 open, 1 close and 1 open event
980 assertEquals(3, regionEvents.size());
982 // no store files in the region
983 int expectedStoreFileCount = 0;
984 for (HStore s : secondaryRegion.getStores()) {
985 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
988 // now replay the region open event that should contain new file locations
989 LOG.info("Testing replaying region open event " + regionEvents.get(2));
990 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2));
992 // assert that the flush files are picked
993 expectedStoreFileCount = 2; // two flushes happened
994 for (HStore s : secondaryRegion.getStores()) {
995 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
997 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
998 MemStoreSize newSnapshotSize = store.getSnapshotSize();
999 assertTrue(newSnapshotSize.getDataSize() == 0);
1001 // assert that the region memstore is empty
1002 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
1003 assertTrue(newRegionMemstoreSize == 0);
1005 assertNull(secondaryRegion.getPrepareFlushResult()); //prepare snapshot should be dropped if any
1007 LOG.info("-- Verifying edits from secondary");
1008 verifyData(secondaryRegion, 0, numRows, cq, families);
1010 LOG.info("-- Verifying edits from primary.");
1011 verifyData(primaryRegion, 0, numRows, cq, families);
1015 * Tests whether edits coming in for replay are skipped which have smaller seq id than the seqId
1016 * of the last replayed region open event.
1018 @Test
1019 public void testSkippingEditsWithSmallerSeqIdAfterRegionOpenEvent() throws IOException {
1020 putDataWithFlushes(primaryRegion, 100, 100, 0);
1021 int numRows = 100;
1023 // close the region and open again.
1024 primaryRegion.close();
1025 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
1027 // now replay the edits and the flush marker
1028 reader = createWALReaderForPrimary();
1029 List<RegionEventDescriptor> regionEvents = Lists.newArrayList();
1030 List<WAL.Entry> edits = Lists.newArrayList();
1032 LOG.info("-- Replaying edits and region events in secondary");
1033 while (true) {
1034 WAL.Entry entry = reader.next();
1035 if (entry == null) {
1036 break;
1038 FlushDescriptor flushDesc
1039 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1040 RegionEventDescriptor regionEventDesc
1041 = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
1043 if (flushDesc != null) {
1044 // don't replay flushes
1045 } else if (regionEventDesc != null) {
1046 regionEvents.add(regionEventDesc);
1047 } else {
1048 edits.add(entry);
1052 // replay the region open of first open, but with the seqid of the second open
1053 // this way non of the flush files will be picked up.
1054 secondaryRegion.replayWALRegionEventMarker(
1055 RegionEventDescriptor.newBuilder(regionEvents.get(0)).setLogSequenceNumber(
1056 regionEvents.get(2).getLogSequenceNumber()).build());
1059 // replay edits from the before region close. If replay does not
1060 // skip these the following verification will NOT fail.
1061 for (WAL.Entry entry: edits) {
1062 replayEdit(secondaryRegion, entry);
1065 boolean expectedFail = false;
1066 try {
1067 verifyData(secondaryRegion, 0, numRows, cq, families);
1068 } catch (AssertionError e) {
1069 expectedFail = true; // expected
1071 if (!expectedFail) {
1072 fail("Should have failed this verification");
1076 @Test
1077 public void testReplayFlushSeqIds() throws IOException {
1078 // load some data to primary and flush
1079 int start = 0;
1080 LOG.info("-- Writing some data to primary from " + start + " to " + (start+100));
1081 putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families);
1082 LOG.info("-- Flushing primary, creating 3 files for 3 stores");
1083 primaryRegion.flush(true);
1085 // now replay the flush marker
1086 reader = createWALReaderForPrimary();
1088 long flushSeqId = -1;
1089 LOG.info("-- Replaying flush events in secondary");
1090 while (true) {
1091 WAL.Entry entry = reader.next();
1092 if (entry == null) {
1093 break;
1095 FlushDescriptor flushDesc
1096 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1097 if (flushDesc != null) {
1098 if (flushDesc.getAction() == FlushAction.START_FLUSH) {
1099 LOG.info("-- Replaying flush start in secondary");
1100 secondaryRegion.replayWALFlushStartMarker(flushDesc);
1101 flushSeqId = flushDesc.getFlushSequenceNumber();
1102 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
1103 LOG.info("-- Replaying flush commit in secondary");
1104 secondaryRegion.replayWALFlushCommitMarker(flushDesc);
1105 assertEquals(flushSeqId, flushDesc.getFlushSequenceNumber());
1108 // else do not replay
1111 // TODO: what to do with this?
1112 // assert that the newly picked up flush file is visible
1113 long readPoint = secondaryRegion.getMVCC().getReadPoint();
1114 assertEquals(flushSeqId, readPoint);
1116 // after replay verify that everything is still visible
1117 verifyData(secondaryRegion, 0, 100, cq, families);
1120 @Test
1121 public void testSeqIdsFromReplay() throws IOException {
1122 // test the case where seqId's coming from replayed WALEdits are made persisted with their
1123 // original seqIds and they are made visible through mvcc read point upon replay
1124 String method = name.getMethodName();
1125 byte[] tableName = Bytes.toBytes(method);
1126 byte[] family = Bytes.toBytes("family");
1128 HRegion region = initHRegion(tableName, family);
1129 try {
1130 // replay an entry that is bigger than current read point
1131 long readPoint = region.getMVCC().getReadPoint();
1132 long origSeqId = readPoint + 100;
1134 Put put = new Put(row).addColumn(family, row, row);
1135 put.setDurability(Durability.SKIP_WAL); // we replay with skip wal
1136 replay(region, put, origSeqId);
1138 // read point should have advanced to this seqId
1139 assertGet(region, family, row);
1141 // region seqId should have advanced at least to this seqId
1142 assertEquals(origSeqId, region.getReadPoint(null));
1144 // replay an entry that is smaller than current read point
1145 // caution: adding an entry below current read point might cause partial dirty reads. Normal
1146 // replay does not allow reads while replay is going on.
1147 put = new Put(row2).addColumn(family, row2, row2);
1148 put.setDurability(Durability.SKIP_WAL);
1149 replay(region, put, origSeqId - 50);
1151 assertGet(region, family, row2);
1152 } finally {
1153 region.close();
1158 * Tests that a region opened in secondary mode would not write region open / close
1159 * events to its WAL.
1160 * @throws IOException
1162 @Test
1163 public void testSecondaryRegionDoesNotWriteRegionEventsToWAL() throws IOException {
1164 secondaryRegion.close();
1165 walSecondary = spy(walSecondary);
1167 // test for region open and close
1168 secondaryRegion = HRegion.openHRegion(secondaryHri, htd, walSecondary, CONF, rss, null);
1169 verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class),
1170 any(WALEdit.class));
1172 // test for replay prepare flush
1173 putDataByReplay(secondaryRegion, 0, 10, cq, families);
1174 secondaryRegion.replayWALFlushStartMarker(FlushDescriptor.newBuilder().
1175 setFlushSequenceNumber(10)
1176 .setTableName(UnsafeByteOperations.unsafeWrap(
1177 primaryRegion.getTableDescriptor().getTableName().getName()))
1178 .setAction(FlushAction.START_FLUSH)
1179 .setEncodedRegionName(
1180 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1181 .setRegionName(UnsafeByteOperations.unsafeWrap(
1182 primaryRegion.getRegionInfo().getRegionName()))
1183 .build());
1185 verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class),
1186 any(WALEdit.class));
1188 secondaryRegion.close();
1189 verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class),
1190 any(WALEdit.class));
1194 * Tests the reads enabled flag for the region. When unset all reads should be rejected
1196 @Test
1197 public void testRegionReadsEnabledFlag() throws IOException {
1199 putDataByReplay(secondaryRegion, 0, 100, cq, families);
1201 verifyData(secondaryRegion, 0, 100, cq, families);
1203 // now disable reads
1204 secondaryRegion.setReadsEnabled(false);
1205 try {
1206 verifyData(secondaryRegion, 0, 100, cq, families);
1207 fail("Should have failed with IOException");
1208 } catch(IOException ex) {
1209 // expected
1212 // verify that we can still replay data
1213 putDataByReplay(secondaryRegion, 100, 100, cq, families);
1215 // now enable reads again
1216 secondaryRegion.setReadsEnabled(true);
1217 verifyData(secondaryRegion, 0, 200, cq, families);
1221 * Tests the case where a request for flush cache is sent to the region, but region cannot flush.
1222 * It should write the flush request marker instead.
1224 @Test
1225 public void testWriteFlushRequestMarker() throws IOException {
1226 // primary region is empty at this point. Request a flush with writeFlushRequestWalMarker=false
1227 FlushResultImpl result = primaryRegion.flushcache(true, false, FlushLifeCycleTracker.DUMMY);
1228 assertNotNull(result);
1229 assertEquals(FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.result);
1230 assertFalse(result.wroteFlushWalMarker);
1232 // request flush again, but this time with writeFlushRequestWalMarker = true
1233 result = primaryRegion.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
1234 assertNotNull(result);
1235 assertEquals(FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.result);
1236 assertTrue(result.wroteFlushWalMarker);
1238 List<FlushDescriptor> flushes = Lists.newArrayList();
1239 reader = createWALReaderForPrimary();
1240 while (true) {
1241 WAL.Entry entry = reader.next();
1242 if (entry == null) {
1243 break;
1245 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1246 if (flush != null) {
1247 flushes.add(flush);
1251 assertEquals(1, flushes.size());
1252 assertNotNull(flushes.get(0));
1253 assertEquals(FlushDescriptor.FlushAction.CANNOT_FLUSH, flushes.get(0).getAction());
1257 * Test the case where the secondary region replica is not in reads enabled state because it is
1258 * waiting for a flush or region open marker from primary region. Replaying CANNOT_FLUSH
1259 * flush marker entry should restore the reads enabled status in the region and allow the reads
1260 * to continue.
1262 @Test
1263 public void testReplayingFlushRequestRestoresReadsEnabledState() throws IOException {
1264 disableReads(secondaryRegion);
1266 // Test case 1: Test that replaying CANNOT_FLUSH request marker assuming this came from
1267 // triggered flush restores readsEnabled
1268 primaryRegion.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
1269 reader = createWALReaderForPrimary();
1270 while (true) {
1271 WAL.Entry entry = reader.next();
1272 if (entry == null) {
1273 break;
1275 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1276 if (flush != null) {
1277 secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId());
1281 // now reads should be enabled
1282 secondaryRegion.get(new Get(Bytes.toBytes(0)));
1286 * Test the case where the secondary region replica is not in reads enabled state because it is
1287 * waiting for a flush or region open marker from primary region. Replaying flush start and commit
1288 * entries should restore the reads enabled status in the region and allow the reads
1289 * to continue.
1291 @Test
1292 public void testReplayingFlushRestoresReadsEnabledState() throws IOException {
1293 // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came
1294 // from triggered flush restores readsEnabled
1295 disableReads(secondaryRegion);
1297 // put some data in primary
1298 putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families);
1299 primaryRegion.flush(true);
1300 // I seem to need to push more edits through so the WAL flushes on local fs. This was not
1301 // needed before HBASE-15028. Not sure whats up. I can see that we have not flushed if I
1302 // look at the WAL if I pause the test here and then use WALPrettyPrinter to look at content..
1303 // Doing same check before HBASE-15028 I can see all edits flushed to the WAL. Somethings up
1304 // but can't figure it... and this is only test that seems to suffer this flush issue.
1305 // St.Ack 20160201
1306 putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families);
1308 reader = createWALReaderForPrimary();
1309 while (true) {
1310 WAL.Entry entry = reader.next();
1311 LOG.info(Objects.toString(entry));
1312 if (entry == null) {
1313 break;
1315 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1316 if (flush != null) {
1317 secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId());
1318 } else {
1319 replayEdit(secondaryRegion, entry);
1323 // now reads should be enabled
1324 verifyData(secondaryRegion, 0, 100, cq, families);
1328 * Test the case where the secondary region replica is not in reads enabled state because it is
1329 * waiting for a flush or region open marker from primary region. Replaying flush start and commit
1330 * entries should restore the reads enabled status in the region and allow the reads
1331 * to continue.
1333 @Test
1334 public void testReplayingFlushWithEmptyMemstoreRestoresReadsEnabledState() throws IOException {
1335 // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came
1336 // from triggered flush restores readsEnabled
1337 disableReads(secondaryRegion);
1339 // put some data in primary
1340 putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families);
1341 primaryRegion.flush(true);
1343 reader = createWALReaderForPrimary();
1344 while (true) {
1345 WAL.Entry entry = reader.next();
1346 if (entry == null) {
1347 break;
1349 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1350 if (flush != null) {
1351 secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId());
1355 // now reads should be enabled
1356 verifyData(secondaryRegion, 0, 100, cq, families);
1360 * Test the case where the secondary region replica is not in reads enabled state because it is
1361 * waiting for a flush or region open marker from primary region. Replaying region open event
1362 * entry from primary should restore the reads enabled status in the region and allow the reads
1363 * to continue.
1365 @Test
1366 public void testReplayingRegionOpenEventRestoresReadsEnabledState() throws IOException {
1367 // Test case 3: Test that replaying region open event markers restores readsEnabled
1368 disableReads(secondaryRegion);
1370 primaryRegion.close();
1371 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
1373 reader = createWALReaderForPrimary();
1374 while (true) {
1375 WAL.Entry entry = reader.next();
1376 if (entry == null) {
1377 break;
1380 RegionEventDescriptor regionEventDesc
1381 = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
1383 if (regionEventDesc != null) {
1384 secondaryRegion.replayWALRegionEventMarker(regionEventDesc);
1388 // now reads should be enabled
1389 secondaryRegion.get(new Get(Bytes.toBytes(0)));
1392 @Test
1393 public void testRefresStoreFiles() throws IOException {
1394 assertEquals(0, primaryRegion.getStoreFileList(families).size());
1395 assertEquals(0, secondaryRegion.getStoreFileList(families).size());
1397 // Test case 1: refresh with an empty region
1398 secondaryRegion.refreshStoreFiles();
1399 assertEquals(0, secondaryRegion.getStoreFileList(families).size());
1401 // do one flush
1402 putDataWithFlushes(primaryRegion, 100, 100, 0);
1403 int numRows = 100;
1405 // refresh the store file list, and ensure that the files are picked up.
1406 secondaryRegion.refreshStoreFiles();
1407 assertPathListsEqual(primaryRegion.getStoreFileList(families),
1408 secondaryRegion.getStoreFileList(families));
1409 assertEquals(families.length, secondaryRegion.getStoreFileList(families).size());
1411 LOG.info("-- Verifying edits from secondary");
1412 verifyData(secondaryRegion, 0, numRows, cq, families);
1414 // Test case 2: 3 some more flushes
1415 putDataWithFlushes(primaryRegion, 100, 300, 0);
1416 numRows = 300;
1418 // refresh the store file list, and ensure that the files are picked up.
1419 secondaryRegion.refreshStoreFiles();
1420 assertPathListsEqual(primaryRegion.getStoreFileList(families),
1421 secondaryRegion.getStoreFileList(families));
1422 assertEquals(families.length * 4, secondaryRegion.getStoreFileList(families).size());
1424 LOG.info("-- Verifying edits from secondary");
1425 verifyData(secondaryRegion, 0, numRows, cq, families);
1427 if (FSUtils.WINDOWS) {
1428 // compaction cannot move files while they are open in secondary on windows. Skip remaining.
1429 return;
1432 // Test case 3: compact primary files
1433 primaryRegion.compactStores();
1434 List<HRegion> regions = new ArrayList<>();
1435 regions.add(primaryRegion);
1436 Mockito.doReturn(regions).when(rss).getRegions();
1437 CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, rss, false);
1438 cleaner.chore();
1439 secondaryRegion.refreshStoreFiles();
1440 assertPathListsEqual(primaryRegion.getStoreFileList(families),
1441 secondaryRegion.getStoreFileList(families));
1442 assertEquals(families.length, secondaryRegion.getStoreFileList(families).size());
1444 LOG.info("-- Verifying edits from secondary");
1445 verifyData(secondaryRegion, 0, numRows, cq, families);
1447 LOG.info("-- Replaying edits in secondary");
1449 // Test case 4: replay some edits, ensure that memstore is dropped.
1450 assertTrue(secondaryRegion.getMemStoreDataSize() == 0);
1451 putDataWithFlushes(primaryRegion, 400, 400, 0);
1452 numRows = 400;
1454 reader = createWALReaderForPrimary();
1455 while (true) {
1456 WAL.Entry entry = reader.next();
1457 if (entry == null) {
1458 break;
1460 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1461 if (flush != null) {
1462 // do not replay flush
1463 } else {
1464 replayEdit(secondaryRegion, entry);
1468 assertTrue(secondaryRegion.getMemStoreDataSize() > 0);
1470 secondaryRegion.refreshStoreFiles();
1472 assertTrue(secondaryRegion.getMemStoreDataSize() == 0);
1474 LOG.info("-- Verifying edits from primary");
1475 verifyData(primaryRegion, 0, numRows, cq, families);
1476 LOG.info("-- Verifying edits from secondary");
1477 verifyData(secondaryRegion, 0, numRows, cq, families);
1481 * Paths can be qualified or not. This does the assertion using String->Path conversion.
1483 private void assertPathListsEqual(List<String> list1, List<String> list2) {
1484 List<Path> l1 = new ArrayList<>(list1.size());
1485 for (String path : list1) {
1486 l1.add(Path.getPathWithoutSchemeAndAuthority(new Path(path)));
1488 List<Path> l2 = new ArrayList<>(list2.size());
1489 for (String path : list2) {
1490 l2.add(Path.getPathWithoutSchemeAndAuthority(new Path(path)));
1492 assertEquals(l1, l2);
1495 private void disableReads(HRegion region) {
1496 region.setReadsEnabled(false);
1497 try {
1498 verifyData(region, 0, 1, cq, families);
1499 fail("Should have failed with IOException");
1500 } catch(IOException ex) {
1501 // expected
1505 private void replay(HRegion region, Put put, long replaySeqId) throws IOException {
1506 put.setDurability(Durability.SKIP_WAL);
1507 MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0);
1508 region.batchReplay(new MutationReplay[] {mutation}, replaySeqId);
1512 * Tests replaying region open markers from primary region. Checks whether the files are picked up
1514 @Test
1515 public void testReplayBulkLoadEvent() throws IOException {
1516 LOG.info("testReplayBulkLoadEvent starts");
1517 putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush
1519 // close the region and open again.
1520 primaryRegion.close();
1521 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
1523 // bulk load a file into primary region
1524 Random random = new Random();
1525 byte[] randomValues = new byte[20];
1526 random.nextBytes(randomValues);
1527 Path testPath = TEST_UTIL.getDataTestDirOnTestFS();
1529 List<Pair<byte[], String>> familyPaths = new ArrayList<>();
1530 int expectedLoadFileCount = 0;
1531 for (byte[] family : families) {
1532 familyPaths.add(new Pair<>(family, createHFileForFamilies(testPath, family, randomValues)));
1533 expectedLoadFileCount++;
1535 primaryRegion.bulkLoadHFiles(familyPaths, false, null);
1537 // now replay the edits and the bulk load marker
1538 reader = createWALReaderForPrimary();
1540 LOG.info("-- Replaying edits and region events in secondary");
1541 BulkLoadDescriptor bulkloadEvent = null;
1542 while (true) {
1543 WAL.Entry entry = reader.next();
1544 if (entry == null) {
1545 break;
1547 bulkloadEvent = WALEdit.getBulkLoadDescriptor(entry.getEdit().getCells().get(0));
1548 if (bulkloadEvent != null) {
1549 break;
1553 // we should have 1 bulk load event
1554 assertTrue(bulkloadEvent != null);
1555 assertEquals(expectedLoadFileCount, bulkloadEvent.getStoresCount());
1557 // replay the bulk load event
1558 secondaryRegion.replayWALBulkLoadEventMarker(bulkloadEvent);
1561 List<String> storeFileName = new ArrayList<>();
1562 for (StoreDescriptor storeDesc : bulkloadEvent.getStoresList()) {
1563 storeFileName.addAll(storeDesc.getStoreFileList());
1565 // assert that the bulk loaded files are picked
1566 for (HStore s : secondaryRegion.getStores()) {
1567 for (HStoreFile sf : s.getStorefiles()) {
1568 storeFileName.remove(sf.getPath().getName());
1571 assertTrue("Found some store file isn't loaded:" + storeFileName, storeFileName.isEmpty());
1573 LOG.info("-- Verifying edits from secondary");
1574 for (byte[] family : families) {
1575 assertGet(secondaryRegion, family, randomValues);
1579 @Test
1580 public void testReplayingFlushCommitWithFileAlreadyDeleted() throws IOException {
1581 // tests replaying flush commit marker, but the flush file has already been compacted
1582 // from primary and also deleted from the archive directory
1583 secondaryRegion.replayWALFlushCommitMarker(FlushDescriptor.newBuilder().
1584 setFlushSequenceNumber(Long.MAX_VALUE)
1585 .setTableName(UnsafeByteOperations.unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName()))
1586 .setAction(FlushAction.COMMIT_FLUSH)
1587 .setEncodedRegionName(
1588 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1589 .setRegionName(UnsafeByteOperations.unsafeWrap(
1590 primaryRegion.getRegionInfo().getRegionName()))
1591 .addStoreFlushes(StoreFlushDescriptor.newBuilder()
1592 .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0]))
1593 .setStoreHomeDir("/store_home_dir")
1594 .addFlushOutput("/foo/baz/123")
1595 .build())
1596 .build());
1599 @Test
1600 public void testReplayingCompactionWithFileAlreadyDeleted() throws IOException {
1601 // tests replaying compaction marker, but the compaction output file has already been compacted
1602 // from primary and also deleted from the archive directory
1603 secondaryRegion.replayWALCompactionMarker(CompactionDescriptor.newBuilder()
1604 .setTableName(UnsafeByteOperations.unsafeWrap(
1605 primaryRegion.getTableDescriptor().getTableName().getName()))
1606 .setEncodedRegionName(
1607 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1608 .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0]))
1609 .addCompactionInput("/123")
1610 .addCompactionOutput("/456")
1611 .setStoreHomeDir("/store_home_dir")
1612 .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName()))
1613 .build()
1614 , true, true, Long.MAX_VALUE);
1617 @Test
1618 public void testReplayingRegionOpenEventWithFileAlreadyDeleted() throws IOException {
1619 // tests replaying region open event marker, but the region files have already been compacted
1620 // from primary and also deleted from the archive directory
1621 secondaryRegion.replayWALRegionEventMarker(RegionEventDescriptor.newBuilder()
1622 .setTableName(UnsafeByteOperations.unsafeWrap(
1623 primaryRegion.getTableDescriptor().getTableName().getName()))
1624 .setEncodedRegionName(
1625 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1626 .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName()))
1627 .setEventType(EventType.REGION_OPEN)
1628 .setServer(ProtobufUtil.toServerName(ServerName.valueOf("foo", 1, 1)))
1629 .setLogSequenceNumber(Long.MAX_VALUE)
1630 .addStores(StoreDescriptor.newBuilder()
1631 .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0]))
1632 .setStoreHomeDir("/store_home_dir")
1633 .addStoreFile("/123")
1634 .build())
1635 .build());
1638 @Test
1639 public void testReplayingBulkLoadEventWithFileAlreadyDeleted() throws IOException {
1640 // tests replaying bulk load event marker, but the bulk load files have already been compacted
1641 // from primary and also deleted from the archive directory
1642 secondaryRegion.replayWALBulkLoadEventMarker(BulkLoadDescriptor.newBuilder()
1643 .setTableName(ProtobufUtil.toProtoTableName(primaryRegion.getTableDescriptor().getTableName()))
1644 .setEncodedRegionName(
1645 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1646 .setBulkloadSeqNum(Long.MAX_VALUE)
1647 .addStores(StoreDescriptor.newBuilder()
1648 .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0]))
1649 .setStoreHomeDir("/store_home_dir")
1650 .addStoreFile("/123")
1651 .build())
1652 .build());
1655 private String createHFileForFamilies(Path testPath, byte[] family,
1656 byte[] valueBytes) throws IOException {
1657 HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration());
1658 // TODO We need a way to do this without creating files
1659 Path testFile = new Path(testPath, TEST_UTIL.getRandomUUID().toString());
1660 FSDataOutputStream out = TEST_UTIL.getTestFileSystem().create(testFile);
1661 try {
1662 hFileFactory.withOutputStream(out);
1663 hFileFactory.withFileContext(new HFileContextBuilder().build());
1664 HFile.Writer writer = hFileFactory.create();
1665 try {
1666 writer.append(new KeyValue(ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY)
1667 .setRow(valueBytes)
1668 .setFamily(family)
1669 .setQualifier(valueBytes)
1670 .setTimestamp(0L)
1671 .setType(KeyValue.Type.Put.getCode())
1672 .setValue(valueBytes)
1673 .build()));
1674 } finally {
1675 writer.close();
1677 } finally {
1678 out.close();
1680 return testFile.toString();
1683 /** Puts a total of numRows + numRowsAfterFlush records indexed with numeric row keys. Does
1684 * a flush every flushInterval number of records. Then it puts numRowsAfterFlush number of
1685 * more rows but does not execute flush after
1686 * @throws IOException */
1687 private void putDataWithFlushes(HRegion region, int flushInterval,
1688 int numRows, int numRowsAfterFlush) throws IOException {
1689 int start = 0;
1690 for (; start < numRows; start += flushInterval) {
1691 LOG.info("-- Writing some data to primary from " + start + " to " + (start+flushInterval));
1692 putData(region, Durability.SYNC_WAL, start, flushInterval, cq, families);
1693 LOG.info("-- Flushing primary, creating 3 files for 3 stores");
1694 region.flush(true);
1696 LOG.info("-- Writing some more data to primary, not flushing");
1697 putData(region, Durability.SYNC_WAL, start, numRowsAfterFlush, cq, families);
1700 private void putDataByReplay(HRegion region,
1701 int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
1702 for (int i = startRow; i < startRow + numRows; i++) {
1703 Put put = new Put(Bytes.toBytes("" + i));
1704 put.setDurability(Durability.SKIP_WAL);
1705 for (byte[] family : families) {
1706 put.addColumn(family, qf, EnvironmentEdgeManager.currentTime(), null);
1708 replay(region, put, i+1);
1712 private static HRegion initHRegion(byte[] tableName, byte[]... families) throws IOException {
1713 return TEST_UTIL.createLocalHRegion(TableName.valueOf(tableName), HConstants.EMPTY_START_ROW,
1714 HConstants.EMPTY_END_ROW, CONF, false, Durability.SYNC_WAL, null, families);