HBASE-17532 Replaced explicit type with diamond operator
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / regionserver / TestHRegionReplayEvents.java
blob005464262bb01e0f530ff439f0c34eb4e322cdac
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
19 package org.apache.hadoop.hbase.regionserver;
21 import static org.apache.hadoop.hbase.regionserver.TestHRegion.assertGet;
22 import static org.apache.hadoop.hbase.regionserver.TestHRegion.putData;
23 import static org.apache.hadoop.hbase.regionserver.TestHRegion.verifyData;
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertFalse;
26 import static org.junit.Assert.assertNotNull;
27 import static org.junit.Assert.assertNull;
28 import static org.junit.Assert.assertTrue;
29 import static org.junit.Assert.fail;
30 import static org.mockito.Matchers.any;
31 import static org.mockito.Matchers.anyBoolean;
32 import static org.mockito.Mockito.mock;
33 import static org.mockito.Mockito.spy;
34 import static org.mockito.Mockito.times;
35 import static org.mockito.Mockito.verify;
36 import static org.mockito.Mockito.when;
38 import java.io.FileNotFoundException;
39 import java.io.IOException;
40 import java.util.ArrayList;
41 import java.util.List;
42 import java.util.Map;
43 import java.util.Random;
44 import java.util.UUID;
46 import org.apache.commons.logging.Log;
47 import org.apache.commons.logging.LogFactory;
48 import org.apache.hadoop.conf.Configuration;
49 import org.apache.hadoop.fs.FSDataOutputStream;
50 import org.apache.hadoop.fs.Path;
51 import org.apache.hadoop.hbase.Cell;
52 import org.apache.hadoop.hbase.CellUtil;
53 import org.apache.hadoop.hbase.HBaseTestingUtility;
54 import org.apache.hadoop.hbase.HColumnDescriptor;
55 import org.apache.hadoop.hbase.HConstants;
56 import org.apache.hadoop.hbase.HRegionInfo;
57 import org.apache.hadoop.hbase.HTableDescriptor;
58 import org.apache.hadoop.hbase.KeyValue;
59 import org.apache.hadoop.hbase.ServerName;
60 import org.apache.hadoop.hbase.TableName;
61 import org.apache.hadoop.hbase.client.Durability;
62 import org.apache.hadoop.hbase.client.Get;
63 import org.apache.hadoop.hbase.client.Put;
64 import org.apache.hadoop.hbase.executor.ExecutorService;
65 import org.apache.hadoop.hbase.io.hfile.HFile;
66 import org.apache.hadoop.hbase.io.hfile.HFileContext;
67 import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
68 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
69 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
70 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
71 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
72 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
73 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
74 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor;
75 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
76 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
77 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
78 import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl;
79 import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult;
80 import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
81 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
82 import org.apache.hadoop.hbase.testclassification.MediumTests;
83 import org.apache.hadoop.hbase.util.Bytes;
84 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
85 import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
86 import org.apache.hadoop.hbase.util.FSUtils;
87 import org.apache.hadoop.hbase.util.Pair;
88 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
89 import org.apache.hadoop.hbase.wal.WAL;
90 import org.apache.hadoop.hbase.wal.WALFactory;
91 import org.apache.hadoop.hbase.wal.WALKey;
92 import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay;
93 import org.apache.hadoop.util.StringUtils;
94 import org.junit.After;
95 import org.junit.Before;
96 import org.junit.Rule;
97 import org.junit.Test;
98 import org.junit.experimental.categories.Category;
99 import org.junit.rules.TestName;
101 import com.google.common.collect.Lists;
104 * Tests of HRegion methods for replaying flush, compaction, region open, etc events for secondary
105 * region replicas
107 @Category(MediumTests.class)
108 public class TestHRegionReplayEvents {
110 private static final Log LOG = LogFactory.getLog(TestHRegion.class);
111 @Rule public TestName name = new TestName();
113 private static HBaseTestingUtility TEST_UTIL;
115 public static Configuration CONF ;
116 private String dir;
118 private byte[][] families = new byte[][] {
119 Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")};
121 // Test names
122 protected byte[] tableName;
123 protected String method;
124 protected final byte[] row = Bytes.toBytes("rowA");
125 protected final byte[] row2 = Bytes.toBytes("rowB");
126 protected byte[] cq = Bytes.toBytes("cq");
128 // per test fields
129 private Path rootDir;
130 private HTableDescriptor htd;
131 private long time;
132 private RegionServerServices rss;
133 private HRegionInfo primaryHri, secondaryHri;
134 private HRegion primaryRegion, secondaryRegion;
135 private WALFactory wals;
136 private WAL walPrimary, walSecondary;
137 private WAL.Reader reader;
139 @Before
140 public void setup() throws IOException {
141 TEST_UTIL = HBaseTestingUtility.createLocalHTU();
142 CONF = TEST_UTIL.getConfiguration();
143 dir = TEST_UTIL.getDataTestDir("TestHRegionReplayEvents").toString();
144 method = name.getMethodName();
145 tableName = Bytes.toBytes(name.getMethodName());
146 rootDir = new Path(dir + method);
147 TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, rootDir.toString());
148 method = name.getMethodName();
150 htd = new HTableDescriptor(TableName.valueOf(method));
151 for (byte[] family : families) {
152 htd.addFamily(new HColumnDescriptor(family));
155 time = System.currentTimeMillis();
157 primaryHri = new HRegionInfo(htd.getTableName(),
158 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
159 false, time, 0);
160 secondaryHri = new HRegionInfo(htd.getTableName(),
161 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
162 false, time, 1);
164 wals = TestHRegion.createWALFactory(CONF, rootDir);
165 walPrimary = wals.getWAL(primaryHri.getEncodedNameAsBytes(),
166 primaryHri.getTable().getNamespace());
167 walSecondary = wals.getWAL(secondaryHri.getEncodedNameAsBytes(),
168 secondaryHri.getTable().getNamespace());
170 rss = mock(RegionServerServices.class);
171 when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1));
172 when(rss.getConfiguration()).thenReturn(CONF);
173 when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting(CONF));
174 String string = org.apache.hadoop.hbase.executor.EventType.RS_COMPACTED_FILES_DISCHARGER
175 .toString();
176 ExecutorService es = new ExecutorService(string);
177 es.startExecutorService(
178 string+"-"+string, 1);
179 when(rss.getExecutorService()).thenReturn(es);
180 primaryRegion = HRegion.createHRegion(primaryHri, rootDir, CONF, htd, walPrimary);
181 primaryRegion.close();
182 List<Region> regions = new ArrayList<>();
183 regions.add(primaryRegion);
184 when(rss.getOnlineRegions()).thenReturn(regions);
186 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
187 secondaryRegion = HRegion.openHRegion(secondaryHri, htd, null, CONF, rss, null);
189 reader = null;
192 @After
193 public void tearDown() throws Exception {
194 if (reader != null) {
195 reader.close();
198 if (primaryRegion != null) {
199 HBaseTestingUtility.closeRegionAndWAL(primaryRegion);
201 if (secondaryRegion != null) {
202 HBaseTestingUtility.closeRegionAndWAL(secondaryRegion);
205 EnvironmentEdgeManagerTestHelper.reset();
206 LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir());
207 TEST_UTIL.cleanupTestDir();
210 String getName() {
211 return name.getMethodName();
214 // Some of the test cases are as follows:
215 // 1. replay flush start marker again
216 // 2. replay flush with smaller seqId than what is there in memstore snapshot
217 // 3. replay flush with larger seqId than what is there in memstore snapshot
218 // 4. replay flush commit without flush prepare. non droppable memstore
219 // 5. replay flush commit without flush prepare. droppable memstore
220 // 6. replay open region event
221 // 7. replay open region event after flush start
222 // 8. replay flush form an earlier seqId (test ignoring seqIds)
223 // 9. start flush does not prevent region from closing.
225 @Test
226 public void testRegionReplicaSecondaryCannotFlush() throws IOException {
227 // load some data and flush ensure that the secondary replica will not execute the flush
229 // load some data to secondary by replaying
230 putDataByReplay(secondaryRegion, 0, 1000, cq, families);
232 verifyData(secondaryRegion, 0, 1000, cq, families);
234 // flush region
235 FlushResultImpl flush = (FlushResultImpl)secondaryRegion.flush(true);
236 assertEquals(flush.result, FlushResultImpl.Result.CANNOT_FLUSH);
238 verifyData(secondaryRegion, 0, 1000, cq, families);
240 // close the region, and inspect that it has not flushed
241 Map<byte[], List<StoreFile>> files = secondaryRegion.close(false);
242 // assert that there are no files (due to flush)
243 for (List<StoreFile> f : files.values()) {
244 assertTrue(f.isEmpty());
249 * Tests a case where we replay only a flush start marker, then the region is closed. This region
250 * should not block indefinitely
252 @Test (timeout = 60000)
253 public void testOnlyReplayingFlushStartDoesNotHoldUpRegionClose() throws IOException {
254 // load some data to primary and flush
255 int start = 0;
256 LOG.info("-- Writing some data to primary from " + start + " to " + (start+100));
257 putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families);
258 LOG.info("-- Flushing primary, creating 3 files for 3 stores");
259 primaryRegion.flush(true);
261 // now replay the edits and the flush marker
262 reader = createWALReaderForPrimary();
264 LOG.info("-- Replaying edits and flush events in secondary");
265 while (true) {
266 WAL.Entry entry = reader.next();
267 if (entry == null) {
268 break;
270 FlushDescriptor flushDesc
271 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
272 if (flushDesc != null) {
273 if (flushDesc.getAction() == FlushAction.START_FLUSH) {
274 LOG.info("-- Replaying flush start in secondary");
275 secondaryRegion.replayWALFlushStartMarker(flushDesc);
276 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
277 LOG.info("-- NOT Replaying flush commit in secondary");
279 } else {
280 replayEdit(secondaryRegion, entry);
284 assertTrue(rss.getRegionServerAccounting().getGlobalMemstoreDataSize() > 0);
285 // now close the region which should not cause hold because of un-committed flush
286 secondaryRegion.close();
288 // verify that the memstore size is back to what it was
289 assertEquals(0, rss.getRegionServerAccounting().getGlobalMemstoreDataSize());
292 static int replayEdit(HRegion region, WAL.Entry entry) throws IOException {
293 if (WALEdit.isMetaEditFamily(entry.getEdit().getCells().get(0))) {
294 return 0; // handled elsewhere
296 Put put = new Put(CellUtil.cloneRow(entry.getEdit().getCells().get(0)));
297 for (Cell cell : entry.getEdit().getCells()) put.add(cell);
298 put.setDurability(Durability.SKIP_WAL);
299 MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0);
300 region.batchReplay(new MutationReplay[] {mutation},
301 entry.getKey().getLogSeqNum());
302 return Integer.parseInt(Bytes.toString(put.getRow()));
305 WAL.Reader createWALReaderForPrimary() throws FileNotFoundException, IOException {
306 return WALFactory.createReader(TEST_UTIL.getTestFileSystem(),
307 AbstractFSWALProvider.getCurrentFileName(walPrimary),
308 TEST_UTIL.getConfiguration());
311 @Test
312 public void testReplayFlushesAndCompactions() throws IOException {
313 // initiate a secondary region with some data.
315 // load some data to primary and flush. 3 flushes and some more unflushed data
316 putDataWithFlushes(primaryRegion, 100, 300, 100);
318 // compaction from primary
319 LOG.info("-- Compacting primary, only 1 store");
320 primaryRegion.compactStore(Bytes.toBytes("cf1"),
321 NoLimitThroughputController.INSTANCE);
323 // now replay the edits and the flush marker
324 reader = createWALReaderForPrimary();
326 LOG.info("-- Replaying edits and flush events in secondary");
327 int lastReplayed = 0;
328 int expectedStoreFileCount = 0;
329 while (true) {
330 WAL.Entry entry = reader.next();
331 if (entry == null) {
332 break;
334 FlushDescriptor flushDesc
335 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
336 CompactionDescriptor compactionDesc
337 = WALEdit.getCompaction(entry.getEdit().getCells().get(0));
338 if (flushDesc != null) {
339 // first verify that everything is replayed and visible before flush event replay
340 verifyData(secondaryRegion, 0, lastReplayed, cq, families);
341 Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
342 long storeMemstoreSize = store.getMemStoreSize();
343 long regionMemstoreSize = secondaryRegion.getMemstoreSize();
344 long storeFlushableSize = store.getFlushableSize();
345 long storeSize = store.getSize();
346 long storeSizeUncompressed = store.getStoreSizeUncompressed();
347 if (flushDesc.getAction() == FlushAction.START_FLUSH) {
348 LOG.info("-- Replaying flush start in secondary");
349 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(flushDesc);
350 assertNull(result.result);
351 assertEquals(result.flushOpSeqId, flushDesc.getFlushSequenceNumber());
353 // assert that the store memstore is smaller now
354 long newStoreMemstoreSize = store.getMemStoreSize();
355 LOG.info("Memstore size reduced by:"
356 + StringUtils.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize));
357 assertTrue(storeMemstoreSize > newStoreMemstoreSize);
359 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
360 LOG.info("-- Replaying flush commit in secondary");
361 secondaryRegion.replayWALFlushCommitMarker(flushDesc);
363 // assert that the flush files are picked
364 expectedStoreFileCount++;
365 for (Store s : secondaryRegion.getStores()) {
366 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
368 long newFlushableSize = store.getFlushableSize();
369 assertTrue(storeFlushableSize > newFlushableSize);
371 // assert that the region memstore is smaller now
372 long newRegionMemstoreSize = secondaryRegion.getMemstoreSize();
373 assertTrue(regionMemstoreSize > newRegionMemstoreSize);
375 // assert that the store sizes are bigger
376 assertTrue(store.getSize() > storeSize);
377 assertTrue(store.getStoreSizeUncompressed() > storeSizeUncompressed);
378 assertEquals(store.getSize(), store.getStorefilesSize());
380 // after replay verify that everything is still visible
381 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
382 } else if (compactionDesc != null) {
383 secondaryRegion.replayWALCompactionMarker(compactionDesc, true, false, Long.MAX_VALUE);
385 // assert that the compaction is applied
386 for (Store store : secondaryRegion.getStores()) {
387 if (store.getColumnFamilyName().equals("cf1")) {
388 assertEquals(1, store.getStorefilesCount());
389 } else {
390 assertEquals(expectedStoreFileCount, store.getStorefilesCount());
393 } else {
394 lastReplayed = replayEdit(secondaryRegion, entry);;
398 assertEquals(400-1, lastReplayed);
399 LOG.info("-- Verifying edits from secondary");
400 verifyData(secondaryRegion, 0, 400, cq, families);
402 LOG.info("-- Verifying edits from primary. Ensuring that files are not deleted");
403 verifyData(primaryRegion, 0, lastReplayed, cq, families);
404 for (Store store : primaryRegion.getStores()) {
405 if (store.getColumnFamilyName().equals("cf1")) {
406 assertEquals(1, store.getStorefilesCount());
407 } else {
408 assertEquals(expectedStoreFileCount, store.getStorefilesCount());
414 * Tests cases where we prepare a flush with some seqId and we receive other flush start markers
415 * equal to, greater or less than the previous flush start marker.
417 @Test
418 public void testReplayFlushStartMarkers() throws IOException {
419 // load some data to primary and flush. 1 flush and some more unflushed data
420 putDataWithFlushes(primaryRegion, 100, 100, 100);
421 int numRows = 200;
423 // now replay the edits and the flush marker
424 reader = createWALReaderForPrimary();
426 LOG.info("-- Replaying edits and flush events in secondary");
428 FlushDescriptor startFlushDesc = null;
430 int lastReplayed = 0;
431 while (true) {
432 WAL.Entry entry = reader.next();
433 if (entry == null) {
434 break;
436 FlushDescriptor flushDesc
437 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
438 if (flushDesc != null) {
439 // first verify that everything is replayed and visible before flush event replay
440 Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
441 long storeMemstoreSize = store.getMemStoreSize();
442 long regionMemstoreSize = secondaryRegion.getMemstoreSize();
443 long storeFlushableSize = store.getFlushableSize();
445 if (flushDesc.getAction() == FlushAction.START_FLUSH) {
446 startFlushDesc = flushDesc;
447 LOG.info("-- Replaying flush start in secondary");
448 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
449 assertNull(result.result);
450 assertEquals(result.flushOpSeqId, startFlushDesc.getFlushSequenceNumber());
451 assertTrue(regionMemstoreSize > 0);
452 assertTrue(storeFlushableSize > 0);
454 // assert that the store memstore is smaller now
455 long newStoreMemstoreSize = store.getMemStoreSize();
456 LOG.info("Memstore size reduced by:"
457 + StringUtils.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize));
458 assertTrue(storeMemstoreSize > newStoreMemstoreSize);
459 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
462 // after replay verify that everything is still visible
463 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
464 } else {
465 lastReplayed = replayEdit(secondaryRegion, entry);
469 // at this point, there should be some data (rows 0-100) in memstore snapshot
470 // and some more data in memstores (rows 100-200)
472 verifyData(secondaryRegion, 0, numRows, cq, families);
474 // Test case 1: replay the same flush start marker again
475 LOG.info("-- Replaying same flush start in secondary again");
476 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
477 assertNull(result); // this should return null. Ignoring the flush start marker
478 // assert that we still have prepared flush with the previous setup.
479 assertNotNull(secondaryRegion.getPrepareFlushResult());
480 assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
481 startFlushDesc.getFlushSequenceNumber());
482 assertTrue(secondaryRegion.getMemstoreSize() > 0); // memstore is not empty
483 verifyData(secondaryRegion, 0, numRows, cq, families);
485 // Test case 2: replay a flush start marker with a smaller seqId
486 FlushDescriptor startFlushDescSmallerSeqId
487 = clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() - 50);
488 LOG.info("-- Replaying same flush start in secondary again " + startFlushDescSmallerSeqId);
489 result = secondaryRegion.replayWALFlushStartMarker(startFlushDescSmallerSeqId);
490 assertNull(result); // this should return null. Ignoring the flush start marker
491 // assert that we still have prepared flush with the previous setup.
492 assertNotNull(secondaryRegion.getPrepareFlushResult());
493 assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
494 startFlushDesc.getFlushSequenceNumber());
495 assertTrue(secondaryRegion.getMemstoreSize() > 0); // memstore is not empty
496 verifyData(secondaryRegion, 0, numRows, cq, families);
498 // Test case 3: replay a flush start marker with a larger seqId
499 FlushDescriptor startFlushDescLargerSeqId
500 = clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() + 50);
501 LOG.info("-- Replaying same flush start in secondary again " + startFlushDescLargerSeqId);
502 result = secondaryRegion.replayWALFlushStartMarker(startFlushDescLargerSeqId);
503 assertNull(result); // this should return null. Ignoring the flush start marker
504 // assert that we still have prepared flush with the previous setup.
505 assertNotNull(secondaryRegion.getPrepareFlushResult());
506 assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
507 startFlushDesc.getFlushSequenceNumber());
508 assertTrue(secondaryRegion.getMemstoreSize() > 0); // memstore is not empty
509 verifyData(secondaryRegion, 0, numRows, cq, families);
511 LOG.info("-- Verifying edits from secondary");
512 verifyData(secondaryRegion, 0, numRows, cq, families);
514 LOG.info("-- Verifying edits from primary.");
515 verifyData(primaryRegion, 0, numRows, cq, families);
519 * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker
520 * less than the previous flush start marker.
522 @Test
523 public void testReplayFlushCommitMarkerSmallerThanFlushStartMarker() throws IOException {
524 // load some data to primary and flush. 2 flushes and some more unflushed data
525 putDataWithFlushes(primaryRegion, 100, 200, 100);
526 int numRows = 300;
528 // now replay the edits and the flush marker
529 reader = createWALReaderForPrimary();
531 LOG.info("-- Replaying edits and flush events in secondary");
532 FlushDescriptor startFlushDesc = null;
533 FlushDescriptor commitFlushDesc = null;
535 int lastReplayed = 0;
536 while (true) {
537 System.out.println(lastReplayed);
538 WAL.Entry entry = reader.next();
539 if (entry == null) {
540 break;
542 FlushDescriptor flushDesc
543 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
544 if (flushDesc != null) {
545 if (flushDesc.getAction() == FlushAction.START_FLUSH) {
546 // don't replay the first flush start marker, hold on to it, replay the second one
547 if (startFlushDesc == null) {
548 startFlushDesc = flushDesc;
549 } else {
550 LOG.info("-- Replaying flush start in secondary");
551 startFlushDesc = flushDesc;
552 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
553 assertNull(result.result);
555 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
556 // do not replay any flush commit yet
557 if (commitFlushDesc == null) {
558 commitFlushDesc = flushDesc; // hold on to the first flush commit marker
561 // after replay verify that everything is still visible
562 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
563 } else {
564 lastReplayed = replayEdit(secondaryRegion, entry);
568 // at this point, there should be some data (rows 0-200) in memstore snapshot
569 // and some more data in memstores (rows 200-300)
570 verifyData(secondaryRegion, 0, numRows, cq, families);
572 // no store files in the region
573 int expectedStoreFileCount = 0;
574 for (Store s : secondaryRegion.getStores()) {
575 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
577 long regionMemstoreSize = secondaryRegion.getMemstoreSize();
579 // Test case 1: replay the a flush commit marker smaller than what we have prepared
580 LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START"
581 + startFlushDesc);
582 assertTrue(commitFlushDesc.getFlushSequenceNumber() < startFlushDesc.getFlushSequenceNumber());
584 LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
585 secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
587 // assert that the flush files are picked
588 expectedStoreFileCount++;
589 for (Store s : secondaryRegion.getStores()) {
590 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
592 Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
593 long newFlushableSize = store.getFlushableSize();
594 assertTrue(newFlushableSize > 0); // assert that the memstore is not dropped
596 // assert that the region memstore is same as before
597 long newRegionMemstoreSize = secondaryRegion.getMemstoreSize();
598 assertEquals(regionMemstoreSize, newRegionMemstoreSize);
600 assertNotNull(secondaryRegion.getPrepareFlushResult()); // not dropped
602 LOG.info("-- Verifying edits from secondary");
603 verifyData(secondaryRegion, 0, numRows, cq, families);
605 LOG.info("-- Verifying edits from primary.");
606 verifyData(primaryRegion, 0, numRows, cq, families);
610 * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker
611 * larger than the previous flush start marker.
613 @Test
614 public void testReplayFlushCommitMarkerLargerThanFlushStartMarker() throws IOException {
615 // load some data to primary and flush. 1 flush and some more unflushed data
616 putDataWithFlushes(primaryRegion, 100, 100, 100);
617 int numRows = 200;
619 // now replay the edits and the flush marker
620 reader = createWALReaderForPrimary();
622 LOG.info("-- Replaying edits and flush events in secondary");
623 FlushDescriptor startFlushDesc = null;
624 FlushDescriptor commitFlushDesc = null;
626 int lastReplayed = 0;
627 while (true) {
628 WAL.Entry entry = reader.next();
629 if (entry == null) {
630 break;
632 FlushDescriptor flushDesc
633 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
634 if (flushDesc != null) {
635 if (flushDesc.getAction() == FlushAction.START_FLUSH) {
636 if (startFlushDesc == null) {
637 LOG.info("-- Replaying flush start in secondary");
638 startFlushDesc = flushDesc;
639 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
640 assertNull(result.result);
642 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
643 // do not replay any flush commit yet
644 // hold on to the flush commit marker but simulate a larger
645 // flush commit seqId
646 commitFlushDesc =
647 FlushDescriptor.newBuilder(flushDesc)
648 .setFlushSequenceNumber(flushDesc.getFlushSequenceNumber() + 50)
649 .build();
651 // after replay verify that everything is still visible
652 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
653 } else {
654 lastReplayed = replayEdit(secondaryRegion, entry);
658 // at this point, there should be some data (rows 0-100) in memstore snapshot
659 // and some more data in memstores (rows 100-200)
660 verifyData(secondaryRegion, 0, numRows, cq, families);
662 // no store files in the region
663 int expectedStoreFileCount = 0;
664 for (Store s : secondaryRegion.getStores()) {
665 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
667 long regionMemstoreSize = secondaryRegion.getMemstoreSize();
669 // Test case 1: replay the a flush commit marker larger than what we have prepared
670 LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START"
671 + startFlushDesc);
672 assertTrue(commitFlushDesc.getFlushSequenceNumber() > startFlushDesc.getFlushSequenceNumber());
674 LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
675 secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
677 // assert that the flush files are picked
678 expectedStoreFileCount++;
679 for (Store s : secondaryRegion.getStores()) {
680 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
682 Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
683 long newFlushableSize = store.getFlushableSize();
684 assertTrue(newFlushableSize > 0); // assert that the memstore is not dropped
686 // assert that the region memstore is smaller than before, but not empty
687 long newRegionMemstoreSize = secondaryRegion.getMemstoreSize();
688 assertTrue(newRegionMemstoreSize > 0);
689 assertTrue(regionMemstoreSize > newRegionMemstoreSize);
691 assertNull(secondaryRegion.getPrepareFlushResult()); // prepare snapshot should be dropped
693 LOG.info("-- Verifying edits from secondary");
694 verifyData(secondaryRegion, 0, numRows, cq, families);
696 LOG.info("-- Verifying edits from primary.");
697 verifyData(primaryRegion, 0, numRows, cq, families);
701 * Tests the case where we receive a flush commit before receiving any flush prepare markers.
702 * The memstore edits should be dropped after the flush commit replay since they should be in
703 * flushed files
705 @Test
706 public void testReplayFlushCommitMarkerWithoutFlushStartMarkerDroppableMemstore()
707 throws IOException {
708 testReplayFlushCommitMarkerWithoutFlushStartMarker(true);
712 * Tests the case where we receive a flush commit before receiving any flush prepare markers.
713 * The memstore edits should be not dropped after the flush commit replay since not every edit
714 * will be in flushed files (based on seqId)
716 @Test
717 public void testReplayFlushCommitMarkerWithoutFlushStartMarkerNonDroppableMemstore()
718 throws IOException {
719 testReplayFlushCommitMarkerWithoutFlushStartMarker(false);
723 * Tests the case where we receive a flush commit before receiving any flush prepare markers
725 public void testReplayFlushCommitMarkerWithoutFlushStartMarker(boolean droppableMemstore)
726 throws IOException {
727 // load some data to primary and flush. 1 flushes and some more unflushed data.
728 // write more data after flush depending on whether droppableSnapshot
729 putDataWithFlushes(primaryRegion, 100, 100, droppableMemstore ? 0 : 100);
730 int numRows = droppableMemstore ? 100 : 200;
732 // now replay the edits and the flush marker
733 reader = createWALReaderForPrimary();
735 LOG.info("-- Replaying edits and flush events in secondary");
736 FlushDescriptor commitFlushDesc = null;
738 int lastReplayed = 0;
739 while (true) {
740 WAL.Entry entry = reader.next();
741 if (entry == null) {
742 break;
744 FlushDescriptor flushDesc
745 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
746 if (flushDesc != null) {
747 if (flushDesc.getAction() == FlushAction.START_FLUSH) {
748 // do not replay flush start marker
749 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
750 commitFlushDesc = flushDesc; // hold on to the flush commit marker
752 // after replay verify that everything is still visible
753 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
754 } else {
755 lastReplayed = replayEdit(secondaryRegion, entry);
759 // at this point, there should be some data (rows 0-200) in the memstore without snapshot
760 // and some more data in memstores (rows 100-300)
761 verifyData(secondaryRegion, 0, numRows, cq, families);
763 // no store files in the region
764 int expectedStoreFileCount = 0;
765 for (Store s : secondaryRegion.getStores()) {
766 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
768 long regionMemstoreSize = secondaryRegion.getMemstoreSize();
770 // Test case 1: replay a flush commit marker without start flush marker
771 assertNull(secondaryRegion.getPrepareFlushResult());
772 assertTrue(commitFlushDesc.getFlushSequenceNumber() > 0);
774 // ensure all files are visible in secondary
775 for (Store store : secondaryRegion.getStores()) {
776 assertTrue(store.getMaxSequenceId() <= secondaryRegion.getReadPoint(null));
779 LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
780 secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
782 // assert that the flush files are picked
783 expectedStoreFileCount++;
784 for (Store s : secondaryRegion.getStores()) {
785 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
787 Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
788 long newFlushableSize = store.getFlushableSize();
789 if (droppableMemstore) {
790 assertTrue(newFlushableSize == 0); // assert that the memstore is dropped
791 } else {
792 assertTrue(newFlushableSize > 0); // assert that the memstore is not dropped
795 // assert that the region memstore is same as before (we could not drop)
796 long newRegionMemstoreSize = secondaryRegion.getMemstoreSize();
797 if (droppableMemstore) {
798 assertTrue(0 == newRegionMemstoreSize);
799 } else {
800 assertTrue(regionMemstoreSize == newRegionMemstoreSize);
803 LOG.info("-- Verifying edits from secondary");
804 verifyData(secondaryRegion, 0, numRows, cq, families);
806 LOG.info("-- Verifying edits from primary.");
807 verifyData(primaryRegion, 0, numRows, cq, families);
810 private FlushDescriptor clone(FlushDescriptor flush, long flushSeqId) {
811 return FlushDescriptor.newBuilder(flush)
812 .setFlushSequenceNumber(flushSeqId)
813 .build();
817 * Tests replaying region open markers from primary region. Checks whether the files are picked up
819 @Test
820 public void testReplayRegionOpenEvent() throws IOException {
821 putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush
822 int numRows = 100;
824 // close the region and open again.
825 primaryRegion.close();
826 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
828 // now replay the edits and the flush marker
829 reader = createWALReaderForPrimary();
830 List<RegionEventDescriptor> regionEvents = Lists.newArrayList();
832 LOG.info("-- Replaying edits and region events in secondary");
833 while (true) {
834 WAL.Entry entry = reader.next();
835 if (entry == null) {
836 break;
838 FlushDescriptor flushDesc
839 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
840 RegionEventDescriptor regionEventDesc
841 = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
843 if (flushDesc != null) {
844 // don't replay flush events
845 } else if (regionEventDesc != null) {
846 regionEvents.add(regionEventDesc);
847 } else {
848 // don't replay edits
852 // we should have 1 open, 1 close and 1 open event
853 assertEquals(3, regionEvents.size());
855 // replay the first region open event.
856 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(0));
858 // replay the close event as well
859 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(1));
861 // no store files in the region
862 int expectedStoreFileCount = 0;
863 for (Store s : secondaryRegion.getStores()) {
864 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
866 long regionMemstoreSize = secondaryRegion.getMemstoreSize();
867 assertTrue(regionMemstoreSize == 0);
869 // now replay the region open event that should contain new file locations
870 LOG.info("Testing replaying region open event " + regionEvents.get(2));
871 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2));
873 // assert that the flush files are picked
874 expectedStoreFileCount++;
875 for (Store s : secondaryRegion.getStores()) {
876 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
878 Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
879 long newFlushableSize = store.getFlushableSize();
880 assertTrue(newFlushableSize == 0);
882 // assert that the region memstore is empty
883 long newRegionMemstoreSize = secondaryRegion.getMemstoreSize();
884 assertTrue(newRegionMemstoreSize == 0);
886 assertNull(secondaryRegion.getPrepareFlushResult()); //prepare snapshot should be dropped if any
888 LOG.info("-- Verifying edits from secondary");
889 verifyData(secondaryRegion, 0, numRows, cq, families);
891 LOG.info("-- Verifying edits from primary.");
892 verifyData(primaryRegion, 0, numRows, cq, families);
896 * Tests the case where we replay a region open event after a flush start but before receiving
897 * flush commit
899 @Test
900 public void testReplayRegionOpenEventAfterFlushStart() throws IOException {
901 putDataWithFlushes(primaryRegion, 100, 100, 100);
902 int numRows = 200;
904 // close the region and open again.
905 primaryRegion.close();
906 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
908 // now replay the edits and the flush marker
909 reader = createWALReaderForPrimary();
910 List<RegionEventDescriptor> regionEvents = Lists.newArrayList();
912 LOG.info("-- Replaying edits and region events in secondary");
913 while (true) {
914 WAL.Entry entry = reader.next();
915 if (entry == null) {
916 break;
918 FlushDescriptor flushDesc
919 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
920 RegionEventDescriptor regionEventDesc
921 = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
923 if (flushDesc != null) {
924 // only replay flush start
925 if (flushDesc.getAction() == FlushAction.START_FLUSH) {
926 secondaryRegion.replayWALFlushStartMarker(flushDesc);
928 } else if (regionEventDesc != null) {
929 regionEvents.add(regionEventDesc);
930 } else {
931 replayEdit(secondaryRegion, entry);
935 // at this point, there should be some data (rows 0-100) in the memstore snapshot
936 // and some more data in memstores (rows 100-200)
937 verifyData(secondaryRegion, 0, numRows, cq, families);
939 // we should have 1 open, 1 close and 1 open event
940 assertEquals(3, regionEvents.size());
942 // no store files in the region
943 int expectedStoreFileCount = 0;
944 for (Store s : secondaryRegion.getStores()) {
945 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
948 // now replay the region open event that should contain new file locations
949 LOG.info("Testing replaying region open event " + regionEvents.get(2));
950 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2));
952 // assert that the flush files are picked
953 expectedStoreFileCount = 2; // two flushes happened
954 for (Store s : secondaryRegion.getStores()) {
955 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
957 Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
958 MemstoreSize newSnapshotSize = store.getSizeOfSnapshot();
959 assertTrue(newSnapshotSize.getDataSize() == 0);
961 // assert that the region memstore is empty
962 long newRegionMemstoreSize = secondaryRegion.getMemstoreSize();
963 assertTrue(newRegionMemstoreSize == 0);
965 assertNull(secondaryRegion.getPrepareFlushResult()); //prepare snapshot should be dropped if any
967 LOG.info("-- Verifying edits from secondary");
968 verifyData(secondaryRegion, 0, numRows, cq, families);
970 LOG.info("-- Verifying edits from primary.");
971 verifyData(primaryRegion, 0, numRows, cq, families);
975 * Tests whether edits coming in for replay are skipped which have smaller seq id than the seqId
976 * of the last replayed region open event.
978 @Test
979 public void testSkippingEditsWithSmallerSeqIdAfterRegionOpenEvent() throws IOException {
980 putDataWithFlushes(primaryRegion, 100, 100, 0);
981 int numRows = 100;
983 // close the region and open again.
984 primaryRegion.close();
985 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
987 // now replay the edits and the flush marker
988 reader = createWALReaderForPrimary();
989 List<RegionEventDescriptor> regionEvents = Lists.newArrayList();
990 List<WAL.Entry> edits = Lists.newArrayList();
992 LOG.info("-- Replaying edits and region events in secondary");
993 while (true) {
994 WAL.Entry entry = reader.next();
995 if (entry == null) {
996 break;
998 FlushDescriptor flushDesc
999 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1000 RegionEventDescriptor regionEventDesc
1001 = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
1003 if (flushDesc != null) {
1004 // don't replay flushes
1005 } else if (regionEventDesc != null) {
1006 regionEvents.add(regionEventDesc);
1007 } else {
1008 edits.add(entry);
1012 // replay the region open of first open, but with the seqid of the second open
1013 // this way non of the flush files will be picked up.
1014 secondaryRegion.replayWALRegionEventMarker(
1015 RegionEventDescriptor.newBuilder(regionEvents.get(0)).setLogSequenceNumber(
1016 regionEvents.get(2).getLogSequenceNumber()).build());
1019 // replay edits from the before region close. If replay does not
1020 // skip these the following verification will NOT fail.
1021 for (WAL.Entry entry: edits) {
1022 replayEdit(secondaryRegion, entry);
1025 boolean expectedFail = false;
1026 try {
1027 verifyData(secondaryRegion, 0, numRows, cq, families);
1028 } catch (AssertionError e) {
1029 expectedFail = true; // expected
1031 if (!expectedFail) {
1032 fail("Should have failed this verification");
1036 @Test
1037 public void testReplayFlushSeqIds() throws IOException {
1038 // load some data to primary and flush
1039 int start = 0;
1040 LOG.info("-- Writing some data to primary from " + start + " to " + (start+100));
1041 putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families);
1042 LOG.info("-- Flushing primary, creating 3 files for 3 stores");
1043 primaryRegion.flush(true);
1045 // now replay the flush marker
1046 reader = createWALReaderForPrimary();
1048 long flushSeqId = -1;
1049 LOG.info("-- Replaying flush events in secondary");
1050 while (true) {
1051 WAL.Entry entry = reader.next();
1052 if (entry == null) {
1053 break;
1055 FlushDescriptor flushDesc
1056 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1057 if (flushDesc != null) {
1058 if (flushDesc.getAction() == FlushAction.START_FLUSH) {
1059 LOG.info("-- Replaying flush start in secondary");
1060 secondaryRegion.replayWALFlushStartMarker(flushDesc);
1061 flushSeqId = flushDesc.getFlushSequenceNumber();
1062 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
1063 LOG.info("-- Replaying flush commit in secondary");
1064 secondaryRegion.replayWALFlushCommitMarker(flushDesc);
1065 assertEquals(flushSeqId, flushDesc.getFlushSequenceNumber());
1068 // else do not replay
1071 // TODO: what to do with this?
1072 // assert that the newly picked up flush file is visible
1073 long readPoint = secondaryRegion.getMVCC().getReadPoint();
1074 assertEquals(flushSeqId, readPoint);
1076 // after replay verify that everything is still visible
1077 verifyData(secondaryRegion, 0, 100, cq, families);
1080 @Test
1081 public void testSeqIdsFromReplay() throws IOException {
1082 // test the case where seqId's coming from replayed WALEdits are made persisted with their
1083 // original seqIds and they are made visible through mvcc read point upon replay
1084 String method = name.getMethodName();
1085 byte[] tableName = Bytes.toBytes(method);
1086 byte[] family = Bytes.toBytes("family");
1088 HRegion region = initHRegion(tableName, method, family);
1089 try {
1090 // replay an entry that is bigger than current read point
1091 long readPoint = region.getMVCC().getReadPoint();
1092 long origSeqId = readPoint + 100;
1094 Put put = new Put(row).addColumn(family, row, row);
1095 put.setDurability(Durability.SKIP_WAL); // we replay with skip wal
1096 replay(region, put, origSeqId);
1098 // read point should have advanced to this seqId
1099 assertGet(region, family, row);
1101 // region seqId should have advanced at least to this seqId
1102 assertEquals(origSeqId, region.getReadPoint(null));
1104 // replay an entry that is smaller than current read point
1105 // caution: adding an entry below current read point might cause partial dirty reads. Normal
1106 // replay does not allow reads while replay is going on.
1107 put = new Put(row2).addColumn(family, row2, row2);
1108 put.setDurability(Durability.SKIP_WAL);
1109 replay(region, put, origSeqId - 50);
1111 assertGet(region, family, row2);
1112 } finally {
1113 region.close();
1118 * Tests that a region opened in secondary mode would not write region open / close
1119 * events to its WAL.
1120 * @throws IOException
1122 @Test
1123 public void testSecondaryRegionDoesNotWriteRegionEventsToWAL() throws IOException {
1124 secondaryRegion.close();
1125 walSecondary = spy(walSecondary);
1127 // test for region open and close
1128 secondaryRegion = HRegion.openHRegion(secondaryHri, htd, walSecondary, CONF, rss, null);
1129 verify(walSecondary, times(0)).append((HRegionInfo)any(),
1130 (WALKey)any(), (WALEdit)any(), anyBoolean());
1132 // test for replay prepare flush
1133 putDataByReplay(secondaryRegion, 0, 10, cq, families);
1134 secondaryRegion.replayWALFlushStartMarker(FlushDescriptor.newBuilder().
1135 setFlushSequenceNumber(10)
1136 .setTableName(UnsafeByteOperations.unsafeWrap(
1137 primaryRegion.getTableDesc().getTableName().getName()))
1138 .setAction(FlushAction.START_FLUSH)
1139 .setEncodedRegionName(
1140 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1141 .setRegionName(UnsafeByteOperations.unsafeWrap(
1142 primaryRegion.getRegionInfo().getRegionName()))
1143 .build());
1145 verify(walSecondary, times(0)).append((HRegionInfo)any(),
1146 (WALKey)any(), (WALEdit)any(), anyBoolean());
1148 secondaryRegion.close();
1149 verify(walSecondary, times(0)).append((HRegionInfo)any(),
1150 (WALKey)any(), (WALEdit)any(), anyBoolean());
1154 * Tests the reads enabled flag for the region. When unset all reads should be rejected
1156 @Test
1157 public void testRegionReadsEnabledFlag() throws IOException {
1159 putDataByReplay(secondaryRegion, 0, 100, cq, families);
1161 verifyData(secondaryRegion, 0, 100, cq, families);
1163 // now disable reads
1164 secondaryRegion.setReadsEnabled(false);
1165 try {
1166 verifyData(secondaryRegion, 0, 100, cq, families);
1167 fail("Should have failed with IOException");
1168 } catch(IOException ex) {
1169 // expected
1172 // verify that we can still replay data
1173 putDataByReplay(secondaryRegion, 100, 100, cq, families);
1175 // now enable reads again
1176 secondaryRegion.setReadsEnabled(true);
1177 verifyData(secondaryRegion, 0, 200, cq, families);
1181 * Tests the case where a request for flush cache is sent to the region, but region cannot flush.
1182 * It should write the flush request marker instead.
1184 @Test
1185 public void testWriteFlushRequestMarker() throws IOException {
1186 // primary region is empty at this point. Request a flush with writeFlushRequestWalMarker=false
1187 FlushResultImpl result = (FlushResultImpl)((HRegion)primaryRegion).flushcache(true, false);
1188 assertNotNull(result);
1189 assertEquals(result.result, FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY);
1190 assertFalse(result.wroteFlushWalMarker);
1192 // request flush again, but this time with writeFlushRequestWalMarker = true
1193 result = (FlushResultImpl)((HRegion)primaryRegion).flushcache(true, true);
1194 assertNotNull(result);
1195 assertEquals(result.result, FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY);
1196 assertTrue(result.wroteFlushWalMarker);
1198 List<FlushDescriptor> flushes = Lists.newArrayList();
1199 reader = createWALReaderForPrimary();
1200 while (true) {
1201 WAL.Entry entry = reader.next();
1202 if (entry == null) {
1203 break;
1205 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1206 if (flush != null) {
1207 flushes.add(flush);
1211 assertEquals(1, flushes.size());
1212 assertNotNull(flushes.get(0));
1213 assertEquals(FlushDescriptor.FlushAction.CANNOT_FLUSH, flushes.get(0).getAction());
1217 * Test the case where the secondary region replica is not in reads enabled state because it is
1218 * waiting for a flush or region open marker from primary region. Replaying CANNOT_FLUSH
1219 * flush marker entry should restore the reads enabled status in the region and allow the reads
1220 * to continue.
1222 @Test
1223 public void testReplayingFlushRequestRestoresReadsEnabledState() throws IOException {
1224 disableReads(secondaryRegion);
1226 // Test case 1: Test that replaying CANNOT_FLUSH request marker assuming this came from
1227 // triggered flush restores readsEnabled
1228 primaryRegion.flushcache(true, true);
1229 reader = createWALReaderForPrimary();
1230 while (true) {
1231 WAL.Entry entry = reader.next();
1232 if (entry == null) {
1233 break;
1235 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1236 if (flush != null) {
1237 secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getLogSeqNum());
1241 // now reads should be enabled
1242 secondaryRegion.get(new Get(Bytes.toBytes(0)));
1246 * Test the case where the secondary region replica is not in reads enabled state because it is
1247 * waiting for a flush or region open marker from primary region. Replaying flush start and commit
1248 * entries should restore the reads enabled status in the region and allow the reads
1249 * to continue.
1251 @Test
1252 public void testReplayingFlushRestoresReadsEnabledState() throws IOException {
1253 // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came
1254 // from triggered flush restores readsEnabled
1255 disableReads(secondaryRegion);
1257 // put some data in primary
1258 putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families);
1259 primaryRegion.flush(true);
1260 // I seem to need to push more edits through so the WAL flushes on local fs. This was not
1261 // needed before HBASE-15028. Not sure whats up. I can see that we have not flushed if I
1262 // look at the WAL if I pause the test here and then use WALPrettyPrinter to look at content..
1263 // Doing same check before HBASE-15028 I can see all edits flushed to the WAL. Somethings up
1264 // but can't figure it... and this is only test that seems to suffer this flush issue.
1265 // St.Ack 20160201
1266 putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families);
1268 reader = createWALReaderForPrimary();
1269 while (true) {
1270 WAL.Entry entry = reader.next();
1271 LOG.info(entry);
1272 if (entry == null) {
1273 break;
1275 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1276 if (flush != null) {
1277 secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getLogSeqNum());
1278 } else {
1279 replayEdit(secondaryRegion, entry);
1283 // now reads should be enabled
1284 verifyData(secondaryRegion, 0, 100, cq, families);
1288 * Test the case where the secondary region replica is not in reads enabled state because it is
1289 * waiting for a flush or region open marker from primary region. Replaying flush start and commit
1290 * entries should restore the reads enabled status in the region and allow the reads
1291 * to continue.
1293 @Test
1294 public void testReplayingFlushWithEmptyMemstoreRestoresReadsEnabledState() throws IOException {
1295 // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came
1296 // from triggered flush restores readsEnabled
1297 disableReads(secondaryRegion);
1299 // put some data in primary
1300 putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families);
1301 primaryRegion.flush(true);
1303 reader = createWALReaderForPrimary();
1304 while (true) {
1305 WAL.Entry entry = reader.next();
1306 if (entry == null) {
1307 break;
1309 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1310 if (flush != null) {
1311 secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getLogSeqNum());
1315 // now reads should be enabled
1316 verifyData(secondaryRegion, 0, 100, cq, families);
1320 * Test the case where the secondary region replica is not in reads enabled state because it is
1321 * waiting for a flush or region open marker from primary region. Replaying region open event
1322 * entry from primary should restore the reads enabled status in the region and allow the reads
1323 * to continue.
1325 @Test
1326 public void testReplayingRegionOpenEventRestoresReadsEnabledState() throws IOException {
1327 // Test case 3: Test that replaying region open event markers restores readsEnabled
1328 disableReads(secondaryRegion);
1330 primaryRegion.close();
1331 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
1333 reader = createWALReaderForPrimary();
1334 while (true) {
1335 WAL.Entry entry = reader.next();
1336 if (entry == null) {
1337 break;
1340 RegionEventDescriptor regionEventDesc
1341 = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
1343 if (regionEventDesc != null) {
1344 secondaryRegion.replayWALRegionEventMarker(regionEventDesc);
1348 // now reads should be enabled
1349 secondaryRegion.get(new Get(Bytes.toBytes(0)));
1352 @Test
1353 public void testRefreshStoreFiles() throws IOException {
1354 assertEquals(0, primaryRegion.getStoreFileList(families).size());
1355 assertEquals(0, secondaryRegion.getStoreFileList(families).size());
1357 // Test case 1: refresh with an empty region
1358 secondaryRegion.refreshStoreFiles();
1359 assertEquals(0, secondaryRegion.getStoreFileList(families).size());
1361 // do one flush
1362 putDataWithFlushes(primaryRegion, 100, 100, 0);
1363 int numRows = 100;
1365 // refresh the store file list, and ensure that the files are picked up.
1366 secondaryRegion.refreshStoreFiles();
1367 assertPathListsEqual(primaryRegion.getStoreFileList(families),
1368 secondaryRegion.getStoreFileList(families));
1369 assertEquals(families.length, secondaryRegion.getStoreFileList(families).size());
1371 LOG.info("-- Verifying edits from secondary");
1372 verifyData(secondaryRegion, 0, numRows, cq, families);
1374 // Test case 2: 3 some more flushes
1375 putDataWithFlushes(primaryRegion, 100, 300, 0);
1376 numRows = 300;
1378 // refresh the store file list, and ensure that the files are picked up.
1379 secondaryRegion.refreshStoreFiles();
1380 assertPathListsEqual(primaryRegion.getStoreFileList(families),
1381 secondaryRegion.getStoreFileList(families));
1382 assertEquals(families.length * 4, secondaryRegion.getStoreFileList(families).size());
1384 LOG.info("-- Verifying edits from secondary");
1385 verifyData(secondaryRegion, 0, numRows, cq, families);
1387 if (FSUtils.WINDOWS) {
1388 // compaction cannot move files while they are open in secondary on windows. Skip remaining.
1389 return;
1392 // Test case 3: compact primary files
1393 primaryRegion.compactStores();
1394 List<Region> regions = new ArrayList<>();
1395 regions.add(primaryRegion);
1396 when(rss.getOnlineRegions()).thenReturn(regions);
1397 CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, rss, false);
1398 cleaner.chore();
1399 secondaryRegion.refreshStoreFiles();
1400 assertPathListsEqual(primaryRegion.getStoreFileList(families),
1401 secondaryRegion.getStoreFileList(families));
1402 assertEquals(families.length, secondaryRegion.getStoreFileList(families).size());
1404 LOG.info("-- Verifying edits from secondary");
1405 verifyData(secondaryRegion, 0, numRows, cq, families);
1407 LOG.info("-- Replaying edits in secondary");
1409 // Test case 4: replay some edits, ensure that memstore is dropped.
1410 assertTrue(secondaryRegion.getMemstoreSize() == 0);
1411 putDataWithFlushes(primaryRegion, 400, 400, 0);
1412 numRows = 400;
1414 reader = createWALReaderForPrimary();
1415 while (true) {
1416 WAL.Entry entry = reader.next();
1417 if (entry == null) {
1418 break;
1420 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1421 if (flush != null) {
1422 // do not replay flush
1423 } else {
1424 replayEdit(secondaryRegion, entry);
1428 assertTrue(secondaryRegion.getMemstoreSize() > 0);
1430 secondaryRegion.refreshStoreFiles();
1432 assertTrue(secondaryRegion.getMemstoreSize() == 0);
1434 LOG.info("-- Verifying edits from primary");
1435 verifyData(primaryRegion, 0, numRows, cq, families);
1436 LOG.info("-- Verifying edits from secondary");
1437 verifyData(secondaryRegion, 0, numRows, cq, families);
1441 * Paths can be qualified or not. This does the assertion using String->Path conversion.
1443 private void assertPathListsEqual(List<String> list1, List<String> list2) {
1444 List<Path> l1 = new ArrayList<>(list1.size());
1445 for (String path : list1) {
1446 l1.add(Path.getPathWithoutSchemeAndAuthority(new Path(path)));
1448 List<Path> l2 = new ArrayList<>(list2.size());
1449 for (String path : list2) {
1450 l2.add(Path.getPathWithoutSchemeAndAuthority(new Path(path)));
1452 assertEquals(l1, l2);
1455 private void disableReads(HRegion region) {
1456 region.setReadsEnabled(false);
1457 try {
1458 verifyData(region, 0, 1, cq, families);
1459 fail("Should have failed with IOException");
1460 } catch(IOException ex) {
1461 // expected
1465 private void replay(HRegion region, Put put, long replaySeqId) throws IOException {
1466 put.setDurability(Durability.SKIP_WAL);
1467 MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0);
1468 region.batchReplay(new MutationReplay[] {mutation}, replaySeqId);
1472 * Tests replaying region open markers from primary region. Checks whether the files are picked up
1474 @Test
1475 public void testReplayBulkLoadEvent() throws IOException {
1476 LOG.info("testReplayBulkLoadEvent starts");
1477 putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush
1479 // close the region and open again.
1480 primaryRegion.close();
1481 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
1483 // bulk load a file into primary region
1484 Random random = new Random();
1485 byte[] randomValues = new byte[20];
1486 random.nextBytes(randomValues);
1487 Path testPath = TEST_UTIL.getDataTestDirOnTestFS();
1489 List<Pair<byte[], String>> familyPaths = new ArrayList<>();
1490 int expectedLoadFileCount = 0;
1491 for (byte[] family : families) {
1492 familyPaths.add(new Pair<>(family, createHFileForFamilies(testPath, family, randomValues)));
1493 expectedLoadFileCount++;
1495 primaryRegion.bulkLoadHFiles(familyPaths, false, null);
1497 // now replay the edits and the bulk load marker
1498 reader = createWALReaderForPrimary();
1500 LOG.info("-- Replaying edits and region events in secondary");
1501 BulkLoadDescriptor bulkloadEvent = null;
1502 while (true) {
1503 WAL.Entry entry = reader.next();
1504 if (entry == null) {
1505 break;
1507 bulkloadEvent = WALEdit.getBulkLoadDescriptor(entry.getEdit().getCells().get(0));
1508 if (bulkloadEvent != null) {
1509 break;
1513 // we should have 1 bulk load event
1514 assertTrue(bulkloadEvent != null);
1515 assertEquals(expectedLoadFileCount, bulkloadEvent.getStoresCount());
1517 // replay the bulk load event
1518 secondaryRegion.replayWALBulkLoadEventMarker(bulkloadEvent);
1521 List<String> storeFileName = new ArrayList<>();
1522 for (StoreDescriptor storeDesc : bulkloadEvent.getStoresList()) {
1523 storeFileName.addAll(storeDesc.getStoreFileList());
1525 // assert that the bulk loaded files are picked
1526 for (Store s : secondaryRegion.getStores()) {
1527 for (StoreFile sf : s.getStorefiles()) {
1528 storeFileName.remove(sf.getPath().getName());
1531 assertTrue("Found some store file isn't loaded:" + storeFileName, storeFileName.isEmpty());
1533 LOG.info("-- Verifying edits from secondary");
1534 for (byte[] family : families) {
1535 assertGet(secondaryRegion, family, randomValues);
1539 @Test
1540 public void testReplayingFlushCommitWithFileAlreadyDeleted() throws IOException {
1541 // tests replaying flush commit marker, but the flush file has already been compacted
1542 // from primary and also deleted from the archive directory
1543 secondaryRegion.replayWALFlushCommitMarker(FlushDescriptor.newBuilder().
1544 setFlushSequenceNumber(Long.MAX_VALUE)
1545 .setTableName(UnsafeByteOperations.unsafeWrap(primaryRegion.getTableDesc().getTableName().getName()))
1546 .setAction(FlushAction.COMMIT_FLUSH)
1547 .setEncodedRegionName(
1548 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1549 .setRegionName(UnsafeByteOperations.unsafeWrap(
1550 primaryRegion.getRegionInfo().getRegionName()))
1551 .addStoreFlushes(StoreFlushDescriptor.newBuilder()
1552 .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0]))
1553 .setStoreHomeDir("/store_home_dir")
1554 .addFlushOutput("/foo/baz/bar")
1555 .build())
1556 .build());
1559 @Test
1560 public void testReplayingCompactionWithFileAlreadyDeleted() throws IOException {
1561 // tests replaying compaction marker, but the compaction output file has already been compacted
1562 // from primary and also deleted from the archive directory
1563 secondaryRegion.replayWALCompactionMarker(CompactionDescriptor.newBuilder()
1564 .setTableName(UnsafeByteOperations.unsafeWrap(
1565 primaryRegion.getTableDesc().getTableName().getName()))
1566 .setEncodedRegionName(
1567 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1568 .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0]))
1569 .addCompactionInput("/foo")
1570 .addCompactionOutput("/bar")
1571 .setStoreHomeDir("/store_home_dir")
1572 .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName()))
1573 .build()
1574 , true, true, Long.MAX_VALUE);
1577 @Test
1578 public void testReplayingRegionOpenEventWithFileAlreadyDeleted() throws IOException {
1579 // tests replaying region open event marker, but the region files have already been compacted
1580 // from primary and also deleted from the archive directory
1581 secondaryRegion.replayWALRegionEventMarker(RegionEventDescriptor.newBuilder()
1582 .setTableName(UnsafeByteOperations.unsafeWrap(
1583 primaryRegion.getTableDesc().getTableName().getName()))
1584 .setEncodedRegionName(
1585 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1586 .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName()))
1587 .setEventType(EventType.REGION_OPEN)
1588 .setServer(ProtobufUtil.toServerName(ServerName.valueOf("foo", 1, 1)))
1589 .setLogSequenceNumber(Long.MAX_VALUE)
1590 .addStores(StoreDescriptor.newBuilder()
1591 .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0]))
1592 .setStoreHomeDir("/store_home_dir")
1593 .addStoreFile("/foo")
1594 .build())
1595 .build());
1598 @Test
1599 public void testReplayingBulkLoadEventWithFileAlreadyDeleted() throws IOException {
1600 // tests replaying bulk load event marker, but the bulk load files have already been compacted
1601 // from primary and also deleted from the archive directory
1602 secondaryRegion.replayWALBulkLoadEventMarker(BulkLoadDescriptor.newBuilder()
1603 .setTableName(ProtobufUtil.toProtoTableName(primaryRegion.getTableDesc().getTableName()))
1604 .setEncodedRegionName(
1605 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1606 .setBulkloadSeqNum(Long.MAX_VALUE)
1607 .addStores(StoreDescriptor.newBuilder()
1608 .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0]))
1609 .setStoreHomeDir("/store_home_dir")
1610 .addStoreFile("/foo")
1611 .build())
1612 .build());
1615 private String createHFileForFamilies(Path testPath, byte[] family,
1616 byte[] valueBytes) throws IOException {
1617 HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration());
1618 // TODO We need a way to do this without creating files
1619 Path testFile = new Path(testPath, UUID.randomUUID().toString());
1620 FSDataOutputStream out = TEST_UTIL.getTestFileSystem().create(testFile);
1621 try {
1622 hFileFactory.withOutputStream(out);
1623 hFileFactory.withFileContext(new HFileContext());
1624 HFile.Writer writer = hFileFactory.create();
1625 try {
1626 writer.append(new KeyValue(CellUtil.createCell(valueBytes, family, valueBytes, 0l,
1627 KeyValue.Type.Put.getCode(), valueBytes)));
1628 } finally {
1629 writer.close();
1631 } finally {
1632 out.close();
1634 return testFile.toString();
1637 /** Puts a total of numRows + numRowsAfterFlush records indexed with numeric row keys. Does
1638 * a flush every flushInterval number of records. Then it puts numRowsAfterFlush number of
1639 * more rows but does not execute flush after
1640 * @throws IOException */
1641 private void putDataWithFlushes(HRegion region, int flushInterval,
1642 int numRows, int numRowsAfterFlush) throws IOException {
1643 int start = 0;
1644 for (; start < numRows; start += flushInterval) {
1645 LOG.info("-- Writing some data to primary from " + start + " to " + (start+flushInterval));
1646 putData(region, Durability.SYNC_WAL, start, flushInterval, cq, families);
1647 LOG.info("-- Flushing primary, creating 3 files for 3 stores");
1648 region.flush(true);
1650 LOG.info("-- Writing some more data to primary, not flushing");
1651 putData(region, Durability.SYNC_WAL, start, numRowsAfterFlush, cq, families);
1654 private void putDataByReplay(HRegion region,
1655 int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
1656 for (int i = startRow; i < startRow + numRows; i++) {
1657 Put put = new Put(Bytes.toBytes("" + i));
1658 put.setDurability(Durability.SKIP_WAL);
1659 for (byte[] family : families) {
1660 put.addColumn(family, qf, EnvironmentEdgeManager.currentTime(), null);
1662 replay(region, put, i+1);
1666 private static HRegion initHRegion(byte[] tableName,
1667 String callingMethod, byte[]... families) throws IOException {
1668 return initHRegion(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
1669 callingMethod, TEST_UTIL.getConfiguration(), false, Durability.SYNC_WAL, null, families);
1672 private static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey,
1673 String callingMethod, Configuration conf, boolean isReadOnly, Durability durability,
1674 WAL wal, byte[]... families) throws IOException {
1675 return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, callingMethod, conf,
1676 isReadOnly, durability, wal, families);