2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.regionserver
;
20 import static org
.junit
.Assert
.assertEquals
;
21 import static org
.junit
.Assert
.assertNotNull
;
22 import static org
.junit
.Assert
.assertNull
;
23 import static org
.junit
.Assert
.assertTrue
;
25 import java
.io
.IOException
;
26 import java
.util
.Arrays
;
27 import java
.util
.List
;
28 import java
.util
.Random
;
29 import org
.apache
.hadoop
.conf
.Configuration
;
30 import org
.apache
.hadoop
.fs
.Path
;
31 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
32 import org
.apache
.hadoop
.hbase
.HBaseConfiguration
;
33 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
34 import org
.apache
.hadoop
.hbase
.HConstants
;
35 import org
.apache
.hadoop
.hbase
.HRegionInfo
;
36 import org
.apache
.hadoop
.hbase
.HTableDescriptor
;
37 import org
.apache
.hadoop
.hbase
.MiniHBaseCluster
;
38 import org
.apache
.hadoop
.hbase
.NamespaceDescriptor
;
39 import org
.apache
.hadoop
.hbase
.TableName
;
40 import org
.apache
.hadoop
.hbase
.Waiter
;
41 import org
.apache
.hadoop
.hbase
.client
.Admin
;
42 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptorBuilder
;
43 import org
.apache
.hadoop
.hbase
.client
.Connection
;
44 import org
.apache
.hadoop
.hbase
.client
.ConnectionFactory
;
45 import org
.apache
.hadoop
.hbase
.client
.Get
;
46 import org
.apache
.hadoop
.hbase
.client
.Put
;
47 import org
.apache
.hadoop
.hbase
.client
.Result
;
48 import org
.apache
.hadoop
.hbase
.client
.Table
;
49 import org
.apache
.hadoop
.hbase
.client
.TableDescriptorBuilder
;
50 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
51 import org
.apache
.hadoop
.hbase
.testclassification
.RegionServerTests
;
52 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
53 import org
.apache
.hadoop
.hbase
.util
.JVMClusterUtil
;
54 import org
.apache
.hadoop
.hbase
.util
.Pair
;
55 import org
.apache
.hadoop
.hbase
.wal
.AbstractFSWALProvider
;
56 import org
.apache
.hadoop
.hbase
.wal
.WAL
;
57 import org
.junit
.ClassRule
;
58 import org
.junit
.Test
;
59 import org
.junit
.experimental
.categories
.Category
;
60 import org
.slf4j
.Logger
;
61 import org
.slf4j
.LoggerFactory
;
63 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.hash
.Hashing
;
66 * This test verifies the correctness of the Per Column Family flushing strategy
68 @Category({ RegionServerTests
.class, LargeTests
.class })
69 public class TestPerColumnFamilyFlush
{
72 public static final HBaseClassTestRule CLASS_RULE
=
73 HBaseClassTestRule
.forClass(TestPerColumnFamilyFlush
.class);
75 private static final Logger LOG
= LoggerFactory
.getLogger(TestPerColumnFamilyFlush
.class);
77 private static final HBaseTestingUtility TEST_UTIL
= new HBaseTestingUtility();
79 private static final Path DIR
= TEST_UTIL
.getDataTestDir("TestHRegion");
81 public static final TableName TABLENAME
= TableName
.valueOf("TestPerColumnFamilyFlush", "t1");
83 public static final byte[][] FAMILIES
= { Bytes
.toBytes("f1"), Bytes
.toBytes("f2"),
84 Bytes
.toBytes("f3"), Bytes
.toBytes("f4"), Bytes
.toBytes("f5") };
86 public static final byte[] FAMILY1
= FAMILIES
[0];
88 public static final byte[] FAMILY2
= FAMILIES
[1];
90 public static final byte[] FAMILY3
= FAMILIES
[2];
92 private HRegion
initHRegion(String callingMethod
, Configuration conf
) throws IOException
{
93 TableDescriptorBuilder
.ModifyableTableDescriptor tableDescriptor
=
94 new TableDescriptorBuilder
.ModifyableTableDescriptor(TABLENAME
);
95 for (byte[] family
: FAMILIES
) {
96 tableDescriptor
.setColumnFamily(
97 new ColumnFamilyDescriptorBuilder
.ModifyableColumnFamilyDescriptor(family
));
99 HRegionInfo info
= new HRegionInfo(TABLENAME
, null, null, false);
100 Path path
= new Path(DIR
, callingMethod
);
101 return HBaseTestingUtility
.createRegionAndWAL(info
, path
, conf
, tableDescriptor
);
104 // A helper function to create puts.
105 private Put
createPut(int familyNum
, int putNum
) {
106 byte[] qf
= Bytes
.toBytes("q" + familyNum
);
107 byte[] row
= Bytes
.toBytes("row" + familyNum
+ "-" + putNum
);
108 byte[] val
= Bytes
.toBytes("val" + familyNum
+ "-" + putNum
);
109 Put p
= new Put(row
);
110 p
.addColumn(FAMILIES
[familyNum
- 1], qf
, val
);
114 // A helper function to create puts.
115 private Get
createGet(int familyNum
, int putNum
) {
116 byte[] row
= Bytes
.toBytes("row" + familyNum
+ "-" + putNum
);
120 // A helper function to verify edits.
121 void verifyEdit(int familyNum
, int putNum
, Table table
) throws IOException
{
122 Result r
= table
.get(createGet(familyNum
, putNum
));
123 byte[] family
= FAMILIES
[familyNum
- 1];
124 byte[] qf
= Bytes
.toBytes("q" + familyNum
);
125 byte[] val
= Bytes
.toBytes("val" + familyNum
+ "-" + putNum
);
126 assertNotNull(("Missing Put#" + putNum
+ " for CF# " + familyNum
), r
.getFamilyMap(family
));
127 assertNotNull(("Missing Put#" + putNum
+ " for CF# " + familyNum
),
128 r
.getFamilyMap(family
).get(qf
));
129 assertTrue(("Incorrect value for Put#" + putNum
+ " for CF# " + familyNum
),
130 Arrays
.equals(r
.getFamilyMap(family
).get(qf
), val
));
134 public void testSelectiveFlushWhenEnabled() throws IOException
{
135 // Set up the configuration, use new one to not conflict with minicluster in other tests
136 Configuration conf
= new HBaseTestingUtility().getConfiguration();
137 conf
.setLong(HConstants
.HREGION_MEMSTORE_FLUSH_SIZE
, 200 * 1024);
138 conf
.set(FlushPolicyFactory
.HBASE_FLUSH_POLICY_KEY
, FlushAllLargeStoresPolicy
.class.getName());
139 conf
.setLong(FlushLargeStoresPolicy
.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN
,
141 // Intialize the region
142 HRegion region
= initHRegion("testSelectiveFlushWithDataCompaction", conf
);
143 // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
144 for (int i
= 1; i
<= 1200; i
++) {
145 region
.put(createPut(1, i
));
148 region
.put(createPut(2, i
));
150 region
.put(createPut(3, i
));
155 long totalMemstoreSize
= region
.getMemStoreDataSize();
157 // Find the smallest LSNs for edits wrt to each CF.
158 long smallestSeqCF1
= region
.getOldestSeqIdOfStore(FAMILY1
);
159 long smallestSeqCF2
= region
.getOldestSeqIdOfStore(FAMILY2
);
160 long smallestSeqCF3
= region
.getOldestSeqIdOfStore(FAMILY3
);
162 // Find the sizes of the memstores of each CF.
163 MemStoreSize cf1MemstoreSize
= region
.getStore(FAMILY1
).getMemStoreSize();
164 MemStoreSize cf2MemstoreSize
= region
.getStore(FAMILY2
).getMemStoreSize();
165 MemStoreSize cf3MemstoreSize
= region
.getStore(FAMILY3
).getMemStoreSize();
167 // Get the overall smallest LSN in the region's memstores.
168 long smallestSeqInRegionCurrentMemstore
= getWAL(region
)
169 .getEarliestMemStoreSeqNum(region
.getRegionInfo().getEncodedNameAsBytes());
171 // The overall smallest LSN in the region's memstores should be the same as
172 // the LSN of the smallest edit in CF1
173 assertEquals(smallestSeqCF1
, smallestSeqInRegionCurrentMemstore
);
175 // Some other sanity checks.
176 assertTrue(smallestSeqCF1
< smallestSeqCF2
);
177 assertTrue(smallestSeqCF2
< smallestSeqCF3
);
178 assertTrue(cf1MemstoreSize
.getDataSize() > 0);
179 assertTrue(cf2MemstoreSize
.getDataSize() > 0);
180 assertTrue(cf3MemstoreSize
.getDataSize() > 0);
182 // The total memstore size should be the same as the sum of the sizes of
183 // memstores of CF1, CF2 and CF3.
184 assertEquals(totalMemstoreSize
, cf1MemstoreSize
.getDataSize() + cf2MemstoreSize
.getDataSize()
185 + cf3MemstoreSize
.getDataSize());
190 // Will use these to check if anything changed.
191 MemStoreSize oldCF2MemstoreSize
= cf2MemstoreSize
;
192 MemStoreSize oldCF3MemstoreSize
= cf3MemstoreSize
;
194 // Recalculate everything
195 cf1MemstoreSize
= region
.getStore(FAMILY1
).getMemStoreSize();
196 cf2MemstoreSize
= region
.getStore(FAMILY2
).getMemStoreSize();
197 cf3MemstoreSize
= region
.getStore(FAMILY3
).getMemStoreSize();
198 totalMemstoreSize
= region
.getMemStoreDataSize();
199 smallestSeqInRegionCurrentMemstore
= getWAL(region
)
200 .getEarliestMemStoreSeqNum(region
.getRegionInfo().getEncodedNameAsBytes());
202 // We should have cleared out only CF1, since we chose the flush thresholds
203 // and number of puts accordingly.
204 assertEquals(0, cf1MemstoreSize
.getDataSize());
205 assertEquals(MutableSegment
.DEEP_OVERHEAD
, cf1MemstoreSize
.getHeapSize());
206 // Nothing should have happened to CF2, ...
207 assertEquals(cf2MemstoreSize
, oldCF2MemstoreSize
);
209 assertEquals(cf3MemstoreSize
, oldCF3MemstoreSize
);
210 // Now the smallest LSN in the region should be the same as the smallest
211 // LSN in the memstore of CF2.
212 assertEquals(smallestSeqInRegionCurrentMemstore
, smallestSeqCF2
);
213 // Of course, this should hold too.
214 assertEquals(totalMemstoreSize
, cf2MemstoreSize
.getDataSize() + cf3MemstoreSize
.getDataSize());
216 // Now add more puts (mostly for CF2), so that we only flush CF2 this time.
217 for (int i
= 1200; i
< 2400; i
++) {
218 region
.put(createPut(2, i
));
220 // Add only 100 puts for CF3
221 if (i
- 1200 < 100) {
222 region
.put(createPut(3, i
));
226 // How much does the CF3 memstore occupy? Will be used later.
227 oldCF3MemstoreSize
= region
.getStore(FAMILY3
).getMemStoreSize();
232 // Recalculate everything
233 cf1MemstoreSize
= region
.getStore(FAMILY1
).getMemStoreSize();
234 cf2MemstoreSize
= region
.getStore(FAMILY2
).getMemStoreSize();
235 cf3MemstoreSize
= region
.getStore(FAMILY3
).getMemStoreSize();
236 totalMemstoreSize
= region
.getMemStoreDataSize();
237 smallestSeqInRegionCurrentMemstore
= getWAL(region
)
238 .getEarliestMemStoreSeqNum(region
.getRegionInfo().getEncodedNameAsBytes());
240 // CF1 and CF2, both should be absent.
241 assertEquals(0, cf1MemstoreSize
.getDataSize());
242 assertEquals(MutableSegment
.DEEP_OVERHEAD
, cf1MemstoreSize
.getHeapSize());
243 assertEquals(0, cf2MemstoreSize
.getDataSize());
244 assertEquals(MutableSegment
.DEEP_OVERHEAD
, cf2MemstoreSize
.getHeapSize());
245 // CF3 shouldn't have been touched.
246 assertEquals(cf3MemstoreSize
, oldCF3MemstoreSize
);
247 assertEquals(totalMemstoreSize
, cf3MemstoreSize
.getDataSize());
249 // What happens when we hit the memstore limit, but we are not able to find
250 // any Column Family above the threshold?
251 // In that case, we should flush all the CFs.
253 // Clearing the existing memstores.
256 // The memstore limit is 200*1024 and the column family flush threshold is
257 // around 50*1024. We try to just hit the memstore limit with each CF's
258 // memstore being below the CF flush threshold.
259 for (int i
= 1; i
<= 300; i
++) {
260 region
.put(createPut(1, i
));
261 region
.put(createPut(2, i
));
262 region
.put(createPut(3, i
));
263 region
.put(createPut(4, i
));
264 region
.put(createPut(5, i
));
269 // Since we won't find any CF above the threshold, and hence no specific
270 // store to flush, we should flush all the memstores.
271 assertEquals(0, region
.getMemStoreDataSize());
272 HBaseTestingUtility
.closeRegionAndWAL(region
);
276 public void testSelectiveFlushWhenNotEnabled() throws IOException
{
277 // Set up the configuration, use new one to not conflict with minicluster in other tests
278 Configuration conf
= new HBaseTestingUtility().getConfiguration();
279 conf
.setLong(HConstants
.HREGION_MEMSTORE_FLUSH_SIZE
, 200 * 1024);
280 conf
.set(FlushPolicyFactory
.HBASE_FLUSH_POLICY_KEY
, FlushAllStoresPolicy
.class.getName());
282 // Intialize the HRegion
283 HRegion region
= initHRegion("testSelectiveFlushWhenNotEnabled", conf
);
284 // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
285 for (int i
= 1; i
<= 1200; i
++) {
286 region
.put(createPut(1, i
));
288 region
.put(createPut(2, i
));
290 region
.put(createPut(3, i
));
295 long totalMemstoreSize
= region
.getMemStoreDataSize();
297 // Find the sizes of the memstores of each CF.
298 MemStoreSize cf1MemstoreSize
= region
.getStore(FAMILY1
).getMemStoreSize();
299 MemStoreSize cf2MemstoreSize
= region
.getStore(FAMILY2
).getMemStoreSize();
300 MemStoreSize cf3MemstoreSize
= region
.getStore(FAMILY3
).getMemStoreSize();
302 // Some other sanity checks.
303 assertTrue(cf1MemstoreSize
.getDataSize() > 0);
304 assertTrue(cf2MemstoreSize
.getDataSize() > 0);
305 assertTrue(cf3MemstoreSize
.getDataSize() > 0);
307 // The total memstore size should be the same as the sum of the sizes of
308 // memstores of CF1, CF2 and CF3.
309 assertEquals(totalMemstoreSize
, cf1MemstoreSize
.getDataSize() + cf2MemstoreSize
.getDataSize()
310 + cf3MemstoreSize
.getDataSize());
315 cf1MemstoreSize
= region
.getStore(FAMILY1
).getMemStoreSize();
316 cf2MemstoreSize
= region
.getStore(FAMILY2
).getMemStoreSize();
317 cf3MemstoreSize
= region
.getStore(FAMILY3
).getMemStoreSize();
318 totalMemstoreSize
= region
.getMemStoreDataSize();
319 long smallestSeqInRegionCurrentMemstore
=
320 region
.getWAL().getEarliestMemStoreSeqNum(region
.getRegionInfo().getEncodedNameAsBytes());
322 // Everything should have been cleared
323 assertEquals(0, cf1MemstoreSize
.getDataSize());
324 assertEquals(MutableSegment
.DEEP_OVERHEAD
, cf1MemstoreSize
.getHeapSize());
325 assertEquals(0, cf2MemstoreSize
.getDataSize());
326 assertEquals(MutableSegment
.DEEP_OVERHEAD
, cf2MemstoreSize
.getHeapSize());
327 assertEquals(0, cf3MemstoreSize
.getDataSize());
328 assertEquals(MutableSegment
.DEEP_OVERHEAD
, cf3MemstoreSize
.getHeapSize());
329 assertEquals(0, totalMemstoreSize
);
330 assertEquals(HConstants
.NO_SEQNUM
, smallestSeqInRegionCurrentMemstore
);
331 HBaseTestingUtility
.closeRegionAndWAL(region
);
334 // Find the (first) region which has the specified name.
335 private static Pair
<HRegion
, HRegionServer
> getRegionWithName(TableName tableName
) {
336 MiniHBaseCluster cluster
= TEST_UTIL
.getMiniHBaseCluster();
337 List
<JVMClusterUtil
.RegionServerThread
> rsts
= cluster
.getRegionServerThreads();
338 for (int i
= 0; i
< cluster
.getRegionServerThreads().size(); i
++) {
339 HRegionServer hrs
= rsts
.get(i
).getRegionServer();
340 for (HRegion region
: hrs
.getRegions(tableName
)) {
341 return Pair
.newPair(region
, hrs
);
347 private void doTestLogReplay() throws Exception
{
348 Configuration conf
= TEST_UTIL
.getConfiguration();
349 conf
.setLong(HConstants
.HREGION_MEMSTORE_FLUSH_SIZE
, 10000);
350 // Carefully chosen limits so that the memstore just flushes when we're done
351 conf
.set(FlushPolicyFactory
.HBASE_FLUSH_POLICY_KEY
, FlushAllLargeStoresPolicy
.class.getName());
352 conf
.setLong(FlushLargeStoresPolicy
.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN
, 2500);
353 final int numRegionServers
= 4;
355 TEST_UTIL
.startMiniCluster(numRegionServers
);
356 TEST_UTIL
.getAdmin().createNamespace(
357 NamespaceDescriptor
.create(TABLENAME
.getNamespaceAsString()).build());
358 Table table
= TEST_UTIL
.createTable(TABLENAME
, FAMILIES
);
360 // Add 100 edits for CF1, 20 for CF2, 20 for CF3.
361 // These will all be interleaved in the log.
362 for (int i
= 1; i
<= 80; i
++) {
363 table
.put(createPut(1, i
));
365 table
.put(createPut(2, i
));
366 table
.put(createPut(3, i
));
371 Pair
<HRegion
, HRegionServer
> desiredRegionAndServer
= getRegionWithName(TABLENAME
);
372 HRegion desiredRegion
= desiredRegionAndServer
.getFirst();
373 assertTrue("Could not find a region which hosts the new region.", desiredRegion
!= null);
375 // Flush the region selectively.
376 desiredRegion
.flush(false);
378 long totalMemstoreSize
;
379 long cf1MemstoreSize
, cf2MemstoreSize
, cf3MemstoreSize
;
380 totalMemstoreSize
= desiredRegion
.getMemStoreDataSize();
382 // Find the sizes of the memstores of each CF.
383 cf1MemstoreSize
= desiredRegion
.getStore(FAMILY1
).getMemStoreSize().getDataSize();
384 cf2MemstoreSize
= desiredRegion
.getStore(FAMILY2
).getMemStoreSize().getDataSize();
385 cf3MemstoreSize
= desiredRegion
.getStore(FAMILY3
).getMemStoreSize().getDataSize();
387 // CF1 Should have been flushed
388 assertEquals(0, cf1MemstoreSize
);
389 // CF2 and CF3 shouldn't have been flushed.
390 // TODO: This test doesn't allow for this case:
391 // " Since none of the CFs were above the size, flushing all."
392 // i.e. a flush happens before we get to here and its a flush-all.
393 assertTrue(cf2MemstoreSize
>= 0);
394 assertTrue(cf3MemstoreSize
>= 0);
395 assertEquals(totalMemstoreSize
, cf2MemstoreSize
+ cf3MemstoreSize
);
397 // Wait for the RS report to go across to the master, so that the master
398 // is aware of which sequence ids have been flushed, before we kill the RS.
399 // If in production, the RS dies before the report goes across, we will
400 // safely replay all the edits.
403 // Abort the region server where we have the region hosted.
404 HRegionServer rs
= desiredRegionAndServer
.getSecond();
407 // The aborted region server's regions will be eventually assigned to some
408 // other region server, and the get RPC call (inside verifyEdit()) will
409 // retry for some time till the regions come back up.
411 // Verify that all the edits are safe.
412 for (int i
= 1; i
<= 80; i
++) {
413 verifyEdit(1, i
, table
);
415 verifyEdit(2, i
, table
);
416 verifyEdit(3, i
, table
);
420 TEST_UTIL
.shutdownMiniCluster();
424 // Test Log Replay with Distributed log split on.
426 public void testLogReplayWithDistributedLogSplit() throws Exception
{
430 private WAL
getWAL(Region region
) {
431 return ((HRegion
)region
).getWAL();
434 private int getNumRolledLogFiles(Region region
) {
435 return AbstractFSWALProvider
.getNumRolledLogFiles(getWAL(region
));
439 * When a log roll is about to happen, we do a flush of the regions who will be affected by the
440 * log roll. These flushes cannot be a selective flushes, otherwise we cannot roll the logs. This
441 * test ensures that we do a full-flush in that scenario.
444 public void testFlushingWhenLogRolling() throws Exception
{
445 TableName tableName
= TableName
.valueOf("testFlushingWhenLogRolling");
446 Configuration conf
= TEST_UTIL
.getConfiguration();
447 conf
.setLong(HConstants
.HREGION_MEMSTORE_FLUSH_SIZE
, 128 * 1024 * 1024);
448 conf
.set(FlushPolicyFactory
.HBASE_FLUSH_POLICY_KEY
, FlushAllLargeStoresPolicy
.class.getName());
449 long cfFlushSizeLowerBound
= 2048;
450 conf
.setLong(FlushLargeStoresPolicy
.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN
,
451 cfFlushSizeLowerBound
);
453 // One hour, prevent periodic rolling
454 conf
.setLong("hbase.regionserver.logroll.period", 60L * 60 * 1000);
455 // prevent rolling by size
456 conf
.setLong("hbase.regionserver.hlog.blocksize", 128L * 1024 * 1024);
457 // Make it 10 as max logs before a flush comes on.
458 final int maxLogs
= 10;
459 conf
.setInt("hbase.regionserver.maxlogs", maxLogs
);
461 final int numRegionServers
= 1;
462 TEST_UTIL
.startMiniCluster(numRegionServers
);
464 Table table
= TEST_UTIL
.createTable(tableName
, FAMILIES
);
465 Pair
<HRegion
, HRegionServer
> desiredRegionAndServer
= getRegionWithName(tableName
);
466 final HRegion desiredRegion
= desiredRegionAndServer
.getFirst();
467 assertTrue("Could not find a region which hosts the new region.", desiredRegion
!= null);
468 LOG
.info("Writing to region=" + desiredRegion
);
470 // Add one row for both CFs.
471 for (int i
= 1; i
<= 3; i
++) {
472 table
.put(createPut(i
, 0));
474 // Now only add row to CF1, make sure when we force a flush, CF1 is larger than the lower
475 // bound and CF2 and CF3 are smaller than the lower bound.
476 for (int i
= 0; i
< maxLogs
; i
++) {
477 for (int j
= 0; j
< 100; j
++) {
478 table
.put(createPut(1, i
* 100 + j
));
480 // Roll the WAL. The log file count is less than maxLogs so no flush is triggered.
481 int currentNumRolledLogFiles
= getNumRolledLogFiles(desiredRegion
);
482 assertNull(getWAL(desiredRegion
).rollWriter());
483 while (getNumRolledLogFiles(desiredRegion
) <= currentNumRolledLogFiles
) {
487 assertEquals(maxLogs
, getNumRolledLogFiles(desiredRegion
));
488 assertTrue(desiredRegion
.getStore(FAMILY1
).getMemStoreSize().getHeapSize() > cfFlushSizeLowerBound
);
489 assertTrue(desiredRegion
.getStore(FAMILY2
).getMemStoreSize().getHeapSize() < cfFlushSizeLowerBound
);
490 assertTrue(desiredRegion
.getStore(FAMILY3
).getMemStoreSize().getHeapSize() < cfFlushSizeLowerBound
);
491 table
.put(createPut(1, 12345678));
492 // Make numRolledLogFiles greater than maxLogs
493 desiredRegionAndServer
.getSecond().getWalRoller().requestRollAll();
494 // Wait for some time till the flush caused by log rolling happens.
495 TEST_UTIL
.waitFor(30000, new Waiter
.ExplainingPredicate
<Exception
>() {
498 public boolean evaluate() throws Exception
{
499 return desiredRegion
.getMemStoreDataSize() == 0;
503 public String
explainFailure() throws Exception
{
504 long memstoreSize
= desiredRegion
.getMemStoreDataSize();
505 if (memstoreSize
> 0) {
506 return "Still have unflushed entries in memstore, memstore size is " + memstoreSize
;
511 LOG
.info("Finished waiting on flush after too many WALs...");
512 // Individual families should have been flushed.
513 assertEquals(MutableSegment
.DEEP_OVERHEAD
,
514 desiredRegion
.getStore(FAMILY1
).getMemStoreSize().getHeapSize());
515 assertEquals(MutableSegment
.DEEP_OVERHEAD
,
516 desiredRegion
.getStore(FAMILY2
).getMemStoreSize().getHeapSize());
517 assertEquals(MutableSegment
.DEEP_OVERHEAD
,
518 desiredRegion
.getStore(FAMILY3
).getMemStoreSize().getHeapSize());
519 // let WAL cleanOldLogs
520 assertNull(getWAL(desiredRegion
).rollWriter(true));
521 assertTrue(getNumRolledLogFiles(desiredRegion
) < maxLogs
);
523 TEST_UTIL
.shutdownMiniCluster();
527 private void doPut(Table table
, long memstoreFlushSize
) throws IOException
, InterruptedException
{
528 Region region
= getRegionWithName(table
.getName()).getFirst();
529 // cf1 4B per row, cf2 40B per row and cf3 400B per row
530 byte[] qf
= Bytes
.toBytes("qf");
531 Random rand
= new Random();
532 byte[] value1
= new byte[100];
533 byte[] value2
= new byte[200];
534 byte[] value3
= new byte[400];
535 for (int i
= 0; i
< 10000; i
++) {
536 Put put
= new Put(Bytes
.toBytes("row-" + i
));
538 rand
.nextBytes(value1
);
539 rand
.nextBytes(value2
);
540 rand
.nextBytes(value3
);
541 put
.addColumn(FAMILY1
, qf
, value1
);
542 put
.addColumn(FAMILY2
, qf
, value2
);
543 put
.addColumn(FAMILY3
, qf
, value3
);
545 // slow down to let regionserver flush region.
546 while (region
.getMemStoreHeapSize() > memstoreFlushSize
) {
552 // Under the same write load, small stores should have less store files when
553 // percolumnfamilyflush enabled.
555 public void testCompareStoreFileCount() throws Exception
{
556 long memstoreFlushSize
= 1024L * 1024;
557 Configuration conf
= TEST_UTIL
.getConfiguration();
558 conf
.setLong(HConstants
.HREGION_MEMSTORE_FLUSH_SIZE
, memstoreFlushSize
);
559 conf
.set(FlushPolicyFactory
.HBASE_FLUSH_POLICY_KEY
, FlushAllStoresPolicy
.class.getName());
560 conf
.setInt(HStore
.BLOCKING_STOREFILES_KEY
, 10000);
561 conf
.set(HConstants
.HBASE_REGION_SPLIT_POLICY_KEY
,
562 ConstantSizeRegionSplitPolicy
.class.getName());
564 TableDescriptorBuilder
.ModifyableTableDescriptor tableDescriptor
=
565 new TableDescriptorBuilder
.ModifyableTableDescriptor(TABLENAME
);
566 tableDescriptor
.setCompactionEnabled(false);
567 tableDescriptor
.setColumnFamily(
568 new ColumnFamilyDescriptorBuilder
.ModifyableColumnFamilyDescriptor(FAMILY1
));
569 tableDescriptor
.setColumnFamily(
570 new ColumnFamilyDescriptorBuilder
.ModifyableColumnFamilyDescriptor(FAMILY2
));
571 tableDescriptor
.setColumnFamily(
572 new ColumnFamilyDescriptorBuilder
.ModifyableColumnFamilyDescriptor(FAMILY3
));
574 LOG
.info("==============Test with selective flush disabled===============");
575 int cf1StoreFileCount
= -1;
576 int cf2StoreFileCount
= -1;
577 int cf3StoreFileCount
= -1;
578 int cf1StoreFileCount1
= -1;
579 int cf2StoreFileCount1
= -1;
580 int cf3StoreFileCount1
= -1;
582 TEST_UTIL
.startMiniCluster(1);
583 TEST_UTIL
.getAdmin().createNamespace(
584 NamespaceDescriptor
.create(TABLENAME
.getNamespaceAsString()).build());
585 TEST_UTIL
.getAdmin().createTable(tableDescriptor
);
586 TEST_UTIL
.waitTableAvailable(TABLENAME
);
587 Connection conn
= ConnectionFactory
.createConnection(conf
);
588 Table table
= conn
.getTable(TABLENAME
);
589 doPut(table
, memstoreFlushSize
);
593 Region region
= getRegionWithName(TABLENAME
).getFirst();
594 cf1StoreFileCount
= region
.getStore(FAMILY1
).getStorefilesCount();
595 cf2StoreFileCount
= region
.getStore(FAMILY2
).getStorefilesCount();
596 cf3StoreFileCount
= region
.getStore(FAMILY3
).getStorefilesCount();
598 TEST_UTIL
.shutdownMiniCluster();
601 LOG
.info("==============Test with selective flush enabled===============");
602 conf
.set(FlushPolicyFactory
.HBASE_FLUSH_POLICY_KEY
, FlushAllLargeStoresPolicy
.class.getName());
603 // default value of per-cf flush lower bound is too big, set to a small enough value
604 conf
.setLong(FlushLargeStoresPolicy
.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN
, 0);
606 TEST_UTIL
.startMiniCluster(1);
607 TEST_UTIL
.getAdmin().createNamespace(
608 NamespaceDescriptor
.create(TABLENAME
.getNamespaceAsString()).build());
609 TEST_UTIL
.getAdmin().createTable(tableDescriptor
);
610 Connection conn
= ConnectionFactory
.createConnection(conf
);
611 Table table
= conn
.getTable(TABLENAME
);
612 doPut(table
, memstoreFlushSize
);
616 Region region
= getRegionWithName(TABLENAME
).getFirst();
617 cf1StoreFileCount1
= region
.getStore(FAMILY1
).getStorefilesCount();
618 cf2StoreFileCount1
= region
.getStore(FAMILY2
).getStorefilesCount();
619 cf3StoreFileCount1
= region
.getStore(FAMILY3
).getStorefilesCount();
621 TEST_UTIL
.shutdownMiniCluster();
624 LOG
.info("disable selective flush: " + Bytes
.toString(FAMILY1
) + "=>" + cf1StoreFileCount
625 + ", " + Bytes
.toString(FAMILY2
) + "=>" + cf2StoreFileCount
+ ", "
626 + Bytes
.toString(FAMILY3
) + "=>" + cf3StoreFileCount
);
627 LOG
.info("enable selective flush: " + Bytes
.toString(FAMILY1
) + "=>" + cf1StoreFileCount1
628 + ", " + Bytes
.toString(FAMILY2
) + "=>" + cf2StoreFileCount1
+ ", "
629 + Bytes
.toString(FAMILY3
) + "=>" + cf3StoreFileCount1
);
630 // small CF will have less store files.
631 assertTrue(cf1StoreFileCount1
< cf1StoreFileCount
);
632 assertTrue(cf2StoreFileCount1
< cf2StoreFileCount
);
635 public static void main(String
[] args
) throws Exception
{
636 int numRegions
= Integer
.parseInt(args
[0]);
637 long numRows
= Long
.parseLong(args
[1]);
639 TableDescriptorBuilder
.ModifyableTableDescriptor tableDescriptor
=
640 new TableDescriptorBuilder
.ModifyableTableDescriptor(TABLENAME
);
641 tableDescriptor
.setMaxFileSize(10L * 1024 * 1024 * 1024);
642 tableDescriptor
.setValue(HTableDescriptor
.SPLIT_POLICY
,
643 ConstantSizeRegionSplitPolicy
.class.getName());
644 tableDescriptor
.setColumnFamily(
645 new ColumnFamilyDescriptorBuilder
.ModifyableColumnFamilyDescriptor(FAMILY1
));
646 tableDescriptor
.setColumnFamily(
647 new ColumnFamilyDescriptorBuilder
.ModifyableColumnFamilyDescriptor(FAMILY2
));
648 tableDescriptor
.setColumnFamily(
649 new ColumnFamilyDescriptorBuilder
.ModifyableColumnFamilyDescriptor(FAMILY3
));
651 Configuration conf
= HBaseConfiguration
.create();
652 Connection conn
= ConnectionFactory
.createConnection(conf
);
653 Admin admin
= conn
.getAdmin();
654 if (admin
.tableExists(TABLENAME
)) {
655 admin
.disableTable(TABLENAME
);
656 admin
.deleteTable(TABLENAME
);
658 if (numRegions
>= 3) {
659 byte[] startKey
= new byte[16];
660 byte[] endKey
= new byte[16];
661 Arrays
.fill(endKey
, (byte) 0xFF);
662 admin
.createTable(tableDescriptor
, startKey
, endKey
, numRegions
);
664 admin
.createTable(tableDescriptor
);
668 Table table
= conn
.getTable(TABLENAME
);
669 byte[] qf
= Bytes
.toBytes("qf");
670 Random rand
= new Random();
671 byte[] value1
= new byte[16];
672 byte[] value2
= new byte[256];
673 byte[] value3
= new byte[4096];
674 for (long i
= 0; i
< numRows
; i
++) {
675 Put put
= new Put(Hashing
.md5().hashLong(i
).asBytes());
677 rand
.nextBytes(value1
);
678 rand
.nextBytes(value2
);
679 rand
.nextBytes(value3
);
680 put
.addColumn(FAMILY1
, qf
, value1
);
681 put
.addColumn(FAMILY2
, qf
, value2
);
682 put
.addColumn(FAMILY3
, qf
, value3
);
684 if (i
% 10000 == 0) {
685 LOG
.info(i
+ " rows put");