2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.regionserver
;
20 import static org
.apache
.hadoop
.hbase
.regionserver
.TestHRegion
.assertGet
;
21 import static org
.apache
.hadoop
.hbase
.regionserver
.TestHRegion
.putData
;
22 import static org
.apache
.hadoop
.hbase
.regionserver
.TestHRegion
.verifyData
;
23 import static org
.junit
.Assert
.assertEquals
;
24 import static org
.junit
.Assert
.assertFalse
;
25 import static org
.junit
.Assert
.assertNotNull
;
26 import static org
.junit
.Assert
.assertNull
;
27 import static org
.junit
.Assert
.assertTrue
;
28 import static org
.junit
.Assert
.fail
;
29 import static org
.mockito
.ArgumentMatchers
.any
;
30 import static org
.mockito
.Mockito
.mock
;
31 import static org
.mockito
.Mockito
.spy
;
32 import static org
.mockito
.Mockito
.times
;
33 import static org
.mockito
.Mockito
.verify
;
34 import static org
.mockito
.Mockito
.when
;
36 import java
.io
.FileNotFoundException
;
37 import java
.io
.IOException
;
38 import java
.util
.ArrayList
;
39 import java
.util
.List
;
41 import java
.util
.Objects
;
42 import java
.util
.Random
;
43 import org
.apache
.hadoop
.conf
.Configuration
;
44 import org
.apache
.hadoop
.fs
.FSDataOutputStream
;
45 import org
.apache
.hadoop
.fs
.Path
;
46 import org
.apache
.hadoop
.hbase
.Cell
;
47 import org
.apache
.hadoop
.hbase
.CellBuilderType
;
48 import org
.apache
.hadoop
.hbase
.CellUtil
;
49 import org
.apache
.hadoop
.hbase
.ExtendedCellBuilderFactory
;
50 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
51 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
52 import org
.apache
.hadoop
.hbase
.HConstants
;
53 import org
.apache
.hadoop
.hbase
.KeyValue
;
54 import org
.apache
.hadoop
.hbase
.ServerName
;
55 import org
.apache
.hadoop
.hbase
.TableName
;
56 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptorBuilder
;
57 import org
.apache
.hadoop
.hbase
.client
.Durability
;
58 import org
.apache
.hadoop
.hbase
.client
.Get
;
59 import org
.apache
.hadoop
.hbase
.client
.Put
;
60 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
61 import org
.apache
.hadoop
.hbase
.client
.RegionInfoBuilder
;
62 import org
.apache
.hadoop
.hbase
.client
.TableDescriptor
;
63 import org
.apache
.hadoop
.hbase
.client
.TableDescriptorBuilder
;
64 import org
.apache
.hadoop
.hbase
.executor
.ExecutorService
;
65 import org
.apache
.hadoop
.hbase
.io
.hfile
.HFile
;
66 import org
.apache
.hadoop
.hbase
.io
.hfile
.HFileContextBuilder
;
67 import org
.apache
.hadoop
.hbase
.regionserver
.HRegion
.FlushResultImpl
;
68 import org
.apache
.hadoop
.hbase
.regionserver
.HRegion
.PrepareFlushResult
;
69 import org
.apache
.hadoop
.hbase
.regionserver
.throttle
.NoLimitThroughputController
;
70 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
71 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
72 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
73 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManagerTestHelper
;
74 import org
.apache
.hadoop
.hbase
.util
.FSUtils
;
75 import org
.apache
.hadoop
.hbase
.util
.Pair
;
76 import org
.apache
.hadoop
.hbase
.wal
.AbstractFSWALProvider
;
77 import org
.apache
.hadoop
.hbase
.wal
.WAL
;
78 import org
.apache
.hadoop
.hbase
.wal
.WALEdit
;
79 import org
.apache
.hadoop
.hbase
.wal
.WALFactory
;
80 import org
.apache
.hadoop
.hbase
.wal
.WALKeyImpl
;
81 import org
.apache
.hadoop
.hbase
.wal
.WALSplitUtil
.MutationReplay
;
82 import org
.apache
.hadoop
.util
.StringUtils
;
83 import org
.junit
.After
;
84 import org
.junit
.AfterClass
;
85 import org
.junit
.Before
;
86 import org
.junit
.BeforeClass
;
87 import org
.junit
.ClassRule
;
88 import org
.junit
.Rule
;
89 import org
.junit
.Test
;
90 import org
.junit
.experimental
.categories
.Category
;
91 import org
.junit
.rules
.TestName
;
92 import org
.mockito
.Mockito
;
93 import org
.slf4j
.Logger
;
94 import org
.slf4j
.LoggerFactory
;
96 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.Lists
;
97 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.UnsafeByteOperations
;
99 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ProtobufUtil
;
100 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.MutationProto
.MutationType
;
101 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.WALProtos
.BulkLoadDescriptor
;
102 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.WALProtos
.CompactionDescriptor
;
103 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.WALProtos
.FlushDescriptor
;
104 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.WALProtos
.FlushDescriptor
.FlushAction
;
105 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.WALProtos
.FlushDescriptor
.StoreFlushDescriptor
;
106 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.WALProtos
.RegionEventDescriptor
;
107 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.WALProtos
.RegionEventDescriptor
.EventType
;
108 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.WALProtos
.StoreDescriptor
;
111 * Tests of HRegion methods for replaying flush, compaction, region open, etc events for secondary
114 @Category(LargeTests
.class)
115 public class TestHRegionReplayEvents
{
118 public static final HBaseClassTestRule CLASS_RULE
=
119 HBaseClassTestRule
.forClass(TestHRegionReplayEvents
.class);
121 private static final Logger LOG
= LoggerFactory
.getLogger(TestHRegion
.class);
122 @Rule public TestName name
= new TestName();
124 private static HBaseTestingUtility TEST_UTIL
;
126 public static Configuration CONF
;
129 private byte[][] families
= new byte[][] {
130 Bytes
.toBytes("cf1"), Bytes
.toBytes("cf2"), Bytes
.toBytes("cf3")};
133 protected byte[] tableName
;
134 protected String method
;
135 protected final byte[] row
= Bytes
.toBytes("rowA");
136 protected final byte[] row2
= Bytes
.toBytes("rowB");
137 protected byte[] cq
= Bytes
.toBytes("cq");
140 private Path rootDir
;
141 private TableDescriptor htd
;
142 private RegionServerServices rss
;
143 private RegionInfo primaryHri
, secondaryHri
;
144 private HRegion primaryRegion
, secondaryRegion
;
145 private WAL walPrimary
, walSecondary
;
146 private WAL
.Reader reader
;
149 public static void setUpBeforeClass() throws Exception
{
150 TEST_UTIL
= new HBaseTestingUtility();
151 TEST_UTIL
.startMiniDFSCluster(1);
155 public static void tearDownAfterClass() throws Exception
{
156 LOG
.info("Cleaning test directory: " + TEST_UTIL
.getDataTestDir());
157 TEST_UTIL
.cleanupTestDir();
158 TEST_UTIL
.shutdownMiniDFSCluster();
162 public void setUp() throws Exception
{
163 CONF
= TEST_UTIL
.getConfiguration();
164 dir
= TEST_UTIL
.getDataTestDir("TestHRegionReplayEvents").toString();
165 method
= name
.getMethodName();
166 tableName
= Bytes
.toBytes(name
.getMethodName());
167 rootDir
= new Path(dir
+ method
);
168 TEST_UTIL
.getConfiguration().set(HConstants
.HBASE_DIR
, rootDir
.toString());
169 method
= name
.getMethodName();
170 TableDescriptorBuilder builder
= TableDescriptorBuilder
.newBuilder(TableName
.valueOf(method
));
171 for (byte[] family
: families
) {
172 builder
.setColumnFamily(ColumnFamilyDescriptorBuilder
.of(family
));
174 htd
= builder
.build();
176 long time
= System
.currentTimeMillis();
177 ChunkCreator
.initialize(MemStoreLABImpl
.CHUNK_SIZE_DEFAULT
, false, 0, 0, 0, null);
179 RegionInfoBuilder
.newBuilder(htd
.getTableName()).setRegionId(time
).setReplicaId(0).build();
181 RegionInfoBuilder
.newBuilder(htd
.getTableName()).setRegionId(time
).setReplicaId(1).build();
183 WALFactory wals
= TestHRegion
.createWALFactory(CONF
, rootDir
);
184 walPrimary
= wals
.getWAL(primaryHri
);
185 walSecondary
= wals
.getWAL(secondaryHri
);
187 rss
= mock(RegionServerServices
.class);
188 when(rss
.getServerName()).thenReturn(ServerName
.valueOf("foo", 1, 1));
189 when(rss
.getConfiguration()).thenReturn(CONF
);
190 when(rss
.getRegionServerAccounting()).thenReturn(new RegionServerAccounting(CONF
));
191 String string
= org
.apache
.hadoop
.hbase
.executor
.EventType
.RS_COMPACTED_FILES_DISCHARGER
193 ExecutorService es
= new ExecutorService(string
);
194 es
.startExecutorService(
195 string
+"-"+string
, 1);
196 when(rss
.getExecutorService()).thenReturn(es
);
197 primaryRegion
= HRegion
.createHRegion(primaryHri
, rootDir
, CONF
, htd
, walPrimary
);
198 primaryRegion
.close();
199 List
<HRegion
> regions
= new ArrayList
<>();
200 regions
.add(primaryRegion
);
201 Mockito
.doReturn(regions
).when(rss
).getRegions();
203 primaryRegion
= HRegion
.openHRegion(rootDir
, primaryHri
, htd
, walPrimary
, CONF
, rss
, null);
204 secondaryRegion
= HRegion
.openHRegion(secondaryHri
, htd
, null, CONF
, rss
, null);
210 public void tearDown() throws Exception
{
211 if (reader
!= null) {
215 if (primaryRegion
!= null) {
216 HBaseTestingUtility
.closeRegionAndWAL(primaryRegion
);
218 if (secondaryRegion
!= null) {
219 HBaseTestingUtility
.closeRegionAndWAL(secondaryRegion
);
222 EnvironmentEdgeManagerTestHelper
.reset();
226 return name
.getMethodName();
229 // Some of the test cases are as follows:
230 // 1. replay flush start marker again
231 // 2. replay flush with smaller seqId than what is there in memstore snapshot
232 // 3. replay flush with larger seqId than what is there in memstore snapshot
233 // 4. replay flush commit without flush prepare. non droppable memstore
234 // 5. replay flush commit without flush prepare. droppable memstore
235 // 6. replay open region event
236 // 7. replay open region event after flush start
237 // 8. replay flush form an earlier seqId (test ignoring seqIds)
238 // 9. start flush does not prevent region from closing.
241 public void testRegionReplicaSecondaryCannotFlush() throws IOException
{
242 // load some data and flush ensure that the secondary replica will not execute the flush
244 // load some data to secondary by replaying
245 putDataByReplay(secondaryRegion
, 0, 1000, cq
, families
);
247 verifyData(secondaryRegion
, 0, 1000, cq
, families
);
250 FlushResultImpl flush
= (FlushResultImpl
)secondaryRegion
.flush(true);
251 assertEquals(FlushResultImpl
.Result
.CANNOT_FLUSH
, flush
.result
);
253 verifyData(secondaryRegion
, 0, 1000, cq
, families
);
255 // close the region, and inspect that it has not flushed
256 Map
<byte[], List
<HStoreFile
>> files
= secondaryRegion
.close(false);
257 // assert that there are no files (due to flush)
258 for (List
<HStoreFile
> f
: files
.values()) {
259 assertTrue(f
.isEmpty());
264 * Tests a case where we replay only a flush start marker, then the region is closed. This region
265 * should not block indefinitely
268 public void testOnlyReplayingFlushStartDoesNotHoldUpRegionClose() throws IOException
{
269 // load some data to primary and flush
271 LOG
.info("-- Writing some data to primary from " + start
+ " to " + (start
+100));
272 putData(primaryRegion
, Durability
.SYNC_WAL
, start
, 100, cq
, families
);
273 LOG
.info("-- Flushing primary, creating 3 files for 3 stores");
274 primaryRegion
.flush(true);
276 // now replay the edits and the flush marker
277 reader
= createWALReaderForPrimary();
279 LOG
.info("-- Replaying edits and flush events in secondary");
281 WAL
.Entry entry
= reader
.next();
285 FlushDescriptor flushDesc
286 = WALEdit
.getFlushDescriptor(entry
.getEdit().getCells().get(0));
287 if (flushDesc
!= null) {
288 if (flushDesc
.getAction() == FlushAction
.START_FLUSH
) {
289 LOG
.info("-- Replaying flush start in secondary");
290 secondaryRegion
.replayWALFlushStartMarker(flushDesc
);
291 } else if (flushDesc
.getAction() == FlushAction
.COMMIT_FLUSH
) {
292 LOG
.info("-- NOT Replaying flush commit in secondary");
295 replayEdit(secondaryRegion
, entry
);
299 assertTrue(rss
.getRegionServerAccounting().getGlobalMemStoreDataSize() > 0);
300 // now close the region which should not cause hold because of un-committed flush
301 secondaryRegion
.close();
303 // verify that the memstore size is back to what it was
304 assertEquals(0, rss
.getRegionServerAccounting().getGlobalMemStoreDataSize());
307 static int replayEdit(HRegion region
, WAL
.Entry entry
) throws IOException
{
308 if (WALEdit
.isMetaEditFamily(entry
.getEdit().getCells().get(0))) {
309 return 0; // handled elsewhere
311 Put put
= new Put(CellUtil
.cloneRow(entry
.getEdit().getCells().get(0)));
312 for (Cell cell
: entry
.getEdit().getCells()) put
.add(cell
);
313 put
.setDurability(Durability
.SKIP_WAL
);
314 MutationReplay mutation
= new MutationReplay(MutationType
.PUT
, put
, 0, 0);
315 region
.batchReplay(new MutationReplay
[] {mutation
},
316 entry
.getKey().getSequenceId());
317 return Integer
.parseInt(Bytes
.toString(put
.getRow()));
320 WAL
.Reader
createWALReaderForPrimary() throws FileNotFoundException
, IOException
{
321 return WALFactory
.createReader(TEST_UTIL
.getTestFileSystem(),
322 AbstractFSWALProvider
.getCurrentFileName(walPrimary
),
323 TEST_UTIL
.getConfiguration());
327 public void testBatchReplayWithMultipleNonces() throws IOException
{
329 MutationReplay
[] mutations
= new MutationReplay
[100];
330 for (int i
= 0; i
< 100; i
++) {
331 Put put
= new Put(Bytes
.toBytes(i
));
332 put
.setDurability(Durability
.SYNC_WAL
);
333 for (byte[] familly
: this.families
) {
334 put
.addColumn(familly
, this.cq
, null);
335 long nonceNum
= i
/ 10;
336 mutations
[i
] = new MutationReplay(MutationType
.PUT
, put
, nonceNum
, nonceNum
);
339 primaryRegion
.batchReplay(mutations
, 20);
340 } catch (Exception e
) {
341 String msg
= "Error while replay of batch with multiple nonces. ";
343 fail(msg
+ e
.getMessage());
348 public void testReplayFlushesAndCompactions() throws IOException
{
349 // initiate a secondary region with some data.
351 // load some data to primary and flush. 3 flushes and some more unflushed data
352 putDataWithFlushes(primaryRegion
, 100, 300, 100);
354 // compaction from primary
355 LOG
.info("-- Compacting primary, only 1 store");
356 primaryRegion
.compactStore(Bytes
.toBytes("cf1"),
357 NoLimitThroughputController
.INSTANCE
);
359 // now replay the edits and the flush marker
360 reader
= createWALReaderForPrimary();
362 LOG
.info("-- Replaying edits and flush events in secondary");
363 int lastReplayed
= 0;
364 int expectedStoreFileCount
= 0;
366 WAL
.Entry entry
= reader
.next();
370 FlushDescriptor flushDesc
371 = WALEdit
.getFlushDescriptor(entry
.getEdit().getCells().get(0));
372 CompactionDescriptor compactionDesc
373 = WALEdit
.getCompaction(entry
.getEdit().getCells().get(0));
374 if (flushDesc
!= null) {
375 // first verify that everything is replayed and visible before flush event replay
376 verifyData(secondaryRegion
, 0, lastReplayed
, cq
, families
);
377 HStore store
= secondaryRegion
.getStore(Bytes
.toBytes("cf1"));
378 long storeMemstoreSize
= store
.getMemStoreSize().getHeapSize();
379 long regionMemstoreSize
= secondaryRegion
.getMemStoreDataSize();
380 MemStoreSize mss
= store
.getFlushableSize();
381 long storeSize
= store
.getSize();
382 long storeSizeUncompressed
= store
.getStoreSizeUncompressed();
383 if (flushDesc
.getAction() == FlushAction
.START_FLUSH
) {
384 LOG
.info("-- Replaying flush start in secondary");
385 PrepareFlushResult result
= secondaryRegion
.replayWALFlushStartMarker(flushDesc
);
386 assertNull(result
.result
);
387 assertEquals(result
.flushOpSeqId
, flushDesc
.getFlushSequenceNumber());
389 // assert that the store memstore is smaller now
390 long newStoreMemstoreSize
= store
.getMemStoreSize().getHeapSize();
391 LOG
.info("Memstore size reduced by:"
392 + StringUtils
.humanReadableInt(newStoreMemstoreSize
- storeMemstoreSize
));
393 assertTrue(storeMemstoreSize
> newStoreMemstoreSize
);
395 } else if (flushDesc
.getAction() == FlushAction
.COMMIT_FLUSH
) {
396 LOG
.info("-- Replaying flush commit in secondary");
397 secondaryRegion
.replayWALFlushCommitMarker(flushDesc
);
399 // assert that the flush files are picked
400 expectedStoreFileCount
++;
401 for (HStore s
: secondaryRegion
.getStores()) {
402 assertEquals(expectedStoreFileCount
, s
.getStorefilesCount());
404 MemStoreSize newMss
= store
.getFlushableSize();
405 assertTrue(mss
.getHeapSize() > newMss
.getHeapSize());
407 // assert that the region memstore is smaller now
408 long newRegionMemstoreSize
= secondaryRegion
.getMemStoreDataSize();
409 assertTrue(regionMemstoreSize
> newRegionMemstoreSize
);
411 // assert that the store sizes are bigger
412 assertTrue(store
.getSize() > storeSize
);
413 assertTrue(store
.getStoreSizeUncompressed() > storeSizeUncompressed
);
414 assertEquals(store
.getSize(), store
.getStorefilesSize());
416 // after replay verify that everything is still visible
417 verifyData(secondaryRegion
, 0, lastReplayed
+1, cq
, families
);
418 } else if (compactionDesc
!= null) {
419 secondaryRegion
.replayWALCompactionMarker(compactionDesc
, true, false, Long
.MAX_VALUE
);
421 // assert that the compaction is applied
422 for (HStore store
: secondaryRegion
.getStores()) {
423 if (store
.getColumnFamilyName().equals("cf1")) {
424 assertEquals(1, store
.getStorefilesCount());
426 assertEquals(expectedStoreFileCount
, store
.getStorefilesCount());
430 lastReplayed
= replayEdit(secondaryRegion
, entry
);
434 assertEquals(400-1, lastReplayed
);
435 LOG
.info("-- Verifying edits from secondary");
436 verifyData(secondaryRegion
, 0, 400, cq
, families
);
438 LOG
.info("-- Verifying edits from primary. Ensuring that files are not deleted");
439 verifyData(primaryRegion
, 0, lastReplayed
, cq
, families
);
440 for (HStore store
: primaryRegion
.getStores()) {
441 if (store
.getColumnFamilyName().equals("cf1")) {
442 assertEquals(1, store
.getStorefilesCount());
444 assertEquals(expectedStoreFileCount
, store
.getStorefilesCount());
450 * Tests cases where we prepare a flush with some seqId and we receive other flush start markers
451 * equal to, greater or less than the previous flush start marker.
454 public void testReplayFlushStartMarkers() throws IOException
{
455 // load some data to primary and flush. 1 flush and some more unflushed data
456 putDataWithFlushes(primaryRegion
, 100, 100, 100);
459 // now replay the edits and the flush marker
460 reader
= createWALReaderForPrimary();
462 LOG
.info("-- Replaying edits and flush events in secondary");
464 FlushDescriptor startFlushDesc
= null;
466 int lastReplayed
= 0;
468 WAL
.Entry entry
= reader
.next();
472 FlushDescriptor flushDesc
473 = WALEdit
.getFlushDescriptor(entry
.getEdit().getCells().get(0));
474 if (flushDesc
!= null) {
475 // first verify that everything is replayed and visible before flush event replay
476 HStore store
= secondaryRegion
.getStore(Bytes
.toBytes("cf1"));
477 long storeMemstoreSize
= store
.getMemStoreSize().getHeapSize();
478 long regionMemstoreSize
= secondaryRegion
.getMemStoreDataSize();
479 MemStoreSize mss
= store
.getFlushableSize();
481 if (flushDesc
.getAction() == FlushAction
.START_FLUSH
) {
482 startFlushDesc
= flushDesc
;
483 LOG
.info("-- Replaying flush start in secondary");
484 PrepareFlushResult result
= secondaryRegion
.replayWALFlushStartMarker(startFlushDesc
);
485 assertNull(result
.result
);
486 assertEquals(result
.flushOpSeqId
, startFlushDesc
.getFlushSequenceNumber());
487 assertTrue(regionMemstoreSize
> 0);
488 assertTrue(mss
.getHeapSize() > 0);
490 // assert that the store memstore is smaller now
491 long newStoreMemstoreSize
= store
.getMemStoreSize().getHeapSize();
492 LOG
.info("Memstore size reduced by:"
493 + StringUtils
.humanReadableInt(newStoreMemstoreSize
- storeMemstoreSize
));
494 assertTrue(storeMemstoreSize
> newStoreMemstoreSize
);
495 verifyData(secondaryRegion
, 0, lastReplayed
+1, cq
, families
);
498 // after replay verify that everything is still visible
499 verifyData(secondaryRegion
, 0, lastReplayed
+1, cq
, families
);
501 lastReplayed
= replayEdit(secondaryRegion
, entry
);
505 // at this point, there should be some data (rows 0-100) in memstore snapshot
506 // and some more data in memstores (rows 100-200)
508 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
510 // Test case 1: replay the same flush start marker again
511 LOG
.info("-- Replaying same flush start in secondary again");
512 PrepareFlushResult result
= secondaryRegion
.replayWALFlushStartMarker(startFlushDesc
);
513 assertNull(result
); // this should return null. Ignoring the flush start marker
514 // assert that we still have prepared flush with the previous setup.
515 assertNotNull(secondaryRegion
.getPrepareFlushResult());
516 assertEquals(secondaryRegion
.getPrepareFlushResult().flushOpSeqId
,
517 startFlushDesc
.getFlushSequenceNumber());
518 assertTrue(secondaryRegion
.getMemStoreDataSize() > 0); // memstore is not empty
519 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
521 // Test case 2: replay a flush start marker with a smaller seqId
522 FlushDescriptor startFlushDescSmallerSeqId
523 = clone(startFlushDesc
, startFlushDesc
.getFlushSequenceNumber() - 50);
524 LOG
.info("-- Replaying same flush start in secondary again " + startFlushDescSmallerSeqId
);
525 result
= secondaryRegion
.replayWALFlushStartMarker(startFlushDescSmallerSeqId
);
526 assertNull(result
); // this should return null. Ignoring the flush start marker
527 // assert that we still have prepared flush with the previous setup.
528 assertNotNull(secondaryRegion
.getPrepareFlushResult());
529 assertEquals(secondaryRegion
.getPrepareFlushResult().flushOpSeqId
,
530 startFlushDesc
.getFlushSequenceNumber());
531 assertTrue(secondaryRegion
.getMemStoreDataSize() > 0); // memstore is not empty
532 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
534 // Test case 3: replay a flush start marker with a larger seqId
535 FlushDescriptor startFlushDescLargerSeqId
536 = clone(startFlushDesc
, startFlushDesc
.getFlushSequenceNumber() + 50);
537 LOG
.info("-- Replaying same flush start in secondary again " + startFlushDescLargerSeqId
);
538 result
= secondaryRegion
.replayWALFlushStartMarker(startFlushDescLargerSeqId
);
539 assertNull(result
); // this should return null. Ignoring the flush start marker
540 // assert that we still have prepared flush with the previous setup.
541 assertNotNull(secondaryRegion
.getPrepareFlushResult());
542 assertEquals(secondaryRegion
.getPrepareFlushResult().flushOpSeqId
,
543 startFlushDesc
.getFlushSequenceNumber());
544 assertTrue(secondaryRegion
.getMemStoreDataSize() > 0); // memstore is not empty
545 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
547 LOG
.info("-- Verifying edits from secondary");
548 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
550 LOG
.info("-- Verifying edits from primary.");
551 verifyData(primaryRegion
, 0, numRows
, cq
, families
);
555 * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker
556 * less than the previous flush start marker.
559 public void testReplayFlushCommitMarkerSmallerThanFlushStartMarker() throws IOException
{
560 // load some data to primary and flush. 2 flushes and some more unflushed data
561 putDataWithFlushes(primaryRegion
, 100, 200, 100);
564 // now replay the edits and the flush marker
565 reader
= createWALReaderForPrimary();
567 LOG
.info("-- Replaying edits and flush events in secondary");
568 FlushDescriptor startFlushDesc
= null;
569 FlushDescriptor commitFlushDesc
= null;
571 int lastReplayed
= 0;
573 System
.out
.println(lastReplayed
);
574 WAL
.Entry entry
= reader
.next();
578 FlushDescriptor flushDesc
579 = WALEdit
.getFlushDescriptor(entry
.getEdit().getCells().get(0));
580 if (flushDesc
!= null) {
581 if (flushDesc
.getAction() == FlushAction
.START_FLUSH
) {
582 // don't replay the first flush start marker, hold on to it, replay the second one
583 if (startFlushDesc
== null) {
584 startFlushDesc
= flushDesc
;
586 LOG
.info("-- Replaying flush start in secondary");
587 startFlushDesc
= flushDesc
;
588 PrepareFlushResult result
= secondaryRegion
.replayWALFlushStartMarker(startFlushDesc
);
589 assertNull(result
.result
);
591 } else if (flushDesc
.getAction() == FlushAction
.COMMIT_FLUSH
) {
592 // do not replay any flush commit yet
593 if (commitFlushDesc
== null) {
594 commitFlushDesc
= flushDesc
; // hold on to the first flush commit marker
597 // after replay verify that everything is still visible
598 verifyData(secondaryRegion
, 0, lastReplayed
+1, cq
, families
);
600 lastReplayed
= replayEdit(secondaryRegion
, entry
);
604 // at this point, there should be some data (rows 0-200) in memstore snapshot
605 // and some more data in memstores (rows 200-300)
606 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
608 // no store files in the region
609 int expectedStoreFileCount
= 0;
610 for (HStore s
: secondaryRegion
.getStores()) {
611 assertEquals(expectedStoreFileCount
, s
.getStorefilesCount());
613 long regionMemstoreSize
= secondaryRegion
.getMemStoreDataSize();
615 // Test case 1: replay the a flush commit marker smaller than what we have prepared
616 LOG
.info("Testing replaying flush COMMIT " + commitFlushDesc
+ " on top of flush START"
618 assertTrue(commitFlushDesc
.getFlushSequenceNumber() < startFlushDesc
.getFlushSequenceNumber());
620 LOG
.info("-- Replaying flush commit in secondary" + commitFlushDesc
);
621 secondaryRegion
.replayWALFlushCommitMarker(commitFlushDesc
);
623 // assert that the flush files are picked
624 expectedStoreFileCount
++;
625 for (HStore s
: secondaryRegion
.getStores()) {
626 assertEquals(expectedStoreFileCount
, s
.getStorefilesCount());
628 HStore store
= secondaryRegion
.getStore(Bytes
.toBytes("cf1"));
629 MemStoreSize mss
= store
.getFlushableSize();
630 assertTrue(mss
.getHeapSize() > 0); // assert that the memstore is not dropped
632 // assert that the region memstore is same as before
633 long newRegionMemstoreSize
= secondaryRegion
.getMemStoreDataSize();
634 assertEquals(regionMemstoreSize
, newRegionMemstoreSize
);
636 assertNotNull(secondaryRegion
.getPrepareFlushResult()); // not dropped
638 LOG
.info("-- Verifying edits from secondary");
639 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
641 LOG
.info("-- Verifying edits from primary.");
642 verifyData(primaryRegion
, 0, numRows
, cq
, families
);
646 * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker
647 * larger than the previous flush start marker.
650 public void testReplayFlushCommitMarkerLargerThanFlushStartMarker() throws IOException
{
651 // load some data to primary and flush. 1 flush and some more unflushed data
652 putDataWithFlushes(primaryRegion
, 100, 100, 100);
655 // now replay the edits and the flush marker
656 reader
= createWALReaderForPrimary();
658 LOG
.info("-- Replaying edits and flush events in secondary");
659 FlushDescriptor startFlushDesc
= null;
660 FlushDescriptor commitFlushDesc
= null;
662 int lastReplayed
= 0;
664 WAL
.Entry entry
= reader
.next();
668 FlushDescriptor flushDesc
669 = WALEdit
.getFlushDescriptor(entry
.getEdit().getCells().get(0));
670 if (flushDesc
!= null) {
671 if (flushDesc
.getAction() == FlushAction
.START_FLUSH
) {
672 if (startFlushDesc
== null) {
673 LOG
.info("-- Replaying flush start in secondary");
674 startFlushDesc
= flushDesc
;
675 PrepareFlushResult result
= secondaryRegion
.replayWALFlushStartMarker(startFlushDesc
);
676 assertNull(result
.result
);
678 } else if (flushDesc
.getAction() == FlushAction
.COMMIT_FLUSH
) {
679 // do not replay any flush commit yet
680 // hold on to the flush commit marker but simulate a larger
681 // flush commit seqId
683 FlushDescriptor
.newBuilder(flushDesc
)
684 .setFlushSequenceNumber(flushDesc
.getFlushSequenceNumber() + 50)
687 // after replay verify that everything is still visible
688 verifyData(secondaryRegion
, 0, lastReplayed
+1, cq
, families
);
690 lastReplayed
= replayEdit(secondaryRegion
, entry
);
694 // at this point, there should be some data (rows 0-100) in memstore snapshot
695 // and some more data in memstores (rows 100-200)
696 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
698 // no store files in the region
699 int expectedStoreFileCount
= 0;
700 for (HStore s
: secondaryRegion
.getStores()) {
701 assertEquals(expectedStoreFileCount
, s
.getStorefilesCount());
703 long regionMemstoreSize
= secondaryRegion
.getMemStoreDataSize();
705 // Test case 1: replay the a flush commit marker larger than what we have prepared
706 LOG
.info("Testing replaying flush COMMIT " + commitFlushDesc
+ " on top of flush START"
708 assertTrue(commitFlushDesc
.getFlushSequenceNumber() > startFlushDesc
.getFlushSequenceNumber());
710 LOG
.info("-- Replaying flush commit in secondary" + commitFlushDesc
);
711 secondaryRegion
.replayWALFlushCommitMarker(commitFlushDesc
);
713 // assert that the flush files are picked
714 expectedStoreFileCount
++;
715 for (HStore s
: secondaryRegion
.getStores()) {
716 assertEquals(expectedStoreFileCount
, s
.getStorefilesCount());
718 HStore store
= secondaryRegion
.getStore(Bytes
.toBytes("cf1"));
719 MemStoreSize mss
= store
.getFlushableSize();
720 assertTrue(mss
.getHeapSize() > 0); // assert that the memstore is not dropped
722 // assert that the region memstore is smaller than before, but not empty
723 long newRegionMemstoreSize
= secondaryRegion
.getMemStoreDataSize();
724 assertTrue(newRegionMemstoreSize
> 0);
725 assertTrue(regionMemstoreSize
> newRegionMemstoreSize
);
727 assertNull(secondaryRegion
.getPrepareFlushResult()); // prepare snapshot should be dropped
729 LOG
.info("-- Verifying edits from secondary");
730 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
732 LOG
.info("-- Verifying edits from primary.");
733 verifyData(primaryRegion
, 0, numRows
, cq
, families
);
737 * Tests the case where we receive a flush commit before receiving any flush prepare markers.
738 * The memstore edits should be dropped after the flush commit replay since they should be in
742 public void testReplayFlushCommitMarkerWithoutFlushStartMarkerDroppableMemstore()
744 testReplayFlushCommitMarkerWithoutFlushStartMarker(true);
748 * Tests the case where we receive a flush commit before receiving any flush prepare markers.
749 * The memstore edits should be not dropped after the flush commit replay since not every edit
750 * will be in flushed files (based on seqId)
753 public void testReplayFlushCommitMarkerWithoutFlushStartMarkerNonDroppableMemstore()
755 testReplayFlushCommitMarkerWithoutFlushStartMarker(false);
759 * Tests the case where we receive a flush commit before receiving any flush prepare markers
761 public void testReplayFlushCommitMarkerWithoutFlushStartMarker(boolean droppableMemstore
)
763 // load some data to primary and flush. 1 flushes and some more unflushed data.
764 // write more data after flush depending on whether droppableSnapshot
765 putDataWithFlushes(primaryRegion
, 100, 100, droppableMemstore ?
0 : 100);
766 int numRows
= droppableMemstore ?
100 : 200;
768 // now replay the edits and the flush marker
769 reader
= createWALReaderForPrimary();
771 LOG
.info("-- Replaying edits and flush events in secondary");
772 FlushDescriptor commitFlushDesc
= null;
774 int lastReplayed
= 0;
776 WAL
.Entry entry
= reader
.next();
780 FlushDescriptor flushDesc
781 = WALEdit
.getFlushDescriptor(entry
.getEdit().getCells().get(0));
782 if (flushDesc
!= null) {
783 if (flushDesc
.getAction() == FlushAction
.START_FLUSH
) {
784 // do not replay flush start marker
785 } else if (flushDesc
.getAction() == FlushAction
.COMMIT_FLUSH
) {
786 commitFlushDesc
= flushDesc
; // hold on to the flush commit marker
788 // after replay verify that everything is still visible
789 verifyData(secondaryRegion
, 0, lastReplayed
+1, cq
, families
);
791 lastReplayed
= replayEdit(secondaryRegion
, entry
);
795 // at this point, there should be some data (rows 0-200) in the memstore without snapshot
796 // and some more data in memstores (rows 100-300)
797 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
799 // no store files in the region
800 int expectedStoreFileCount
= 0;
801 for (HStore s
: secondaryRegion
.getStores()) {
802 assertEquals(expectedStoreFileCount
, s
.getStorefilesCount());
804 long regionMemstoreSize
= secondaryRegion
.getMemStoreDataSize();
806 // Test case 1: replay a flush commit marker without start flush marker
807 assertNull(secondaryRegion
.getPrepareFlushResult());
808 assertTrue(commitFlushDesc
.getFlushSequenceNumber() > 0);
810 // ensure all files are visible in secondary
811 for (HStore store
: secondaryRegion
.getStores()) {
812 assertTrue(store
.getMaxSequenceId().orElse(0L) <= secondaryRegion
.getReadPoint(null));
815 LOG
.info("-- Replaying flush commit in secondary" + commitFlushDesc
);
816 secondaryRegion
.replayWALFlushCommitMarker(commitFlushDesc
);
818 // assert that the flush files are picked
819 expectedStoreFileCount
++;
820 for (HStore s
: secondaryRegion
.getStores()) {
821 assertEquals(expectedStoreFileCount
, s
.getStorefilesCount());
823 HStore store
= secondaryRegion
.getStore(Bytes
.toBytes("cf1"));
824 MemStoreSize mss
= store
.getFlushableSize();
825 if (droppableMemstore
) {
826 // assert that the memstore is dropped
827 assertTrue(mss
.getHeapSize() == MutableSegment
.DEEP_OVERHEAD
);
829 assertTrue(mss
.getHeapSize() > 0); // assert that the memstore is not dropped
832 // assert that the region memstore is same as before (we could not drop)
833 long newRegionMemstoreSize
= secondaryRegion
.getMemStoreDataSize();
834 if (droppableMemstore
) {
835 assertTrue(0 == newRegionMemstoreSize
);
837 assertTrue(regionMemstoreSize
== newRegionMemstoreSize
);
840 LOG
.info("-- Verifying edits from secondary");
841 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
843 LOG
.info("-- Verifying edits from primary.");
844 verifyData(primaryRegion
, 0, numRows
, cq
, families
);
847 private FlushDescriptor
clone(FlushDescriptor flush
, long flushSeqId
) {
848 return FlushDescriptor
.newBuilder(flush
)
849 .setFlushSequenceNumber(flushSeqId
)
854 * Tests replaying region open markers from primary region. Checks whether the files are picked up
857 public void testReplayRegionOpenEvent() throws IOException
{
858 putDataWithFlushes(primaryRegion
, 100, 0, 100); // no flush
861 // close the region and open again.
862 primaryRegion
.close();
863 primaryRegion
= HRegion
.openHRegion(rootDir
, primaryHri
, htd
, walPrimary
, CONF
, rss
, null);
865 // now replay the edits and the flush marker
866 reader
= createWALReaderForPrimary();
867 List
<RegionEventDescriptor
> regionEvents
= Lists
.newArrayList();
869 LOG
.info("-- Replaying edits and region events in secondary");
871 WAL
.Entry entry
= reader
.next();
875 FlushDescriptor flushDesc
876 = WALEdit
.getFlushDescriptor(entry
.getEdit().getCells().get(0));
877 RegionEventDescriptor regionEventDesc
878 = WALEdit
.getRegionEventDescriptor(entry
.getEdit().getCells().get(0));
880 if (flushDesc
!= null) {
881 // don't replay flush events
882 } else if (regionEventDesc
!= null) {
883 regionEvents
.add(regionEventDesc
);
885 // don't replay edits
889 // we should have 1 open, 1 close and 1 open event
890 assertEquals(3, regionEvents
.size());
892 // replay the first region open event.
893 secondaryRegion
.replayWALRegionEventMarker(regionEvents
.get(0));
895 // replay the close event as well
896 secondaryRegion
.replayWALRegionEventMarker(regionEvents
.get(1));
898 // no store files in the region
899 int expectedStoreFileCount
= 0;
900 for (HStore s
: secondaryRegion
.getStores()) {
901 assertEquals(expectedStoreFileCount
, s
.getStorefilesCount());
903 long regionMemstoreSize
= secondaryRegion
.getMemStoreDataSize();
904 assertTrue(regionMemstoreSize
== 0);
906 // now replay the region open event that should contain new file locations
907 LOG
.info("Testing replaying region open event " + regionEvents
.get(2));
908 secondaryRegion
.replayWALRegionEventMarker(regionEvents
.get(2));
910 // assert that the flush files are picked
911 expectedStoreFileCount
++;
912 for (HStore s
: secondaryRegion
.getStores()) {
913 assertEquals(expectedStoreFileCount
, s
.getStorefilesCount());
915 HStore store
= secondaryRegion
.getStore(Bytes
.toBytes("cf1"));
916 MemStoreSize mss
= store
.getFlushableSize();
917 assertTrue(mss
.getHeapSize() == MutableSegment
.DEEP_OVERHEAD
);
919 // assert that the region memstore is empty
920 long newRegionMemstoreSize
= secondaryRegion
.getMemStoreDataSize();
921 assertTrue(newRegionMemstoreSize
== 0);
923 assertNull(secondaryRegion
.getPrepareFlushResult()); //prepare snapshot should be dropped if any
925 LOG
.info("-- Verifying edits from secondary");
926 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
928 LOG
.info("-- Verifying edits from primary.");
929 verifyData(primaryRegion
, 0, numRows
, cq
, families
);
933 * Tests the case where we replay a region open event after a flush start but before receiving
937 public void testReplayRegionOpenEventAfterFlushStart() throws IOException
{
938 putDataWithFlushes(primaryRegion
, 100, 100, 100);
941 // close the region and open again.
942 primaryRegion
.close();
943 primaryRegion
= HRegion
.openHRegion(rootDir
, primaryHri
, htd
, walPrimary
, CONF
, rss
, null);
945 // now replay the edits and the flush marker
946 reader
= createWALReaderForPrimary();
947 List
<RegionEventDescriptor
> regionEvents
= Lists
.newArrayList();
949 LOG
.info("-- Replaying edits and region events in secondary");
951 WAL
.Entry entry
= reader
.next();
955 FlushDescriptor flushDesc
956 = WALEdit
.getFlushDescriptor(entry
.getEdit().getCells().get(0));
957 RegionEventDescriptor regionEventDesc
958 = WALEdit
.getRegionEventDescriptor(entry
.getEdit().getCells().get(0));
960 if (flushDesc
!= null) {
961 // only replay flush start
962 if (flushDesc
.getAction() == FlushAction
.START_FLUSH
) {
963 secondaryRegion
.replayWALFlushStartMarker(flushDesc
);
965 } else if (regionEventDesc
!= null) {
966 regionEvents
.add(regionEventDesc
);
968 replayEdit(secondaryRegion
, entry
);
972 // at this point, there should be some data (rows 0-100) in the memstore snapshot
973 // and some more data in memstores (rows 100-200)
974 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
976 // we should have 1 open, 1 close and 1 open event
977 assertEquals(3, regionEvents
.size());
979 // no store files in the region
980 int expectedStoreFileCount
= 0;
981 for (HStore s
: secondaryRegion
.getStores()) {
982 assertEquals(expectedStoreFileCount
, s
.getStorefilesCount());
985 // now replay the region open event that should contain new file locations
986 LOG
.info("Testing replaying region open event " + regionEvents
.get(2));
987 secondaryRegion
.replayWALRegionEventMarker(regionEvents
.get(2));
989 // assert that the flush files are picked
990 expectedStoreFileCount
= 2; // two flushes happened
991 for (HStore s
: secondaryRegion
.getStores()) {
992 assertEquals(expectedStoreFileCount
, s
.getStorefilesCount());
994 HStore store
= secondaryRegion
.getStore(Bytes
.toBytes("cf1"));
995 MemStoreSize newSnapshotSize
= store
.getSnapshotSize();
996 assertTrue(newSnapshotSize
.getDataSize() == 0);
998 // assert that the region memstore is empty
999 long newRegionMemstoreSize
= secondaryRegion
.getMemStoreDataSize();
1000 assertTrue(newRegionMemstoreSize
== 0);
1002 assertNull(secondaryRegion
.getPrepareFlushResult()); //prepare snapshot should be dropped if any
1004 LOG
.info("-- Verifying edits from secondary");
1005 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
1007 LOG
.info("-- Verifying edits from primary.");
1008 verifyData(primaryRegion
, 0, numRows
, cq
, families
);
1012 * Tests whether edits coming in for replay are skipped which have smaller seq id than the seqId
1013 * of the last replayed region open event.
1016 public void testSkippingEditsWithSmallerSeqIdAfterRegionOpenEvent() throws IOException
{
1017 putDataWithFlushes(primaryRegion
, 100, 100, 0);
1020 // close the region and open again.
1021 primaryRegion
.close();
1022 primaryRegion
= HRegion
.openHRegion(rootDir
, primaryHri
, htd
, walPrimary
, CONF
, rss
, null);
1024 // now replay the edits and the flush marker
1025 reader
= createWALReaderForPrimary();
1026 List
<RegionEventDescriptor
> regionEvents
= Lists
.newArrayList();
1027 List
<WAL
.Entry
> edits
= Lists
.newArrayList();
1029 LOG
.info("-- Replaying edits and region events in secondary");
1031 WAL
.Entry entry
= reader
.next();
1032 if (entry
== null) {
1035 FlushDescriptor flushDesc
1036 = WALEdit
.getFlushDescriptor(entry
.getEdit().getCells().get(0));
1037 RegionEventDescriptor regionEventDesc
1038 = WALEdit
.getRegionEventDescriptor(entry
.getEdit().getCells().get(0));
1040 if (flushDesc
!= null) {
1041 // don't replay flushes
1042 } else if (regionEventDesc
!= null) {
1043 regionEvents
.add(regionEventDesc
);
1049 // replay the region open of first open, but with the seqid of the second open
1050 // this way non of the flush files will be picked up.
1051 secondaryRegion
.replayWALRegionEventMarker(
1052 RegionEventDescriptor
.newBuilder(regionEvents
.get(0)).setLogSequenceNumber(
1053 regionEvents
.get(2).getLogSequenceNumber()).build());
1056 // replay edits from the before region close. If replay does not
1057 // skip these the following verification will NOT fail.
1058 for (WAL
.Entry entry
: edits
) {
1059 replayEdit(secondaryRegion
, entry
);
1062 boolean expectedFail
= false;
1064 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
1065 } catch (AssertionError e
) {
1066 expectedFail
= true; // expected
1068 if (!expectedFail
) {
1069 fail("Should have failed this verification");
1074 public void testReplayFlushSeqIds() throws IOException
{
1075 // load some data to primary and flush
1077 LOG
.info("-- Writing some data to primary from " + start
+ " to " + (start
+100));
1078 putData(primaryRegion
, Durability
.SYNC_WAL
, start
, 100, cq
, families
);
1079 LOG
.info("-- Flushing primary, creating 3 files for 3 stores");
1080 primaryRegion
.flush(true);
1082 // now replay the flush marker
1083 reader
= createWALReaderForPrimary();
1085 long flushSeqId
= -1;
1086 LOG
.info("-- Replaying flush events in secondary");
1088 WAL
.Entry entry
= reader
.next();
1089 if (entry
== null) {
1092 FlushDescriptor flushDesc
1093 = WALEdit
.getFlushDescriptor(entry
.getEdit().getCells().get(0));
1094 if (flushDesc
!= null) {
1095 if (flushDesc
.getAction() == FlushAction
.START_FLUSH
) {
1096 LOG
.info("-- Replaying flush start in secondary");
1097 secondaryRegion
.replayWALFlushStartMarker(flushDesc
);
1098 flushSeqId
= flushDesc
.getFlushSequenceNumber();
1099 } else if (flushDesc
.getAction() == FlushAction
.COMMIT_FLUSH
) {
1100 LOG
.info("-- Replaying flush commit in secondary");
1101 secondaryRegion
.replayWALFlushCommitMarker(flushDesc
);
1102 assertEquals(flushSeqId
, flushDesc
.getFlushSequenceNumber());
1105 // else do not replay
1108 // TODO: what to do with this?
1109 // assert that the newly picked up flush file is visible
1110 long readPoint
= secondaryRegion
.getMVCC().getReadPoint();
1111 assertEquals(flushSeqId
, readPoint
);
1113 // after replay verify that everything is still visible
1114 verifyData(secondaryRegion
, 0, 100, cq
, families
);
1118 public void testSeqIdsFromReplay() throws IOException
{
1119 // test the case where seqId's coming from replayed WALEdits are made persisted with their
1120 // original seqIds and they are made visible through mvcc read point upon replay
1121 String method
= name
.getMethodName();
1122 byte[] tableName
= Bytes
.toBytes(method
);
1123 byte[] family
= Bytes
.toBytes("family");
1125 HRegion region
= initHRegion(tableName
, family
);
1127 // replay an entry that is bigger than current read point
1128 long readPoint
= region
.getMVCC().getReadPoint();
1129 long origSeqId
= readPoint
+ 100;
1131 Put put
= new Put(row
).addColumn(family
, row
, row
);
1132 put
.setDurability(Durability
.SKIP_WAL
); // we replay with skip wal
1133 replay(region
, put
, origSeqId
);
1135 // read point should have advanced to this seqId
1136 assertGet(region
, family
, row
);
1138 // region seqId should have advanced at least to this seqId
1139 assertEquals(origSeqId
, region
.getReadPoint(null));
1141 // replay an entry that is smaller than current read point
1142 // caution: adding an entry below current read point might cause partial dirty reads. Normal
1143 // replay does not allow reads while replay is going on.
1144 put
= new Put(row2
).addColumn(family
, row2
, row2
);
1145 put
.setDurability(Durability
.SKIP_WAL
);
1146 replay(region
, put
, origSeqId
- 50);
1148 assertGet(region
, family
, row2
);
1155 * Tests that a region opened in secondary mode would not write region open / close
1156 * events to its WAL.
1157 * @throws IOException
1160 public void testSecondaryRegionDoesNotWriteRegionEventsToWAL() throws IOException
{
1161 secondaryRegion
.close();
1162 walSecondary
= spy(walSecondary
);
1164 // test for region open and close
1165 secondaryRegion
= HRegion
.openHRegion(secondaryHri
, htd
, walSecondary
, CONF
, rss
, null);
1166 verify(walSecondary
, times(0)).appendData(any(RegionInfo
.class), any(WALKeyImpl
.class),
1167 any(WALEdit
.class));
1169 // test for replay prepare flush
1170 putDataByReplay(secondaryRegion
, 0, 10, cq
, families
);
1171 secondaryRegion
.replayWALFlushStartMarker(FlushDescriptor
.newBuilder().
1172 setFlushSequenceNumber(10)
1173 .setTableName(UnsafeByteOperations
.unsafeWrap(
1174 primaryRegion
.getTableDescriptor().getTableName().getName()))
1175 .setAction(FlushAction
.START_FLUSH
)
1176 .setEncodedRegionName(
1177 UnsafeByteOperations
.unsafeWrap(primaryRegion
.getRegionInfo().getEncodedNameAsBytes()))
1178 .setRegionName(UnsafeByteOperations
.unsafeWrap(
1179 primaryRegion
.getRegionInfo().getRegionName()))
1182 verify(walSecondary
, times(0)).appendData(any(RegionInfo
.class), any(WALKeyImpl
.class),
1183 any(WALEdit
.class));
1185 secondaryRegion
.close();
1186 verify(walSecondary
, times(0)).appendData(any(RegionInfo
.class), any(WALKeyImpl
.class),
1187 any(WALEdit
.class));
1191 * Tests the reads enabled flag for the region. When unset all reads should be rejected
1194 public void testRegionReadsEnabledFlag() throws IOException
{
1196 putDataByReplay(secondaryRegion
, 0, 100, cq
, families
);
1198 verifyData(secondaryRegion
, 0, 100, cq
, families
);
1200 // now disable reads
1201 secondaryRegion
.setReadsEnabled(false);
1203 verifyData(secondaryRegion
, 0, 100, cq
, families
);
1204 fail("Should have failed with IOException");
1205 } catch(IOException ex
) {
1209 // verify that we can still replay data
1210 putDataByReplay(secondaryRegion
, 100, 100, cq
, families
);
1212 // now enable reads again
1213 secondaryRegion
.setReadsEnabled(true);
1214 verifyData(secondaryRegion
, 0, 200, cq
, families
);
1218 * Tests the case where a request for flush cache is sent to the region, but region cannot flush.
1219 * It should write the flush request marker instead.
1222 public void testWriteFlushRequestMarker() throws IOException
{
1223 // primary region is empty at this point. Request a flush with writeFlushRequestWalMarker=false
1224 FlushResultImpl result
= primaryRegion
.flushcache(true, false, FlushLifeCycleTracker
.DUMMY
);
1225 assertNotNull(result
);
1226 assertEquals(FlushResultImpl
.Result
.CANNOT_FLUSH_MEMSTORE_EMPTY
, result
.result
);
1227 assertFalse(result
.wroteFlushWalMarker
);
1229 // request flush again, but this time with writeFlushRequestWalMarker = true
1230 result
= primaryRegion
.flushcache(true, true, FlushLifeCycleTracker
.DUMMY
);
1231 assertNotNull(result
);
1232 assertEquals(FlushResultImpl
.Result
.CANNOT_FLUSH_MEMSTORE_EMPTY
, result
.result
);
1233 assertTrue(result
.wroteFlushWalMarker
);
1235 List
<FlushDescriptor
> flushes
= Lists
.newArrayList();
1236 reader
= createWALReaderForPrimary();
1238 WAL
.Entry entry
= reader
.next();
1239 if (entry
== null) {
1242 FlushDescriptor flush
= WALEdit
.getFlushDescriptor(entry
.getEdit().getCells().get(0));
1243 if (flush
!= null) {
1248 assertEquals(1, flushes
.size());
1249 assertNotNull(flushes
.get(0));
1250 assertEquals(FlushDescriptor
.FlushAction
.CANNOT_FLUSH
, flushes
.get(0).getAction());
1254 * Test the case where the secondary region replica is not in reads enabled state because it is
1255 * waiting for a flush or region open marker from primary region. Replaying CANNOT_FLUSH
1256 * flush marker entry should restore the reads enabled status in the region and allow the reads
1260 public void testReplayingFlushRequestRestoresReadsEnabledState() throws IOException
{
1261 disableReads(secondaryRegion
);
1263 // Test case 1: Test that replaying CANNOT_FLUSH request marker assuming this came from
1264 // triggered flush restores readsEnabled
1265 primaryRegion
.flushcache(true, true, FlushLifeCycleTracker
.DUMMY
);
1266 reader
= createWALReaderForPrimary();
1268 WAL
.Entry entry
= reader
.next();
1269 if (entry
== null) {
1272 FlushDescriptor flush
= WALEdit
.getFlushDescriptor(entry
.getEdit().getCells().get(0));
1273 if (flush
!= null) {
1274 secondaryRegion
.replayWALFlushMarker(flush
, entry
.getKey().getSequenceId());
1278 // now reads should be enabled
1279 secondaryRegion
.get(new Get(Bytes
.toBytes(0)));
1283 * Test the case where the secondary region replica is not in reads enabled state because it is
1284 * waiting for a flush or region open marker from primary region. Replaying flush start and commit
1285 * entries should restore the reads enabled status in the region and allow the reads
1289 public void testReplayingFlushRestoresReadsEnabledState() throws IOException
{
1290 // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came
1291 // from triggered flush restores readsEnabled
1292 disableReads(secondaryRegion
);
1294 // put some data in primary
1295 putData(primaryRegion
, Durability
.SYNC_WAL
, 0, 100, cq
, families
);
1296 primaryRegion
.flush(true);
1297 // I seem to need to push more edits through so the WAL flushes on local fs. This was not
1298 // needed before HBASE-15028. Not sure whats up. I can see that we have not flushed if I
1299 // look at the WAL if I pause the test here and then use WALPrettyPrinter to look at content..
1300 // Doing same check before HBASE-15028 I can see all edits flushed to the WAL. Somethings up
1301 // but can't figure it... and this is only test that seems to suffer this flush issue.
1303 putData(primaryRegion
, Durability
.SYNC_WAL
, 0, 100, cq
, families
);
1305 reader
= createWALReaderForPrimary();
1307 WAL
.Entry entry
= reader
.next();
1308 LOG
.info(Objects
.toString(entry
));
1309 if (entry
== null) {
1312 FlushDescriptor flush
= WALEdit
.getFlushDescriptor(entry
.getEdit().getCells().get(0));
1313 if (flush
!= null) {
1314 secondaryRegion
.replayWALFlushMarker(flush
, entry
.getKey().getSequenceId());
1316 replayEdit(secondaryRegion
, entry
);
1320 // now reads should be enabled
1321 verifyData(secondaryRegion
, 0, 100, cq
, families
);
1325 * Test the case where the secondary region replica is not in reads enabled state because it is
1326 * waiting for a flush or region open marker from primary region. Replaying flush start and commit
1327 * entries should restore the reads enabled status in the region and allow the reads
1331 public void testReplayingFlushWithEmptyMemstoreRestoresReadsEnabledState() throws IOException
{
1332 // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came
1333 // from triggered flush restores readsEnabled
1334 disableReads(secondaryRegion
);
1336 // put some data in primary
1337 putData(primaryRegion
, Durability
.SYNC_WAL
, 0, 100, cq
, families
);
1338 primaryRegion
.flush(true);
1340 reader
= createWALReaderForPrimary();
1342 WAL
.Entry entry
= reader
.next();
1343 if (entry
== null) {
1346 FlushDescriptor flush
= WALEdit
.getFlushDescriptor(entry
.getEdit().getCells().get(0));
1347 if (flush
!= null) {
1348 secondaryRegion
.replayWALFlushMarker(flush
, entry
.getKey().getSequenceId());
1352 // now reads should be enabled
1353 verifyData(secondaryRegion
, 0, 100, cq
, families
);
1357 * Test the case where the secondary region replica is not in reads enabled state because it is
1358 * waiting for a flush or region open marker from primary region. Replaying region open event
1359 * entry from primary should restore the reads enabled status in the region and allow the reads
1363 public void testReplayingRegionOpenEventRestoresReadsEnabledState() throws IOException
{
1364 // Test case 3: Test that replaying region open event markers restores readsEnabled
1365 disableReads(secondaryRegion
);
1367 primaryRegion
.close();
1368 primaryRegion
= HRegion
.openHRegion(rootDir
, primaryHri
, htd
, walPrimary
, CONF
, rss
, null);
1370 reader
= createWALReaderForPrimary();
1372 WAL
.Entry entry
= reader
.next();
1373 if (entry
== null) {
1377 RegionEventDescriptor regionEventDesc
1378 = WALEdit
.getRegionEventDescriptor(entry
.getEdit().getCells().get(0));
1380 if (regionEventDesc
!= null) {
1381 secondaryRegion
.replayWALRegionEventMarker(regionEventDesc
);
1385 // now reads should be enabled
1386 secondaryRegion
.get(new Get(Bytes
.toBytes(0)));
1390 public void testRefresStoreFiles() throws IOException
{
1391 assertEquals(0, primaryRegion
.getStoreFileList(families
).size());
1392 assertEquals(0, secondaryRegion
.getStoreFileList(families
).size());
1394 // Test case 1: refresh with an empty region
1395 secondaryRegion
.refreshStoreFiles();
1396 assertEquals(0, secondaryRegion
.getStoreFileList(families
).size());
1399 putDataWithFlushes(primaryRegion
, 100, 100, 0);
1402 // refresh the store file list, and ensure that the files are picked up.
1403 secondaryRegion
.refreshStoreFiles();
1404 assertPathListsEqual(primaryRegion
.getStoreFileList(families
),
1405 secondaryRegion
.getStoreFileList(families
));
1406 assertEquals(families
.length
, secondaryRegion
.getStoreFileList(families
).size());
1408 LOG
.info("-- Verifying edits from secondary");
1409 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
1411 // Test case 2: 3 some more flushes
1412 putDataWithFlushes(primaryRegion
, 100, 300, 0);
1415 // refresh the store file list, and ensure that the files are picked up.
1416 secondaryRegion
.refreshStoreFiles();
1417 assertPathListsEqual(primaryRegion
.getStoreFileList(families
),
1418 secondaryRegion
.getStoreFileList(families
));
1419 assertEquals(families
.length
* 4, secondaryRegion
.getStoreFileList(families
).size());
1421 LOG
.info("-- Verifying edits from secondary");
1422 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
1424 if (FSUtils
.WINDOWS
) {
1425 // compaction cannot move files while they are open in secondary on windows. Skip remaining.
1429 // Test case 3: compact primary files
1430 primaryRegion
.compactStores();
1431 List
<HRegion
> regions
= new ArrayList
<>();
1432 regions
.add(primaryRegion
);
1433 Mockito
.doReturn(regions
).when(rss
).getRegions();
1434 CompactedHFilesDischarger cleaner
= new CompactedHFilesDischarger(100, null, rss
, false);
1436 secondaryRegion
.refreshStoreFiles();
1437 assertPathListsEqual(primaryRegion
.getStoreFileList(families
),
1438 secondaryRegion
.getStoreFileList(families
));
1439 assertEquals(families
.length
, secondaryRegion
.getStoreFileList(families
).size());
1441 LOG
.info("-- Verifying edits from secondary");
1442 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
1444 LOG
.info("-- Replaying edits in secondary");
1446 // Test case 4: replay some edits, ensure that memstore is dropped.
1447 assertTrue(secondaryRegion
.getMemStoreDataSize() == 0);
1448 putDataWithFlushes(primaryRegion
, 400, 400, 0);
1451 reader
= createWALReaderForPrimary();
1453 WAL
.Entry entry
= reader
.next();
1454 if (entry
== null) {
1457 FlushDescriptor flush
= WALEdit
.getFlushDescriptor(entry
.getEdit().getCells().get(0));
1458 if (flush
!= null) {
1459 // do not replay flush
1461 replayEdit(secondaryRegion
, entry
);
1465 assertTrue(secondaryRegion
.getMemStoreDataSize() > 0);
1467 secondaryRegion
.refreshStoreFiles();
1469 assertTrue(secondaryRegion
.getMemStoreDataSize() == 0);
1471 LOG
.info("-- Verifying edits from primary");
1472 verifyData(primaryRegion
, 0, numRows
, cq
, families
);
1473 LOG
.info("-- Verifying edits from secondary");
1474 verifyData(secondaryRegion
, 0, numRows
, cq
, families
);
1478 * Paths can be qualified or not. This does the assertion using String->Path conversion.
1480 private void assertPathListsEqual(List
<String
> list1
, List
<String
> list2
) {
1481 List
<Path
> l1
= new ArrayList
<>(list1
.size());
1482 for (String path
: list1
) {
1483 l1
.add(Path
.getPathWithoutSchemeAndAuthority(new Path(path
)));
1485 List
<Path
> l2
= new ArrayList
<>(list2
.size());
1486 for (String path
: list2
) {
1487 l2
.add(Path
.getPathWithoutSchemeAndAuthority(new Path(path
)));
1489 assertEquals(l1
, l2
);
1492 private void disableReads(HRegion region
) {
1493 region
.setReadsEnabled(false);
1495 verifyData(region
, 0, 1, cq
, families
);
1496 fail("Should have failed with IOException");
1497 } catch(IOException ex
) {
1502 private void replay(HRegion region
, Put put
, long replaySeqId
) throws IOException
{
1503 put
.setDurability(Durability
.SKIP_WAL
);
1504 MutationReplay mutation
= new MutationReplay(MutationType
.PUT
, put
, 0, 0);
1505 region
.batchReplay(new MutationReplay
[] {mutation
}, replaySeqId
);
1509 * Tests replaying region open markers from primary region. Checks whether the files are picked up
1512 public void testReplayBulkLoadEvent() throws IOException
{
1513 LOG
.info("testReplayBulkLoadEvent starts");
1514 putDataWithFlushes(primaryRegion
, 100, 0, 100); // no flush
1516 // close the region and open again.
1517 primaryRegion
.close();
1518 primaryRegion
= HRegion
.openHRegion(rootDir
, primaryHri
, htd
, walPrimary
, CONF
, rss
, null);
1520 // bulk load a file into primary region
1521 Random random
= new Random();
1522 byte[] randomValues
= new byte[20];
1523 random
.nextBytes(randomValues
);
1524 Path testPath
= TEST_UTIL
.getDataTestDirOnTestFS();
1526 List
<Pair
<byte[], String
>> familyPaths
= new ArrayList
<>();
1527 int expectedLoadFileCount
= 0;
1528 for (byte[] family
: families
) {
1529 familyPaths
.add(new Pair
<>(family
, createHFileForFamilies(testPath
, family
, randomValues
)));
1530 expectedLoadFileCount
++;
1532 primaryRegion
.bulkLoadHFiles(familyPaths
, false, null);
1534 // now replay the edits and the bulk load marker
1535 reader
= createWALReaderForPrimary();
1537 LOG
.info("-- Replaying edits and region events in secondary");
1538 BulkLoadDescriptor bulkloadEvent
= null;
1540 WAL
.Entry entry
= reader
.next();
1541 if (entry
== null) {
1544 bulkloadEvent
= WALEdit
.getBulkLoadDescriptor(entry
.getEdit().getCells().get(0));
1545 if (bulkloadEvent
!= null) {
1550 // we should have 1 bulk load event
1551 assertTrue(bulkloadEvent
!= null);
1552 assertEquals(expectedLoadFileCount
, bulkloadEvent
.getStoresCount());
1554 // replay the bulk load event
1555 secondaryRegion
.replayWALBulkLoadEventMarker(bulkloadEvent
);
1558 List
<String
> storeFileName
= new ArrayList
<>();
1559 for (StoreDescriptor storeDesc
: bulkloadEvent
.getStoresList()) {
1560 storeFileName
.addAll(storeDesc
.getStoreFileList());
1562 // assert that the bulk loaded files are picked
1563 for (HStore s
: secondaryRegion
.getStores()) {
1564 for (HStoreFile sf
: s
.getStorefiles()) {
1565 storeFileName
.remove(sf
.getPath().getName());
1568 assertTrue("Found some store file isn't loaded:" + storeFileName
, storeFileName
.isEmpty());
1570 LOG
.info("-- Verifying edits from secondary");
1571 for (byte[] family
: families
) {
1572 assertGet(secondaryRegion
, family
, randomValues
);
1577 public void testReplayingFlushCommitWithFileAlreadyDeleted() throws IOException
{
1578 // tests replaying flush commit marker, but the flush file has already been compacted
1579 // from primary and also deleted from the archive directory
1580 secondaryRegion
.replayWALFlushCommitMarker(FlushDescriptor
.newBuilder().
1581 setFlushSequenceNumber(Long
.MAX_VALUE
)
1582 .setTableName(UnsafeByteOperations
.unsafeWrap(primaryRegion
.getTableDescriptor().getTableName().getName()))
1583 .setAction(FlushAction
.COMMIT_FLUSH
)
1584 .setEncodedRegionName(
1585 UnsafeByteOperations
.unsafeWrap(primaryRegion
.getRegionInfo().getEncodedNameAsBytes()))
1586 .setRegionName(UnsafeByteOperations
.unsafeWrap(
1587 primaryRegion
.getRegionInfo().getRegionName()))
1588 .addStoreFlushes(StoreFlushDescriptor
.newBuilder()
1589 .setFamilyName(UnsafeByteOperations
.unsafeWrap(families
[0]))
1590 .setStoreHomeDir("/store_home_dir")
1591 .addFlushOutput("/foo/baz/123")
1597 public void testReplayingCompactionWithFileAlreadyDeleted() throws IOException
{
1598 // tests replaying compaction marker, but the compaction output file has already been compacted
1599 // from primary and also deleted from the archive directory
1600 secondaryRegion
.replayWALCompactionMarker(CompactionDescriptor
.newBuilder()
1601 .setTableName(UnsafeByteOperations
.unsafeWrap(
1602 primaryRegion
.getTableDescriptor().getTableName().getName()))
1603 .setEncodedRegionName(
1604 UnsafeByteOperations
.unsafeWrap(primaryRegion
.getRegionInfo().getEncodedNameAsBytes()))
1605 .setFamilyName(UnsafeByteOperations
.unsafeWrap(families
[0]))
1606 .addCompactionInput("/123")
1607 .addCompactionOutput("/456")
1608 .setStoreHomeDir("/store_home_dir")
1609 .setRegionName(UnsafeByteOperations
.unsafeWrap(primaryRegion
.getRegionInfo().getRegionName()))
1611 , true, true, Long
.MAX_VALUE
);
1615 public void testReplayingRegionOpenEventWithFileAlreadyDeleted() throws IOException
{
1616 // tests replaying region open event marker, but the region files have already been compacted
1617 // from primary and also deleted from the archive directory
1618 secondaryRegion
.replayWALRegionEventMarker(RegionEventDescriptor
.newBuilder()
1619 .setTableName(UnsafeByteOperations
.unsafeWrap(
1620 primaryRegion
.getTableDescriptor().getTableName().getName()))
1621 .setEncodedRegionName(
1622 UnsafeByteOperations
.unsafeWrap(primaryRegion
.getRegionInfo().getEncodedNameAsBytes()))
1623 .setRegionName(UnsafeByteOperations
.unsafeWrap(primaryRegion
.getRegionInfo().getRegionName()))
1624 .setEventType(EventType
.REGION_OPEN
)
1625 .setServer(ProtobufUtil
.toServerName(ServerName
.valueOf("foo", 1, 1)))
1626 .setLogSequenceNumber(Long
.MAX_VALUE
)
1627 .addStores(StoreDescriptor
.newBuilder()
1628 .setFamilyName(UnsafeByteOperations
.unsafeWrap(families
[0]))
1629 .setStoreHomeDir("/store_home_dir")
1630 .addStoreFile("/123")
1636 public void testReplayingBulkLoadEventWithFileAlreadyDeleted() throws IOException
{
1637 // tests replaying bulk load event marker, but the bulk load files have already been compacted
1638 // from primary and also deleted from the archive directory
1639 secondaryRegion
.replayWALBulkLoadEventMarker(BulkLoadDescriptor
.newBuilder()
1640 .setTableName(ProtobufUtil
.toProtoTableName(primaryRegion
.getTableDescriptor().getTableName()))
1641 .setEncodedRegionName(
1642 UnsafeByteOperations
.unsafeWrap(primaryRegion
.getRegionInfo().getEncodedNameAsBytes()))
1643 .setBulkloadSeqNum(Long
.MAX_VALUE
)
1644 .addStores(StoreDescriptor
.newBuilder()
1645 .setFamilyName(UnsafeByteOperations
.unsafeWrap(families
[0]))
1646 .setStoreHomeDir("/store_home_dir")
1647 .addStoreFile("/123")
1652 private String
createHFileForFamilies(Path testPath
, byte[] family
,
1653 byte[] valueBytes
) throws IOException
{
1654 HFile
.WriterFactory hFileFactory
= HFile
.getWriterFactoryNoCache(TEST_UTIL
.getConfiguration());
1655 // TODO We need a way to do this without creating files
1656 Path testFile
= new Path(testPath
, TEST_UTIL
.getRandomUUID().toString());
1657 FSDataOutputStream out
= TEST_UTIL
.getTestFileSystem().create(testFile
);
1659 hFileFactory
.withOutputStream(out
);
1660 hFileFactory
.withFileContext(new HFileContextBuilder().build());
1661 HFile
.Writer writer
= hFileFactory
.create();
1663 writer
.append(new KeyValue(ExtendedCellBuilderFactory
.create(CellBuilderType
.DEEP_COPY
)
1666 .setQualifier(valueBytes
)
1668 .setType(KeyValue
.Type
.Put
.getCode())
1669 .setValue(valueBytes
)
1677 return testFile
.toString();
1680 /** Puts a total of numRows + numRowsAfterFlush records indexed with numeric row keys. Does
1681 * a flush every flushInterval number of records. Then it puts numRowsAfterFlush number of
1682 * more rows but does not execute flush after
1683 * @throws IOException */
1684 private void putDataWithFlushes(HRegion region
, int flushInterval
,
1685 int numRows
, int numRowsAfterFlush
) throws IOException
{
1687 for (; start
< numRows
; start
+= flushInterval
) {
1688 LOG
.info("-- Writing some data to primary from " + start
+ " to " + (start
+flushInterval
));
1689 putData(region
, Durability
.SYNC_WAL
, start
, flushInterval
, cq
, families
);
1690 LOG
.info("-- Flushing primary, creating 3 files for 3 stores");
1693 LOG
.info("-- Writing some more data to primary, not flushing");
1694 putData(region
, Durability
.SYNC_WAL
, start
, numRowsAfterFlush
, cq
, families
);
1697 private void putDataByReplay(HRegion region
,
1698 int startRow
, int numRows
, byte[] qf
, byte[]... families
) throws IOException
{
1699 for (int i
= startRow
; i
< startRow
+ numRows
; i
++) {
1700 Put put
= new Put(Bytes
.toBytes("" + i
));
1701 put
.setDurability(Durability
.SKIP_WAL
);
1702 for (byte[] family
: families
) {
1703 put
.addColumn(family
, qf
, EnvironmentEdgeManager
.currentTime(), null);
1705 replay(region
, put
, i
+1);
1709 private static HRegion
initHRegion(byte[] tableName
, byte[]... families
) throws IOException
{
1710 return TEST_UTIL
.createLocalHRegion(TableName
.valueOf(tableName
), HConstants
.EMPTY_START_ROW
,
1711 HConstants
.EMPTY_END_ROW
, false, Durability
.SYNC_WAL
, null, families
);