2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
.regionserver
;
21 import static org
.apache
.hadoop
.hbase
.regionserver
.TestHRegion
.assertGet
;
22 import static org
.apache
.hadoop
.hbase
.regionserver
.TestHRegion
.putData
;
23 import static org
.apache
.hadoop
.hbase
.regionserver
.TestHRegion
.verifyData
;
24 import static org
.junit
.Assert
.assertEquals
;
25 import static org
.junit
.Assert
.assertFalse
;
26 import static org
.junit
.Assert
.assertNotNull
;
27 import static org
.junit
.Assert
.assertNull
;
28 import static org
.junit
.Assert
.assertTrue
;
29 import static org
.junit
.Assert
.fail
;
30 import static org
.mockito
.Matchers
.any
;
31 import static org
.mockito
.Matchers
.anyBoolean
;
32 import static org
.mockito
.Mockito
.mock
;
33 import static org
.mockito
.Mockito
.spy
;
34 import static org
.mockito
.Mockito
.times
;
35 import static org
.mockito
.Mockito
.verify
;
36 import static org
.mockito
.Mockito
.when
;
38 import java
.io
.FileNotFoundException
;
39 import java
.io
.IOException
;
40 import java
.util
.ArrayList
;
41 import java
.util
.List
;
43 import java
.util
.Random
;
44 import java
.util
.UUID
;
46 import org
.apache
.commons
.logging
.Log
;
47 import org
.apache
.commons
.logging
.LogFactory
;
48 import org
.apache
.hadoop
.conf
.Configuration
;
49 import org
.apache
.hadoop
.fs
.FSDataOutputStream
;
50 import org
.apache
.hadoop
.fs
.Path
;
51 import org
.apache
.hadoop
.hbase
.Cell
;
52 import org
.apache
.hadoop
.hbase
.CellUtil
;
53 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
54 import org
.apache
.hadoop
.hbase
.HColumnDescriptor
;
55 import org
.apache
.hadoop
.hbase
.HConstants
;
56 import org
.apache
.hadoop
.hbase
.HRegionInfo
;
57 import org
.apache
.hadoop
.hbase
.HTableDescriptor
;
58 import org
.apache
.hadoop
.hbase
.KeyValue
;
59 import org
.apache
.hadoop
.hbase
.ServerName
;
60 import org
.apache
.hadoop
.hbase
.TableName
;
61 import org
.apache
.hadoop
.hbase
.client
.Durability
;
62 import org
.apache
.hadoop
.hbase
.client
.Get
;
63 import org
.apache
.hadoop
.hbase
.client
.Put
;
64 import org
.apache
.hadoop
.hbase
.executor
.ExecutorService
;
65 import org
.apache
.hadoop
.hbase
.io
.hfile
.HFile
;
66 import org
.apache
.hadoop
.hbase
.io
.hfile
.HFileContext
;
67 import org
.apache
.hadoop
.hbase
.shaded
.com
.google
.protobuf
.UnsafeByteOperations
;
68 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ProtobufUtil
;
69 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.MutationProto
.MutationType
;
70 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.WALProtos
.BulkLoadDescriptor
;
71 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.WALProtos
.CompactionDescriptor
;
72 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.WALProtos
.FlushDescriptor
;
73 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.WALProtos
.FlushDescriptor
.FlushAction
;
74 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.WALProtos
.FlushDescriptor
.StoreFlushDescriptor
;
75 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.WALProtos
.RegionEventDescriptor
;
76 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.WALProtos
.RegionEventDescriptor
.EventType
;
77 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.WALProtos
.StoreDescriptor
;
78 import org
.apache
.hadoop
.hbase
.regionserver
.HRegion
.FlushResultImpl
;
79 import org
.apache
.hadoop
.hbase
.regionserver
.HRegion
.PrepareFlushResult
;
80 import org
.apache
.hadoop
.hbase
.regionserver
.throttle
.NoLimitThroughputController
;
81 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.WALEdit
;
82 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
83 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
84 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
85 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManagerTestHelper
;
86 import org
.apache
.hadoop
.hbase
.util
.FSUtils
;
87 import org
.apache
.hadoop
.hbase
.util
.Pair
;
88 import org
.apache
.hadoop
.hbase
.wal
.AbstractFSWALProvider
;
89 import org
.apache
.hadoop
.hbase
.wal
.WAL
;
90 import org
.apache
.hadoop
.hbase
.wal
.WALFactory
;
91 import org
.apache
.hadoop
.hbase
.wal
.WALKey
;
92 import org
.apache
.hadoop
.hbase
.wal
.WALSplitter
.MutationReplay
;
93 import org
.apache
.hadoop
.util
.StringUtils
;
94 import org
.junit
.After
;
95 import org
.junit
.Before
;
96 import org
.junit
.Rule
;
97 import org
.junit
.Test
;
98 import org
.junit
.experimental
.categories
.Category
;
99 import org
.junit
.rules
.TestName
;
101 import com
.google
.common
.collect
.Lists
;
104 * Tests of HRegion methods for replaying flush, compaction, region open, etc events for secondary
107 @Category(MediumTests
.class)
108 public class TestHRegionReplayEvents
{
110 private static final Log LOG
= LogFactory
.getLog(TestHRegion
.class);
111 @Rule public TestName name
= new TestName();
113 private static HBaseTestingUtility TEST_UTIL
;
115 public static Configuration CONF
;
118 private byte[][] families
= new byte[][] {
119 Bytes
.toBytes("cf1"), Bytes
.toBytes("cf2"), Bytes
.toBytes("cf3")};
122 protected byte[] tableName
;
123 protected String method
;
124 protected final byte[] row
= Bytes
.toBytes("rowA");
125 protected final byte[] row2
= Bytes
.toBytes("rowB");
126 protected byte[] cq
= Bytes
.toBytes("cq");
129 private Path rootDir
;
130 private HTableDescriptor htd
;
132 private RegionServerServices rss
;
133 private HRegionInfo primaryHri
, secondaryHri
;
134 private HRegion primaryRegion
, secondaryRegion
;
135 private WALFactory wals
;
136 private WAL walPrimary
, walSecondary
;
137 private WAL
.Reader reader
;
140 public void setup() throws IOException
{
141 TEST_UTIL
= HBaseTestingUtility
.createLocalHTU();
142 CONF
= TEST_UTIL
.getConfiguration();
143 dir
= TEST_UTIL
.getDataTestDir("TestHRegionReplayEvents").toString();
144 method
= name
.getMethodName();
145 tableName
= Bytes
.toBytes(name
.getMethodName());
146 rootDir
= new Path(dir
+ method
);
147 TEST_UTIL
.getConfiguration().set(HConstants
.HBASE_DIR
, rootDir
.toString());
148 method
= name
.getMethodName();
150 htd
= new HTableDescriptor(TableName
.valueOf(method
));
151 for (byte[] family
: families
) {
152 htd
.addFamily(new HColumnDescriptor(family
));
155 time
= System
.currentTimeMillis();
157 primaryHri
= new HRegionInfo(htd
.getTableName(),
158 HConstants
.EMPTY_START_ROW
, HConstants
.EMPTY_END_ROW
,
160 secondaryHri
= new HRegionInfo(htd
.getTableName(),
161 HConstants
.EMPTY_START_ROW
, HConstants
.EMPTY_END_ROW
,
164 wals
= TestHRegion
.createWALFactory(CONF
, rootDir
);
165 walPrimary
= wals
.getWAL(primaryHri
.getEncodedNameAsBytes(),
166 primaryHri
.getTable().getNamespace());
167 walSecondary
= wals
.getWAL(secondaryHri
.getEncodedNameAsBytes(),
168 secondaryHri
.getTable().getNamespace());
170 rss
= mock(RegionServerServices
.class);
171 when(rss
.getServerName()).thenReturn(ServerName
.valueOf("foo", 1, 1));
172 when(rss
.getConfiguration()).thenReturn(CONF
);
173 when(rss
.getRegionServerAccounting()).thenReturn(new RegionServerAccounting(CONF
));
174 String string
= org
.apache
.hadoop
.hbase
.executor
.EventType
.RS_COMPACTED_FILES_DISCHARGER
176 ExecutorService es
= new ExecutorService(string
);
177 es
.startExecutorService(
178 string
+"-"+string
, 1);
179 when(rss
.getExecutorService()).thenReturn(es
);
180 primaryRegion
= HRegion
.createHRegion(primaryHri
, rootDir
, CONF
, htd
, walPrimary
);
181 primaryRegion
.close();
182 List
<Region
> regions
= new ArrayList
<>();
183 regions
.add(primaryRegion
);
184 when(rss
.getOnlineRegions()).thenReturn(regions
);
186 primaryRegion
= HRegion
.openHRegion(rootDir
, primaryHri
, htd
, walPrimary
, CONF
, rss
, null);
187 secondaryRegion
= HRegion
.openHRegion(secondaryHri
, htd
, null, CONF
, rss
, null);
193 public void tearDown() throws Exception
{
194 if (reader
!= null) {
198 if (primaryRegion
!= null) {
199 HBaseTestingUtility
.closeRegionAndWAL(primaryRegion
);
201 if (secondaryRegion
!= null) {
202 HBaseTestingUtility
.closeRegionAndWAL(secondaryRegion
);
205 EnvironmentEdgeManagerTestHelper
.reset();
206 LOG
.info("Cleaning test directory: " + TEST_UTIL
.getDataTestDir());
207 TEST_UTIL
.cleanupTestDir();
211 return name
.getMethodName();
214 // Some of the test cases are as follows:
215 // 1. replay flush start marker again
216 // 2. replay flush with smaller seqId than what is there in memstore snapshot
217 // 3. replay flush with larger seqId than what is there in memstore snapshot
218 // 4. replay flush commit without flush prepare. non droppable memstore
219 // 5. replay flush commit without flush prepare. droppable memstore
220 // 6. replay open region event
221 // 7. replay open region event after flush start
222 // 8. replay flush form an earlier seqId (test ignoring seqIds)
223 // 9. start flush does not prevent region from closing.
226 public void testRegionReplicaSecondaryCannotFlush() throws IOException
{
227 // load some data and flush ensure that the secondary replica will not execute the flush
229 // load some data to secondary by replaying
230 putDataByReplay(secondaryRegion
, 0, 1000, cq
, families
);
232 verifyData(secondaryRegion
, 0, 1000, cq
, families
);
235 FlushResultImpl flush
= (FlushResultImpl
)secondaryRegion
.flush(true);
236 assertEquals(flush
.result
, FlushResultImpl
.Result
.CANNOT_FLUSH
);
238 verifyData(secondaryRegion
, 0, 1000, cq
, families
);
240 // close the region, and inspect that it has not flushed
241 Map
<byte[], List
<StoreFile
>> files
= secondaryRegion
.close(false);
242 // assert that there are no files (due to flush)
243 for (List
<StoreFile
> f
: files
.values()) {
244 assertTrue(f
.isEmpty());
249 * Tests a case where we replay only a flush start marker, then the region is closed. This region
250 * should not block indefinitely
252 @Test (timeout
= 60000)
253 public void testOnlyReplayingFlushStartDoesNotHoldUpRegionClose() throws IOException
{
254 // load some data to primary and flush
256 LOG
.info("-- Writing some data to primary from " + start
+ " to " + (start
+100));
257 putData(primaryRegion
, Durability
.SYNC_WAL
, start
, 100, cq
, families
);
258 LOG
.info("-- Flushing primary, creating 3 files for 3 stores");
259 primaryRegion
.flush(true);
261 // now replay the edits and the flush marker
262 reader
= createWALReaderForPrimary();
264 LOG
.info("-- Replaying edits and flush events in secondary");
266 WAL
.Entry entry
= reader
.next();
270 FlushDescriptor flushDesc
271 = WALEdit
.getFlushDescriptor(entry
.getEdit().getCells().get(0));
272 if (flushDesc
!= null) {
273 if (flushDesc
.getAction() == FlushAction
.START_FLUSH
) {
274 LOG
.info("-- Replaying flush start in secondary");
275 secondaryRegion
.replayWALFlushStartMarker(flushDesc
);
276 } else if (flushDesc
.getAction() == FlushAction
.COMMIT_FLUSH
) {
277 LOG
.info("-- NOT Replaying flush commit in secondary");
280 replayEdit(secondaryRegion
, entry
);
284 assertTrue(rss
.getRegionServerAccounting().getGlobalMemstoreDataSize() > 0);
285 // now close the region which should not cause hold because of un-committed flush
286 secondaryRegion
.close();
288 // verify that the memstore size is back to what it was
289 assertEquals(0, rss
.getRegionServerAccounting().getGlobalMemstoreDataSize());
292 static int replayEdit(HRegion region
, WAL
.Entry entry
) throws IOException
{
293 if (WALEdit
.isMetaEditFamily(entry
.getEdit().getCells().get(0))) {
294 return 0; // handled elsewhere
296 Put put
= new Put(CellUtil
.cloneRow(entry
.getEdit().getCells().get(0)));
297 for (Cell cell
: entry
.getEdit().getCells()) put
.add(cell
);
298 put
.setDurability(Durability
.SKIP_WAL
);
299 MutationReplay mutation
= new MutationReplay(MutationType
.PUT
, put
, 0, 0);
300 region
.batchReplay(new MutationReplay
[] {mutation
},
301 entry
.getKey().getLogSeqNum());
302 return Integer
.parseInt(Bytes
.toString(put
.getRow()));
305 WAL
.Reader
createWALReaderForPrimary() throws FileNotFoundException
, IOException
{
306 return WALFactory
.createReader(TEST_UTIL
.getTestFileSystem(),
307 AbstractFSWALProvider
.getCurrentFileName(walPrimary
),
308 TEST_UTIL
.getConfiguration());
312 public void testReplayFlushesAndCompactions() throws IOException
{
313 // initiate a secondary region with some data.
315 // load some data to primary and flush. 3 flushes and some more unflushed data
316 putDataWithFlushes(primaryRegion
, 100, 300, 100);
318 // compaction from primary
319 LOG
.info("-- Compacting primary, only 1 store");
320 primaryRegion
.compactStore(Bytes
.toBytes("cf1"),
321 NoLimitThroughputController
.INSTANCE
);
323 // now replay the edits and the flush marker
324 reader
= createWALReaderForPrimary();
326 LOG
.info("-- Replaying edits and flush events in secondary");
327 int lastReplayed
= 0;
328 int expectedStoreFileCount
= 0;
330 WAL
.Entry entry
= reader
.next();
334 FlushDescriptor flushDesc
335 = WALEdit
.getFlushDescriptor(entry
.getEdit().getCells().get(0));
336 CompactionDescriptor compactionDesc
337 = WALEdit
.getCompaction(entry
.getEdit().getCells().get(0));
338 if (flushDesc
!= null) {
339 // first verify that everything is replayed and visible before flush event replay
340 verifyData(secondaryRegion
, 0, lastReplayed
, cq
, families
);
341 Store store
= secondaryRegion
.getStore(Bytes
.toBytes("cf1"));
342 long storeMemstoreSize
= store
.getMemStoreSize();
343 long regionMemstoreSize
= secondaryRegion
.getMemstoreSize();
344 long storeFlushableSize
= store
.getFlushableSize();
345 long storeSize
= store
.getSize();
346 long storeSizeUncompressed
= store
.getStoreSizeUncompressed();
347 if (flushDesc
.getAction() == FlushAction
.START_FLUSH
) {
348 LOG
.info("-- Replaying flush start in secondary");
349 PrepareFlushResult result
= secondaryRegion
.replayWALFlushStartMarker(flushDesc
);
350 assertNull(result
.result
);
351 assertEquals(result
.flushOpSeqId
, flushDesc
.getFlushSequenceNumber());
353 // assert that the store memstore is smaller now
354 long newStoreMemstoreSize
= store
.getMemStoreSize();
355 LOG
.info("Memstore size reduced by:"
356 + StringUtils
.humanReadableInt(newStoreMemstoreSize
- storeMemstoreSize
));
357 assertTrue(storeMemstoreSize
> newStoreMemstoreSize
);
359 } else if (flushDesc
.getAction() == FlushAction
.COMMIT_FLUSH
) {
360 LOG
.info("-- Replaying flush commit in secondary");
361 secondaryRegion
.replayWALFlushCommitMarker(flushDesc
);
363 // assert that the flush files are picked
364 expectedStoreFileCount
++;
365 for (Store s
: secondaryRegion
.getStores()) {
366 assertEquals(expectedStoreFileCount
, s
.getStorefilesCount());
368 long newFlushableSize
= store
.getFlushableSize();
369 assertTrue(storeFlushableSize
> newFlushableSize
);
371 // assert that the region memstore is smaller now
372 long newRegionMemstoreSize
= secondaryRegion
.getMemstoreSize();
373 assertTrue(regionMemstoreSize
> newRegionMemstoreSize
);
375 // assert that the store sizes are bigger
376 assertTrue(store
.getSize() > storeSize
);
377 assertTrue(store
.getStoreSizeUncompressed() > storeSizeUncompressed
);
378 assertEquals(store
.getSize(), store
.getStorefilesSize());
380 // after replay verify that everything is still visible
381 verifyData(secondaryRegion
, 0, lastReplayed
+1, cq
, families
);
382 } else if (compactionDesc
!= null) {
383 secondaryRegion
.replayWALCompactionMarker(compactionDesc
, true, false, Long
.MAX_VALUE
);
385 // assert that the compaction is applied
386 for (Store store
: secondaryRegion
.getStores()) {
387 if (store
.getColumnFamilyName().equals("cf1")) {
388 assertEquals(1, store
.getStorefilesCount());
390 assertEquals(expectedStoreFileCount
, store
.getStorefilesCount());
394 lastReplayed
= replayEdit(secondaryRegion
, entry
);;
398 assertEquals(400-1, lastReplayed
);
399 LOG
.info("-- Verifying edits from secondary");
400 verifyData(secondaryRegion
, 0, 400, cq
, families
);
402 LOG
.info("-- Verifying edits from primary. Ensuring that files are not deleted");
403 verifyData(primaryRegion
, 0, lastReplayed
, cq
, families
);
404 for (Store store
: primaryRegion
.getStores()) {
405 if (store
.getColumnFamilyName().equals("cf1")) {
406 assertEquals(1, store
.getStorefilesCount());
408 assertEquals(expectedStoreFileCount
, store
.getStorefilesCount());
414 * Tests cases where we prepare a flush with some seqId and we receive other flush start markers
415 * equal to, greater or less than the previous flush start marker.
418 public void testReplayFlushStartMarkers() throws IOException
{
419 // load some data to primary and flush. 1 flush and some more unflushed data
420 putDataWithFlushes(primaryRegion
, 100, 100, 100);
423 // now replay the edits and the flush marker
424 reader
= createWALReaderForPrimary();
426 LOG
.info("-- Replaying edits and flush events in secondary");
428 FlushDescriptor startFlushDesc
= null;
430 int lastReplayed
= 0;
432 WAL
.Entry entry
= reader
.next();
436 FlushDescriptor flushDesc
437 = WALEdit
.getFlushDescriptor(entry
.getEdit().getCells().get(0));
438 if (flushDesc
!= null) {
439 // first verify that everything is replayed and visible before flush event replay
440 Store store
= secondaryRegion
.getStore(Bytes
.toBytes("cf1"));
441 long storeMemstoreSize
= store
.getMemStoreSize();
442 long regionMemstoreSize
= secondaryRegion
.getMemstoreSize();
443 long storeFlushableSize
= store
.getFlushableSize();
445 if (flushDesc
.getAction() == FlushAction
.START_FLUSH
) {
446 startFlushDesc
= flushDesc
;
447 LOG
.info("-- Replaying flush start in secondary");
448 PrepareFlushResult result
= secondaryRegion
.replayWALFlushStartMarker(startFlushDesc
);
449 assertNull(result
.result
);
450 assertEquals(result
.flushOpSeqId
, startFlushDesc
.getFlushSequenceNumber());
451 assertTrue(regionMemstoreSize
> 0);
452 assertTrue(storeFlushableSize
> 0);
454 // assert that the store memstore is smaller now
455 long newStoreMemstoreSize
= store
.getMemStoreSize();
456 LOG
.info("Memstore size reduced by:"
457 + StringUtils
.humanReadableInt(newStoreMemstoreSize
- storeMemstoreSize
));
458 assertTrue(storeMemstoreSize
> newStoreMemstoreSize
);
459 verifyData(secondaryRegion
, 0, lastReplayed
+1, cq
, families
);
462 // after replay verify that everything is still visible
463 verifyData(secondaryRegion
, 0, lastReplayed
+1, cq
, families
);
465 lastReplayed
= replayEdit(secondaryRegion
, entry
);
469 // at this point, there should be some data (rows 0-100) in memstore snapshot
470 // and some more data in memstores (rows 100-200)
472 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
474 // Test case 1: replay the same flush start marker again
475 LOG
.info("-- Replaying same flush start in secondary again");
476 PrepareFlushResult result
= secondaryRegion
.replayWALFlushStartMarker(startFlushDesc
);
477 assertNull(result
); // this should return null. Ignoring the flush start marker
478 // assert that we still have prepared flush with the previous setup.
479 assertNotNull(secondaryRegion
.getPrepareFlushResult());
480 assertEquals(secondaryRegion
.getPrepareFlushResult().flushOpSeqId
,
481 startFlushDesc
.getFlushSequenceNumber());
482 assertTrue(secondaryRegion
.getMemstoreSize() > 0); // memstore is not empty
483 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
485 // Test case 2: replay a flush start marker with a smaller seqId
486 FlushDescriptor startFlushDescSmallerSeqId
487 = clone(startFlushDesc
, startFlushDesc
.getFlushSequenceNumber() - 50);
488 LOG
.info("-- Replaying same flush start in secondary again " + startFlushDescSmallerSeqId
);
489 result
= secondaryRegion
.replayWALFlushStartMarker(startFlushDescSmallerSeqId
);
490 assertNull(result
); // this should return null. Ignoring the flush start marker
491 // assert that we still have prepared flush with the previous setup.
492 assertNotNull(secondaryRegion
.getPrepareFlushResult());
493 assertEquals(secondaryRegion
.getPrepareFlushResult().flushOpSeqId
,
494 startFlushDesc
.getFlushSequenceNumber());
495 assertTrue(secondaryRegion
.getMemstoreSize() > 0); // memstore is not empty
496 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
498 // Test case 3: replay a flush start marker with a larger seqId
499 FlushDescriptor startFlushDescLargerSeqId
500 = clone(startFlushDesc
, startFlushDesc
.getFlushSequenceNumber() + 50);
501 LOG
.info("-- Replaying same flush start in secondary again " + startFlushDescLargerSeqId
);
502 result
= secondaryRegion
.replayWALFlushStartMarker(startFlushDescLargerSeqId
);
503 assertNull(result
); // this should return null. Ignoring the flush start marker
504 // assert that we still have prepared flush with the previous setup.
505 assertNotNull(secondaryRegion
.getPrepareFlushResult());
506 assertEquals(secondaryRegion
.getPrepareFlushResult().flushOpSeqId
,
507 startFlushDesc
.getFlushSequenceNumber());
508 assertTrue(secondaryRegion
.getMemstoreSize() > 0); // memstore is not empty
509 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
511 LOG
.info("-- Verifying edits from secondary");
512 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
514 LOG
.info("-- Verifying edits from primary.");
515 verifyData(primaryRegion
, 0, numRows
, cq
, families
);
519 * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker
520 * less than the previous flush start marker.
523 public void testReplayFlushCommitMarkerSmallerThanFlushStartMarker() throws IOException
{
524 // load some data to primary and flush. 2 flushes and some more unflushed data
525 putDataWithFlushes(primaryRegion
, 100, 200, 100);
528 // now replay the edits and the flush marker
529 reader
= createWALReaderForPrimary();
531 LOG
.info("-- Replaying edits and flush events in secondary");
532 FlushDescriptor startFlushDesc
= null;
533 FlushDescriptor commitFlushDesc
= null;
535 int lastReplayed
= 0;
537 System
.out
.println(lastReplayed
);
538 WAL
.Entry entry
= reader
.next();
542 FlushDescriptor flushDesc
543 = WALEdit
.getFlushDescriptor(entry
.getEdit().getCells().get(0));
544 if (flushDesc
!= null) {
545 if (flushDesc
.getAction() == FlushAction
.START_FLUSH
) {
546 // don't replay the first flush start marker, hold on to it, replay the second one
547 if (startFlushDesc
== null) {
548 startFlushDesc
= flushDesc
;
550 LOG
.info("-- Replaying flush start in secondary");
551 startFlushDesc
= flushDesc
;
552 PrepareFlushResult result
= secondaryRegion
.replayWALFlushStartMarker(startFlushDesc
);
553 assertNull(result
.result
);
555 } else if (flushDesc
.getAction() == FlushAction
.COMMIT_FLUSH
) {
556 // do not replay any flush commit yet
557 if (commitFlushDesc
== null) {
558 commitFlushDesc
= flushDesc
; // hold on to the first flush commit marker
561 // after replay verify that everything is still visible
562 verifyData(secondaryRegion
, 0, lastReplayed
+1, cq
, families
);
564 lastReplayed
= replayEdit(secondaryRegion
, entry
);
568 // at this point, there should be some data (rows 0-200) in memstore snapshot
569 // and some more data in memstores (rows 200-300)
570 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
572 // no store files in the region
573 int expectedStoreFileCount
= 0;
574 for (Store s
: secondaryRegion
.getStores()) {
575 assertEquals(expectedStoreFileCount
, s
.getStorefilesCount());
577 long regionMemstoreSize
= secondaryRegion
.getMemstoreSize();
579 // Test case 1: replay the a flush commit marker smaller than what we have prepared
580 LOG
.info("Testing replaying flush COMMIT " + commitFlushDesc
+ " on top of flush START"
582 assertTrue(commitFlushDesc
.getFlushSequenceNumber() < startFlushDesc
.getFlushSequenceNumber());
584 LOG
.info("-- Replaying flush commit in secondary" + commitFlushDesc
);
585 secondaryRegion
.replayWALFlushCommitMarker(commitFlushDesc
);
587 // assert that the flush files are picked
588 expectedStoreFileCount
++;
589 for (Store s
: secondaryRegion
.getStores()) {
590 assertEquals(expectedStoreFileCount
, s
.getStorefilesCount());
592 Store store
= secondaryRegion
.getStore(Bytes
.toBytes("cf1"));
593 long newFlushableSize
= store
.getFlushableSize();
594 assertTrue(newFlushableSize
> 0); // assert that the memstore is not dropped
596 // assert that the region memstore is same as before
597 long newRegionMemstoreSize
= secondaryRegion
.getMemstoreSize();
598 assertEquals(regionMemstoreSize
, newRegionMemstoreSize
);
600 assertNotNull(secondaryRegion
.getPrepareFlushResult()); // not dropped
602 LOG
.info("-- Verifying edits from secondary");
603 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
605 LOG
.info("-- Verifying edits from primary.");
606 verifyData(primaryRegion
, 0, numRows
, cq
, families
);
610 * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker
611 * larger than the previous flush start marker.
614 public void testReplayFlushCommitMarkerLargerThanFlushStartMarker() throws IOException
{
615 // load some data to primary and flush. 1 flush and some more unflushed data
616 putDataWithFlushes(primaryRegion
, 100, 100, 100);
619 // now replay the edits and the flush marker
620 reader
= createWALReaderForPrimary();
622 LOG
.info("-- Replaying edits and flush events in secondary");
623 FlushDescriptor startFlushDesc
= null;
624 FlushDescriptor commitFlushDesc
= null;
626 int lastReplayed
= 0;
628 WAL
.Entry entry
= reader
.next();
632 FlushDescriptor flushDesc
633 = WALEdit
.getFlushDescriptor(entry
.getEdit().getCells().get(0));
634 if (flushDesc
!= null) {
635 if (flushDesc
.getAction() == FlushAction
.START_FLUSH
) {
636 if (startFlushDesc
== null) {
637 LOG
.info("-- Replaying flush start in secondary");
638 startFlushDesc
= flushDesc
;
639 PrepareFlushResult result
= secondaryRegion
.replayWALFlushStartMarker(startFlushDesc
);
640 assertNull(result
.result
);
642 } else if (flushDesc
.getAction() == FlushAction
.COMMIT_FLUSH
) {
643 // do not replay any flush commit yet
644 // hold on to the flush commit marker but simulate a larger
645 // flush commit seqId
647 FlushDescriptor
.newBuilder(flushDesc
)
648 .setFlushSequenceNumber(flushDesc
.getFlushSequenceNumber() + 50)
651 // after replay verify that everything is still visible
652 verifyData(secondaryRegion
, 0, lastReplayed
+1, cq
, families
);
654 lastReplayed
= replayEdit(secondaryRegion
, entry
);
658 // at this point, there should be some data (rows 0-100) in memstore snapshot
659 // and some more data in memstores (rows 100-200)
660 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
662 // no store files in the region
663 int expectedStoreFileCount
= 0;
664 for (Store s
: secondaryRegion
.getStores()) {
665 assertEquals(expectedStoreFileCount
, s
.getStorefilesCount());
667 long regionMemstoreSize
= secondaryRegion
.getMemstoreSize();
669 // Test case 1: replay the a flush commit marker larger than what we have prepared
670 LOG
.info("Testing replaying flush COMMIT " + commitFlushDesc
+ " on top of flush START"
672 assertTrue(commitFlushDesc
.getFlushSequenceNumber() > startFlushDesc
.getFlushSequenceNumber());
674 LOG
.info("-- Replaying flush commit in secondary" + commitFlushDesc
);
675 secondaryRegion
.replayWALFlushCommitMarker(commitFlushDesc
);
677 // assert that the flush files are picked
678 expectedStoreFileCount
++;
679 for (Store s
: secondaryRegion
.getStores()) {
680 assertEquals(expectedStoreFileCount
, s
.getStorefilesCount());
682 Store store
= secondaryRegion
.getStore(Bytes
.toBytes("cf1"));
683 long newFlushableSize
= store
.getFlushableSize();
684 assertTrue(newFlushableSize
> 0); // assert that the memstore is not dropped
686 // assert that the region memstore is smaller than before, but not empty
687 long newRegionMemstoreSize
= secondaryRegion
.getMemstoreSize();
688 assertTrue(newRegionMemstoreSize
> 0);
689 assertTrue(regionMemstoreSize
> newRegionMemstoreSize
);
691 assertNull(secondaryRegion
.getPrepareFlushResult()); // prepare snapshot should be dropped
693 LOG
.info("-- Verifying edits from secondary");
694 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
696 LOG
.info("-- Verifying edits from primary.");
697 verifyData(primaryRegion
, 0, numRows
, cq
, families
);
701 * Tests the case where we receive a flush commit before receiving any flush prepare markers.
702 * The memstore edits should be dropped after the flush commit replay since they should be in
706 public void testReplayFlushCommitMarkerWithoutFlushStartMarkerDroppableMemstore()
708 testReplayFlushCommitMarkerWithoutFlushStartMarker(true);
712 * Tests the case where we receive a flush commit before receiving any flush prepare markers.
713 * The memstore edits should be not dropped after the flush commit replay since not every edit
714 * will be in flushed files (based on seqId)
717 public void testReplayFlushCommitMarkerWithoutFlushStartMarkerNonDroppableMemstore()
719 testReplayFlushCommitMarkerWithoutFlushStartMarker(false);
723 * Tests the case where we receive a flush commit before receiving any flush prepare markers
725 public void testReplayFlushCommitMarkerWithoutFlushStartMarker(boolean droppableMemstore
)
727 // load some data to primary and flush. 1 flushes and some more unflushed data.
728 // write more data after flush depending on whether droppableSnapshot
729 putDataWithFlushes(primaryRegion
, 100, 100, droppableMemstore ?
0 : 100);
730 int numRows
= droppableMemstore ?
100 : 200;
732 // now replay the edits and the flush marker
733 reader
= createWALReaderForPrimary();
735 LOG
.info("-- Replaying edits and flush events in secondary");
736 FlushDescriptor commitFlushDesc
= null;
738 int lastReplayed
= 0;
740 WAL
.Entry entry
= reader
.next();
744 FlushDescriptor flushDesc
745 = WALEdit
.getFlushDescriptor(entry
.getEdit().getCells().get(0));
746 if (flushDesc
!= null) {
747 if (flushDesc
.getAction() == FlushAction
.START_FLUSH
) {
748 // do not replay flush start marker
749 } else if (flushDesc
.getAction() == FlushAction
.COMMIT_FLUSH
) {
750 commitFlushDesc
= flushDesc
; // hold on to the flush commit marker
752 // after replay verify that everything is still visible
753 verifyData(secondaryRegion
, 0, lastReplayed
+1, cq
, families
);
755 lastReplayed
= replayEdit(secondaryRegion
, entry
);
759 // at this point, there should be some data (rows 0-200) in the memstore without snapshot
760 // and some more data in memstores (rows 100-300)
761 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
763 // no store files in the region
764 int expectedStoreFileCount
= 0;
765 for (Store s
: secondaryRegion
.getStores()) {
766 assertEquals(expectedStoreFileCount
, s
.getStorefilesCount());
768 long regionMemstoreSize
= secondaryRegion
.getMemstoreSize();
770 // Test case 1: replay a flush commit marker without start flush marker
771 assertNull(secondaryRegion
.getPrepareFlushResult());
772 assertTrue(commitFlushDesc
.getFlushSequenceNumber() > 0);
774 // ensure all files are visible in secondary
775 for (Store store
: secondaryRegion
.getStores()) {
776 assertTrue(store
.getMaxSequenceId() <= secondaryRegion
.getReadPoint(null));
779 LOG
.info("-- Replaying flush commit in secondary" + commitFlushDesc
);
780 secondaryRegion
.replayWALFlushCommitMarker(commitFlushDesc
);
782 // assert that the flush files are picked
783 expectedStoreFileCount
++;
784 for (Store s
: secondaryRegion
.getStores()) {
785 assertEquals(expectedStoreFileCount
, s
.getStorefilesCount());
787 Store store
= secondaryRegion
.getStore(Bytes
.toBytes("cf1"));
788 long newFlushableSize
= store
.getFlushableSize();
789 if (droppableMemstore
) {
790 assertTrue(newFlushableSize
== 0); // assert that the memstore is dropped
792 assertTrue(newFlushableSize
> 0); // assert that the memstore is not dropped
795 // assert that the region memstore is same as before (we could not drop)
796 long newRegionMemstoreSize
= secondaryRegion
.getMemstoreSize();
797 if (droppableMemstore
) {
798 assertTrue(0 == newRegionMemstoreSize
);
800 assertTrue(regionMemstoreSize
== newRegionMemstoreSize
);
803 LOG
.info("-- Verifying edits from secondary");
804 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
806 LOG
.info("-- Verifying edits from primary.");
807 verifyData(primaryRegion
, 0, numRows
, cq
, families
);
810 private FlushDescriptor
clone(FlushDescriptor flush
, long flushSeqId
) {
811 return FlushDescriptor
.newBuilder(flush
)
812 .setFlushSequenceNumber(flushSeqId
)
817 * Tests replaying region open markers from primary region. Checks whether the files are picked up
820 public void testReplayRegionOpenEvent() throws IOException
{
821 putDataWithFlushes(primaryRegion
, 100, 0, 100); // no flush
824 // close the region and open again.
825 primaryRegion
.close();
826 primaryRegion
= HRegion
.openHRegion(rootDir
, primaryHri
, htd
, walPrimary
, CONF
, rss
, null);
828 // now replay the edits and the flush marker
829 reader
= createWALReaderForPrimary();
830 List
<RegionEventDescriptor
> regionEvents
= Lists
.newArrayList();
832 LOG
.info("-- Replaying edits and region events in secondary");
834 WAL
.Entry entry
= reader
.next();
838 FlushDescriptor flushDesc
839 = WALEdit
.getFlushDescriptor(entry
.getEdit().getCells().get(0));
840 RegionEventDescriptor regionEventDesc
841 = WALEdit
.getRegionEventDescriptor(entry
.getEdit().getCells().get(0));
843 if (flushDesc
!= null) {
844 // don't replay flush events
845 } else if (regionEventDesc
!= null) {
846 regionEvents
.add(regionEventDesc
);
848 // don't replay edits
852 // we should have 1 open, 1 close and 1 open event
853 assertEquals(3, regionEvents
.size());
855 // replay the first region open event.
856 secondaryRegion
.replayWALRegionEventMarker(regionEvents
.get(0));
858 // replay the close event as well
859 secondaryRegion
.replayWALRegionEventMarker(regionEvents
.get(1));
861 // no store files in the region
862 int expectedStoreFileCount
= 0;
863 for (Store s
: secondaryRegion
.getStores()) {
864 assertEquals(expectedStoreFileCount
, s
.getStorefilesCount());
866 long regionMemstoreSize
= secondaryRegion
.getMemstoreSize();
867 assertTrue(regionMemstoreSize
== 0);
869 // now replay the region open event that should contain new file locations
870 LOG
.info("Testing replaying region open event " + regionEvents
.get(2));
871 secondaryRegion
.replayWALRegionEventMarker(regionEvents
.get(2));
873 // assert that the flush files are picked
874 expectedStoreFileCount
++;
875 for (Store s
: secondaryRegion
.getStores()) {
876 assertEquals(expectedStoreFileCount
, s
.getStorefilesCount());
878 Store store
= secondaryRegion
.getStore(Bytes
.toBytes("cf1"));
879 long newFlushableSize
= store
.getFlushableSize();
880 assertTrue(newFlushableSize
== 0);
882 // assert that the region memstore is empty
883 long newRegionMemstoreSize
= secondaryRegion
.getMemstoreSize();
884 assertTrue(newRegionMemstoreSize
== 0);
886 assertNull(secondaryRegion
.getPrepareFlushResult()); //prepare snapshot should be dropped if any
888 LOG
.info("-- Verifying edits from secondary");
889 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
891 LOG
.info("-- Verifying edits from primary.");
892 verifyData(primaryRegion
, 0, numRows
, cq
, families
);
896 * Tests the case where we replay a region open event after a flush start but before receiving
900 public void testReplayRegionOpenEventAfterFlushStart() throws IOException
{
901 putDataWithFlushes(primaryRegion
, 100, 100, 100);
904 // close the region and open again.
905 primaryRegion
.close();
906 primaryRegion
= HRegion
.openHRegion(rootDir
, primaryHri
, htd
, walPrimary
, CONF
, rss
, null);
908 // now replay the edits and the flush marker
909 reader
= createWALReaderForPrimary();
910 List
<RegionEventDescriptor
> regionEvents
= Lists
.newArrayList();
912 LOG
.info("-- Replaying edits and region events in secondary");
914 WAL
.Entry entry
= reader
.next();
918 FlushDescriptor flushDesc
919 = WALEdit
.getFlushDescriptor(entry
.getEdit().getCells().get(0));
920 RegionEventDescriptor regionEventDesc
921 = WALEdit
.getRegionEventDescriptor(entry
.getEdit().getCells().get(0));
923 if (flushDesc
!= null) {
924 // only replay flush start
925 if (flushDesc
.getAction() == FlushAction
.START_FLUSH
) {
926 secondaryRegion
.replayWALFlushStartMarker(flushDesc
);
928 } else if (regionEventDesc
!= null) {
929 regionEvents
.add(regionEventDesc
);
931 replayEdit(secondaryRegion
, entry
);
935 // at this point, there should be some data (rows 0-100) in the memstore snapshot
936 // and some more data in memstores (rows 100-200)
937 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
939 // we should have 1 open, 1 close and 1 open event
940 assertEquals(3, regionEvents
.size());
942 // no store files in the region
943 int expectedStoreFileCount
= 0;
944 for (Store s
: secondaryRegion
.getStores()) {
945 assertEquals(expectedStoreFileCount
, s
.getStorefilesCount());
948 // now replay the region open event that should contain new file locations
949 LOG
.info("Testing replaying region open event " + regionEvents
.get(2));
950 secondaryRegion
.replayWALRegionEventMarker(regionEvents
.get(2));
952 // assert that the flush files are picked
953 expectedStoreFileCount
= 2; // two flushes happened
954 for (Store s
: secondaryRegion
.getStores()) {
955 assertEquals(expectedStoreFileCount
, s
.getStorefilesCount());
957 Store store
= secondaryRegion
.getStore(Bytes
.toBytes("cf1"));
958 MemstoreSize newSnapshotSize
= store
.getSizeOfSnapshot();
959 assertTrue(newSnapshotSize
.getDataSize() == 0);
961 // assert that the region memstore is empty
962 long newRegionMemstoreSize
= secondaryRegion
.getMemstoreSize();
963 assertTrue(newRegionMemstoreSize
== 0);
965 assertNull(secondaryRegion
.getPrepareFlushResult()); //prepare snapshot should be dropped if any
967 LOG
.info("-- Verifying edits from secondary");
968 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
970 LOG
.info("-- Verifying edits from primary.");
971 verifyData(primaryRegion
, 0, numRows
, cq
, families
);
975 * Tests whether edits coming in for replay are skipped which have smaller seq id than the seqId
976 * of the last replayed region open event.
979 public void testSkippingEditsWithSmallerSeqIdAfterRegionOpenEvent() throws IOException
{
980 putDataWithFlushes(primaryRegion
, 100, 100, 0);
983 // close the region and open again.
984 primaryRegion
.close();
985 primaryRegion
= HRegion
.openHRegion(rootDir
, primaryHri
, htd
, walPrimary
, CONF
, rss
, null);
987 // now replay the edits and the flush marker
988 reader
= createWALReaderForPrimary();
989 List
<RegionEventDescriptor
> regionEvents
= Lists
.newArrayList();
990 List
<WAL
.Entry
> edits
= Lists
.newArrayList();
992 LOG
.info("-- Replaying edits and region events in secondary");
994 WAL
.Entry entry
= reader
.next();
998 FlushDescriptor flushDesc
999 = WALEdit
.getFlushDescriptor(entry
.getEdit().getCells().get(0));
1000 RegionEventDescriptor regionEventDesc
1001 = WALEdit
.getRegionEventDescriptor(entry
.getEdit().getCells().get(0));
1003 if (flushDesc
!= null) {
1004 // don't replay flushes
1005 } else if (regionEventDesc
!= null) {
1006 regionEvents
.add(regionEventDesc
);
1012 // replay the region open of first open, but with the seqid of the second open
1013 // this way non of the flush files will be picked up.
1014 secondaryRegion
.replayWALRegionEventMarker(
1015 RegionEventDescriptor
.newBuilder(regionEvents
.get(0)).setLogSequenceNumber(
1016 regionEvents
.get(2).getLogSequenceNumber()).build());
1019 // replay edits from the before region close. If replay does not
1020 // skip these the following verification will NOT fail.
1021 for (WAL
.Entry entry
: edits
) {
1022 replayEdit(secondaryRegion
, entry
);
1025 boolean expectedFail
= false;
1027 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
1028 } catch (AssertionError e
) {
1029 expectedFail
= true; // expected
1031 if (!expectedFail
) {
1032 fail("Should have failed this verification");
1037 public void testReplayFlushSeqIds() throws IOException
{
1038 // load some data to primary and flush
1040 LOG
.info("-- Writing some data to primary from " + start
+ " to " + (start
+100));
1041 putData(primaryRegion
, Durability
.SYNC_WAL
, start
, 100, cq
, families
);
1042 LOG
.info("-- Flushing primary, creating 3 files for 3 stores");
1043 primaryRegion
.flush(true);
1045 // now replay the flush marker
1046 reader
= createWALReaderForPrimary();
1048 long flushSeqId
= -1;
1049 LOG
.info("-- Replaying flush events in secondary");
1051 WAL
.Entry entry
= reader
.next();
1052 if (entry
== null) {
1055 FlushDescriptor flushDesc
1056 = WALEdit
.getFlushDescriptor(entry
.getEdit().getCells().get(0));
1057 if (flushDesc
!= null) {
1058 if (flushDesc
.getAction() == FlushAction
.START_FLUSH
) {
1059 LOG
.info("-- Replaying flush start in secondary");
1060 secondaryRegion
.replayWALFlushStartMarker(flushDesc
);
1061 flushSeqId
= flushDesc
.getFlushSequenceNumber();
1062 } else if (flushDesc
.getAction() == FlushAction
.COMMIT_FLUSH
) {
1063 LOG
.info("-- Replaying flush commit in secondary");
1064 secondaryRegion
.replayWALFlushCommitMarker(flushDesc
);
1065 assertEquals(flushSeqId
, flushDesc
.getFlushSequenceNumber());
1068 // else do not replay
1071 // TODO: what to do with this?
1072 // assert that the newly picked up flush file is visible
1073 long readPoint
= secondaryRegion
.getMVCC().getReadPoint();
1074 assertEquals(flushSeqId
, readPoint
);
1076 // after replay verify that everything is still visible
1077 verifyData(secondaryRegion
, 0, 100, cq
, families
);
1081 public void testSeqIdsFromReplay() throws IOException
{
1082 // test the case where seqId's coming from replayed WALEdits are made persisted with their
1083 // original seqIds and they are made visible through mvcc read point upon replay
1084 String method
= name
.getMethodName();
1085 byte[] tableName
= Bytes
.toBytes(method
);
1086 byte[] family
= Bytes
.toBytes("family");
1088 HRegion region
= initHRegion(tableName
, method
, family
);
1090 // replay an entry that is bigger than current read point
1091 long readPoint
= region
.getMVCC().getReadPoint();
1092 long origSeqId
= readPoint
+ 100;
1094 Put put
= new Put(row
).addColumn(family
, row
, row
);
1095 put
.setDurability(Durability
.SKIP_WAL
); // we replay with skip wal
1096 replay(region
, put
, origSeqId
);
1098 // read point should have advanced to this seqId
1099 assertGet(region
, family
, row
);
1101 // region seqId should have advanced at least to this seqId
1102 assertEquals(origSeqId
, region
.getReadPoint(null));
1104 // replay an entry that is smaller than current read point
1105 // caution: adding an entry below current read point might cause partial dirty reads. Normal
1106 // replay does not allow reads while replay is going on.
1107 put
= new Put(row2
).addColumn(family
, row2
, row2
);
1108 put
.setDurability(Durability
.SKIP_WAL
);
1109 replay(region
, put
, origSeqId
- 50);
1111 assertGet(region
, family
, row2
);
1118 * Tests that a region opened in secondary mode would not write region open / close
1119 * events to its WAL.
1120 * @throws IOException
1123 public void testSecondaryRegionDoesNotWriteRegionEventsToWAL() throws IOException
{
1124 secondaryRegion
.close();
1125 walSecondary
= spy(walSecondary
);
1127 // test for region open and close
1128 secondaryRegion
= HRegion
.openHRegion(secondaryHri
, htd
, walSecondary
, CONF
, rss
, null);
1129 verify(walSecondary
, times(0)).append((HRegionInfo
)any(),
1130 (WALKey
)any(), (WALEdit
)any(), anyBoolean());
1132 // test for replay prepare flush
1133 putDataByReplay(secondaryRegion
, 0, 10, cq
, families
);
1134 secondaryRegion
.replayWALFlushStartMarker(FlushDescriptor
.newBuilder().
1135 setFlushSequenceNumber(10)
1136 .setTableName(UnsafeByteOperations
.unsafeWrap(
1137 primaryRegion
.getTableDesc().getTableName().getName()))
1138 .setAction(FlushAction
.START_FLUSH
)
1139 .setEncodedRegionName(
1140 UnsafeByteOperations
.unsafeWrap(primaryRegion
.getRegionInfo().getEncodedNameAsBytes()))
1141 .setRegionName(UnsafeByteOperations
.unsafeWrap(
1142 primaryRegion
.getRegionInfo().getRegionName()))
1145 verify(walSecondary
, times(0)).append((HRegionInfo
)any(),
1146 (WALKey
)any(), (WALEdit
)any(), anyBoolean());
1148 secondaryRegion
.close();
1149 verify(walSecondary
, times(0)).append((HRegionInfo
)any(),
1150 (WALKey
)any(), (WALEdit
)any(), anyBoolean());
1154 * Tests the reads enabled flag for the region. When unset all reads should be rejected
1157 public void testRegionReadsEnabledFlag() throws IOException
{
1159 putDataByReplay(secondaryRegion
, 0, 100, cq
, families
);
1161 verifyData(secondaryRegion
, 0, 100, cq
, families
);
1163 // now disable reads
1164 secondaryRegion
.setReadsEnabled(false);
1166 verifyData(secondaryRegion
, 0, 100, cq
, families
);
1167 fail("Should have failed with IOException");
1168 } catch(IOException ex
) {
1172 // verify that we can still replay data
1173 putDataByReplay(secondaryRegion
, 100, 100, cq
, families
);
1175 // now enable reads again
1176 secondaryRegion
.setReadsEnabled(true);
1177 verifyData(secondaryRegion
, 0, 200, cq
, families
);
1181 * Tests the case where a request for flush cache is sent to the region, but region cannot flush.
1182 * It should write the flush request marker instead.
1185 public void testWriteFlushRequestMarker() throws IOException
{
1186 // primary region is empty at this point. Request a flush with writeFlushRequestWalMarker=false
1187 FlushResultImpl result
= (FlushResultImpl
)((HRegion
)primaryRegion
).flushcache(true, false);
1188 assertNotNull(result
);
1189 assertEquals(result
.result
, FlushResultImpl
.Result
.CANNOT_FLUSH_MEMSTORE_EMPTY
);
1190 assertFalse(result
.wroteFlushWalMarker
);
1192 // request flush again, but this time with writeFlushRequestWalMarker = true
1193 result
= (FlushResultImpl
)((HRegion
)primaryRegion
).flushcache(true, true);
1194 assertNotNull(result
);
1195 assertEquals(result
.result
, FlushResultImpl
.Result
.CANNOT_FLUSH_MEMSTORE_EMPTY
);
1196 assertTrue(result
.wroteFlushWalMarker
);
1198 List
<FlushDescriptor
> flushes
= Lists
.newArrayList();
1199 reader
= createWALReaderForPrimary();
1201 WAL
.Entry entry
= reader
.next();
1202 if (entry
== null) {
1205 FlushDescriptor flush
= WALEdit
.getFlushDescriptor(entry
.getEdit().getCells().get(0));
1206 if (flush
!= null) {
1211 assertEquals(1, flushes
.size());
1212 assertNotNull(flushes
.get(0));
1213 assertEquals(FlushDescriptor
.FlushAction
.CANNOT_FLUSH
, flushes
.get(0).getAction());
1217 * Test the case where the secondary region replica is not in reads enabled state because it is
1218 * waiting for a flush or region open marker from primary region. Replaying CANNOT_FLUSH
1219 * flush marker entry should restore the reads enabled status in the region and allow the reads
1223 public void testReplayingFlushRequestRestoresReadsEnabledState() throws IOException
{
1224 disableReads(secondaryRegion
);
1226 // Test case 1: Test that replaying CANNOT_FLUSH request marker assuming this came from
1227 // triggered flush restores readsEnabled
1228 primaryRegion
.flushcache(true, true);
1229 reader
= createWALReaderForPrimary();
1231 WAL
.Entry entry
= reader
.next();
1232 if (entry
== null) {
1235 FlushDescriptor flush
= WALEdit
.getFlushDescriptor(entry
.getEdit().getCells().get(0));
1236 if (flush
!= null) {
1237 secondaryRegion
.replayWALFlushMarker(flush
, entry
.getKey().getLogSeqNum());
1241 // now reads should be enabled
1242 secondaryRegion
.get(new Get(Bytes
.toBytes(0)));
1246 * Test the case where the secondary region replica is not in reads enabled state because it is
1247 * waiting for a flush or region open marker from primary region. Replaying flush start and commit
1248 * entries should restore the reads enabled status in the region and allow the reads
1252 public void testReplayingFlushRestoresReadsEnabledState() throws IOException
{
1253 // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came
1254 // from triggered flush restores readsEnabled
1255 disableReads(secondaryRegion
);
1257 // put some data in primary
1258 putData(primaryRegion
, Durability
.SYNC_WAL
, 0, 100, cq
, families
);
1259 primaryRegion
.flush(true);
1260 // I seem to need to push more edits through so the WAL flushes on local fs. This was not
1261 // needed before HBASE-15028. Not sure whats up. I can see that we have not flushed if I
1262 // look at the WAL if I pause the test here and then use WALPrettyPrinter to look at content..
1263 // Doing same check before HBASE-15028 I can see all edits flushed to the WAL. Somethings up
1264 // but can't figure it... and this is only test that seems to suffer this flush issue.
1266 putData(primaryRegion
, Durability
.SYNC_WAL
, 0, 100, cq
, families
);
1268 reader
= createWALReaderForPrimary();
1270 WAL
.Entry entry
= reader
.next();
1272 if (entry
== null) {
1275 FlushDescriptor flush
= WALEdit
.getFlushDescriptor(entry
.getEdit().getCells().get(0));
1276 if (flush
!= null) {
1277 secondaryRegion
.replayWALFlushMarker(flush
, entry
.getKey().getLogSeqNum());
1279 replayEdit(secondaryRegion
, entry
);
1283 // now reads should be enabled
1284 verifyData(secondaryRegion
, 0, 100, cq
, families
);
1288 * Test the case where the secondary region replica is not in reads enabled state because it is
1289 * waiting for a flush or region open marker from primary region. Replaying flush start and commit
1290 * entries should restore the reads enabled status in the region and allow the reads
1294 public void testReplayingFlushWithEmptyMemstoreRestoresReadsEnabledState() throws IOException
{
1295 // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came
1296 // from triggered flush restores readsEnabled
1297 disableReads(secondaryRegion
);
1299 // put some data in primary
1300 putData(primaryRegion
, Durability
.SYNC_WAL
, 0, 100, cq
, families
);
1301 primaryRegion
.flush(true);
1303 reader
= createWALReaderForPrimary();
1305 WAL
.Entry entry
= reader
.next();
1306 if (entry
== null) {
1309 FlushDescriptor flush
= WALEdit
.getFlushDescriptor(entry
.getEdit().getCells().get(0));
1310 if (flush
!= null) {
1311 secondaryRegion
.replayWALFlushMarker(flush
, entry
.getKey().getLogSeqNum());
1315 // now reads should be enabled
1316 verifyData(secondaryRegion
, 0, 100, cq
, families
);
1320 * Test the case where the secondary region replica is not in reads enabled state because it is
1321 * waiting for a flush or region open marker from primary region. Replaying region open event
1322 * entry from primary should restore the reads enabled status in the region and allow the reads
1326 public void testReplayingRegionOpenEventRestoresReadsEnabledState() throws IOException
{
1327 // Test case 3: Test that replaying region open event markers restores readsEnabled
1328 disableReads(secondaryRegion
);
1330 primaryRegion
.close();
1331 primaryRegion
= HRegion
.openHRegion(rootDir
, primaryHri
, htd
, walPrimary
, CONF
, rss
, null);
1333 reader
= createWALReaderForPrimary();
1335 WAL
.Entry entry
= reader
.next();
1336 if (entry
== null) {
1340 RegionEventDescriptor regionEventDesc
1341 = WALEdit
.getRegionEventDescriptor(entry
.getEdit().getCells().get(0));
1343 if (regionEventDesc
!= null) {
1344 secondaryRegion
.replayWALRegionEventMarker(regionEventDesc
);
1348 // now reads should be enabled
1349 secondaryRegion
.get(new Get(Bytes
.toBytes(0)));
1353 public void testRefreshStoreFiles() throws IOException
{
1354 assertEquals(0, primaryRegion
.getStoreFileList(families
).size());
1355 assertEquals(0, secondaryRegion
.getStoreFileList(families
).size());
1357 // Test case 1: refresh with an empty region
1358 secondaryRegion
.refreshStoreFiles();
1359 assertEquals(0, secondaryRegion
.getStoreFileList(families
).size());
1362 putDataWithFlushes(primaryRegion
, 100, 100, 0);
1365 // refresh the store file list, and ensure that the files are picked up.
1366 secondaryRegion
.refreshStoreFiles();
1367 assertPathListsEqual(primaryRegion
.getStoreFileList(families
),
1368 secondaryRegion
.getStoreFileList(families
));
1369 assertEquals(families
.length
, secondaryRegion
.getStoreFileList(families
).size());
1371 LOG
.info("-- Verifying edits from secondary");
1372 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
1374 // Test case 2: 3 some more flushes
1375 putDataWithFlushes(primaryRegion
, 100, 300, 0);
1378 // refresh the store file list, and ensure that the files are picked up.
1379 secondaryRegion
.refreshStoreFiles();
1380 assertPathListsEqual(primaryRegion
.getStoreFileList(families
),
1381 secondaryRegion
.getStoreFileList(families
));
1382 assertEquals(families
.length
* 4, secondaryRegion
.getStoreFileList(families
).size());
1384 LOG
.info("-- Verifying edits from secondary");
1385 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
1387 if (FSUtils
.WINDOWS
) {
1388 // compaction cannot move files while they are open in secondary on windows. Skip remaining.
1392 // Test case 3: compact primary files
1393 primaryRegion
.compactStores();
1394 List
<Region
> regions
= new ArrayList
<>();
1395 regions
.add(primaryRegion
);
1396 when(rss
.getOnlineRegions()).thenReturn(regions
);
1397 CompactedHFilesDischarger cleaner
= new CompactedHFilesDischarger(100, null, rss
, false);
1399 secondaryRegion
.refreshStoreFiles();
1400 assertPathListsEqual(primaryRegion
.getStoreFileList(families
),
1401 secondaryRegion
.getStoreFileList(families
));
1402 assertEquals(families
.length
, secondaryRegion
.getStoreFileList(families
).size());
1404 LOG
.info("-- Verifying edits from secondary");
1405 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
1407 LOG
.info("-- Replaying edits in secondary");
1409 // Test case 4: replay some edits, ensure that memstore is dropped.
1410 assertTrue(secondaryRegion
.getMemstoreSize() == 0);
1411 putDataWithFlushes(primaryRegion
, 400, 400, 0);
1414 reader
= createWALReaderForPrimary();
1416 WAL
.Entry entry
= reader
.next();
1417 if (entry
== null) {
1420 FlushDescriptor flush
= WALEdit
.getFlushDescriptor(entry
.getEdit().getCells().get(0));
1421 if (flush
!= null) {
1422 // do not replay flush
1424 replayEdit(secondaryRegion
, entry
);
1428 assertTrue(secondaryRegion
.getMemstoreSize() > 0);
1430 secondaryRegion
.refreshStoreFiles();
1432 assertTrue(secondaryRegion
.getMemstoreSize() == 0);
1434 LOG
.info("-- Verifying edits from primary");
1435 verifyData(primaryRegion
, 0, numRows
, cq
, families
);
1436 LOG
.info("-- Verifying edits from secondary");
1437 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
1441 * Paths can be qualified or not. This does the assertion using String->Path conversion.
1443 private void assertPathListsEqual(List
<String
> list1
, List
<String
> list2
) {
1444 List
<Path
> l1
= new ArrayList
<>(list1
.size());
1445 for (String path
: list1
) {
1446 l1
.add(Path
.getPathWithoutSchemeAndAuthority(new Path(path
)));
1448 List
<Path
> l2
= new ArrayList
<>(list2
.size());
1449 for (String path
: list2
) {
1450 l2
.add(Path
.getPathWithoutSchemeAndAuthority(new Path(path
)));
1452 assertEquals(l1
, l2
);
1455 private void disableReads(HRegion region
) {
1456 region
.setReadsEnabled(false);
1458 verifyData(region
, 0, 1, cq
, families
);
1459 fail("Should have failed with IOException");
1460 } catch(IOException ex
) {
1465 private void replay(HRegion region
, Put put
, long replaySeqId
) throws IOException
{
1466 put
.setDurability(Durability
.SKIP_WAL
);
1467 MutationReplay mutation
= new MutationReplay(MutationType
.PUT
, put
, 0, 0);
1468 region
.batchReplay(new MutationReplay
[] {mutation
}, replaySeqId
);
1472 * Tests replaying region open markers from primary region. Checks whether the files are picked up
1475 public void testReplayBulkLoadEvent() throws IOException
{
1476 LOG
.info("testReplayBulkLoadEvent starts");
1477 putDataWithFlushes(primaryRegion
, 100, 0, 100); // no flush
1479 // close the region and open again.
1480 primaryRegion
.close();
1481 primaryRegion
= HRegion
.openHRegion(rootDir
, primaryHri
, htd
, walPrimary
, CONF
, rss
, null);
1483 // bulk load a file into primary region
1484 Random random
= new Random();
1485 byte[] randomValues
= new byte[20];
1486 random
.nextBytes(randomValues
);
1487 Path testPath
= TEST_UTIL
.getDataTestDirOnTestFS();
1489 List
<Pair
<byte[], String
>> familyPaths
= new ArrayList
<>();
1490 int expectedLoadFileCount
= 0;
1491 for (byte[] family
: families
) {
1492 familyPaths
.add(new Pair
<>(family
, createHFileForFamilies(testPath
, family
, randomValues
)));
1493 expectedLoadFileCount
++;
1495 primaryRegion
.bulkLoadHFiles(familyPaths
, false, null);
1497 // now replay the edits and the bulk load marker
1498 reader
= createWALReaderForPrimary();
1500 LOG
.info("-- Replaying edits and region events in secondary");
1501 BulkLoadDescriptor bulkloadEvent
= null;
1503 WAL
.Entry entry
= reader
.next();
1504 if (entry
== null) {
1507 bulkloadEvent
= WALEdit
.getBulkLoadDescriptor(entry
.getEdit().getCells().get(0));
1508 if (bulkloadEvent
!= null) {
1513 // we should have 1 bulk load event
1514 assertTrue(bulkloadEvent
!= null);
1515 assertEquals(expectedLoadFileCount
, bulkloadEvent
.getStoresCount());
1517 // replay the bulk load event
1518 secondaryRegion
.replayWALBulkLoadEventMarker(bulkloadEvent
);
1521 List
<String
> storeFileName
= new ArrayList
<>();
1522 for (StoreDescriptor storeDesc
: bulkloadEvent
.getStoresList()) {
1523 storeFileName
.addAll(storeDesc
.getStoreFileList());
1525 // assert that the bulk loaded files are picked
1526 for (Store s
: secondaryRegion
.getStores()) {
1527 for (StoreFile sf
: s
.getStorefiles()) {
1528 storeFileName
.remove(sf
.getPath().getName());
1531 assertTrue("Found some store file isn't loaded:" + storeFileName
, storeFileName
.isEmpty());
1533 LOG
.info("-- Verifying edits from secondary");
1534 for (byte[] family
: families
) {
1535 assertGet(secondaryRegion
, family
, randomValues
);
1540 public void testReplayingFlushCommitWithFileAlreadyDeleted() throws IOException
{
1541 // tests replaying flush commit marker, but the flush file has already been compacted
1542 // from primary and also deleted from the archive directory
1543 secondaryRegion
.replayWALFlushCommitMarker(FlushDescriptor
.newBuilder().
1544 setFlushSequenceNumber(Long
.MAX_VALUE
)
1545 .setTableName(UnsafeByteOperations
.unsafeWrap(primaryRegion
.getTableDesc().getTableName().getName()))
1546 .setAction(FlushAction
.COMMIT_FLUSH
)
1547 .setEncodedRegionName(
1548 UnsafeByteOperations
.unsafeWrap(primaryRegion
.getRegionInfo().getEncodedNameAsBytes()))
1549 .setRegionName(UnsafeByteOperations
.unsafeWrap(
1550 primaryRegion
.getRegionInfo().getRegionName()))
1551 .addStoreFlushes(StoreFlushDescriptor
.newBuilder()
1552 .setFamilyName(UnsafeByteOperations
.unsafeWrap(families
[0]))
1553 .setStoreHomeDir("/store_home_dir")
1554 .addFlushOutput("/foo/baz/bar")
1560 public void testReplayingCompactionWithFileAlreadyDeleted() throws IOException
{
1561 // tests replaying compaction marker, but the compaction output file has already been compacted
1562 // from primary and also deleted from the archive directory
1563 secondaryRegion
.replayWALCompactionMarker(CompactionDescriptor
.newBuilder()
1564 .setTableName(UnsafeByteOperations
.unsafeWrap(
1565 primaryRegion
.getTableDesc().getTableName().getName()))
1566 .setEncodedRegionName(
1567 UnsafeByteOperations
.unsafeWrap(primaryRegion
.getRegionInfo().getEncodedNameAsBytes()))
1568 .setFamilyName(UnsafeByteOperations
.unsafeWrap(families
[0]))
1569 .addCompactionInput("/foo")
1570 .addCompactionOutput("/bar")
1571 .setStoreHomeDir("/store_home_dir")
1572 .setRegionName(UnsafeByteOperations
.unsafeWrap(primaryRegion
.getRegionInfo().getRegionName()))
1574 , true, true, Long
.MAX_VALUE
);
1578 public void testReplayingRegionOpenEventWithFileAlreadyDeleted() throws IOException
{
1579 // tests replaying region open event marker, but the region files have already been compacted
1580 // from primary and also deleted from the archive directory
1581 secondaryRegion
.replayWALRegionEventMarker(RegionEventDescriptor
.newBuilder()
1582 .setTableName(UnsafeByteOperations
.unsafeWrap(
1583 primaryRegion
.getTableDesc().getTableName().getName()))
1584 .setEncodedRegionName(
1585 UnsafeByteOperations
.unsafeWrap(primaryRegion
.getRegionInfo().getEncodedNameAsBytes()))
1586 .setRegionName(UnsafeByteOperations
.unsafeWrap(primaryRegion
.getRegionInfo().getRegionName()))
1587 .setEventType(EventType
.REGION_OPEN
)
1588 .setServer(ProtobufUtil
.toServerName(ServerName
.valueOf("foo", 1, 1)))
1589 .setLogSequenceNumber(Long
.MAX_VALUE
)
1590 .addStores(StoreDescriptor
.newBuilder()
1591 .setFamilyName(UnsafeByteOperations
.unsafeWrap(families
[0]))
1592 .setStoreHomeDir("/store_home_dir")
1593 .addStoreFile("/foo")
1599 public void testReplayingBulkLoadEventWithFileAlreadyDeleted() throws IOException
{
1600 // tests replaying bulk load event marker, but the bulk load files have already been compacted
1601 // from primary and also deleted from the archive directory
1602 secondaryRegion
.replayWALBulkLoadEventMarker(BulkLoadDescriptor
.newBuilder()
1603 .setTableName(ProtobufUtil
.toProtoTableName(primaryRegion
.getTableDesc().getTableName()))
1604 .setEncodedRegionName(
1605 UnsafeByteOperations
.unsafeWrap(primaryRegion
.getRegionInfo().getEncodedNameAsBytes()))
1606 .setBulkloadSeqNum(Long
.MAX_VALUE
)
1607 .addStores(StoreDescriptor
.newBuilder()
1608 .setFamilyName(UnsafeByteOperations
.unsafeWrap(families
[0]))
1609 .setStoreHomeDir("/store_home_dir")
1610 .addStoreFile("/foo")
1615 private String
createHFileForFamilies(Path testPath
, byte[] family
,
1616 byte[] valueBytes
) throws IOException
{
1617 HFile
.WriterFactory hFileFactory
= HFile
.getWriterFactoryNoCache(TEST_UTIL
.getConfiguration());
1618 // TODO We need a way to do this without creating files
1619 Path testFile
= new Path(testPath
, UUID
.randomUUID().toString());
1620 FSDataOutputStream out
= TEST_UTIL
.getTestFileSystem().create(testFile
);
1622 hFileFactory
.withOutputStream(out
);
1623 hFileFactory
.withFileContext(new HFileContext());
1624 HFile
.Writer writer
= hFileFactory
.create();
1626 writer
.append(new KeyValue(CellUtil
.createCell(valueBytes
, family
, valueBytes
, 0l,
1627 KeyValue
.Type
.Put
.getCode(), valueBytes
)));
1634 return testFile
.toString();
1637 /** Puts a total of numRows + numRowsAfterFlush records indexed with numeric row keys. Does
1638 * a flush every flushInterval number of records. Then it puts numRowsAfterFlush number of
1639 * more rows but does not execute flush after
1640 * @throws IOException */
1641 private void putDataWithFlushes(HRegion region
, int flushInterval
,
1642 int numRows
, int numRowsAfterFlush
) throws IOException
{
1644 for (; start
< numRows
; start
+= flushInterval
) {
1645 LOG
.info("-- Writing some data to primary from " + start
+ " to " + (start
+flushInterval
));
1646 putData(region
, Durability
.SYNC_WAL
, start
, flushInterval
, cq
, families
);
1647 LOG
.info("-- Flushing primary, creating 3 files for 3 stores");
1650 LOG
.info("-- Writing some more data to primary, not flushing");
1651 putData(region
, Durability
.SYNC_WAL
, start
, numRowsAfterFlush
, cq
, families
);
1654 private void putDataByReplay(HRegion region
,
1655 int startRow
, int numRows
, byte[] qf
, byte[]... families
) throws IOException
{
1656 for (int i
= startRow
; i
< startRow
+ numRows
; i
++) {
1657 Put put
= new Put(Bytes
.toBytes("" + i
));
1658 put
.setDurability(Durability
.SKIP_WAL
);
1659 for (byte[] family
: families
) {
1660 put
.addColumn(family
, qf
, EnvironmentEdgeManager
.currentTime(), null);
1662 replay(region
, put
, i
+1);
1666 private static HRegion
initHRegion(byte[] tableName
,
1667 String callingMethod
, byte[]... families
) throws IOException
{
1668 return initHRegion(tableName
, HConstants
.EMPTY_START_ROW
, HConstants
.EMPTY_END_ROW
,
1669 callingMethod
, TEST_UTIL
.getConfiguration(), false, Durability
.SYNC_WAL
, null, families
);
1672 private static HRegion
initHRegion(byte[] tableName
, byte[] startKey
, byte[] stopKey
,
1673 String callingMethod
, Configuration conf
, boolean isReadOnly
, Durability durability
,
1674 WAL wal
, byte[]... families
) throws IOException
{
1675 return TEST_UTIL
.createLocalHRegion(tableName
, startKey
, stopKey
, callingMethod
, conf
,
1676 isReadOnly
, durability
, wal
, families
);