2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.regionserver
;
20 import static org
.junit
.Assert
.assertEquals
;
21 import static org
.junit
.Assert
.assertNotNull
;
22 import static org
.junit
.Assert
.assertNull
;
23 import static org
.junit
.Assert
.assertTrue
;
25 import java
.io
.IOException
;
26 import java
.util
.Arrays
;
27 import java
.util
.List
;
28 import org
.apache
.hadoop
.conf
.Configuration
;
29 import org
.apache
.hadoop
.fs
.Path
;
30 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
31 import org
.apache
.hadoop
.hbase
.HBaseConfiguration
;
32 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
33 import org
.apache
.hadoop
.hbase
.HConstants
;
34 import org
.apache
.hadoop
.hbase
.NamespaceDescriptor
;
35 import org
.apache
.hadoop
.hbase
.SingleProcessHBaseCluster
;
36 import org
.apache
.hadoop
.hbase
.TableName
;
37 import org
.apache
.hadoop
.hbase
.Waiter
;
38 import org
.apache
.hadoop
.hbase
.client
.Admin
;
39 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptorBuilder
;
40 import org
.apache
.hadoop
.hbase
.client
.Connection
;
41 import org
.apache
.hadoop
.hbase
.client
.ConnectionFactory
;
42 import org
.apache
.hadoop
.hbase
.client
.Get
;
43 import org
.apache
.hadoop
.hbase
.client
.Put
;
44 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
45 import org
.apache
.hadoop
.hbase
.client
.RegionInfoBuilder
;
46 import org
.apache
.hadoop
.hbase
.client
.Result
;
47 import org
.apache
.hadoop
.hbase
.client
.Table
;
48 import org
.apache
.hadoop
.hbase
.client
.TableDescriptor
;
49 import org
.apache
.hadoop
.hbase
.client
.TableDescriptorBuilder
;
50 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
51 import org
.apache
.hadoop
.hbase
.testclassification
.RegionServerTests
;
52 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
53 import org
.apache
.hadoop
.hbase
.util
.JVMClusterUtil
;
54 import org
.apache
.hadoop
.hbase
.util
.Pair
;
55 import org
.apache
.hadoop
.hbase
.wal
.AbstractFSWALProvider
;
56 import org
.apache
.hadoop
.hbase
.wal
.WAL
;
57 import org
.junit
.ClassRule
;
58 import org
.junit
.Test
;
59 import org
.junit
.experimental
.categories
.Category
;
60 import org
.slf4j
.Logger
;
61 import org
.slf4j
.LoggerFactory
;
62 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.hash
.Hashing
;
65 * This test verifies the correctness of the Per Column Family flushing strategy
67 @Category({ RegionServerTests
.class, LargeTests
.class })
68 public class TestPerColumnFamilyFlush
{
71 public static final HBaseClassTestRule CLASS_RULE
=
72 HBaseClassTestRule
.forClass(TestPerColumnFamilyFlush
.class);
74 private static final Logger LOG
= LoggerFactory
.getLogger(TestPerColumnFamilyFlush
.class);
76 private static final HBaseTestingUtil TEST_UTIL
= new HBaseTestingUtil();
78 private static final Path DIR
= TEST_UTIL
.getDataTestDir("TestHRegion");
80 public static final TableName TABLENAME
= TableName
.valueOf("TestPerColumnFamilyFlush", "t1");
82 public static final byte[][] FAMILIES
= { Bytes
.toBytes("f1"), Bytes
.toBytes("f2"),
83 Bytes
.toBytes("f3"), Bytes
.toBytes("f4"), Bytes
.toBytes("f5") };
85 public static final byte[] FAMILY1
= FAMILIES
[0];
87 public static final byte[] FAMILY2
= FAMILIES
[1];
89 public static final byte[] FAMILY3
= FAMILIES
[2];
91 private HRegion
initHRegion(String callingMethod
, Configuration conf
) throws IOException
{
92 TableDescriptorBuilder builder
= TableDescriptorBuilder
.newBuilder(TABLENAME
);
93 for (byte[] family
: FAMILIES
) {
94 builder
.setColumnFamily(ColumnFamilyDescriptorBuilder
.of(family
));
96 RegionInfo info
= RegionInfoBuilder
.newBuilder(TABLENAME
).build();
97 Path path
= new Path(DIR
, callingMethod
);
98 return HBaseTestingUtil
.createRegionAndWAL(info
, path
, conf
, builder
.build());
101 // A helper function to create puts.
102 private Put
createPut(int familyNum
, int putNum
) {
103 byte[] qf
= Bytes
.toBytes("q" + familyNum
);
104 byte[] row
= Bytes
.toBytes("row" + familyNum
+ "-" + putNum
);
105 byte[] val
= Bytes
.toBytes("val" + familyNum
+ "-" + putNum
);
106 Put p
= new Put(row
);
107 p
.addColumn(FAMILIES
[familyNum
- 1], qf
, val
);
111 // A helper function to create puts.
112 private Get
createGet(int familyNum
, int putNum
) {
113 byte[] row
= Bytes
.toBytes("row" + familyNum
+ "-" + putNum
);
117 // A helper function to verify edits.
118 void verifyEdit(int familyNum
, int putNum
, Table table
) throws IOException
{
119 Result r
= table
.get(createGet(familyNum
, putNum
));
120 byte[] family
= FAMILIES
[familyNum
- 1];
121 byte[] qf
= Bytes
.toBytes("q" + familyNum
);
122 byte[] val
= Bytes
.toBytes("val" + familyNum
+ "-" + putNum
);
123 assertNotNull(("Missing Put#" + putNum
+ " for CF# " + familyNum
), r
.getFamilyMap(family
));
124 assertNotNull(("Missing Put#" + putNum
+ " for CF# " + familyNum
),
125 r
.getFamilyMap(family
).get(qf
));
126 assertTrue(("Incorrect value for Put#" + putNum
+ " for CF# " + familyNum
),
127 Arrays
.equals(r
.getFamilyMap(family
).get(qf
), val
));
131 public void testSelectiveFlushWhenEnabled() throws IOException
{
132 // Set up the configuration, use new one to not conflict with minicluster in other tests
133 Configuration conf
= new HBaseTestingUtil().getConfiguration();
134 conf
.setLong(HConstants
.HREGION_MEMSTORE_FLUSH_SIZE
, 200 * 1024);
135 conf
.set(FlushPolicyFactory
.HBASE_FLUSH_POLICY_KEY
, FlushAllLargeStoresPolicy
.class.getName());
136 conf
.setLong(FlushLargeStoresPolicy
.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN
,
138 // Intialize the region
139 HRegion region
= initHRegion("testSelectiveFlushWithDataCompaction", conf
);
140 // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
141 for (int i
= 1; i
<= 1200; i
++) {
142 region
.put(createPut(1, i
));
145 region
.put(createPut(2, i
));
147 region
.put(createPut(3, i
));
152 long totalMemstoreSize
= region
.getMemStoreDataSize();
154 // Find the smallest LSNs for edits wrt to each CF.
155 long smallestSeqCF1
= region
.getOldestSeqIdOfStore(FAMILY1
);
156 long smallestSeqCF2
= region
.getOldestSeqIdOfStore(FAMILY2
);
157 long smallestSeqCF3
= region
.getOldestSeqIdOfStore(FAMILY3
);
159 // Find the sizes of the memstores of each CF.
160 MemStoreSize cf1MemstoreSize
= region
.getStore(FAMILY1
).getMemStoreSize();
161 MemStoreSize cf2MemstoreSize
= region
.getStore(FAMILY2
).getMemStoreSize();
162 MemStoreSize cf3MemstoreSize
= region
.getStore(FAMILY3
).getMemStoreSize();
164 // Get the overall smallest LSN in the region's memstores.
165 long smallestSeqInRegionCurrentMemstore
= getWAL(region
)
166 .getEarliestMemStoreSeqNum(region
.getRegionInfo().getEncodedNameAsBytes());
168 // The overall smallest LSN in the region's memstores should be the same as
169 // the LSN of the smallest edit in CF1
170 assertEquals(smallestSeqCF1
, smallestSeqInRegionCurrentMemstore
);
172 // Some other sanity checks.
173 assertTrue(smallestSeqCF1
< smallestSeqCF2
);
174 assertTrue(smallestSeqCF2
< smallestSeqCF3
);
175 assertTrue(cf1MemstoreSize
.getDataSize() > 0);
176 assertTrue(cf2MemstoreSize
.getDataSize() > 0);
177 assertTrue(cf3MemstoreSize
.getDataSize() > 0);
179 // The total memstore size should be the same as the sum of the sizes of
180 // memstores of CF1, CF2 and CF3.
181 assertEquals(totalMemstoreSize
, cf1MemstoreSize
.getDataSize() + cf2MemstoreSize
.getDataSize()
182 + cf3MemstoreSize
.getDataSize());
187 // Will use these to check if anything changed.
188 MemStoreSize oldCF2MemstoreSize
= cf2MemstoreSize
;
189 MemStoreSize oldCF3MemstoreSize
= cf3MemstoreSize
;
191 // Recalculate everything
192 cf1MemstoreSize
= region
.getStore(FAMILY1
).getMemStoreSize();
193 cf2MemstoreSize
= region
.getStore(FAMILY2
).getMemStoreSize();
194 cf3MemstoreSize
= region
.getStore(FAMILY3
).getMemStoreSize();
195 totalMemstoreSize
= region
.getMemStoreDataSize();
196 smallestSeqInRegionCurrentMemstore
= getWAL(region
)
197 .getEarliestMemStoreSeqNum(region
.getRegionInfo().getEncodedNameAsBytes());
199 // We should have cleared out only CF1, since we chose the flush thresholds
200 // and number of puts accordingly.
201 assertEquals(0, cf1MemstoreSize
.getDataSize());
202 assertEquals(MutableSegment
.DEEP_OVERHEAD
, cf1MemstoreSize
.getHeapSize());
203 // Nothing should have happened to CF2, ...
204 assertEquals(cf2MemstoreSize
, oldCF2MemstoreSize
);
206 assertEquals(cf3MemstoreSize
, oldCF3MemstoreSize
);
207 // Now the smallest LSN in the region should be the same as the smallest
208 // LSN in the memstore of CF2.
209 assertEquals(smallestSeqInRegionCurrentMemstore
, smallestSeqCF2
);
210 // Of course, this should hold too.
211 assertEquals(totalMemstoreSize
, cf2MemstoreSize
.getDataSize() + cf3MemstoreSize
.getDataSize());
213 // Now add more puts (mostly for CF2), so that we only flush CF2 this time.
214 for (int i
= 1200; i
< 2400; i
++) {
215 region
.put(createPut(2, i
));
217 // Add only 100 puts for CF3
218 if (i
- 1200 < 100) {
219 region
.put(createPut(3, i
));
223 // How much does the CF3 memstore occupy? Will be used later.
224 oldCF3MemstoreSize
= region
.getStore(FAMILY3
).getMemStoreSize();
229 // Recalculate everything
230 cf1MemstoreSize
= region
.getStore(FAMILY1
).getMemStoreSize();
231 cf2MemstoreSize
= region
.getStore(FAMILY2
).getMemStoreSize();
232 cf3MemstoreSize
= region
.getStore(FAMILY3
).getMemStoreSize();
233 totalMemstoreSize
= region
.getMemStoreDataSize();
234 smallestSeqInRegionCurrentMemstore
= getWAL(region
)
235 .getEarliestMemStoreSeqNum(region
.getRegionInfo().getEncodedNameAsBytes());
237 // CF1 and CF2, both should be absent.
238 assertEquals(0, cf1MemstoreSize
.getDataSize());
239 assertEquals(MutableSegment
.DEEP_OVERHEAD
, cf1MemstoreSize
.getHeapSize());
240 assertEquals(0, cf2MemstoreSize
.getDataSize());
241 assertEquals(MutableSegment
.DEEP_OVERHEAD
, cf2MemstoreSize
.getHeapSize());
242 // CF3 shouldn't have been touched.
243 assertEquals(cf3MemstoreSize
, oldCF3MemstoreSize
);
244 assertEquals(totalMemstoreSize
, cf3MemstoreSize
.getDataSize());
246 // What happens when we hit the memstore limit, but we are not able to find
247 // any Column Family above the threshold?
248 // In that case, we should flush all the CFs.
250 // Clearing the existing memstores.
253 // The memstore limit is 200*1024 and the column family flush threshold is
254 // around 50*1024. We try to just hit the memstore limit with each CF's
255 // memstore being below the CF flush threshold.
256 for (int i
= 1; i
<= 300; i
++) {
257 region
.put(createPut(1, i
));
258 region
.put(createPut(2, i
));
259 region
.put(createPut(3, i
));
260 region
.put(createPut(4, i
));
261 region
.put(createPut(5, i
));
266 // Since we won't find any CF above the threshold, and hence no specific
267 // store to flush, we should flush all the memstores.
268 assertEquals(0, region
.getMemStoreDataSize());
269 HBaseTestingUtil
.closeRegionAndWAL(region
);
273 public void testSelectiveFlushWhenNotEnabled() throws IOException
{
274 // Set up the configuration, use new one to not conflict with minicluster in other tests
275 Configuration conf
= new HBaseTestingUtil().getConfiguration();
276 conf
.setLong(HConstants
.HREGION_MEMSTORE_FLUSH_SIZE
, 200 * 1024);
277 conf
.set(FlushPolicyFactory
.HBASE_FLUSH_POLICY_KEY
, FlushAllStoresPolicy
.class.getName());
279 // Intialize the HRegion
280 HRegion region
= initHRegion("testSelectiveFlushWhenNotEnabled", conf
);
281 // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
282 for (int i
= 1; i
<= 1200; i
++) {
283 region
.put(createPut(1, i
));
285 region
.put(createPut(2, i
));
287 region
.put(createPut(3, i
));
292 long totalMemstoreSize
= region
.getMemStoreDataSize();
294 // Find the sizes of the memstores of each CF.
295 MemStoreSize cf1MemstoreSize
= region
.getStore(FAMILY1
).getMemStoreSize();
296 MemStoreSize cf2MemstoreSize
= region
.getStore(FAMILY2
).getMemStoreSize();
297 MemStoreSize cf3MemstoreSize
= region
.getStore(FAMILY3
).getMemStoreSize();
299 // Some other sanity checks.
300 assertTrue(cf1MemstoreSize
.getDataSize() > 0);
301 assertTrue(cf2MemstoreSize
.getDataSize() > 0);
302 assertTrue(cf3MemstoreSize
.getDataSize() > 0);
304 // The total memstore size should be the same as the sum of the sizes of
305 // memstores of CF1, CF2 and CF3.
306 assertEquals(totalMemstoreSize
, cf1MemstoreSize
.getDataSize() + cf2MemstoreSize
.getDataSize()
307 + cf3MemstoreSize
.getDataSize());
312 cf1MemstoreSize
= region
.getStore(FAMILY1
).getMemStoreSize();
313 cf2MemstoreSize
= region
.getStore(FAMILY2
).getMemStoreSize();
314 cf3MemstoreSize
= region
.getStore(FAMILY3
).getMemStoreSize();
315 totalMemstoreSize
= region
.getMemStoreDataSize();
316 long smallestSeqInRegionCurrentMemstore
=
317 region
.getWAL().getEarliestMemStoreSeqNum(region
.getRegionInfo().getEncodedNameAsBytes());
319 // Everything should have been cleared
320 assertEquals(0, cf1MemstoreSize
.getDataSize());
321 assertEquals(MutableSegment
.DEEP_OVERHEAD
, cf1MemstoreSize
.getHeapSize());
322 assertEquals(0, cf2MemstoreSize
.getDataSize());
323 assertEquals(MutableSegment
.DEEP_OVERHEAD
, cf2MemstoreSize
.getHeapSize());
324 assertEquals(0, cf3MemstoreSize
.getDataSize());
325 assertEquals(MutableSegment
.DEEP_OVERHEAD
, cf3MemstoreSize
.getHeapSize());
326 assertEquals(0, totalMemstoreSize
);
327 assertEquals(HConstants
.NO_SEQNUM
, smallestSeqInRegionCurrentMemstore
);
328 HBaseTestingUtil
.closeRegionAndWAL(region
);
331 // Find the (first) region which has the specified name.
332 private static Pair
<HRegion
, HRegionServer
> getRegionWithName(TableName tableName
) {
333 SingleProcessHBaseCluster cluster
= TEST_UTIL
.getMiniHBaseCluster();
334 List
<JVMClusterUtil
.RegionServerThread
> rsts
= cluster
.getRegionServerThreads();
335 for (int i
= 0; i
< cluster
.getRegionServerThreads().size(); i
++) {
336 HRegionServer hrs
= rsts
.get(i
).getRegionServer();
337 for (HRegion region
: hrs
.getRegions(tableName
)) {
338 return Pair
.newPair(region
, hrs
);
344 private void doTestLogReplay() throws Exception
{
345 Configuration conf
= TEST_UTIL
.getConfiguration();
346 conf
.setLong(HConstants
.HREGION_MEMSTORE_FLUSH_SIZE
, 10000);
347 // Carefully chosen limits so that the memstore just flushes when we're done
348 conf
.set(FlushPolicyFactory
.HBASE_FLUSH_POLICY_KEY
, FlushAllLargeStoresPolicy
.class.getName());
349 conf
.setLong(FlushLargeStoresPolicy
.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN
, 2500);
350 final int numRegionServers
= 4;
352 TEST_UTIL
.startMiniCluster(numRegionServers
);
353 TEST_UTIL
.getAdmin().createNamespace(
354 NamespaceDescriptor
.create(TABLENAME
.getNamespaceAsString()).build());
355 Table table
= TEST_UTIL
.createTable(TABLENAME
, FAMILIES
);
357 // Add 100 edits for CF1, 20 for CF2, 20 for CF3.
358 // These will all be interleaved in the log.
359 for (int i
= 1; i
<= 80; i
++) {
360 table
.put(createPut(1, i
));
362 table
.put(createPut(2, i
));
363 table
.put(createPut(3, i
));
368 Pair
<HRegion
, HRegionServer
> desiredRegionAndServer
= getRegionWithName(TABLENAME
);
369 HRegion desiredRegion
= desiredRegionAndServer
.getFirst();
370 assertTrue("Could not find a region which hosts the new region.", desiredRegion
!= null);
372 // Flush the region selectively.
373 desiredRegion
.flush(false);
375 long totalMemstoreSize
;
376 long cf1MemstoreSize
, cf2MemstoreSize
, cf3MemstoreSize
;
377 totalMemstoreSize
= desiredRegion
.getMemStoreDataSize();
379 // Find the sizes of the memstores of each CF.
380 cf1MemstoreSize
= desiredRegion
.getStore(FAMILY1
).getMemStoreSize().getDataSize();
381 cf2MemstoreSize
= desiredRegion
.getStore(FAMILY2
).getMemStoreSize().getDataSize();
382 cf3MemstoreSize
= desiredRegion
.getStore(FAMILY3
).getMemStoreSize().getDataSize();
384 // CF1 Should have been flushed
385 assertEquals(0, cf1MemstoreSize
);
386 // CF2 and CF3 shouldn't have been flushed.
387 // TODO: This test doesn't allow for this case:
388 // " Since none of the CFs were above the size, flushing all."
389 // i.e. a flush happens before we get to here and its a flush-all.
390 assertTrue(cf2MemstoreSize
>= 0);
391 assertTrue(cf3MemstoreSize
>= 0);
392 assertEquals(totalMemstoreSize
, cf2MemstoreSize
+ cf3MemstoreSize
);
394 // Wait for the RS report to go across to the master, so that the master
395 // is aware of which sequence ids have been flushed, before we kill the RS.
396 // If in production, the RS dies before the report goes across, we will
397 // safely replay all the edits.
400 // Abort the region server where we have the region hosted.
401 HRegionServer rs
= desiredRegionAndServer
.getSecond();
404 // The aborted region server's regions will be eventually assigned to some
405 // other region server, and the get RPC call (inside verifyEdit()) will
406 // retry for some time till the regions come back up.
408 // Verify that all the edits are safe.
409 for (int i
= 1; i
<= 80; i
++) {
410 verifyEdit(1, i
, table
);
412 verifyEdit(2, i
, table
);
413 verifyEdit(3, i
, table
);
417 TEST_UTIL
.shutdownMiniCluster();
421 // Test Log Replay with Distributed log split on.
423 public void testLogReplayWithDistributedLogSplit() throws Exception
{
427 private WAL
getWAL(Region region
) {
428 return ((HRegion
)region
).getWAL();
431 private int getNumRolledLogFiles(Region region
) {
432 return AbstractFSWALProvider
.getNumRolledLogFiles(getWAL(region
));
436 * When a log roll is about to happen, we do a flush of the regions who will be affected by the
437 * log roll. These flushes cannot be a selective flushes, otherwise we cannot roll the logs. This
438 * test ensures that we do a full-flush in that scenario.
441 public void testFlushingWhenLogRolling() throws Exception
{
442 TableName tableName
= TableName
.valueOf("testFlushingWhenLogRolling");
443 Configuration conf
= TEST_UTIL
.getConfiguration();
444 conf
.setLong(HConstants
.HREGION_MEMSTORE_FLUSH_SIZE
, 128 * 1024 * 1024);
445 conf
.set(FlushPolicyFactory
.HBASE_FLUSH_POLICY_KEY
, FlushAllLargeStoresPolicy
.class.getName());
446 long cfFlushSizeLowerBound
= 2048;
447 conf
.setLong(FlushLargeStoresPolicy
.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN
,
448 cfFlushSizeLowerBound
);
450 // One hour, prevent periodic rolling
451 conf
.setLong("hbase.regionserver.logroll.period", 60L * 60 * 1000);
452 // prevent rolling by size
453 conf
.setLong("hbase.regionserver.hlog.blocksize", 128L * 1024 * 1024);
454 // Make it 10 as max logs before a flush comes on.
455 final int maxLogs
= 10;
456 conf
.setInt("hbase.regionserver.maxlogs", maxLogs
);
458 final int numRegionServers
= 1;
459 TEST_UTIL
.startMiniCluster(numRegionServers
);
461 Table table
= TEST_UTIL
.createTable(tableName
, FAMILIES
);
462 Pair
<HRegion
, HRegionServer
> desiredRegionAndServer
= getRegionWithName(tableName
);
463 final HRegion desiredRegion
= desiredRegionAndServer
.getFirst();
464 assertTrue("Could not find a region which hosts the new region.", desiredRegion
!= null);
465 LOG
.info("Writing to region=" + desiredRegion
);
467 // Add one row for both CFs.
468 for (int i
= 1; i
<= 3; i
++) {
469 table
.put(createPut(i
, 0));
471 // Now only add row to CF1, make sure when we force a flush, CF1 is larger than the lower
472 // bound and CF2 and CF3 are smaller than the lower bound.
473 for (int i
= 0; i
< maxLogs
; i
++) {
474 for (int j
= 0; j
< 100; j
++) {
475 table
.put(createPut(1, i
* 100 + j
));
477 // Roll the WAL. The log file count is less than maxLogs so no flush is triggered.
478 int currentNumRolledLogFiles
= getNumRolledLogFiles(desiredRegion
);
479 assertNull(getWAL(desiredRegion
).rollWriter());
480 while (getNumRolledLogFiles(desiredRegion
) <= currentNumRolledLogFiles
) {
484 assertEquals(maxLogs
, getNumRolledLogFiles(desiredRegion
));
485 assertTrue(desiredRegion
.getStore(FAMILY1
).getMemStoreSize().getHeapSize() > cfFlushSizeLowerBound
);
486 assertTrue(desiredRegion
.getStore(FAMILY2
).getMemStoreSize().getHeapSize() < cfFlushSizeLowerBound
);
487 assertTrue(desiredRegion
.getStore(FAMILY3
).getMemStoreSize().getHeapSize() < cfFlushSizeLowerBound
);
488 table
.put(createPut(1, 12345678));
489 // Make numRolledLogFiles greater than maxLogs
490 desiredRegionAndServer
.getSecond().getWalRoller().requestRollAll();
491 // Wait for some time till the flush caused by log rolling happens.
492 TEST_UTIL
.waitFor(30000, new Waiter
.ExplainingPredicate
<Exception
>() {
495 public boolean evaluate() throws Exception
{
496 return desiredRegion
.getMemStoreDataSize() == 0;
500 public String
explainFailure() throws Exception
{
501 long memstoreSize
= desiredRegion
.getMemStoreDataSize();
502 if (memstoreSize
> 0) {
503 return "Still have unflushed entries in memstore, memstore size is " + memstoreSize
;
508 LOG
.info("Finished waiting on flush after too many WALs...");
509 // Individual families should have been flushed.
510 assertEquals(MutableSegment
.DEEP_OVERHEAD
,
511 desiredRegion
.getStore(FAMILY1
).getMemStoreSize().getHeapSize());
512 assertEquals(MutableSegment
.DEEP_OVERHEAD
,
513 desiredRegion
.getStore(FAMILY2
).getMemStoreSize().getHeapSize());
514 assertEquals(MutableSegment
.DEEP_OVERHEAD
,
515 desiredRegion
.getStore(FAMILY3
).getMemStoreSize().getHeapSize());
516 // let WAL cleanOldLogs
517 assertNull(getWAL(desiredRegion
).rollWriter(true));
518 assertTrue(getNumRolledLogFiles(desiredRegion
) < maxLogs
);
520 TEST_UTIL
.shutdownMiniCluster();
524 private void doPut(Table table
, long memstoreFlushSize
) throws IOException
, InterruptedException
{
525 Region region
= getRegionWithName(table
.getName()).getFirst();
526 // cf1 4B per row, cf2 40B per row and cf3 400B per row
527 byte[] qf
= Bytes
.toBytes("qf");
528 for (int i
= 0; i
< 10000; i
++) {
529 Put put
= new Put(Bytes
.toBytes("row-" + i
));
530 byte[] value1
= new byte[100];
531 Bytes
.random(value1
);
532 put
.addColumn(FAMILY1
, qf
, value1
);
533 byte[] value2
= new byte[200];
534 Bytes
.random(value2
);
535 put
.addColumn(FAMILY2
, qf
, value2
);
536 byte[] value3
= new byte[400];
537 Bytes
.random(value3
);
538 put
.addColumn(FAMILY3
, qf
, value3
);
540 // slow down to let regionserver flush region.
541 while (region
.getMemStoreHeapSize() > memstoreFlushSize
) {
547 // Under the same write load, small stores should have less store files when
548 // percolumnfamilyflush enabled.
550 public void testCompareStoreFileCount() throws Exception
{
551 long memstoreFlushSize
= 1024L * 1024;
552 Configuration conf
= TEST_UTIL
.getConfiguration();
553 conf
.setLong(HConstants
.HREGION_MEMSTORE_FLUSH_SIZE
, memstoreFlushSize
);
554 conf
.set(FlushPolicyFactory
.HBASE_FLUSH_POLICY_KEY
, FlushAllStoresPolicy
.class.getName());
555 conf
.setInt(HStore
.BLOCKING_STOREFILES_KEY
, 10000);
556 conf
.set(HConstants
.HBASE_REGION_SPLIT_POLICY_KEY
,
557 ConstantSizeRegionSplitPolicy
.class.getName());
559 TableDescriptor tableDescriptor
= TableDescriptorBuilder
.newBuilder(TABLENAME
)
560 .setCompactionEnabled(false).setColumnFamily(ColumnFamilyDescriptorBuilder
.of(FAMILY1
))
561 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of(FAMILY2
))
562 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of(FAMILY3
)).build();
564 LOG
.info("==============Test with selective flush disabled===============");
565 int cf1StoreFileCount
= -1;
566 int cf2StoreFileCount
= -1;
567 int cf3StoreFileCount
= -1;
568 int cf1StoreFileCount1
= -1;
569 int cf2StoreFileCount1
= -1;
570 int cf3StoreFileCount1
= -1;
572 TEST_UTIL
.startMiniCluster(1);
573 TEST_UTIL
.getAdmin().createNamespace(
574 NamespaceDescriptor
.create(TABLENAME
.getNamespaceAsString()).build());
575 TEST_UTIL
.getAdmin().createTable(tableDescriptor
);
576 TEST_UTIL
.waitTableAvailable(TABLENAME
);
577 Connection conn
= ConnectionFactory
.createConnection(conf
);
578 Table table
= conn
.getTable(TABLENAME
);
579 doPut(table
, memstoreFlushSize
);
583 Region region
= getRegionWithName(TABLENAME
).getFirst();
584 cf1StoreFileCount
= region
.getStore(FAMILY1
).getStorefilesCount();
585 cf2StoreFileCount
= region
.getStore(FAMILY2
).getStorefilesCount();
586 cf3StoreFileCount
= region
.getStore(FAMILY3
).getStorefilesCount();
588 TEST_UTIL
.shutdownMiniCluster();
591 LOG
.info("==============Test with selective flush enabled===============");
592 conf
.set(FlushPolicyFactory
.HBASE_FLUSH_POLICY_KEY
, FlushAllLargeStoresPolicy
.class.getName());
593 // default value of per-cf flush lower bound is too big, set to a small enough value
594 conf
.setLong(FlushLargeStoresPolicy
.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN
, 0);
596 TEST_UTIL
.startMiniCluster(1);
597 TEST_UTIL
.getAdmin().createNamespace(
598 NamespaceDescriptor
.create(TABLENAME
.getNamespaceAsString()).build());
599 TEST_UTIL
.getAdmin().createTable(tableDescriptor
);
600 Connection conn
= ConnectionFactory
.createConnection(conf
);
601 Table table
= conn
.getTable(TABLENAME
);
602 doPut(table
, memstoreFlushSize
);
606 Region region
= getRegionWithName(TABLENAME
).getFirst();
607 cf1StoreFileCount1
= region
.getStore(FAMILY1
).getStorefilesCount();
608 cf2StoreFileCount1
= region
.getStore(FAMILY2
).getStorefilesCount();
609 cf3StoreFileCount1
= region
.getStore(FAMILY3
).getStorefilesCount();
611 TEST_UTIL
.shutdownMiniCluster();
614 LOG
.info("disable selective flush: " + Bytes
.toString(FAMILY1
) + "=>" + cf1StoreFileCount
615 + ", " + Bytes
.toString(FAMILY2
) + "=>" + cf2StoreFileCount
+ ", "
616 + Bytes
.toString(FAMILY3
) + "=>" + cf3StoreFileCount
);
617 LOG
.info("enable selective flush: " + Bytes
.toString(FAMILY1
) + "=>" + cf1StoreFileCount1
618 + ", " + Bytes
.toString(FAMILY2
) + "=>" + cf2StoreFileCount1
+ ", "
619 + Bytes
.toString(FAMILY3
) + "=>" + cf3StoreFileCount1
);
620 // small CF will have less store files.
621 assertTrue(cf1StoreFileCount1
< cf1StoreFileCount
);
622 assertTrue(cf2StoreFileCount1
< cf2StoreFileCount
);
625 public static void main(String
[] args
) throws Exception
{
626 int numRegions
= Integer
.parseInt(args
[0]);
627 long numRows
= Long
.parseLong(args
[1]);
629 TableDescriptor tableDescriptor
= TableDescriptorBuilder
.newBuilder(TABLENAME
)
630 .setMaxFileSize(10L * 1024 * 1024 * 1024)
631 .setValue(TableDescriptorBuilder
.SPLIT_POLICY
, ConstantSizeRegionSplitPolicy
.class.getName())
632 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of(FAMILY1
))
633 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of(FAMILY2
))
634 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of(FAMILY3
)).build();
636 Configuration conf
= HBaseConfiguration
.create();
637 Connection conn
= ConnectionFactory
.createConnection(conf
);
638 Admin admin
= conn
.getAdmin();
639 if (admin
.tableExists(TABLENAME
)) {
640 admin
.disableTable(TABLENAME
);
641 admin
.deleteTable(TABLENAME
);
643 if (numRegions
>= 3) {
644 byte[] startKey
= new byte[16];
645 byte[] endKey
= new byte[16];
646 Arrays
.fill(endKey
, (byte) 0xFF);
647 admin
.createTable(tableDescriptor
, startKey
, endKey
, numRegions
);
649 admin
.createTable(tableDescriptor
);
653 Table table
= conn
.getTable(TABLENAME
);
654 byte[] qf
= Bytes
.toBytes("qf");
655 byte[] value1
= new byte[16];
656 byte[] value2
= new byte[256];
657 byte[] value3
= new byte[4096];
658 for (long i
= 0; i
< numRows
; i
++) {
659 Put put
= new Put(Hashing
.md5().hashLong(i
).asBytes());
660 Bytes
.random(value1
);
661 Bytes
.random(value2
);
662 Bytes
.random(value3
);
663 put
.addColumn(FAMILY1
, qf
, value1
);
664 put
.addColumn(FAMILY2
, qf
, value2
);
665 put
.addColumn(FAMILY3
, qf
, value3
);
667 if (i
% 10000 == 0) {
668 LOG
.info(i
+ " rows put");