2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.regionserver
;
20 import static org
.apache
.hadoop
.hbase
.regionserver
.TestHRegion
.assertGet
;
21 import static org
.apache
.hadoop
.hbase
.regionserver
.TestHRegion
.putData
;
22 import static org
.apache
.hadoop
.hbase
.regionserver
.TestHRegion
.verifyData
;
23 import static org
.junit
.Assert
.assertEquals
;
24 import static org
.junit
.Assert
.assertFalse
;
25 import static org
.junit
.Assert
.assertNotNull
;
26 import static org
.junit
.Assert
.assertNull
;
27 import static org
.junit
.Assert
.assertTrue
;
28 import static org
.junit
.Assert
.fail
;
29 import static org
.mockito
.ArgumentMatchers
.any
;
30 import static org
.mockito
.Mockito
.mock
;
31 import static org
.mockito
.Mockito
.spy
;
32 import static org
.mockito
.Mockito
.times
;
33 import static org
.mockito
.Mockito
.verify
;
34 import static org
.mockito
.Mockito
.when
;
36 import java
.io
.FileNotFoundException
;
37 import java
.io
.IOException
;
38 import java
.util
.ArrayList
;
39 import java
.util
.List
;
41 import java
.util
.Objects
;
42 import java
.util
.Random
;
43 import org
.apache
.hadoop
.conf
.Configuration
;
44 import org
.apache
.hadoop
.fs
.FSDataOutputStream
;
45 import org
.apache
.hadoop
.fs
.Path
;
46 import org
.apache
.hadoop
.hbase
.Cell
;
47 import org
.apache
.hadoop
.hbase
.CellBuilderType
;
48 import org
.apache
.hadoop
.hbase
.CellUtil
;
49 import org
.apache
.hadoop
.hbase
.ExtendedCellBuilderFactory
;
50 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
51 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
52 import org
.apache
.hadoop
.hbase
.HConstants
;
53 import org
.apache
.hadoop
.hbase
.KeyValue
;
54 import org
.apache
.hadoop
.hbase
.ServerName
;
55 import org
.apache
.hadoop
.hbase
.TableName
;
56 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptorBuilder
;
57 import org
.apache
.hadoop
.hbase
.client
.Durability
;
58 import org
.apache
.hadoop
.hbase
.client
.Get
;
59 import org
.apache
.hadoop
.hbase
.client
.Put
;
60 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
61 import org
.apache
.hadoop
.hbase
.client
.RegionInfoBuilder
;
62 import org
.apache
.hadoop
.hbase
.client
.TableDescriptor
;
63 import org
.apache
.hadoop
.hbase
.client
.TableDescriptorBuilder
;
64 import org
.apache
.hadoop
.hbase
.executor
.ExecutorService
;
65 import org
.apache
.hadoop
.hbase
.executor
.ExecutorType
;
66 import org
.apache
.hadoop
.hbase
.io
.hfile
.HFile
;
67 import org
.apache
.hadoop
.hbase
.io
.hfile
.HFileContextBuilder
;
68 import org
.apache
.hadoop
.hbase
.regionserver
.HRegion
.FlushResultImpl
;
69 import org
.apache
.hadoop
.hbase
.regionserver
.HRegion
.PrepareFlushResult
;
70 import org
.apache
.hadoop
.hbase
.regionserver
.throttle
.NoLimitThroughputController
;
71 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
72 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
73 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
74 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManagerTestHelper
;
75 import org
.apache
.hadoop
.hbase
.util
.FSUtils
;
76 import org
.apache
.hadoop
.hbase
.util
.Pair
;
77 import org
.apache
.hadoop
.hbase
.wal
.AbstractFSWALProvider
;
78 import org
.apache
.hadoop
.hbase
.wal
.WAL
;
79 import org
.apache
.hadoop
.hbase
.wal
.WALEdit
;
80 import org
.apache
.hadoop
.hbase
.wal
.WALFactory
;
81 import org
.apache
.hadoop
.hbase
.wal
.WALKeyImpl
;
82 import org
.apache
.hadoop
.hbase
.wal
.WALSplitUtil
.MutationReplay
;
83 import org
.apache
.hadoop
.util
.StringUtils
;
84 import org
.junit
.After
;
85 import org
.junit
.AfterClass
;
86 import org
.junit
.Before
;
87 import org
.junit
.BeforeClass
;
88 import org
.junit
.ClassRule
;
89 import org
.junit
.Rule
;
90 import org
.junit
.Test
;
91 import org
.junit
.experimental
.categories
.Category
;
92 import org
.junit
.rules
.TestName
;
93 import org
.mockito
.Mockito
;
94 import org
.slf4j
.Logger
;
95 import org
.slf4j
.LoggerFactory
;
97 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.Lists
;
98 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.UnsafeByteOperations
;
100 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ProtobufUtil
;
101 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.MutationProto
.MutationType
;
102 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.WALProtos
.BulkLoadDescriptor
;
103 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.WALProtos
.CompactionDescriptor
;
104 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.WALProtos
.FlushDescriptor
;
105 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.WALProtos
.FlushDescriptor
.FlushAction
;
106 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.WALProtos
.FlushDescriptor
.StoreFlushDescriptor
;
107 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.WALProtos
.RegionEventDescriptor
;
108 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.WALProtos
.RegionEventDescriptor
.EventType
;
109 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.WALProtos
.StoreDescriptor
;
112 * Tests of HRegion methods for replaying flush, compaction, region open, etc events for secondary
115 @SuppressWarnings("deprecation")
116 @Category(LargeTests
.class)
117 public class TestHRegionReplayEvents
{
120 public static final HBaseClassTestRule CLASS_RULE
=
121 HBaseClassTestRule
.forClass(TestHRegionReplayEvents
.class);
123 private static final Logger LOG
= LoggerFactory
.getLogger(TestHRegion
.class);
124 @Rule public TestName name
= new TestName();
126 private static HBaseTestingUtil TEST_UTIL
;
128 public static Configuration CONF
;
131 private byte[][] families
= new byte[][] {
132 Bytes
.toBytes("cf1"), Bytes
.toBytes("cf2"), Bytes
.toBytes("cf3")};
135 protected byte[] tableName
;
136 protected String method
;
137 protected final byte[] row
= Bytes
.toBytes("rowA");
138 protected final byte[] row2
= Bytes
.toBytes("rowB");
139 protected byte[] cq
= Bytes
.toBytes("cq");
142 private Path rootDir
;
143 private TableDescriptor htd
;
144 private RegionServerServices rss
;
145 private RegionInfo primaryHri
, secondaryHri
;
146 private HRegion primaryRegion
, secondaryRegion
;
147 private WAL walPrimary
, walSecondary
;
148 private WAL
.Reader reader
;
151 public static void setUpBeforeClass() throws Exception
{
152 TEST_UTIL
= new HBaseTestingUtil();
153 TEST_UTIL
.startMiniDFSCluster(1);
157 public static void tearDownAfterClass() throws Exception
{
158 LOG
.info("Cleaning test directory: " + TEST_UTIL
.getDataTestDir());
159 TEST_UTIL
.cleanupTestDir();
160 TEST_UTIL
.shutdownMiniDFSCluster();
164 public void setUp() throws Exception
{
165 CONF
= TEST_UTIL
.getConfiguration();
166 dir
= TEST_UTIL
.getDataTestDir("TestHRegionReplayEvents").toString();
167 method
= name
.getMethodName();
168 tableName
= Bytes
.toBytes(name
.getMethodName());
169 rootDir
= new Path(dir
+ method
);
170 TEST_UTIL
.getConfiguration().set(HConstants
.HBASE_DIR
, rootDir
.toString());
171 method
= name
.getMethodName();
172 TableDescriptorBuilder builder
= TableDescriptorBuilder
.newBuilder(TableName
.valueOf(method
));
173 for (byte[] family
: families
) {
174 builder
.setColumnFamily(ColumnFamilyDescriptorBuilder
.of(family
));
176 htd
= builder
.build();
178 long time
= EnvironmentEdgeManager
.currentTime();
179 ChunkCreator
.initialize(MemStoreLAB
.CHUNK_SIZE_DEFAULT
, false, 0, 0,
180 0, null, MemStoreLAB
.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT
);
182 RegionInfoBuilder
.newBuilder(htd
.getTableName()).setRegionId(time
).setReplicaId(0).build();
184 RegionInfoBuilder
.newBuilder(htd
.getTableName()).setRegionId(time
).setReplicaId(1).build();
186 WALFactory wals
= TestHRegion
.createWALFactory(CONF
, rootDir
);
187 walPrimary
= wals
.getWAL(primaryHri
);
188 walSecondary
= wals
.getWAL(secondaryHri
);
190 rss
= mock(RegionServerServices
.class);
191 when(rss
.getServerName()).thenReturn(ServerName
.valueOf("foo", 1, 1));
192 when(rss
.getConfiguration()).thenReturn(CONF
);
193 when(rss
.getRegionServerAccounting()).thenReturn(new RegionServerAccounting(CONF
));
194 String string
= org
.apache
.hadoop
.hbase
.executor
.EventType
.RS_COMPACTED_FILES_DISCHARGER
196 ExecutorService es
= new ExecutorService(string
);
197 es
.startExecutorService(es
.new ExecutorConfig().setCorePoolSize(1).setExecutorType(
198 ExecutorType
.RS_COMPACTED_FILES_DISCHARGER
));
199 when(rss
.getExecutorService()).thenReturn(es
);
200 primaryRegion
= HRegion
.createHRegion(primaryHri
, rootDir
, CONF
, htd
, walPrimary
);
201 primaryRegion
.close();
202 List
<HRegion
> regions
= new ArrayList
<>();
203 regions
.add(primaryRegion
);
204 Mockito
.doReturn(regions
).when(rss
).getRegions();
206 primaryRegion
= HRegion
.openHRegion(rootDir
, primaryHri
, htd
, walPrimary
, CONF
, rss
, null);
207 secondaryRegion
= HRegion
.openHRegion(secondaryHri
, htd
, null, CONF
, rss
, null);
213 public void tearDown() throws Exception
{
214 if (reader
!= null) {
218 if (primaryRegion
!= null) {
219 HBaseTestingUtil
.closeRegionAndWAL(primaryRegion
);
221 if (secondaryRegion
!= null) {
222 HBaseTestingUtil
.closeRegionAndWAL(secondaryRegion
);
225 EnvironmentEdgeManagerTestHelper
.reset();
229 return name
.getMethodName();
232 // Some of the test cases are as follows:
233 // 1. replay flush start marker again
234 // 2. replay flush with smaller seqId than what is there in memstore snapshot
235 // 3. replay flush with larger seqId than what is there in memstore snapshot
236 // 4. replay flush commit without flush prepare. non droppable memstore
237 // 5. replay flush commit without flush prepare. droppable memstore
238 // 6. replay open region event
239 // 7. replay open region event after flush start
240 // 8. replay flush form an earlier seqId (test ignoring seqIds)
241 // 9. start flush does not prevent region from closing.
244 public void testRegionReplicaSecondaryCannotFlush() throws IOException
{
245 // load some data and flush ensure that the secondary replica will not execute the flush
247 // load some data to secondary by replaying
248 putDataByReplay(secondaryRegion
, 0, 1000, cq
, families
);
250 verifyData(secondaryRegion
, 0, 1000, cq
, families
);
253 FlushResultImpl flush
= (FlushResultImpl
)secondaryRegion
.flush(true);
254 assertEquals(FlushResultImpl
.Result
.CANNOT_FLUSH
, flush
.result
);
256 verifyData(secondaryRegion
, 0, 1000, cq
, families
);
258 // close the region, and inspect that it has not flushed
259 Map
<byte[], List
<HStoreFile
>> files
= secondaryRegion
.close(false);
260 // assert that there are no files (due to flush)
261 for (List
<HStoreFile
> f
: files
.values()) {
262 assertTrue(f
.isEmpty());
267 * Tests a case where we replay only a flush start marker, then the region is closed. This region
268 * should not block indefinitely
271 public void testOnlyReplayingFlushStartDoesNotHoldUpRegionClose() throws IOException
{
272 // load some data to primary and flush
274 LOG
.info("-- Writing some data to primary from " + start
+ " to " + (start
+100));
275 putData(primaryRegion
, Durability
.SYNC_WAL
, start
, 100, cq
, families
);
276 LOG
.info("-- Flushing primary, creating 3 files for 3 stores");
277 primaryRegion
.flush(true);
279 // now replay the edits and the flush marker
280 reader
= createWALReaderForPrimary();
282 LOG
.info("-- Replaying edits and flush events in secondary");
284 WAL
.Entry entry
= reader
.next();
288 FlushDescriptor flushDesc
289 = WALEdit
.getFlushDescriptor(entry
.getEdit().getCells().get(0));
290 if (flushDesc
!= null) {
291 if (flushDesc
.getAction() == FlushAction
.START_FLUSH
) {
292 LOG
.info("-- Replaying flush start in secondary");
293 secondaryRegion
.replayWALFlushStartMarker(flushDesc
);
294 } else if (flushDesc
.getAction() == FlushAction
.COMMIT_FLUSH
) {
295 LOG
.info("-- NOT Replaying flush commit in secondary");
298 replayEdit(secondaryRegion
, entry
);
302 assertTrue(rss
.getRegionServerAccounting().getGlobalMemStoreDataSize() > 0);
303 // now close the region which should not cause hold because of un-committed flush
304 secondaryRegion
.close();
306 // verify that the memstore size is back to what it was
307 assertEquals(0, rss
.getRegionServerAccounting().getGlobalMemStoreDataSize());
310 static int replayEdit(HRegion region
, WAL
.Entry entry
) throws IOException
{
311 if (WALEdit
.isMetaEditFamily(entry
.getEdit().getCells().get(0))) {
312 return 0; // handled elsewhere
314 Put put
= new Put(CellUtil
.cloneRow(entry
.getEdit().getCells().get(0)));
315 for (Cell cell
: entry
.getEdit().getCells()) put
.add(cell
);
316 put
.setDurability(Durability
.SKIP_WAL
);
317 MutationReplay mutation
= new MutationReplay(MutationType
.PUT
, put
, 0, 0);
318 region
.batchReplay(new MutationReplay
[] {mutation
},
319 entry
.getKey().getSequenceId());
320 return Integer
.parseInt(Bytes
.toString(put
.getRow()));
323 WAL
.Reader
createWALReaderForPrimary() throws FileNotFoundException
, IOException
{
324 return WALFactory
.createReader(TEST_UTIL
.getTestFileSystem(),
325 AbstractFSWALProvider
.getCurrentFileName(walPrimary
),
326 TEST_UTIL
.getConfiguration());
330 public void testBatchReplayWithMultipleNonces() throws IOException
{
332 MutationReplay
[] mutations
= new MutationReplay
[100];
333 for (int i
= 0; i
< 100; i
++) {
334 Put put
= new Put(Bytes
.toBytes(i
));
335 put
.setDurability(Durability
.SYNC_WAL
);
336 for (byte[] familly
: this.families
) {
337 put
.addColumn(familly
, this.cq
, null);
338 long nonceNum
= i
/ 10;
339 mutations
[i
] = new MutationReplay(MutationType
.PUT
, put
, nonceNum
, nonceNum
);
342 primaryRegion
.batchReplay(mutations
, 20);
343 } catch (Exception e
) {
344 String msg
= "Error while replay of batch with multiple nonces. ";
346 fail(msg
+ e
.getMessage());
351 public void testReplayFlushesAndCompactions() throws IOException
{
352 // initiate a secondary region with some data.
354 // load some data to primary and flush. 3 flushes and some more unflushed data
355 putDataWithFlushes(primaryRegion
, 100, 300, 100);
357 // compaction from primary
358 LOG
.info("-- Compacting primary, only 1 store");
359 primaryRegion
.compactStore(Bytes
.toBytes("cf1"),
360 NoLimitThroughputController
.INSTANCE
);
362 // now replay the edits and the flush marker
363 reader
= createWALReaderForPrimary();
365 LOG
.info("-- Replaying edits and flush events in secondary");
366 int lastReplayed
= 0;
367 int expectedStoreFileCount
= 0;
369 WAL
.Entry entry
= reader
.next();
373 FlushDescriptor flushDesc
374 = WALEdit
.getFlushDescriptor(entry
.getEdit().getCells().get(0));
375 CompactionDescriptor compactionDesc
376 = WALEdit
.getCompaction(entry
.getEdit().getCells().get(0));
377 if (flushDesc
!= null) {
378 // first verify that everything is replayed and visible before flush event replay
379 verifyData(secondaryRegion
, 0, lastReplayed
, cq
, families
);
380 HStore store
= secondaryRegion
.getStore(Bytes
.toBytes("cf1"));
381 long storeMemstoreSize
= store
.getMemStoreSize().getHeapSize();
382 long regionMemstoreSize
= secondaryRegion
.getMemStoreDataSize();
383 MemStoreSize mss
= store
.getFlushableSize();
384 long storeSize
= store
.getSize();
385 long storeSizeUncompressed
= store
.getStoreSizeUncompressed();
386 if (flushDesc
.getAction() == FlushAction
.START_FLUSH
) {
387 LOG
.info("-- Replaying flush start in secondary");
388 PrepareFlushResult result
= secondaryRegion
.replayWALFlushStartMarker(flushDesc
);
389 assertNull(result
.result
);
390 assertEquals(result
.flushOpSeqId
, flushDesc
.getFlushSequenceNumber());
392 // assert that the store memstore is smaller now
393 long newStoreMemstoreSize
= store
.getMemStoreSize().getHeapSize();
394 LOG
.info("Memstore size reduced by:"
395 + StringUtils
.humanReadableInt(newStoreMemstoreSize
- storeMemstoreSize
));
396 assertTrue(storeMemstoreSize
> newStoreMemstoreSize
);
398 } else if (flushDesc
.getAction() == FlushAction
.COMMIT_FLUSH
) {
399 LOG
.info("-- Replaying flush commit in secondary");
400 secondaryRegion
.replayWALFlushCommitMarker(flushDesc
);
402 // assert that the flush files are picked
403 expectedStoreFileCount
++;
404 for (HStore s
: secondaryRegion
.getStores()) {
405 assertEquals(expectedStoreFileCount
, s
.getStorefilesCount());
407 MemStoreSize newMss
= store
.getFlushableSize();
408 assertTrue(mss
.getHeapSize() > newMss
.getHeapSize());
410 // assert that the region memstore is smaller now
411 long newRegionMemstoreSize
= secondaryRegion
.getMemStoreDataSize();
412 assertTrue(regionMemstoreSize
> newRegionMemstoreSize
);
414 // assert that the store sizes are bigger
415 assertTrue(store
.getSize() > storeSize
);
416 assertTrue(store
.getStoreSizeUncompressed() > storeSizeUncompressed
);
417 assertEquals(store
.getSize(), store
.getStorefilesSize());
419 // after replay verify that everything is still visible
420 verifyData(secondaryRegion
, 0, lastReplayed
+1, cq
, families
);
421 } else if (compactionDesc
!= null) {
422 secondaryRegion
.replayWALCompactionMarker(compactionDesc
, true, false, Long
.MAX_VALUE
);
424 // assert that the compaction is applied
425 for (HStore store
: secondaryRegion
.getStores()) {
426 if (store
.getColumnFamilyName().equals("cf1")) {
427 assertEquals(1, store
.getStorefilesCount());
429 assertEquals(expectedStoreFileCount
, store
.getStorefilesCount());
433 lastReplayed
= replayEdit(secondaryRegion
, entry
);
437 assertEquals(400-1, lastReplayed
);
438 LOG
.info("-- Verifying edits from secondary");
439 verifyData(secondaryRegion
, 0, 400, cq
, families
);
441 LOG
.info("-- Verifying edits from primary. Ensuring that files are not deleted");
442 verifyData(primaryRegion
, 0, lastReplayed
, cq
, families
);
443 for (HStore store
: primaryRegion
.getStores()) {
444 if (store
.getColumnFamilyName().equals("cf1")) {
445 assertEquals(1, store
.getStorefilesCount());
447 assertEquals(expectedStoreFileCount
, store
.getStorefilesCount());
453 * Tests cases where we prepare a flush with some seqId and we receive other flush start markers
454 * equal to, greater or less than the previous flush start marker.
457 public void testReplayFlushStartMarkers() throws IOException
{
458 // load some data to primary and flush. 1 flush and some more unflushed data
459 putDataWithFlushes(primaryRegion
, 100, 100, 100);
462 // now replay the edits and the flush marker
463 reader
= createWALReaderForPrimary();
465 LOG
.info("-- Replaying edits and flush events in secondary");
467 FlushDescriptor startFlushDesc
= null;
469 int lastReplayed
= 0;
471 WAL
.Entry entry
= reader
.next();
475 FlushDescriptor flushDesc
476 = WALEdit
.getFlushDescriptor(entry
.getEdit().getCells().get(0));
477 if (flushDesc
!= null) {
478 // first verify that everything is replayed and visible before flush event replay
479 HStore store
= secondaryRegion
.getStore(Bytes
.toBytes("cf1"));
480 long storeMemstoreSize
= store
.getMemStoreSize().getHeapSize();
481 long regionMemstoreSize
= secondaryRegion
.getMemStoreDataSize();
482 MemStoreSize mss
= store
.getFlushableSize();
484 if (flushDesc
.getAction() == FlushAction
.START_FLUSH
) {
485 startFlushDesc
= flushDesc
;
486 LOG
.info("-- Replaying flush start in secondary");
487 PrepareFlushResult result
= secondaryRegion
.replayWALFlushStartMarker(startFlushDesc
);
488 assertNull(result
.result
);
489 assertEquals(result
.flushOpSeqId
, startFlushDesc
.getFlushSequenceNumber());
490 assertTrue(regionMemstoreSize
> 0);
491 assertTrue(mss
.getHeapSize() > 0);
493 // assert that the store memstore is smaller now
494 long newStoreMemstoreSize
= store
.getMemStoreSize().getHeapSize();
495 LOG
.info("Memstore size reduced by:"
496 + StringUtils
.humanReadableInt(newStoreMemstoreSize
- storeMemstoreSize
));
497 assertTrue(storeMemstoreSize
> newStoreMemstoreSize
);
498 verifyData(secondaryRegion
, 0, lastReplayed
+1, cq
, families
);
501 // after replay verify that everything is still visible
502 verifyData(secondaryRegion
, 0, lastReplayed
+1, cq
, families
);
504 lastReplayed
= replayEdit(secondaryRegion
, entry
);
508 // at this point, there should be some data (rows 0-100) in memstore snapshot
509 // and some more data in memstores (rows 100-200)
511 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
513 // Test case 1: replay the same flush start marker again
514 LOG
.info("-- Replaying same flush start in secondary again");
515 PrepareFlushResult result
= secondaryRegion
.replayWALFlushStartMarker(startFlushDesc
);
516 assertNull(result
); // this should return null. Ignoring the flush start marker
517 // assert that we still have prepared flush with the previous setup.
518 assertNotNull(secondaryRegion
.getPrepareFlushResult());
519 assertEquals(secondaryRegion
.getPrepareFlushResult().flushOpSeqId
,
520 startFlushDesc
.getFlushSequenceNumber());
521 assertTrue(secondaryRegion
.getMemStoreDataSize() > 0); // memstore is not empty
522 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
524 // Test case 2: replay a flush start marker with a smaller seqId
525 FlushDescriptor startFlushDescSmallerSeqId
526 = clone(startFlushDesc
, startFlushDesc
.getFlushSequenceNumber() - 50);
527 LOG
.info("-- Replaying same flush start in secondary again " + startFlushDescSmallerSeqId
);
528 result
= secondaryRegion
.replayWALFlushStartMarker(startFlushDescSmallerSeqId
);
529 assertNull(result
); // this should return null. Ignoring the flush start marker
530 // assert that we still have prepared flush with the previous setup.
531 assertNotNull(secondaryRegion
.getPrepareFlushResult());
532 assertEquals(secondaryRegion
.getPrepareFlushResult().flushOpSeqId
,
533 startFlushDesc
.getFlushSequenceNumber());
534 assertTrue(secondaryRegion
.getMemStoreDataSize() > 0); // memstore is not empty
535 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
537 // Test case 3: replay a flush start marker with a larger seqId
538 FlushDescriptor startFlushDescLargerSeqId
539 = clone(startFlushDesc
, startFlushDesc
.getFlushSequenceNumber() + 50);
540 LOG
.info("-- Replaying same flush start in secondary again " + startFlushDescLargerSeqId
);
541 result
= secondaryRegion
.replayWALFlushStartMarker(startFlushDescLargerSeqId
);
542 assertNull(result
); // this should return null. Ignoring the flush start marker
543 // assert that we still have prepared flush with the previous setup.
544 assertNotNull(secondaryRegion
.getPrepareFlushResult());
545 assertEquals(secondaryRegion
.getPrepareFlushResult().flushOpSeqId
,
546 startFlushDesc
.getFlushSequenceNumber());
547 assertTrue(secondaryRegion
.getMemStoreDataSize() > 0); // memstore is not empty
548 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
550 LOG
.info("-- Verifying edits from secondary");
551 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
553 LOG
.info("-- Verifying edits from primary.");
554 verifyData(primaryRegion
, 0, numRows
, cq
, families
);
558 * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker
559 * less than the previous flush start marker.
562 public void testReplayFlushCommitMarkerSmallerThanFlushStartMarker() throws IOException
{
563 // load some data to primary and flush. 2 flushes and some more unflushed data
564 putDataWithFlushes(primaryRegion
, 100, 200, 100);
567 // now replay the edits and the flush marker
568 reader
= createWALReaderForPrimary();
570 LOG
.info("-- Replaying edits and flush events in secondary");
571 FlushDescriptor startFlushDesc
= null;
572 FlushDescriptor commitFlushDesc
= null;
574 int lastReplayed
= 0;
576 System
.out
.println(lastReplayed
);
577 WAL
.Entry entry
= reader
.next();
581 FlushDescriptor flushDesc
582 = WALEdit
.getFlushDescriptor(entry
.getEdit().getCells().get(0));
583 if (flushDesc
!= null) {
584 if (flushDesc
.getAction() == FlushAction
.START_FLUSH
) {
585 // don't replay the first flush start marker, hold on to it, replay the second one
586 if (startFlushDesc
== null) {
587 startFlushDesc
= flushDesc
;
589 LOG
.info("-- Replaying flush start in secondary");
590 startFlushDesc
= flushDesc
;
591 PrepareFlushResult result
= secondaryRegion
.replayWALFlushStartMarker(startFlushDesc
);
592 assertNull(result
.result
);
594 } else if (flushDesc
.getAction() == FlushAction
.COMMIT_FLUSH
) {
595 // do not replay any flush commit yet
596 if (commitFlushDesc
== null) {
597 commitFlushDesc
= flushDesc
; // hold on to the first flush commit marker
600 // after replay verify that everything is still visible
601 verifyData(secondaryRegion
, 0, lastReplayed
+1, cq
, families
);
603 lastReplayed
= replayEdit(secondaryRegion
, entry
);
607 // at this point, there should be some data (rows 0-200) in memstore snapshot
608 // and some more data in memstores (rows 200-300)
609 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
611 // no store files in the region
612 int expectedStoreFileCount
= 0;
613 for (HStore s
: secondaryRegion
.getStores()) {
614 assertEquals(expectedStoreFileCount
, s
.getStorefilesCount());
616 long regionMemstoreSize
= secondaryRegion
.getMemStoreDataSize();
618 // Test case 1: replay the a flush commit marker smaller than what we have prepared
619 LOG
.info("Testing replaying flush COMMIT " + commitFlushDesc
+ " on top of flush START"
621 assertTrue(commitFlushDesc
.getFlushSequenceNumber() < startFlushDesc
.getFlushSequenceNumber());
623 LOG
.info("-- Replaying flush commit in secondary" + commitFlushDesc
);
624 secondaryRegion
.replayWALFlushCommitMarker(commitFlushDesc
);
626 // assert that the flush files are picked
627 expectedStoreFileCount
++;
628 for (HStore s
: secondaryRegion
.getStores()) {
629 assertEquals(expectedStoreFileCount
, s
.getStorefilesCount());
631 HStore store
= secondaryRegion
.getStore(Bytes
.toBytes("cf1"));
632 MemStoreSize mss
= store
.getFlushableSize();
633 assertTrue(mss
.getHeapSize() > 0); // assert that the memstore is not dropped
635 // assert that the region memstore is same as before
636 long newRegionMemstoreSize
= secondaryRegion
.getMemStoreDataSize();
637 assertEquals(regionMemstoreSize
, newRegionMemstoreSize
);
639 assertNotNull(secondaryRegion
.getPrepareFlushResult()); // not dropped
641 LOG
.info("-- Verifying edits from secondary");
642 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
644 LOG
.info("-- Verifying edits from primary.");
645 verifyData(primaryRegion
, 0, numRows
, cq
, families
);
649 * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker
650 * larger than the previous flush start marker.
653 public void testReplayFlushCommitMarkerLargerThanFlushStartMarker() throws IOException
{
654 // load some data to primary and flush. 1 flush and some more unflushed data
655 putDataWithFlushes(primaryRegion
, 100, 100, 100);
658 // now replay the edits and the flush marker
659 reader
= createWALReaderForPrimary();
661 LOG
.info("-- Replaying edits and flush events in secondary");
662 FlushDescriptor startFlushDesc
= null;
663 FlushDescriptor commitFlushDesc
= null;
665 int lastReplayed
= 0;
667 WAL
.Entry entry
= reader
.next();
671 FlushDescriptor flushDesc
672 = WALEdit
.getFlushDescriptor(entry
.getEdit().getCells().get(0));
673 if (flushDesc
!= null) {
674 if (flushDesc
.getAction() == FlushAction
.START_FLUSH
) {
675 if (startFlushDesc
== null) {
676 LOG
.info("-- Replaying flush start in secondary");
677 startFlushDesc
= flushDesc
;
678 PrepareFlushResult result
= secondaryRegion
.replayWALFlushStartMarker(startFlushDesc
);
679 assertNull(result
.result
);
681 } else if (flushDesc
.getAction() == FlushAction
.COMMIT_FLUSH
) {
682 // do not replay any flush commit yet
683 // hold on to the flush commit marker but simulate a larger
684 // flush commit seqId
686 FlushDescriptor
.newBuilder(flushDesc
)
687 .setFlushSequenceNumber(flushDesc
.getFlushSequenceNumber() + 50)
690 // after replay verify that everything is still visible
691 verifyData(secondaryRegion
, 0, lastReplayed
+1, cq
, families
);
693 lastReplayed
= replayEdit(secondaryRegion
, entry
);
697 // at this point, there should be some data (rows 0-100) in memstore snapshot
698 // and some more data in memstores (rows 100-200)
699 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
701 // no store files in the region
702 int expectedStoreFileCount
= 0;
703 for (HStore s
: secondaryRegion
.getStores()) {
704 assertEquals(expectedStoreFileCount
, s
.getStorefilesCount());
706 long regionMemstoreSize
= secondaryRegion
.getMemStoreDataSize();
708 // Test case 1: replay the a flush commit marker larger than what we have prepared
709 LOG
.info("Testing replaying flush COMMIT " + commitFlushDesc
+ " on top of flush START"
711 assertTrue(commitFlushDesc
.getFlushSequenceNumber() > startFlushDesc
.getFlushSequenceNumber());
713 LOG
.info("-- Replaying flush commit in secondary" + commitFlushDesc
);
714 secondaryRegion
.replayWALFlushCommitMarker(commitFlushDesc
);
716 // assert that the flush files are picked
717 expectedStoreFileCount
++;
718 for (HStore s
: secondaryRegion
.getStores()) {
719 assertEquals(expectedStoreFileCount
, s
.getStorefilesCount());
721 HStore store
= secondaryRegion
.getStore(Bytes
.toBytes("cf1"));
722 MemStoreSize mss
= store
.getFlushableSize();
723 assertTrue(mss
.getHeapSize() > 0); // assert that the memstore is not dropped
725 // assert that the region memstore is smaller than before, but not empty
726 long newRegionMemstoreSize
= secondaryRegion
.getMemStoreDataSize();
727 assertTrue(newRegionMemstoreSize
> 0);
728 assertTrue(regionMemstoreSize
> newRegionMemstoreSize
);
730 assertNull(secondaryRegion
.getPrepareFlushResult()); // prepare snapshot should be dropped
732 LOG
.info("-- Verifying edits from secondary");
733 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
735 LOG
.info("-- Verifying edits from primary.");
736 verifyData(primaryRegion
, 0, numRows
, cq
, families
);
740 * Tests the case where we receive a flush commit before receiving any flush prepare markers.
741 * The memstore edits should be dropped after the flush commit replay since they should be in
745 public void testReplayFlushCommitMarkerWithoutFlushStartMarkerDroppableMemstore()
747 testReplayFlushCommitMarkerWithoutFlushStartMarker(true);
751 * Tests the case where we receive a flush commit before receiving any flush prepare markers.
752 * The memstore edits should be not dropped after the flush commit replay since not every edit
753 * will be in flushed files (based on seqId)
756 public void testReplayFlushCommitMarkerWithoutFlushStartMarkerNonDroppableMemstore()
758 testReplayFlushCommitMarkerWithoutFlushStartMarker(false);
762 * Tests the case where we receive a flush commit before receiving any flush prepare markers
764 public void testReplayFlushCommitMarkerWithoutFlushStartMarker(boolean droppableMemstore
)
766 // load some data to primary and flush. 1 flushes and some more unflushed data.
767 // write more data after flush depending on whether droppableSnapshot
768 putDataWithFlushes(primaryRegion
, 100, 100, droppableMemstore ?
0 : 100);
769 int numRows
= droppableMemstore ?
100 : 200;
771 // now replay the edits and the flush marker
772 reader
= createWALReaderForPrimary();
774 LOG
.info("-- Replaying edits and flush events in secondary");
775 FlushDescriptor commitFlushDesc
= null;
777 int lastReplayed
= 0;
779 WAL
.Entry entry
= reader
.next();
783 FlushDescriptor flushDesc
784 = WALEdit
.getFlushDescriptor(entry
.getEdit().getCells().get(0));
785 if (flushDesc
!= null) {
786 if (flushDesc
.getAction() == FlushAction
.START_FLUSH
) {
787 // do not replay flush start marker
788 } else if (flushDesc
.getAction() == FlushAction
.COMMIT_FLUSH
) {
789 commitFlushDesc
= flushDesc
; // hold on to the flush commit marker
791 // after replay verify that everything is still visible
792 verifyData(secondaryRegion
, 0, lastReplayed
+1, cq
, families
);
794 lastReplayed
= replayEdit(secondaryRegion
, entry
);
798 // at this point, there should be some data (rows 0-200) in the memstore without snapshot
799 // and some more data in memstores (rows 100-300)
800 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
802 // no store files in the region
803 int expectedStoreFileCount
= 0;
804 for (HStore s
: secondaryRegion
.getStores()) {
805 assertEquals(expectedStoreFileCount
, s
.getStorefilesCount());
807 long regionMemstoreSize
= secondaryRegion
.getMemStoreDataSize();
809 // Test case 1: replay a flush commit marker without start flush marker
810 assertNull(secondaryRegion
.getPrepareFlushResult());
811 assertTrue(commitFlushDesc
.getFlushSequenceNumber() > 0);
813 // ensure all files are visible in secondary
814 for (HStore store
: secondaryRegion
.getStores()) {
815 assertTrue(store
.getMaxSequenceId().orElse(0L) <= secondaryRegion
.getReadPoint(null));
818 LOG
.info("-- Replaying flush commit in secondary" + commitFlushDesc
);
819 secondaryRegion
.replayWALFlushCommitMarker(commitFlushDesc
);
821 // assert that the flush files are picked
822 expectedStoreFileCount
++;
823 for (HStore s
: secondaryRegion
.getStores()) {
824 assertEquals(expectedStoreFileCount
, s
.getStorefilesCount());
826 HStore store
= secondaryRegion
.getStore(Bytes
.toBytes("cf1"));
827 MemStoreSize mss
= store
.getFlushableSize();
828 if (droppableMemstore
) {
829 // assert that the memstore is dropped
830 assertTrue(mss
.getHeapSize() == MutableSegment
.DEEP_OVERHEAD
);
832 assertTrue(mss
.getHeapSize() > 0); // assert that the memstore is not dropped
835 // assert that the region memstore is same as before (we could not drop)
836 long newRegionMemstoreSize
= secondaryRegion
.getMemStoreDataSize();
837 if (droppableMemstore
) {
838 assertTrue(0 == newRegionMemstoreSize
);
840 assertTrue(regionMemstoreSize
== newRegionMemstoreSize
);
843 LOG
.info("-- Verifying edits from secondary");
844 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
846 LOG
.info("-- Verifying edits from primary.");
847 verifyData(primaryRegion
, 0, numRows
, cq
, families
);
850 private FlushDescriptor
clone(FlushDescriptor flush
, long flushSeqId
) {
851 return FlushDescriptor
.newBuilder(flush
)
852 .setFlushSequenceNumber(flushSeqId
)
857 * Tests replaying region open markers from primary region. Checks whether the files are picked up
860 public void testReplayRegionOpenEvent() throws IOException
{
861 putDataWithFlushes(primaryRegion
, 100, 0, 100); // no flush
864 // close the region and open again.
865 primaryRegion
.close();
866 primaryRegion
= HRegion
.openHRegion(rootDir
, primaryHri
, htd
, walPrimary
, CONF
, rss
, null);
868 // now replay the edits and the flush marker
869 reader
= createWALReaderForPrimary();
870 List
<RegionEventDescriptor
> regionEvents
= Lists
.newArrayList();
872 LOG
.info("-- Replaying edits and region events in secondary");
874 WAL
.Entry entry
= reader
.next();
878 FlushDescriptor flushDesc
879 = WALEdit
.getFlushDescriptor(entry
.getEdit().getCells().get(0));
880 RegionEventDescriptor regionEventDesc
881 = WALEdit
.getRegionEventDescriptor(entry
.getEdit().getCells().get(0));
883 if (flushDesc
!= null) {
884 // don't replay flush events
885 } else if (regionEventDesc
!= null) {
886 regionEvents
.add(regionEventDesc
);
888 // don't replay edits
892 // we should have 1 open, 1 close and 1 open event
893 assertEquals(3, regionEvents
.size());
895 // replay the first region open event.
896 secondaryRegion
.replayWALRegionEventMarker(regionEvents
.get(0));
898 // replay the close event as well
899 secondaryRegion
.replayWALRegionEventMarker(regionEvents
.get(1));
901 // no store files in the region
902 int expectedStoreFileCount
= 0;
903 for (HStore s
: secondaryRegion
.getStores()) {
904 assertEquals(expectedStoreFileCount
, s
.getStorefilesCount());
906 long regionMemstoreSize
= secondaryRegion
.getMemStoreDataSize();
907 assertTrue(regionMemstoreSize
== 0);
909 // now replay the region open event that should contain new file locations
910 LOG
.info("Testing replaying region open event " + regionEvents
.get(2));
911 secondaryRegion
.replayWALRegionEventMarker(regionEvents
.get(2));
913 // assert that the flush files are picked
914 expectedStoreFileCount
++;
915 for (HStore s
: secondaryRegion
.getStores()) {
916 assertEquals(expectedStoreFileCount
, s
.getStorefilesCount());
918 HStore store
= secondaryRegion
.getStore(Bytes
.toBytes("cf1"));
919 MemStoreSize mss
= store
.getFlushableSize();
920 assertTrue(mss
.getHeapSize() == MutableSegment
.DEEP_OVERHEAD
);
922 // assert that the region memstore is empty
923 long newRegionMemstoreSize
= secondaryRegion
.getMemStoreDataSize();
924 assertTrue(newRegionMemstoreSize
== 0);
926 assertNull(secondaryRegion
.getPrepareFlushResult()); //prepare snapshot should be dropped if any
928 LOG
.info("-- Verifying edits from secondary");
929 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
931 LOG
.info("-- Verifying edits from primary.");
932 verifyData(primaryRegion
, 0, numRows
, cq
, families
);
936 * Tests the case where we replay a region open event after a flush start but before receiving
940 public void testReplayRegionOpenEventAfterFlushStart() throws IOException
{
941 putDataWithFlushes(primaryRegion
, 100, 100, 100);
944 // close the region and open again.
945 primaryRegion
.close();
946 primaryRegion
= HRegion
.openHRegion(rootDir
, primaryHri
, htd
, walPrimary
, CONF
, rss
, null);
948 // now replay the edits and the flush marker
949 reader
= createWALReaderForPrimary();
950 List
<RegionEventDescriptor
> regionEvents
= Lists
.newArrayList();
952 LOG
.info("-- Replaying edits and region events in secondary");
954 WAL
.Entry entry
= reader
.next();
958 FlushDescriptor flushDesc
959 = WALEdit
.getFlushDescriptor(entry
.getEdit().getCells().get(0));
960 RegionEventDescriptor regionEventDesc
961 = WALEdit
.getRegionEventDescriptor(entry
.getEdit().getCells().get(0));
963 if (flushDesc
!= null) {
964 // only replay flush start
965 if (flushDesc
.getAction() == FlushAction
.START_FLUSH
) {
966 secondaryRegion
.replayWALFlushStartMarker(flushDesc
);
968 } else if (regionEventDesc
!= null) {
969 regionEvents
.add(regionEventDesc
);
971 replayEdit(secondaryRegion
, entry
);
975 // at this point, there should be some data (rows 0-100) in the memstore snapshot
976 // and some more data in memstores (rows 100-200)
977 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
979 // we should have 1 open, 1 close and 1 open event
980 assertEquals(3, regionEvents
.size());
982 // no store files in the region
983 int expectedStoreFileCount
= 0;
984 for (HStore s
: secondaryRegion
.getStores()) {
985 assertEquals(expectedStoreFileCount
, s
.getStorefilesCount());
988 // now replay the region open event that should contain new file locations
989 LOG
.info("Testing replaying region open event " + regionEvents
.get(2));
990 secondaryRegion
.replayWALRegionEventMarker(regionEvents
.get(2));
992 // assert that the flush files are picked
993 expectedStoreFileCount
= 2; // two flushes happened
994 for (HStore s
: secondaryRegion
.getStores()) {
995 assertEquals(expectedStoreFileCount
, s
.getStorefilesCount());
997 HStore store
= secondaryRegion
.getStore(Bytes
.toBytes("cf1"));
998 MemStoreSize newSnapshotSize
= store
.getSnapshotSize();
999 assertTrue(newSnapshotSize
.getDataSize() == 0);
1001 // assert that the region memstore is empty
1002 long newRegionMemstoreSize
= secondaryRegion
.getMemStoreDataSize();
1003 assertTrue(newRegionMemstoreSize
== 0);
1005 assertNull(secondaryRegion
.getPrepareFlushResult()); //prepare snapshot should be dropped if any
1007 LOG
.info("-- Verifying edits from secondary");
1008 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
1010 LOG
.info("-- Verifying edits from primary.");
1011 verifyData(primaryRegion
, 0, numRows
, cq
, families
);
1015 * Tests whether edits coming in for replay are skipped which have smaller seq id than the seqId
1016 * of the last replayed region open event.
1019 public void testSkippingEditsWithSmallerSeqIdAfterRegionOpenEvent() throws IOException
{
1020 putDataWithFlushes(primaryRegion
, 100, 100, 0);
1023 // close the region and open again.
1024 primaryRegion
.close();
1025 primaryRegion
= HRegion
.openHRegion(rootDir
, primaryHri
, htd
, walPrimary
, CONF
, rss
, null);
1027 // now replay the edits and the flush marker
1028 reader
= createWALReaderForPrimary();
1029 List
<RegionEventDescriptor
> regionEvents
= Lists
.newArrayList();
1030 List
<WAL
.Entry
> edits
= Lists
.newArrayList();
1032 LOG
.info("-- Replaying edits and region events in secondary");
1034 WAL
.Entry entry
= reader
.next();
1035 if (entry
== null) {
1038 FlushDescriptor flushDesc
1039 = WALEdit
.getFlushDescriptor(entry
.getEdit().getCells().get(0));
1040 RegionEventDescriptor regionEventDesc
1041 = WALEdit
.getRegionEventDescriptor(entry
.getEdit().getCells().get(0));
1043 if (flushDesc
!= null) {
1044 // don't replay flushes
1045 } else if (regionEventDesc
!= null) {
1046 regionEvents
.add(regionEventDesc
);
1052 // replay the region open of first open, but with the seqid of the second open
1053 // this way non of the flush files will be picked up.
1054 secondaryRegion
.replayWALRegionEventMarker(
1055 RegionEventDescriptor
.newBuilder(regionEvents
.get(0)).setLogSequenceNumber(
1056 regionEvents
.get(2).getLogSequenceNumber()).build());
1059 // replay edits from the before region close. If replay does not
1060 // skip these the following verification will NOT fail.
1061 for (WAL
.Entry entry
: edits
) {
1062 replayEdit(secondaryRegion
, entry
);
1065 boolean expectedFail
= false;
1067 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
1068 } catch (AssertionError e
) {
1069 expectedFail
= true; // expected
1071 if (!expectedFail
) {
1072 fail("Should have failed this verification");
1077 public void testReplayFlushSeqIds() throws IOException
{
1078 // load some data to primary and flush
1080 LOG
.info("-- Writing some data to primary from " + start
+ " to " + (start
+100));
1081 putData(primaryRegion
, Durability
.SYNC_WAL
, start
, 100, cq
, families
);
1082 LOG
.info("-- Flushing primary, creating 3 files for 3 stores");
1083 primaryRegion
.flush(true);
1085 // now replay the flush marker
1086 reader
= createWALReaderForPrimary();
1088 long flushSeqId
= -1;
1089 LOG
.info("-- Replaying flush events in secondary");
1091 WAL
.Entry entry
= reader
.next();
1092 if (entry
== null) {
1095 FlushDescriptor flushDesc
1096 = WALEdit
.getFlushDescriptor(entry
.getEdit().getCells().get(0));
1097 if (flushDesc
!= null) {
1098 if (flushDesc
.getAction() == FlushAction
.START_FLUSH
) {
1099 LOG
.info("-- Replaying flush start in secondary");
1100 secondaryRegion
.replayWALFlushStartMarker(flushDesc
);
1101 flushSeqId
= flushDesc
.getFlushSequenceNumber();
1102 } else if (flushDesc
.getAction() == FlushAction
.COMMIT_FLUSH
) {
1103 LOG
.info("-- Replaying flush commit in secondary");
1104 secondaryRegion
.replayWALFlushCommitMarker(flushDesc
);
1105 assertEquals(flushSeqId
, flushDesc
.getFlushSequenceNumber());
1108 // else do not replay
1111 // TODO: what to do with this?
1112 // assert that the newly picked up flush file is visible
1113 long readPoint
= secondaryRegion
.getMVCC().getReadPoint();
1114 assertEquals(flushSeqId
, readPoint
);
1116 // after replay verify that everything is still visible
1117 verifyData(secondaryRegion
, 0, 100, cq
, families
);
1121 public void testSeqIdsFromReplay() throws IOException
{
1122 // test the case where seqId's coming from replayed WALEdits are made persisted with their
1123 // original seqIds and they are made visible through mvcc read point upon replay
1124 String method
= name
.getMethodName();
1125 byte[] tableName
= Bytes
.toBytes(method
);
1126 byte[] family
= Bytes
.toBytes("family");
1128 HRegion region
= initHRegion(tableName
, family
);
1130 // replay an entry that is bigger than current read point
1131 long readPoint
= region
.getMVCC().getReadPoint();
1132 long origSeqId
= readPoint
+ 100;
1134 Put put
= new Put(row
).addColumn(family
, row
, row
);
1135 put
.setDurability(Durability
.SKIP_WAL
); // we replay with skip wal
1136 replay(region
, put
, origSeqId
);
1138 // read point should have advanced to this seqId
1139 assertGet(region
, family
, row
);
1141 // region seqId should have advanced at least to this seqId
1142 assertEquals(origSeqId
, region
.getReadPoint(null));
1144 // replay an entry that is smaller than current read point
1145 // caution: adding an entry below current read point might cause partial dirty reads. Normal
1146 // replay does not allow reads while replay is going on.
1147 put
= new Put(row2
).addColumn(family
, row2
, row2
);
1148 put
.setDurability(Durability
.SKIP_WAL
);
1149 replay(region
, put
, origSeqId
- 50);
1151 assertGet(region
, family
, row2
);
1158 * Tests that a region opened in secondary mode would not write region open / close
1159 * events to its WAL.
1160 * @throws IOException
1163 public void testSecondaryRegionDoesNotWriteRegionEventsToWAL() throws IOException
{
1164 secondaryRegion
.close();
1165 walSecondary
= spy(walSecondary
);
1167 // test for region open and close
1168 secondaryRegion
= HRegion
.openHRegion(secondaryHri
, htd
, walSecondary
, CONF
, rss
, null);
1169 verify(walSecondary
, times(0)).appendData(any(RegionInfo
.class), any(WALKeyImpl
.class),
1170 any(WALEdit
.class));
1172 // test for replay prepare flush
1173 putDataByReplay(secondaryRegion
, 0, 10, cq
, families
);
1174 secondaryRegion
.replayWALFlushStartMarker(FlushDescriptor
.newBuilder().
1175 setFlushSequenceNumber(10)
1176 .setTableName(UnsafeByteOperations
.unsafeWrap(
1177 primaryRegion
.getTableDescriptor().getTableName().getName()))
1178 .setAction(FlushAction
.START_FLUSH
)
1179 .setEncodedRegionName(
1180 UnsafeByteOperations
.unsafeWrap(primaryRegion
.getRegionInfo().getEncodedNameAsBytes()))
1181 .setRegionName(UnsafeByteOperations
.unsafeWrap(
1182 primaryRegion
.getRegionInfo().getRegionName()))
1185 verify(walSecondary
, times(0)).appendData(any(RegionInfo
.class), any(WALKeyImpl
.class),
1186 any(WALEdit
.class));
1188 secondaryRegion
.close();
1189 verify(walSecondary
, times(0)).appendData(any(RegionInfo
.class), any(WALKeyImpl
.class),
1190 any(WALEdit
.class));
1194 * Tests the reads enabled flag for the region. When unset all reads should be rejected
1197 public void testRegionReadsEnabledFlag() throws IOException
{
1199 putDataByReplay(secondaryRegion
, 0, 100, cq
, families
);
1201 verifyData(secondaryRegion
, 0, 100, cq
, families
);
1203 // now disable reads
1204 secondaryRegion
.setReadsEnabled(false);
1206 verifyData(secondaryRegion
, 0, 100, cq
, families
);
1207 fail("Should have failed with IOException");
1208 } catch(IOException ex
) {
1212 // verify that we can still replay data
1213 putDataByReplay(secondaryRegion
, 100, 100, cq
, families
);
1215 // now enable reads again
1216 secondaryRegion
.setReadsEnabled(true);
1217 verifyData(secondaryRegion
, 0, 200, cq
, families
);
1221 * Tests the case where a request for flush cache is sent to the region, but region cannot flush.
1222 * It should write the flush request marker instead.
1225 public void testWriteFlushRequestMarker() throws IOException
{
1226 // primary region is empty at this point. Request a flush with writeFlushRequestWalMarker=false
1227 FlushResultImpl result
= primaryRegion
.flushcache(true, false, FlushLifeCycleTracker
.DUMMY
);
1228 assertNotNull(result
);
1229 assertEquals(FlushResultImpl
.Result
.CANNOT_FLUSH_MEMSTORE_EMPTY
, result
.result
);
1230 assertFalse(result
.wroteFlushWalMarker
);
1232 // request flush again, but this time with writeFlushRequestWalMarker = true
1233 result
= primaryRegion
.flushcache(true, true, FlushLifeCycleTracker
.DUMMY
);
1234 assertNotNull(result
);
1235 assertEquals(FlushResultImpl
.Result
.CANNOT_FLUSH_MEMSTORE_EMPTY
, result
.result
);
1236 assertTrue(result
.wroteFlushWalMarker
);
1238 List
<FlushDescriptor
> flushes
= Lists
.newArrayList();
1239 reader
= createWALReaderForPrimary();
1241 WAL
.Entry entry
= reader
.next();
1242 if (entry
== null) {
1245 FlushDescriptor flush
= WALEdit
.getFlushDescriptor(entry
.getEdit().getCells().get(0));
1246 if (flush
!= null) {
1251 assertEquals(1, flushes
.size());
1252 assertNotNull(flushes
.get(0));
1253 assertEquals(FlushDescriptor
.FlushAction
.CANNOT_FLUSH
, flushes
.get(0).getAction());
1257 * Test the case where the secondary region replica is not in reads enabled state because it is
1258 * waiting for a flush or region open marker from primary region. Replaying CANNOT_FLUSH
1259 * flush marker entry should restore the reads enabled status in the region and allow the reads
1263 public void testReplayingFlushRequestRestoresReadsEnabledState() throws IOException
{
1264 disableReads(secondaryRegion
);
1266 // Test case 1: Test that replaying CANNOT_FLUSH request marker assuming this came from
1267 // triggered flush restores readsEnabled
1268 primaryRegion
.flushcache(true, true, FlushLifeCycleTracker
.DUMMY
);
1269 reader
= createWALReaderForPrimary();
1271 WAL
.Entry entry
= reader
.next();
1272 if (entry
== null) {
1275 FlushDescriptor flush
= WALEdit
.getFlushDescriptor(entry
.getEdit().getCells().get(0));
1276 if (flush
!= null) {
1277 secondaryRegion
.replayWALFlushMarker(flush
, entry
.getKey().getSequenceId());
1281 // now reads should be enabled
1282 secondaryRegion
.get(new Get(Bytes
.toBytes(0)));
1286 * Test the case where the secondary region replica is not in reads enabled state because it is
1287 * waiting for a flush or region open marker from primary region. Replaying flush start and commit
1288 * entries should restore the reads enabled status in the region and allow the reads
1292 public void testReplayingFlushRestoresReadsEnabledState() throws IOException
{
1293 // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came
1294 // from triggered flush restores readsEnabled
1295 disableReads(secondaryRegion
);
1297 // put some data in primary
1298 putData(primaryRegion
, Durability
.SYNC_WAL
, 0, 100, cq
, families
);
1299 primaryRegion
.flush(true);
1300 // I seem to need to push more edits through so the WAL flushes on local fs. This was not
1301 // needed before HBASE-15028. Not sure whats up. I can see that we have not flushed if I
1302 // look at the WAL if I pause the test here and then use WALPrettyPrinter to look at content..
1303 // Doing same check before HBASE-15028 I can see all edits flushed to the WAL. Somethings up
1304 // but can't figure it... and this is only test that seems to suffer this flush issue.
1306 putData(primaryRegion
, Durability
.SYNC_WAL
, 0, 100, cq
, families
);
1308 reader
= createWALReaderForPrimary();
1310 WAL
.Entry entry
= reader
.next();
1311 LOG
.info(Objects
.toString(entry
));
1312 if (entry
== null) {
1315 FlushDescriptor flush
= WALEdit
.getFlushDescriptor(entry
.getEdit().getCells().get(0));
1316 if (flush
!= null) {
1317 secondaryRegion
.replayWALFlushMarker(flush
, entry
.getKey().getSequenceId());
1319 replayEdit(secondaryRegion
, entry
);
1323 // now reads should be enabled
1324 verifyData(secondaryRegion
, 0, 100, cq
, families
);
1328 * Test the case where the secondary region replica is not in reads enabled state because it is
1329 * waiting for a flush or region open marker from primary region. Replaying flush start and commit
1330 * entries should restore the reads enabled status in the region and allow the reads
1334 public void testReplayingFlushWithEmptyMemstoreRestoresReadsEnabledState() throws IOException
{
1335 // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came
1336 // from triggered flush restores readsEnabled
1337 disableReads(secondaryRegion
);
1339 // put some data in primary
1340 putData(primaryRegion
, Durability
.SYNC_WAL
, 0, 100, cq
, families
);
1341 primaryRegion
.flush(true);
1343 reader
= createWALReaderForPrimary();
1345 WAL
.Entry entry
= reader
.next();
1346 if (entry
== null) {
1349 FlushDescriptor flush
= WALEdit
.getFlushDescriptor(entry
.getEdit().getCells().get(0));
1350 if (flush
!= null) {
1351 secondaryRegion
.replayWALFlushMarker(flush
, entry
.getKey().getSequenceId());
1355 // now reads should be enabled
1356 verifyData(secondaryRegion
, 0, 100, cq
, families
);
1360 * Test the case where the secondary region replica is not in reads enabled state because it is
1361 * waiting for a flush or region open marker from primary region. Replaying region open event
1362 * entry from primary should restore the reads enabled status in the region and allow the reads
1366 public void testReplayingRegionOpenEventRestoresReadsEnabledState() throws IOException
{
1367 // Test case 3: Test that replaying region open event markers restores readsEnabled
1368 disableReads(secondaryRegion
);
1370 primaryRegion
.close();
1371 primaryRegion
= HRegion
.openHRegion(rootDir
, primaryHri
, htd
, walPrimary
, CONF
, rss
, null);
1373 reader
= createWALReaderForPrimary();
1375 WAL
.Entry entry
= reader
.next();
1376 if (entry
== null) {
1380 RegionEventDescriptor regionEventDesc
1381 = WALEdit
.getRegionEventDescriptor(entry
.getEdit().getCells().get(0));
1383 if (regionEventDesc
!= null) {
1384 secondaryRegion
.replayWALRegionEventMarker(regionEventDesc
);
1388 // now reads should be enabled
1389 secondaryRegion
.get(new Get(Bytes
.toBytes(0)));
1393 public void testRefresStoreFiles() throws IOException
{
1394 assertEquals(0, primaryRegion
.getStoreFileList(families
).size());
1395 assertEquals(0, secondaryRegion
.getStoreFileList(families
).size());
1397 // Test case 1: refresh with an empty region
1398 secondaryRegion
.refreshStoreFiles();
1399 assertEquals(0, secondaryRegion
.getStoreFileList(families
).size());
1402 putDataWithFlushes(primaryRegion
, 100, 100, 0);
1405 // refresh the store file list, and ensure that the files are picked up.
1406 secondaryRegion
.refreshStoreFiles();
1407 assertPathListsEqual(primaryRegion
.getStoreFileList(families
),
1408 secondaryRegion
.getStoreFileList(families
));
1409 assertEquals(families
.length
, secondaryRegion
.getStoreFileList(families
).size());
1411 LOG
.info("-- Verifying edits from secondary");
1412 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
1414 // Test case 2: 3 some more flushes
1415 putDataWithFlushes(primaryRegion
, 100, 300, 0);
1418 // refresh the store file list, and ensure that the files are picked up.
1419 secondaryRegion
.refreshStoreFiles();
1420 assertPathListsEqual(primaryRegion
.getStoreFileList(families
),
1421 secondaryRegion
.getStoreFileList(families
));
1422 assertEquals(families
.length
* 4, secondaryRegion
.getStoreFileList(families
).size());
1424 LOG
.info("-- Verifying edits from secondary");
1425 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
1427 if (FSUtils
.WINDOWS
) {
1428 // compaction cannot move files while they are open in secondary on windows. Skip remaining.
1432 // Test case 3: compact primary files
1433 primaryRegion
.compactStores();
1434 List
<HRegion
> regions
= new ArrayList
<>();
1435 regions
.add(primaryRegion
);
1436 Mockito
.doReturn(regions
).when(rss
).getRegions();
1437 CompactedHFilesDischarger cleaner
= new CompactedHFilesDischarger(100, null, rss
, false);
1439 secondaryRegion
.refreshStoreFiles();
1440 assertPathListsEqual(primaryRegion
.getStoreFileList(families
),
1441 secondaryRegion
.getStoreFileList(families
));
1442 assertEquals(families
.length
, secondaryRegion
.getStoreFileList(families
).size());
1444 LOG
.info("-- Verifying edits from secondary");
1445 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
1447 LOG
.info("-- Replaying edits in secondary");
1449 // Test case 4: replay some edits, ensure that memstore is dropped.
1450 assertTrue(secondaryRegion
.getMemStoreDataSize() == 0);
1451 putDataWithFlushes(primaryRegion
, 400, 400, 0);
1454 reader
= createWALReaderForPrimary();
1456 WAL
.Entry entry
= reader
.next();
1457 if (entry
== null) {
1460 FlushDescriptor flush
= WALEdit
.getFlushDescriptor(entry
.getEdit().getCells().get(0));
1461 if (flush
!= null) {
1462 // do not replay flush
1464 replayEdit(secondaryRegion
, entry
);
1468 assertTrue(secondaryRegion
.getMemStoreDataSize() > 0);
1470 secondaryRegion
.refreshStoreFiles();
1472 assertTrue(secondaryRegion
.getMemStoreDataSize() == 0);
1474 LOG
.info("-- Verifying edits from primary");
1475 verifyData(primaryRegion
, 0, numRows
, cq
, families
);
1476 LOG
.info("-- Verifying edits from secondary");
1477 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
1481 * Paths can be qualified or not. This does the assertion using String->Path conversion.
1483 private void assertPathListsEqual(List
<String
> list1
, List
<String
> list2
) {
1484 List
<Path
> l1
= new ArrayList
<>(list1
.size());
1485 for (String path
: list1
) {
1486 l1
.add(Path
.getPathWithoutSchemeAndAuthority(new Path(path
)));
1488 List
<Path
> l2
= new ArrayList
<>(list2
.size());
1489 for (String path
: list2
) {
1490 l2
.add(Path
.getPathWithoutSchemeAndAuthority(new Path(path
)));
1492 assertEquals(l1
, l2
);
1495 private void disableReads(HRegion region
) {
1496 region
.setReadsEnabled(false);
1498 verifyData(region
, 0, 1, cq
, families
);
1499 fail("Should have failed with IOException");
1500 } catch(IOException ex
) {
1505 private void replay(HRegion region
, Put put
, long replaySeqId
) throws IOException
{
1506 put
.setDurability(Durability
.SKIP_WAL
);
1507 MutationReplay mutation
= new MutationReplay(MutationType
.PUT
, put
, 0, 0);
1508 region
.batchReplay(new MutationReplay
[] {mutation
}, replaySeqId
);
1512 * Tests replaying region open markers from primary region. Checks whether the files are picked up
1515 public void testReplayBulkLoadEvent() throws IOException
{
1516 LOG
.info("testReplayBulkLoadEvent starts");
1517 putDataWithFlushes(primaryRegion
, 100, 0, 100); // no flush
1519 // close the region and open again.
1520 primaryRegion
.close();
1521 primaryRegion
= HRegion
.openHRegion(rootDir
, primaryHri
, htd
, walPrimary
, CONF
, rss
, null);
1523 // bulk load a file into primary region
1524 Random random
= new Random();
1525 byte[] randomValues
= new byte[20];
1526 random
.nextBytes(randomValues
);
1527 Path testPath
= TEST_UTIL
.getDataTestDirOnTestFS();
1529 List
<Pair
<byte[], String
>> familyPaths
= new ArrayList
<>();
1530 int expectedLoadFileCount
= 0;
1531 for (byte[] family
: families
) {
1532 familyPaths
.add(new Pair
<>(family
, createHFileForFamilies(testPath
, family
, randomValues
)));
1533 expectedLoadFileCount
++;
1535 primaryRegion
.bulkLoadHFiles(familyPaths
, false, null);
1537 // now replay the edits and the bulk load marker
1538 reader
= createWALReaderForPrimary();
1540 LOG
.info("-- Replaying edits and region events in secondary");
1541 BulkLoadDescriptor bulkloadEvent
= null;
1543 WAL
.Entry entry
= reader
.next();
1544 if (entry
== null) {
1547 bulkloadEvent
= WALEdit
.getBulkLoadDescriptor(entry
.getEdit().getCells().get(0));
1548 if (bulkloadEvent
!= null) {
1553 // we should have 1 bulk load event
1554 assertTrue(bulkloadEvent
!= null);
1555 assertEquals(expectedLoadFileCount
, bulkloadEvent
.getStoresCount());
1557 // replay the bulk load event
1558 secondaryRegion
.replayWALBulkLoadEventMarker(bulkloadEvent
);
1561 List
<String
> storeFileName
= new ArrayList
<>();
1562 for (StoreDescriptor storeDesc
: bulkloadEvent
.getStoresList()) {
1563 storeFileName
.addAll(storeDesc
.getStoreFileList());
1565 // assert that the bulk loaded files are picked
1566 for (HStore s
: secondaryRegion
.getStores()) {
1567 for (HStoreFile sf
: s
.getStorefiles()) {
1568 storeFileName
.remove(sf
.getPath().getName());
1571 assertTrue("Found some store file isn't loaded:" + storeFileName
, storeFileName
.isEmpty());
1573 LOG
.info("-- Verifying edits from secondary");
1574 for (byte[] family
: families
) {
1575 assertGet(secondaryRegion
, family
, randomValues
);
1580 public void testReplayingFlushCommitWithFileAlreadyDeleted() throws IOException
{
1581 // tests replaying flush commit marker, but the flush file has already been compacted
1582 // from primary and also deleted from the archive directory
1583 secondaryRegion
.replayWALFlushCommitMarker(FlushDescriptor
.newBuilder().
1584 setFlushSequenceNumber(Long
.MAX_VALUE
)
1585 .setTableName(UnsafeByteOperations
.unsafeWrap(primaryRegion
.getTableDescriptor().getTableName().getName()))
1586 .setAction(FlushAction
.COMMIT_FLUSH
)
1587 .setEncodedRegionName(
1588 UnsafeByteOperations
.unsafeWrap(primaryRegion
.getRegionInfo().getEncodedNameAsBytes()))
1589 .setRegionName(UnsafeByteOperations
.unsafeWrap(
1590 primaryRegion
.getRegionInfo().getRegionName()))
1591 .addStoreFlushes(StoreFlushDescriptor
.newBuilder()
1592 .setFamilyName(UnsafeByteOperations
.unsafeWrap(families
[0]))
1593 .setStoreHomeDir("/store_home_dir")
1594 .addFlushOutput("/foo/baz/123")
1600 public void testReplayingCompactionWithFileAlreadyDeleted() throws IOException
{
1601 // tests replaying compaction marker, but the compaction output file has already been compacted
1602 // from primary and also deleted from the archive directory
1603 secondaryRegion
.replayWALCompactionMarker(CompactionDescriptor
.newBuilder()
1604 .setTableName(UnsafeByteOperations
.unsafeWrap(
1605 primaryRegion
.getTableDescriptor().getTableName().getName()))
1606 .setEncodedRegionName(
1607 UnsafeByteOperations
.unsafeWrap(primaryRegion
.getRegionInfo().getEncodedNameAsBytes()))
1608 .setFamilyName(UnsafeByteOperations
.unsafeWrap(families
[0]))
1609 .addCompactionInput("/123")
1610 .addCompactionOutput("/456")
1611 .setStoreHomeDir("/store_home_dir")
1612 .setRegionName(UnsafeByteOperations
.unsafeWrap(primaryRegion
.getRegionInfo().getRegionName()))
1614 , true, true, Long
.MAX_VALUE
);
1618 public void testReplayingRegionOpenEventWithFileAlreadyDeleted() throws IOException
{
1619 // tests replaying region open event marker, but the region files have already been compacted
1620 // from primary and also deleted from the archive directory
1621 secondaryRegion
.replayWALRegionEventMarker(RegionEventDescriptor
.newBuilder()
1622 .setTableName(UnsafeByteOperations
.unsafeWrap(
1623 primaryRegion
.getTableDescriptor().getTableName().getName()))
1624 .setEncodedRegionName(
1625 UnsafeByteOperations
.unsafeWrap(primaryRegion
.getRegionInfo().getEncodedNameAsBytes()))
1626 .setRegionName(UnsafeByteOperations
.unsafeWrap(primaryRegion
.getRegionInfo().getRegionName()))
1627 .setEventType(EventType
.REGION_OPEN
)
1628 .setServer(ProtobufUtil
.toServerName(ServerName
.valueOf("foo", 1, 1)))
1629 .setLogSequenceNumber(Long
.MAX_VALUE
)
1630 .addStores(StoreDescriptor
.newBuilder()
1631 .setFamilyName(UnsafeByteOperations
.unsafeWrap(families
[0]))
1632 .setStoreHomeDir("/store_home_dir")
1633 .addStoreFile("/123")
1639 public void testReplayingBulkLoadEventWithFileAlreadyDeleted() throws IOException
{
1640 // tests replaying bulk load event marker, but the bulk load files have already been compacted
1641 // from primary and also deleted from the archive directory
1642 secondaryRegion
.replayWALBulkLoadEventMarker(BulkLoadDescriptor
.newBuilder()
1643 .setTableName(ProtobufUtil
.toProtoTableName(primaryRegion
.getTableDescriptor().getTableName()))
1644 .setEncodedRegionName(
1645 UnsafeByteOperations
.unsafeWrap(primaryRegion
.getRegionInfo().getEncodedNameAsBytes()))
1646 .setBulkloadSeqNum(Long
.MAX_VALUE
)
1647 .addStores(StoreDescriptor
.newBuilder()
1648 .setFamilyName(UnsafeByteOperations
.unsafeWrap(families
[0]))
1649 .setStoreHomeDir("/store_home_dir")
1650 .addStoreFile("/123")
1655 private String
createHFileForFamilies(Path testPath
, byte[] family
,
1656 byte[] valueBytes
) throws IOException
{
1657 HFile
.WriterFactory hFileFactory
= HFile
.getWriterFactoryNoCache(TEST_UTIL
.getConfiguration());
1658 // TODO We need a way to do this without creating files
1659 Path testFile
= new Path(testPath
, TEST_UTIL
.getRandomUUID().toString());
1660 FSDataOutputStream out
= TEST_UTIL
.getTestFileSystem().create(testFile
);
1662 hFileFactory
.withOutputStream(out
);
1663 hFileFactory
.withFileContext(new HFileContextBuilder().build());
1664 HFile
.Writer writer
= hFileFactory
.create();
1666 writer
.append(new KeyValue(ExtendedCellBuilderFactory
.create(CellBuilderType
.DEEP_COPY
)
1669 .setQualifier(valueBytes
)
1671 .setType(KeyValue
.Type
.Put
.getCode())
1672 .setValue(valueBytes
)
1680 return testFile
.toString();
1683 /** Puts a total of numRows + numRowsAfterFlush records indexed with numeric row keys. Does
1684 * a flush every flushInterval number of records. Then it puts numRowsAfterFlush number of
1685 * more rows but does not execute flush after
1686 * @throws IOException */
1687 private void putDataWithFlushes(HRegion region
, int flushInterval
,
1688 int numRows
, int numRowsAfterFlush
) throws IOException
{
1690 for (; start
< numRows
; start
+= flushInterval
) {
1691 LOG
.info("-- Writing some data to primary from " + start
+ " to " + (start
+flushInterval
));
1692 putData(region
, Durability
.SYNC_WAL
, start
, flushInterval
, cq
, families
);
1693 LOG
.info("-- Flushing primary, creating 3 files for 3 stores");
1696 LOG
.info("-- Writing some more data to primary, not flushing");
1697 putData(region
, Durability
.SYNC_WAL
, start
, numRowsAfterFlush
, cq
, families
);
1700 private void putDataByReplay(HRegion region
,
1701 int startRow
, int numRows
, byte[] qf
, byte[]... families
) throws IOException
{
1702 for (int i
= startRow
; i
< startRow
+ numRows
; i
++) {
1703 Put put
= new Put(Bytes
.toBytes("" + i
));
1704 put
.setDurability(Durability
.SKIP_WAL
);
1705 for (byte[] family
: families
) {
1706 put
.addColumn(family
, qf
, EnvironmentEdgeManager
.currentTime(), null);
1708 replay(region
, put
, i
+1);
1712 private static HRegion
initHRegion(byte[] tableName
, byte[]... families
) throws IOException
{
1713 return TEST_UTIL
.createLocalHRegion(TableName
.valueOf(tableName
), HConstants
.EMPTY_START_ROW
,
1714 HConstants
.EMPTY_END_ROW
, CONF
, false, Durability
.SYNC_WAL
, null, families
);