HBASE-26921 Rewrite the counting cells part in TestMultiVersions (#4316)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / regionserver / TestPerColumnFamilyFlush.java
blob16d56e670a25f40f414851ae8c63aa5fb53026ce
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.regionserver;
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertNotNull;
22 import static org.junit.Assert.assertNull;
23 import static org.junit.Assert.assertTrue;
25 import java.io.IOException;
26 import java.util.Arrays;
27 import java.util.List;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.fs.Path;
30 import org.apache.hadoop.hbase.HBaseClassTestRule;
31 import org.apache.hadoop.hbase.HBaseConfiguration;
32 import org.apache.hadoop.hbase.HBaseTestingUtil;
33 import org.apache.hadoop.hbase.HConstants;
34 import org.apache.hadoop.hbase.NamespaceDescriptor;
35 import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
36 import org.apache.hadoop.hbase.TableName;
37 import org.apache.hadoop.hbase.Waiter;
38 import org.apache.hadoop.hbase.client.Admin;
39 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
40 import org.apache.hadoop.hbase.client.Connection;
41 import org.apache.hadoop.hbase.client.ConnectionFactory;
42 import org.apache.hadoop.hbase.client.Get;
43 import org.apache.hadoop.hbase.client.Put;
44 import org.apache.hadoop.hbase.client.RegionInfo;
45 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
46 import org.apache.hadoop.hbase.client.Result;
47 import org.apache.hadoop.hbase.client.Table;
48 import org.apache.hadoop.hbase.client.TableDescriptor;
49 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
50 import org.apache.hadoop.hbase.testclassification.LargeTests;
51 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
52 import org.apache.hadoop.hbase.util.Bytes;
53 import org.apache.hadoop.hbase.util.JVMClusterUtil;
54 import org.apache.hadoop.hbase.util.Pair;
55 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
56 import org.apache.hadoop.hbase.wal.WAL;
57 import org.junit.ClassRule;
58 import org.junit.Test;
59 import org.junit.experimental.categories.Category;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
62 import org.apache.hbase.thirdparty.com.google.common.hash.Hashing;
64 /**
65 * This test verifies the correctness of the Per Column Family flushing strategy
67 @Category({ RegionServerTests.class, LargeTests.class })
68 public class TestPerColumnFamilyFlush {
70 @ClassRule
71 public static final HBaseClassTestRule CLASS_RULE =
72 HBaseClassTestRule.forClass(TestPerColumnFamilyFlush.class);
74 private static final Logger LOG = LoggerFactory.getLogger(TestPerColumnFamilyFlush.class);
76 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
78 private static final Path DIR = TEST_UTIL.getDataTestDir("TestHRegion");
80 public static final TableName TABLENAME = TableName.valueOf("TestPerColumnFamilyFlush", "t1");
82 public static final byte[][] FAMILIES = { Bytes.toBytes("f1"), Bytes.toBytes("f2"),
83 Bytes.toBytes("f3"), Bytes.toBytes("f4"), Bytes.toBytes("f5") };
85 public static final byte[] FAMILY1 = FAMILIES[0];
87 public static final byte[] FAMILY2 = FAMILIES[1];
89 public static final byte[] FAMILY3 = FAMILIES[2];
91 private HRegion initHRegion(String callingMethod, Configuration conf) throws IOException {
92 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLENAME);
93 for (byte[] family : FAMILIES) {
94 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family));
96 RegionInfo info = RegionInfoBuilder.newBuilder(TABLENAME).build();
97 Path path = new Path(DIR, callingMethod);
98 return HBaseTestingUtil.createRegionAndWAL(info, path, conf, builder.build());
101 // A helper function to create puts.
102 private Put createPut(int familyNum, int putNum) {
103 byte[] qf = Bytes.toBytes("q" + familyNum);
104 byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
105 byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
106 Put p = new Put(row);
107 p.addColumn(FAMILIES[familyNum - 1], qf, val);
108 return p;
111 // A helper function to create puts.
112 private Get createGet(int familyNum, int putNum) {
113 byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
114 return new Get(row);
117 // A helper function to verify edits.
118 void verifyEdit(int familyNum, int putNum, Table table) throws IOException {
119 Result r = table.get(createGet(familyNum, putNum));
120 byte[] family = FAMILIES[familyNum - 1];
121 byte[] qf = Bytes.toBytes("q" + familyNum);
122 byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
123 assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), r.getFamilyMap(family));
124 assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum),
125 r.getFamilyMap(family).get(qf));
126 assertTrue(("Incorrect value for Put#" + putNum + " for CF# " + familyNum),
127 Arrays.equals(r.getFamilyMap(family).get(qf), val));
130 @Test
131 public void testSelectiveFlushWhenEnabled() throws IOException {
132 // Set up the configuration, use new one to not conflict with minicluster in other tests
133 Configuration conf = new HBaseTestingUtil().getConfiguration();
134 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200 * 1024);
135 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName());
136 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN,
137 40 * 1024);
138 // Intialize the region
139 HRegion region = initHRegion("testSelectiveFlushWithDataCompaction", conf);
140 // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
141 for (int i = 1; i <= 1200; i++) {
142 region.put(createPut(1, i));
144 if (i <= 100) {
145 region.put(createPut(2, i));
146 if (i <= 50) {
147 region.put(createPut(3, i));
152 long totalMemstoreSize = region.getMemStoreDataSize();
154 // Find the smallest LSNs for edits wrt to each CF.
155 long smallestSeqCF1 = region.getOldestSeqIdOfStore(FAMILY1);
156 long smallestSeqCF2 = region.getOldestSeqIdOfStore(FAMILY2);
157 long smallestSeqCF3 = region.getOldestSeqIdOfStore(FAMILY3);
159 // Find the sizes of the memstores of each CF.
160 MemStoreSize cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
161 MemStoreSize cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
162 MemStoreSize cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
164 // Get the overall smallest LSN in the region's memstores.
165 long smallestSeqInRegionCurrentMemstore = getWAL(region)
166 .getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
168 // The overall smallest LSN in the region's memstores should be the same as
169 // the LSN of the smallest edit in CF1
170 assertEquals(smallestSeqCF1, smallestSeqInRegionCurrentMemstore);
172 // Some other sanity checks.
173 assertTrue(smallestSeqCF1 < smallestSeqCF2);
174 assertTrue(smallestSeqCF2 < smallestSeqCF3);
175 assertTrue(cf1MemstoreSize.getDataSize() > 0);
176 assertTrue(cf2MemstoreSize.getDataSize() > 0);
177 assertTrue(cf3MemstoreSize.getDataSize() > 0);
179 // The total memstore size should be the same as the sum of the sizes of
180 // memstores of CF1, CF2 and CF3.
181 assertEquals(totalMemstoreSize, cf1MemstoreSize.getDataSize() + cf2MemstoreSize.getDataSize()
182 + cf3MemstoreSize.getDataSize());
184 // Flush!
185 region.flush(false);
187 // Will use these to check if anything changed.
188 MemStoreSize oldCF2MemstoreSize = cf2MemstoreSize;
189 MemStoreSize oldCF3MemstoreSize = cf3MemstoreSize;
191 // Recalculate everything
192 cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
193 cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
194 cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
195 totalMemstoreSize = region.getMemStoreDataSize();
196 smallestSeqInRegionCurrentMemstore = getWAL(region)
197 .getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
199 // We should have cleared out only CF1, since we chose the flush thresholds
200 // and number of puts accordingly.
201 assertEquals(0, cf1MemstoreSize.getDataSize());
202 assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize.getHeapSize());
203 // Nothing should have happened to CF2, ...
204 assertEquals(cf2MemstoreSize, oldCF2MemstoreSize);
205 // ... or CF3
206 assertEquals(cf3MemstoreSize, oldCF3MemstoreSize);
207 // Now the smallest LSN in the region should be the same as the smallest
208 // LSN in the memstore of CF2.
209 assertEquals(smallestSeqInRegionCurrentMemstore, smallestSeqCF2);
210 // Of course, this should hold too.
211 assertEquals(totalMemstoreSize, cf2MemstoreSize.getDataSize() + cf3MemstoreSize.getDataSize());
213 // Now add more puts (mostly for CF2), so that we only flush CF2 this time.
214 for (int i = 1200; i < 2400; i++) {
215 region.put(createPut(2, i));
217 // Add only 100 puts for CF3
218 if (i - 1200 < 100) {
219 region.put(createPut(3, i));
223 // How much does the CF3 memstore occupy? Will be used later.
224 oldCF3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
226 // Flush again
227 region.flush(false);
229 // Recalculate everything
230 cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
231 cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
232 cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
233 totalMemstoreSize = region.getMemStoreDataSize();
234 smallestSeqInRegionCurrentMemstore = getWAL(region)
235 .getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
237 // CF1 and CF2, both should be absent.
238 assertEquals(0, cf1MemstoreSize.getDataSize());
239 assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize.getHeapSize());
240 assertEquals(0, cf2MemstoreSize.getDataSize());
241 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSize.getHeapSize());
242 // CF3 shouldn't have been touched.
243 assertEquals(cf3MemstoreSize, oldCF3MemstoreSize);
244 assertEquals(totalMemstoreSize, cf3MemstoreSize.getDataSize());
246 // What happens when we hit the memstore limit, but we are not able to find
247 // any Column Family above the threshold?
248 // In that case, we should flush all the CFs.
250 // Clearing the existing memstores.
251 region.flush(true);
253 // The memstore limit is 200*1024 and the column family flush threshold is
254 // around 50*1024. We try to just hit the memstore limit with each CF's
255 // memstore being below the CF flush threshold.
256 for (int i = 1; i <= 300; i++) {
257 region.put(createPut(1, i));
258 region.put(createPut(2, i));
259 region.put(createPut(3, i));
260 region.put(createPut(4, i));
261 region.put(createPut(5, i));
264 region.flush(false);
266 // Since we won't find any CF above the threshold, and hence no specific
267 // store to flush, we should flush all the memstores.
268 assertEquals(0, region.getMemStoreDataSize());
269 HBaseTestingUtil.closeRegionAndWAL(region);
272 @Test
273 public void testSelectiveFlushWhenNotEnabled() throws IOException {
274 // Set up the configuration, use new one to not conflict with minicluster in other tests
275 Configuration conf = new HBaseTestingUtil().getConfiguration();
276 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200 * 1024);
277 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllStoresPolicy.class.getName());
279 // Intialize the HRegion
280 HRegion region = initHRegion("testSelectiveFlushWhenNotEnabled", conf);
281 // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
282 for (int i = 1; i <= 1200; i++) {
283 region.put(createPut(1, i));
284 if (i <= 100) {
285 region.put(createPut(2, i));
286 if (i <= 50) {
287 region.put(createPut(3, i));
292 long totalMemstoreSize = region.getMemStoreDataSize();
294 // Find the sizes of the memstores of each CF.
295 MemStoreSize cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
296 MemStoreSize cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
297 MemStoreSize cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
299 // Some other sanity checks.
300 assertTrue(cf1MemstoreSize.getDataSize() > 0);
301 assertTrue(cf2MemstoreSize.getDataSize() > 0);
302 assertTrue(cf3MemstoreSize.getDataSize() > 0);
304 // The total memstore size should be the same as the sum of the sizes of
305 // memstores of CF1, CF2 and CF3.
306 assertEquals(totalMemstoreSize, cf1MemstoreSize.getDataSize() + cf2MemstoreSize.getDataSize()
307 + cf3MemstoreSize.getDataSize());
309 // Flush!
310 region.flush(false);
312 cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
313 cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
314 cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
315 totalMemstoreSize = region.getMemStoreDataSize();
316 long smallestSeqInRegionCurrentMemstore =
317 region.getWAL().getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
319 // Everything should have been cleared
320 assertEquals(0, cf1MemstoreSize.getDataSize());
321 assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize.getHeapSize());
322 assertEquals(0, cf2MemstoreSize.getDataSize());
323 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSize.getHeapSize());
324 assertEquals(0, cf3MemstoreSize.getDataSize());
325 assertEquals(MutableSegment.DEEP_OVERHEAD, cf3MemstoreSize.getHeapSize());
326 assertEquals(0, totalMemstoreSize);
327 assertEquals(HConstants.NO_SEQNUM, smallestSeqInRegionCurrentMemstore);
328 HBaseTestingUtil.closeRegionAndWAL(region);
331 // Find the (first) region which has the specified name.
332 private static Pair<HRegion, HRegionServer> getRegionWithName(TableName tableName) {
333 SingleProcessHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
334 List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads();
335 for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) {
336 HRegionServer hrs = rsts.get(i).getRegionServer();
337 for (HRegion region : hrs.getRegions(tableName)) {
338 return Pair.newPair(region, hrs);
341 return null;
344 private void doTestLogReplay() throws Exception {
345 Configuration conf = TEST_UTIL.getConfiguration();
346 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 10000);
347 // Carefully chosen limits so that the memstore just flushes when we're done
348 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName());
349 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 2500);
350 final int numRegionServers = 4;
351 try {
352 TEST_UTIL.startMiniCluster(numRegionServers);
353 TEST_UTIL.getAdmin().createNamespace(
354 NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
355 Table table = TEST_UTIL.createTable(TABLENAME, FAMILIES);
357 // Add 100 edits for CF1, 20 for CF2, 20 for CF3.
358 // These will all be interleaved in the log.
359 for (int i = 1; i <= 80; i++) {
360 table.put(createPut(1, i));
361 if (i <= 10) {
362 table.put(createPut(2, i));
363 table.put(createPut(3, i));
366 Thread.sleep(1000);
368 Pair<HRegion, HRegionServer> desiredRegionAndServer = getRegionWithName(TABLENAME);
369 HRegion desiredRegion = desiredRegionAndServer.getFirst();
370 assertTrue("Could not find a region which hosts the new region.", desiredRegion != null);
372 // Flush the region selectively.
373 desiredRegion.flush(false);
375 long totalMemstoreSize;
376 long cf1MemstoreSize, cf2MemstoreSize, cf3MemstoreSize;
377 totalMemstoreSize = desiredRegion.getMemStoreDataSize();
379 // Find the sizes of the memstores of each CF.
380 cf1MemstoreSize = desiredRegion.getStore(FAMILY1).getMemStoreSize().getDataSize();
381 cf2MemstoreSize = desiredRegion.getStore(FAMILY2).getMemStoreSize().getDataSize();
382 cf3MemstoreSize = desiredRegion.getStore(FAMILY3).getMemStoreSize().getDataSize();
384 // CF1 Should have been flushed
385 assertEquals(0, cf1MemstoreSize);
386 // CF2 and CF3 shouldn't have been flushed.
387 // TODO: This test doesn't allow for this case:
388 // " Since none of the CFs were above the size, flushing all."
389 // i.e. a flush happens before we get to here and its a flush-all.
390 assertTrue(cf2MemstoreSize >= 0);
391 assertTrue(cf3MemstoreSize >= 0);
392 assertEquals(totalMemstoreSize, cf2MemstoreSize + cf3MemstoreSize);
394 // Wait for the RS report to go across to the master, so that the master
395 // is aware of which sequence ids have been flushed, before we kill the RS.
396 // If in production, the RS dies before the report goes across, we will
397 // safely replay all the edits.
398 Thread.sleep(2000);
400 // Abort the region server where we have the region hosted.
401 HRegionServer rs = desiredRegionAndServer.getSecond();
402 rs.abort("testing");
404 // The aborted region server's regions will be eventually assigned to some
405 // other region server, and the get RPC call (inside verifyEdit()) will
406 // retry for some time till the regions come back up.
408 // Verify that all the edits are safe.
409 for (int i = 1; i <= 80; i++) {
410 verifyEdit(1, i, table);
411 if (i <= 10) {
412 verifyEdit(2, i, table);
413 verifyEdit(3, i, table);
416 } finally {
417 TEST_UTIL.shutdownMiniCluster();
421 // Test Log Replay with Distributed log split on.
422 @Test
423 public void testLogReplayWithDistributedLogSplit() throws Exception {
424 doTestLogReplay();
427 private WAL getWAL(Region region) {
428 return ((HRegion)region).getWAL();
431 private int getNumRolledLogFiles(Region region) {
432 return AbstractFSWALProvider.getNumRolledLogFiles(getWAL(region));
436 * When a log roll is about to happen, we do a flush of the regions who will be affected by the
437 * log roll. These flushes cannot be a selective flushes, otherwise we cannot roll the logs. This
438 * test ensures that we do a full-flush in that scenario.
440 @Test
441 public void testFlushingWhenLogRolling() throws Exception {
442 TableName tableName = TableName.valueOf("testFlushingWhenLogRolling");
443 Configuration conf = TEST_UTIL.getConfiguration();
444 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 128 * 1024 * 1024);
445 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName());
446 long cfFlushSizeLowerBound = 2048;
447 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN,
448 cfFlushSizeLowerBound);
450 // One hour, prevent periodic rolling
451 conf.setLong("hbase.regionserver.logroll.period", 60L * 60 * 1000);
452 // prevent rolling by size
453 conf.setLong("hbase.regionserver.hlog.blocksize", 128L * 1024 * 1024);
454 // Make it 10 as max logs before a flush comes on.
455 final int maxLogs = 10;
456 conf.setInt("hbase.regionserver.maxlogs", maxLogs);
458 final int numRegionServers = 1;
459 TEST_UTIL.startMiniCluster(numRegionServers);
460 try {
461 Table table = TEST_UTIL.createTable(tableName, FAMILIES);
462 Pair<HRegion, HRegionServer> desiredRegionAndServer = getRegionWithName(tableName);
463 final HRegion desiredRegion = desiredRegionAndServer.getFirst();
464 assertTrue("Could not find a region which hosts the new region.", desiredRegion != null);
465 LOG.info("Writing to region=" + desiredRegion);
467 // Add one row for both CFs.
468 for (int i = 1; i <= 3; i++) {
469 table.put(createPut(i, 0));
471 // Now only add row to CF1, make sure when we force a flush, CF1 is larger than the lower
472 // bound and CF2 and CF3 are smaller than the lower bound.
473 for (int i = 0; i < maxLogs; i++) {
474 for (int j = 0; j < 100; j++) {
475 table.put(createPut(1, i * 100 + j));
477 // Roll the WAL. The log file count is less than maxLogs so no flush is triggered.
478 int currentNumRolledLogFiles = getNumRolledLogFiles(desiredRegion);
479 assertNull(getWAL(desiredRegion).rollWriter());
480 while (getNumRolledLogFiles(desiredRegion) <= currentNumRolledLogFiles) {
481 Thread.sleep(100);
484 assertEquals(maxLogs, getNumRolledLogFiles(desiredRegion));
485 assertTrue(desiredRegion.getStore(FAMILY1).getMemStoreSize().getHeapSize() > cfFlushSizeLowerBound);
486 assertTrue(desiredRegion.getStore(FAMILY2).getMemStoreSize().getHeapSize() < cfFlushSizeLowerBound);
487 assertTrue(desiredRegion.getStore(FAMILY3).getMemStoreSize().getHeapSize() < cfFlushSizeLowerBound);
488 table.put(createPut(1, 12345678));
489 // Make numRolledLogFiles greater than maxLogs
490 desiredRegionAndServer.getSecond().getWalRoller().requestRollAll();
491 // Wait for some time till the flush caused by log rolling happens.
492 TEST_UTIL.waitFor(30000, new Waiter.ExplainingPredicate<Exception>() {
494 @Override
495 public boolean evaluate() throws Exception {
496 return desiredRegion.getMemStoreDataSize() == 0;
499 @Override
500 public String explainFailure() throws Exception {
501 long memstoreSize = desiredRegion.getMemStoreDataSize();
502 if (memstoreSize > 0) {
503 return "Still have unflushed entries in memstore, memstore size is " + memstoreSize;
505 return "Unknown";
508 LOG.info("Finished waiting on flush after too many WALs...");
509 // Individual families should have been flushed.
510 assertEquals(MutableSegment.DEEP_OVERHEAD,
511 desiredRegion.getStore(FAMILY1).getMemStoreSize().getHeapSize());
512 assertEquals(MutableSegment.DEEP_OVERHEAD,
513 desiredRegion.getStore(FAMILY2).getMemStoreSize().getHeapSize());
514 assertEquals(MutableSegment.DEEP_OVERHEAD,
515 desiredRegion.getStore(FAMILY3).getMemStoreSize().getHeapSize());
516 // let WAL cleanOldLogs
517 assertNull(getWAL(desiredRegion).rollWriter(true));
518 assertTrue(getNumRolledLogFiles(desiredRegion) < maxLogs);
519 } finally {
520 TEST_UTIL.shutdownMiniCluster();
524 private void doPut(Table table, long memstoreFlushSize) throws IOException, InterruptedException {
525 Region region = getRegionWithName(table.getName()).getFirst();
526 // cf1 4B per row, cf2 40B per row and cf3 400B per row
527 byte[] qf = Bytes.toBytes("qf");
528 for (int i = 0; i < 10000; i++) {
529 Put put = new Put(Bytes.toBytes("row-" + i));
530 byte[] value1 = new byte[100];
531 Bytes.random(value1);
532 put.addColumn(FAMILY1, qf, value1);
533 byte[] value2 = new byte[200];
534 Bytes.random(value2);
535 put.addColumn(FAMILY2, qf, value2);
536 byte[] value3 = new byte[400];
537 Bytes.random(value3);
538 put.addColumn(FAMILY3, qf, value3);
539 table.put(put);
540 // slow down to let regionserver flush region.
541 while (region.getMemStoreHeapSize() > memstoreFlushSize) {
542 Thread.sleep(100);
547 // Under the same write load, small stores should have less store files when
548 // percolumnfamilyflush enabled.
549 @Test
550 public void testCompareStoreFileCount() throws Exception {
551 long memstoreFlushSize = 1024L * 1024;
552 Configuration conf = TEST_UTIL.getConfiguration();
553 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, memstoreFlushSize);
554 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllStoresPolicy.class.getName());
555 conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000);
556 conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
557 ConstantSizeRegionSplitPolicy.class.getName());
559 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TABLENAME)
560 .setCompactionEnabled(false).setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY1))
561 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY2))
562 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY3)).build();
564 LOG.info("==============Test with selective flush disabled===============");
565 int cf1StoreFileCount = -1;
566 int cf2StoreFileCount = -1;
567 int cf3StoreFileCount = -1;
568 int cf1StoreFileCount1 = -1;
569 int cf2StoreFileCount1 = -1;
570 int cf3StoreFileCount1 = -1;
571 try {
572 TEST_UTIL.startMiniCluster(1);
573 TEST_UTIL.getAdmin().createNamespace(
574 NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
575 TEST_UTIL.getAdmin().createTable(tableDescriptor);
576 TEST_UTIL.waitTableAvailable(TABLENAME);
577 Connection conn = ConnectionFactory.createConnection(conf);
578 Table table = conn.getTable(TABLENAME);
579 doPut(table, memstoreFlushSize);
580 table.close();
581 conn.close();
583 Region region = getRegionWithName(TABLENAME).getFirst();
584 cf1StoreFileCount = region.getStore(FAMILY1).getStorefilesCount();
585 cf2StoreFileCount = region.getStore(FAMILY2).getStorefilesCount();
586 cf3StoreFileCount = region.getStore(FAMILY3).getStorefilesCount();
587 } finally {
588 TEST_UTIL.shutdownMiniCluster();
591 LOG.info("==============Test with selective flush enabled===============");
592 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName());
593 // default value of per-cf flush lower bound is too big, set to a small enough value
594 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 0);
595 try {
596 TEST_UTIL.startMiniCluster(1);
597 TEST_UTIL.getAdmin().createNamespace(
598 NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
599 TEST_UTIL.getAdmin().createTable(tableDescriptor);
600 Connection conn = ConnectionFactory.createConnection(conf);
601 Table table = conn.getTable(TABLENAME);
602 doPut(table, memstoreFlushSize);
603 table.close();
604 conn.close();
606 Region region = getRegionWithName(TABLENAME).getFirst();
607 cf1StoreFileCount1 = region.getStore(FAMILY1).getStorefilesCount();
608 cf2StoreFileCount1 = region.getStore(FAMILY2).getStorefilesCount();
609 cf3StoreFileCount1 = region.getStore(FAMILY3).getStorefilesCount();
610 } finally {
611 TEST_UTIL.shutdownMiniCluster();
614 LOG.info("disable selective flush: " + Bytes.toString(FAMILY1) + "=>" + cf1StoreFileCount
615 + ", " + Bytes.toString(FAMILY2) + "=>" + cf2StoreFileCount + ", "
616 + Bytes.toString(FAMILY3) + "=>" + cf3StoreFileCount);
617 LOG.info("enable selective flush: " + Bytes.toString(FAMILY1) + "=>" + cf1StoreFileCount1
618 + ", " + Bytes.toString(FAMILY2) + "=>" + cf2StoreFileCount1 + ", "
619 + Bytes.toString(FAMILY3) + "=>" + cf3StoreFileCount1);
620 // small CF will have less store files.
621 assertTrue(cf1StoreFileCount1 < cf1StoreFileCount);
622 assertTrue(cf2StoreFileCount1 < cf2StoreFileCount);
625 public static void main(String[] args) throws Exception {
626 int numRegions = Integer.parseInt(args[0]);
627 long numRows = Long.parseLong(args[1]);
629 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TABLENAME)
630 .setMaxFileSize(10L * 1024 * 1024 * 1024)
631 .setValue(TableDescriptorBuilder.SPLIT_POLICY, ConstantSizeRegionSplitPolicy.class.getName())
632 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY1))
633 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY2))
634 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY3)).build();
636 Configuration conf = HBaseConfiguration.create();
637 Connection conn = ConnectionFactory.createConnection(conf);
638 Admin admin = conn.getAdmin();
639 if (admin.tableExists(TABLENAME)) {
640 admin.disableTable(TABLENAME);
641 admin.deleteTable(TABLENAME);
643 if (numRegions >= 3) {
644 byte[] startKey = new byte[16];
645 byte[] endKey = new byte[16];
646 Arrays.fill(endKey, (byte) 0xFF);
647 admin.createTable(tableDescriptor, startKey, endKey, numRegions);
648 } else {
649 admin.createTable(tableDescriptor);
651 admin.close();
653 Table table = conn.getTable(TABLENAME);
654 byte[] qf = Bytes.toBytes("qf");
655 byte[] value1 = new byte[16];
656 byte[] value2 = new byte[256];
657 byte[] value3 = new byte[4096];
658 for (long i = 0; i < numRows; i++) {
659 Put put = new Put(Hashing.md5().hashLong(i).asBytes());
660 Bytes.random(value1);
661 Bytes.random(value2);
662 Bytes.random(value3);
663 put.addColumn(FAMILY1, qf, value1);
664 put.addColumn(FAMILY2, qf, value2);
665 put.addColumn(FAMILY3, qf, value3);
666 table.put(put);
667 if (i % 10000 == 0) {
668 LOG.info(i + " rows put");
671 table.close();
672 conn.close();