HBASE-24033 Add ut for loading the corrupt recovered hfiles (#1322)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / regionserver / TestPerColumnFamilyFlush.java
blob05e72e6d2e0bc232bdf1b0d96bd96e4aa279f093
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.regionserver;
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertNotNull;
22 import static org.junit.Assert.assertNull;
23 import static org.junit.Assert.assertTrue;
25 import java.io.IOException;
26 import java.util.Arrays;
27 import java.util.List;
28 import java.util.Random;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.fs.Path;
31 import org.apache.hadoop.hbase.HBaseClassTestRule;
32 import org.apache.hadoop.hbase.HBaseConfiguration;
33 import org.apache.hadoop.hbase.HBaseTestingUtility;
34 import org.apache.hadoop.hbase.HConstants;
35 import org.apache.hadoop.hbase.HRegionInfo;
36 import org.apache.hadoop.hbase.HTableDescriptor;
37 import org.apache.hadoop.hbase.MiniHBaseCluster;
38 import org.apache.hadoop.hbase.NamespaceDescriptor;
39 import org.apache.hadoop.hbase.TableName;
40 import org.apache.hadoop.hbase.Waiter;
41 import org.apache.hadoop.hbase.client.Admin;
42 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
43 import org.apache.hadoop.hbase.client.Connection;
44 import org.apache.hadoop.hbase.client.ConnectionFactory;
45 import org.apache.hadoop.hbase.client.Get;
46 import org.apache.hadoop.hbase.client.Put;
47 import org.apache.hadoop.hbase.client.Result;
48 import org.apache.hadoop.hbase.client.Table;
49 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
50 import org.apache.hadoop.hbase.testclassification.LargeTests;
51 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
52 import org.apache.hadoop.hbase.util.Bytes;
53 import org.apache.hadoop.hbase.util.JVMClusterUtil;
54 import org.apache.hadoop.hbase.util.Pair;
55 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
56 import org.apache.hadoop.hbase.wal.WAL;
57 import org.junit.ClassRule;
58 import org.junit.Test;
59 import org.junit.experimental.categories.Category;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
63 import org.apache.hbase.thirdparty.com.google.common.hash.Hashing;
65 /**
66 * This test verifies the correctness of the Per Column Family flushing strategy
68 @Category({ RegionServerTests.class, LargeTests.class })
69 public class TestPerColumnFamilyFlush {
71 @ClassRule
72 public static final HBaseClassTestRule CLASS_RULE =
73 HBaseClassTestRule.forClass(TestPerColumnFamilyFlush.class);
75 private static final Logger LOG = LoggerFactory.getLogger(TestPerColumnFamilyFlush.class);
77 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
79 private static final Path DIR = TEST_UTIL.getDataTestDir("TestHRegion");
81 public static final TableName TABLENAME = TableName.valueOf("TestPerColumnFamilyFlush", "t1");
83 public static final byte[][] FAMILIES = { Bytes.toBytes("f1"), Bytes.toBytes("f2"),
84 Bytes.toBytes("f3"), Bytes.toBytes("f4"), Bytes.toBytes("f5") };
86 public static final byte[] FAMILY1 = FAMILIES[0];
88 public static final byte[] FAMILY2 = FAMILIES[1];
90 public static final byte[] FAMILY3 = FAMILIES[2];
92 private HRegion initHRegion(String callingMethod, Configuration conf) throws IOException {
93 TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor =
94 new TableDescriptorBuilder.ModifyableTableDescriptor(TABLENAME);
95 for (byte[] family : FAMILIES) {
96 tableDescriptor.setColumnFamily(
97 new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(family));
99 HRegionInfo info = new HRegionInfo(TABLENAME, null, null, false);
100 Path path = new Path(DIR, callingMethod);
101 return HBaseTestingUtility.createRegionAndWAL(info, path, conf, tableDescriptor);
104 // A helper function to create puts.
105 private Put createPut(int familyNum, int putNum) {
106 byte[] qf = Bytes.toBytes("q" + familyNum);
107 byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
108 byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
109 Put p = new Put(row);
110 p.addColumn(FAMILIES[familyNum - 1], qf, val);
111 return p;
114 // A helper function to create puts.
115 private Get createGet(int familyNum, int putNum) {
116 byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
117 return new Get(row);
120 // A helper function to verify edits.
121 void verifyEdit(int familyNum, int putNum, Table table) throws IOException {
122 Result r = table.get(createGet(familyNum, putNum));
123 byte[] family = FAMILIES[familyNum - 1];
124 byte[] qf = Bytes.toBytes("q" + familyNum);
125 byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
126 assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), r.getFamilyMap(family));
127 assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum),
128 r.getFamilyMap(family).get(qf));
129 assertTrue(("Incorrect value for Put#" + putNum + " for CF# " + familyNum),
130 Arrays.equals(r.getFamilyMap(family).get(qf), val));
133 @Test
134 public void testSelectiveFlushWhenEnabled() throws IOException {
135 // Set up the configuration, use new one to not conflict with minicluster in other tests
136 Configuration conf = new HBaseTestingUtility().getConfiguration();
137 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200 * 1024);
138 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName());
139 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN,
140 40 * 1024);
141 // Intialize the region
142 HRegion region = initHRegion("testSelectiveFlushWithDataCompaction", conf);
143 // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
144 for (int i = 1; i <= 1200; i++) {
145 region.put(createPut(1, i));
147 if (i <= 100) {
148 region.put(createPut(2, i));
149 if (i <= 50) {
150 region.put(createPut(3, i));
155 long totalMemstoreSize = region.getMemStoreDataSize();
157 // Find the smallest LSNs for edits wrt to each CF.
158 long smallestSeqCF1 = region.getOldestSeqIdOfStore(FAMILY1);
159 long smallestSeqCF2 = region.getOldestSeqIdOfStore(FAMILY2);
160 long smallestSeqCF3 = region.getOldestSeqIdOfStore(FAMILY3);
162 // Find the sizes of the memstores of each CF.
163 MemStoreSize cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
164 MemStoreSize cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
165 MemStoreSize cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
167 // Get the overall smallest LSN in the region's memstores.
168 long smallestSeqInRegionCurrentMemstore = getWAL(region)
169 .getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
171 // The overall smallest LSN in the region's memstores should be the same as
172 // the LSN of the smallest edit in CF1
173 assertEquals(smallestSeqCF1, smallestSeqInRegionCurrentMemstore);
175 // Some other sanity checks.
176 assertTrue(smallestSeqCF1 < smallestSeqCF2);
177 assertTrue(smallestSeqCF2 < smallestSeqCF3);
178 assertTrue(cf1MemstoreSize.getDataSize() > 0);
179 assertTrue(cf2MemstoreSize.getDataSize() > 0);
180 assertTrue(cf3MemstoreSize.getDataSize() > 0);
182 // The total memstore size should be the same as the sum of the sizes of
183 // memstores of CF1, CF2 and CF3.
184 assertEquals(totalMemstoreSize, cf1MemstoreSize.getDataSize() + cf2MemstoreSize.getDataSize()
185 + cf3MemstoreSize.getDataSize());
187 // Flush!
188 region.flush(false);
190 // Will use these to check if anything changed.
191 MemStoreSize oldCF2MemstoreSize = cf2MemstoreSize;
192 MemStoreSize oldCF3MemstoreSize = cf3MemstoreSize;
194 // Recalculate everything
195 cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
196 cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
197 cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
198 totalMemstoreSize = region.getMemStoreDataSize();
199 smallestSeqInRegionCurrentMemstore = getWAL(region)
200 .getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
202 // We should have cleared out only CF1, since we chose the flush thresholds
203 // and number of puts accordingly.
204 assertEquals(0, cf1MemstoreSize.getDataSize());
205 assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize.getHeapSize());
206 // Nothing should have happened to CF2, ...
207 assertEquals(cf2MemstoreSize, oldCF2MemstoreSize);
208 // ... or CF3
209 assertEquals(cf3MemstoreSize, oldCF3MemstoreSize);
210 // Now the smallest LSN in the region should be the same as the smallest
211 // LSN in the memstore of CF2.
212 assertEquals(smallestSeqInRegionCurrentMemstore, smallestSeqCF2);
213 // Of course, this should hold too.
214 assertEquals(totalMemstoreSize, cf2MemstoreSize.getDataSize() + cf3MemstoreSize.getDataSize());
216 // Now add more puts (mostly for CF2), so that we only flush CF2 this time.
217 for (int i = 1200; i < 2400; i++) {
218 region.put(createPut(2, i));
220 // Add only 100 puts for CF3
221 if (i - 1200 < 100) {
222 region.put(createPut(3, i));
226 // How much does the CF3 memstore occupy? Will be used later.
227 oldCF3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
229 // Flush again
230 region.flush(false);
232 // Recalculate everything
233 cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
234 cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
235 cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
236 totalMemstoreSize = region.getMemStoreDataSize();
237 smallestSeqInRegionCurrentMemstore = getWAL(region)
238 .getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
240 // CF1 and CF2, both should be absent.
241 assertEquals(0, cf1MemstoreSize.getDataSize());
242 assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize.getHeapSize());
243 assertEquals(0, cf2MemstoreSize.getDataSize());
244 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSize.getHeapSize());
245 // CF3 shouldn't have been touched.
246 assertEquals(cf3MemstoreSize, oldCF3MemstoreSize);
247 assertEquals(totalMemstoreSize, cf3MemstoreSize.getDataSize());
249 // What happens when we hit the memstore limit, but we are not able to find
250 // any Column Family above the threshold?
251 // In that case, we should flush all the CFs.
253 // Clearing the existing memstores.
254 region.flush(true);
256 // The memstore limit is 200*1024 and the column family flush threshold is
257 // around 50*1024. We try to just hit the memstore limit with each CF's
258 // memstore being below the CF flush threshold.
259 for (int i = 1; i <= 300; i++) {
260 region.put(createPut(1, i));
261 region.put(createPut(2, i));
262 region.put(createPut(3, i));
263 region.put(createPut(4, i));
264 region.put(createPut(5, i));
267 region.flush(false);
269 // Since we won't find any CF above the threshold, and hence no specific
270 // store to flush, we should flush all the memstores.
271 assertEquals(0, region.getMemStoreDataSize());
272 HBaseTestingUtility.closeRegionAndWAL(region);
275 @Test
276 public void testSelectiveFlushWhenNotEnabled() throws IOException {
277 // Set up the configuration, use new one to not conflict with minicluster in other tests
278 Configuration conf = new HBaseTestingUtility().getConfiguration();
279 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200 * 1024);
280 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllStoresPolicy.class.getName());
282 // Intialize the HRegion
283 HRegion region = initHRegion("testSelectiveFlushWhenNotEnabled", conf);
284 // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
285 for (int i = 1; i <= 1200; i++) {
286 region.put(createPut(1, i));
287 if (i <= 100) {
288 region.put(createPut(2, i));
289 if (i <= 50) {
290 region.put(createPut(3, i));
295 long totalMemstoreSize = region.getMemStoreDataSize();
297 // Find the sizes of the memstores of each CF.
298 MemStoreSize cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
299 MemStoreSize cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
300 MemStoreSize cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
302 // Some other sanity checks.
303 assertTrue(cf1MemstoreSize.getDataSize() > 0);
304 assertTrue(cf2MemstoreSize.getDataSize() > 0);
305 assertTrue(cf3MemstoreSize.getDataSize() > 0);
307 // The total memstore size should be the same as the sum of the sizes of
308 // memstores of CF1, CF2 and CF3.
309 assertEquals(totalMemstoreSize, cf1MemstoreSize.getDataSize() + cf2MemstoreSize.getDataSize()
310 + cf3MemstoreSize.getDataSize());
312 // Flush!
313 region.flush(false);
315 cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
316 cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
317 cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
318 totalMemstoreSize = region.getMemStoreDataSize();
319 long smallestSeqInRegionCurrentMemstore =
320 region.getWAL().getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
322 // Everything should have been cleared
323 assertEquals(0, cf1MemstoreSize.getDataSize());
324 assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize.getHeapSize());
325 assertEquals(0, cf2MemstoreSize.getDataSize());
326 assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSize.getHeapSize());
327 assertEquals(0, cf3MemstoreSize.getDataSize());
328 assertEquals(MutableSegment.DEEP_OVERHEAD, cf3MemstoreSize.getHeapSize());
329 assertEquals(0, totalMemstoreSize);
330 assertEquals(HConstants.NO_SEQNUM, smallestSeqInRegionCurrentMemstore);
331 HBaseTestingUtility.closeRegionAndWAL(region);
334 // Find the (first) region which has the specified name.
335 private static Pair<HRegion, HRegionServer> getRegionWithName(TableName tableName) {
336 MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
337 List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads();
338 for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) {
339 HRegionServer hrs = rsts.get(i).getRegionServer();
340 for (HRegion region : hrs.getRegions(tableName)) {
341 return Pair.newPair(region, hrs);
344 return null;
347 private void doTestLogReplay() throws Exception {
348 Configuration conf = TEST_UTIL.getConfiguration();
349 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 10000);
350 // Carefully chosen limits so that the memstore just flushes when we're done
351 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName());
352 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 2500);
353 final int numRegionServers = 4;
354 try {
355 TEST_UTIL.startMiniCluster(numRegionServers);
356 TEST_UTIL.getAdmin().createNamespace(
357 NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
358 Table table = TEST_UTIL.createTable(TABLENAME, FAMILIES);
360 // Add 100 edits for CF1, 20 for CF2, 20 for CF3.
361 // These will all be interleaved in the log.
362 for (int i = 1; i <= 80; i++) {
363 table.put(createPut(1, i));
364 if (i <= 10) {
365 table.put(createPut(2, i));
366 table.put(createPut(3, i));
369 Thread.sleep(1000);
371 Pair<HRegion, HRegionServer> desiredRegionAndServer = getRegionWithName(TABLENAME);
372 HRegion desiredRegion = desiredRegionAndServer.getFirst();
373 assertTrue("Could not find a region which hosts the new region.", desiredRegion != null);
375 // Flush the region selectively.
376 desiredRegion.flush(false);
378 long totalMemstoreSize;
379 long cf1MemstoreSize, cf2MemstoreSize, cf3MemstoreSize;
380 totalMemstoreSize = desiredRegion.getMemStoreDataSize();
382 // Find the sizes of the memstores of each CF.
383 cf1MemstoreSize = desiredRegion.getStore(FAMILY1).getMemStoreSize().getDataSize();
384 cf2MemstoreSize = desiredRegion.getStore(FAMILY2).getMemStoreSize().getDataSize();
385 cf3MemstoreSize = desiredRegion.getStore(FAMILY3).getMemStoreSize().getDataSize();
387 // CF1 Should have been flushed
388 assertEquals(0, cf1MemstoreSize);
389 // CF2 and CF3 shouldn't have been flushed.
390 // TODO: This test doesn't allow for this case:
391 // " Since none of the CFs were above the size, flushing all."
392 // i.e. a flush happens before we get to here and its a flush-all.
393 assertTrue(cf2MemstoreSize >= 0);
394 assertTrue(cf3MemstoreSize >= 0);
395 assertEquals(totalMemstoreSize, cf2MemstoreSize + cf3MemstoreSize);
397 // Wait for the RS report to go across to the master, so that the master
398 // is aware of which sequence ids have been flushed, before we kill the RS.
399 // If in production, the RS dies before the report goes across, we will
400 // safely replay all the edits.
401 Thread.sleep(2000);
403 // Abort the region server where we have the region hosted.
404 HRegionServer rs = desiredRegionAndServer.getSecond();
405 rs.abort("testing");
407 // The aborted region server's regions will be eventually assigned to some
408 // other region server, and the get RPC call (inside verifyEdit()) will
409 // retry for some time till the regions come back up.
411 // Verify that all the edits are safe.
412 for (int i = 1; i <= 80; i++) {
413 verifyEdit(1, i, table);
414 if (i <= 10) {
415 verifyEdit(2, i, table);
416 verifyEdit(3, i, table);
419 } finally {
420 TEST_UTIL.shutdownMiniCluster();
424 // Test Log Replay with Distributed log split on.
425 @Test
426 public void testLogReplayWithDistributedLogSplit() throws Exception {
427 doTestLogReplay();
430 private WAL getWAL(Region region) {
431 return ((HRegion)region).getWAL();
434 private int getNumRolledLogFiles(Region region) {
435 return AbstractFSWALProvider.getNumRolledLogFiles(getWAL(region));
439 * When a log roll is about to happen, we do a flush of the regions who will be affected by the
440 * log roll. These flushes cannot be a selective flushes, otherwise we cannot roll the logs. This
441 * test ensures that we do a full-flush in that scenario.
443 @Test
444 public void testFlushingWhenLogRolling() throws Exception {
445 TableName tableName = TableName.valueOf("testFlushingWhenLogRolling");
446 Configuration conf = TEST_UTIL.getConfiguration();
447 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 128 * 1024 * 1024);
448 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName());
449 long cfFlushSizeLowerBound = 2048;
450 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN,
451 cfFlushSizeLowerBound);
453 // One hour, prevent periodic rolling
454 conf.setLong("hbase.regionserver.logroll.period", 60L * 60 * 1000);
455 // prevent rolling by size
456 conf.setLong("hbase.regionserver.hlog.blocksize", 128L * 1024 * 1024);
457 // Make it 10 as max logs before a flush comes on.
458 final int maxLogs = 10;
459 conf.setInt("hbase.regionserver.maxlogs", maxLogs);
461 final int numRegionServers = 1;
462 TEST_UTIL.startMiniCluster(numRegionServers);
463 try {
464 Table table = TEST_UTIL.createTable(tableName, FAMILIES);
465 Pair<HRegion, HRegionServer> desiredRegionAndServer = getRegionWithName(tableName);
466 final HRegion desiredRegion = desiredRegionAndServer.getFirst();
467 assertTrue("Could not find a region which hosts the new region.", desiredRegion != null);
468 LOG.info("Writing to region=" + desiredRegion);
470 // Add one row for both CFs.
471 for (int i = 1; i <= 3; i++) {
472 table.put(createPut(i, 0));
474 // Now only add row to CF1, make sure when we force a flush, CF1 is larger than the lower
475 // bound and CF2 and CF3 are smaller than the lower bound.
476 for (int i = 0; i < maxLogs; i++) {
477 for (int j = 0; j < 100; j++) {
478 table.put(createPut(1, i * 100 + j));
480 // Roll the WAL. The log file count is less than maxLogs so no flush is triggered.
481 int currentNumRolledLogFiles = getNumRolledLogFiles(desiredRegion);
482 assertNull(getWAL(desiredRegion).rollWriter());
483 while (getNumRolledLogFiles(desiredRegion) <= currentNumRolledLogFiles) {
484 Thread.sleep(100);
487 assertEquals(maxLogs, getNumRolledLogFiles(desiredRegion));
488 assertTrue(desiredRegion.getStore(FAMILY1).getMemStoreSize().getHeapSize() > cfFlushSizeLowerBound);
489 assertTrue(desiredRegion.getStore(FAMILY2).getMemStoreSize().getHeapSize() < cfFlushSizeLowerBound);
490 assertTrue(desiredRegion.getStore(FAMILY3).getMemStoreSize().getHeapSize() < cfFlushSizeLowerBound);
491 table.put(createPut(1, 12345678));
492 // Make numRolledLogFiles greater than maxLogs
493 desiredRegionAndServer.getSecond().getWalRoller().requestRollAll();
494 // Wait for some time till the flush caused by log rolling happens.
495 TEST_UTIL.waitFor(30000, new Waiter.ExplainingPredicate<Exception>() {
497 @Override
498 public boolean evaluate() throws Exception {
499 return desiredRegion.getMemStoreDataSize() == 0;
502 @Override
503 public String explainFailure() throws Exception {
504 long memstoreSize = desiredRegion.getMemStoreDataSize();
505 if (memstoreSize > 0) {
506 return "Still have unflushed entries in memstore, memstore size is " + memstoreSize;
508 return "Unknown";
511 LOG.info("Finished waiting on flush after too many WALs...");
512 // Individual families should have been flushed.
513 assertEquals(MutableSegment.DEEP_OVERHEAD,
514 desiredRegion.getStore(FAMILY1).getMemStoreSize().getHeapSize());
515 assertEquals(MutableSegment.DEEP_OVERHEAD,
516 desiredRegion.getStore(FAMILY2).getMemStoreSize().getHeapSize());
517 assertEquals(MutableSegment.DEEP_OVERHEAD,
518 desiredRegion.getStore(FAMILY3).getMemStoreSize().getHeapSize());
519 // let WAL cleanOldLogs
520 assertNull(getWAL(desiredRegion).rollWriter(true));
521 assertTrue(getNumRolledLogFiles(desiredRegion) < maxLogs);
522 } finally {
523 TEST_UTIL.shutdownMiniCluster();
527 private void doPut(Table table, long memstoreFlushSize) throws IOException, InterruptedException {
528 Region region = getRegionWithName(table.getName()).getFirst();
529 // cf1 4B per row, cf2 40B per row and cf3 400B per row
530 byte[] qf = Bytes.toBytes("qf");
531 Random rand = new Random();
532 byte[] value1 = new byte[100];
533 byte[] value2 = new byte[200];
534 byte[] value3 = new byte[400];
535 for (int i = 0; i < 10000; i++) {
536 Put put = new Put(Bytes.toBytes("row-" + i));
537 rand.setSeed(i);
538 rand.nextBytes(value1);
539 rand.nextBytes(value2);
540 rand.nextBytes(value3);
541 put.addColumn(FAMILY1, qf, value1);
542 put.addColumn(FAMILY2, qf, value2);
543 put.addColumn(FAMILY3, qf, value3);
544 table.put(put);
545 // slow down to let regionserver flush region.
546 while (region.getMemStoreHeapSize() > memstoreFlushSize) {
547 Thread.sleep(100);
552 // Under the same write load, small stores should have less store files when
553 // percolumnfamilyflush enabled.
554 @Test
555 public void testCompareStoreFileCount() throws Exception {
556 long memstoreFlushSize = 1024L * 1024;
557 Configuration conf = TEST_UTIL.getConfiguration();
558 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, memstoreFlushSize);
559 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllStoresPolicy.class.getName());
560 conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000);
561 conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
562 ConstantSizeRegionSplitPolicy.class.getName());
564 TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor =
565 new TableDescriptorBuilder.ModifyableTableDescriptor(TABLENAME);
566 tableDescriptor.setCompactionEnabled(false);
567 tableDescriptor.setColumnFamily(
568 new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(FAMILY1));
569 tableDescriptor.setColumnFamily(
570 new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(FAMILY2));
571 tableDescriptor.setColumnFamily(
572 new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(FAMILY3));
574 LOG.info("==============Test with selective flush disabled===============");
575 int cf1StoreFileCount = -1;
576 int cf2StoreFileCount = -1;
577 int cf3StoreFileCount = -1;
578 int cf1StoreFileCount1 = -1;
579 int cf2StoreFileCount1 = -1;
580 int cf3StoreFileCount1 = -1;
581 try {
582 TEST_UTIL.startMiniCluster(1);
583 TEST_UTIL.getAdmin().createNamespace(
584 NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
585 TEST_UTIL.getAdmin().createTable(tableDescriptor);
586 TEST_UTIL.waitTableAvailable(TABLENAME);
587 Connection conn = ConnectionFactory.createConnection(conf);
588 Table table = conn.getTable(TABLENAME);
589 doPut(table, memstoreFlushSize);
590 table.close();
591 conn.close();
593 Region region = getRegionWithName(TABLENAME).getFirst();
594 cf1StoreFileCount = region.getStore(FAMILY1).getStorefilesCount();
595 cf2StoreFileCount = region.getStore(FAMILY2).getStorefilesCount();
596 cf3StoreFileCount = region.getStore(FAMILY3).getStorefilesCount();
597 } finally {
598 TEST_UTIL.shutdownMiniCluster();
601 LOG.info("==============Test with selective flush enabled===============");
602 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName());
603 // default value of per-cf flush lower bound is too big, set to a small enough value
604 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 0);
605 try {
606 TEST_UTIL.startMiniCluster(1);
607 TEST_UTIL.getAdmin().createNamespace(
608 NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
609 TEST_UTIL.getAdmin().createTable(tableDescriptor);
610 Connection conn = ConnectionFactory.createConnection(conf);
611 Table table = conn.getTable(TABLENAME);
612 doPut(table, memstoreFlushSize);
613 table.close();
614 conn.close();
616 Region region = getRegionWithName(TABLENAME).getFirst();
617 cf1StoreFileCount1 = region.getStore(FAMILY1).getStorefilesCount();
618 cf2StoreFileCount1 = region.getStore(FAMILY2).getStorefilesCount();
619 cf3StoreFileCount1 = region.getStore(FAMILY3).getStorefilesCount();
620 } finally {
621 TEST_UTIL.shutdownMiniCluster();
624 LOG.info("disable selective flush: " + Bytes.toString(FAMILY1) + "=>" + cf1StoreFileCount
625 + ", " + Bytes.toString(FAMILY2) + "=>" + cf2StoreFileCount + ", "
626 + Bytes.toString(FAMILY3) + "=>" + cf3StoreFileCount);
627 LOG.info("enable selective flush: " + Bytes.toString(FAMILY1) + "=>" + cf1StoreFileCount1
628 + ", " + Bytes.toString(FAMILY2) + "=>" + cf2StoreFileCount1 + ", "
629 + Bytes.toString(FAMILY3) + "=>" + cf3StoreFileCount1);
630 // small CF will have less store files.
631 assertTrue(cf1StoreFileCount1 < cf1StoreFileCount);
632 assertTrue(cf2StoreFileCount1 < cf2StoreFileCount);
635 public static void main(String[] args) throws Exception {
636 int numRegions = Integer.parseInt(args[0]);
637 long numRows = Long.parseLong(args[1]);
639 TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor =
640 new TableDescriptorBuilder.ModifyableTableDescriptor(TABLENAME);
641 tableDescriptor.setMaxFileSize(10L * 1024 * 1024 * 1024);
642 tableDescriptor.setValue(HTableDescriptor.SPLIT_POLICY,
643 ConstantSizeRegionSplitPolicy.class.getName());
644 tableDescriptor.setColumnFamily(
645 new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(FAMILY1));
646 tableDescriptor.setColumnFamily(
647 new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(FAMILY2));
648 tableDescriptor.setColumnFamily(
649 new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(FAMILY3));
651 Configuration conf = HBaseConfiguration.create();
652 Connection conn = ConnectionFactory.createConnection(conf);
653 Admin admin = conn.getAdmin();
654 if (admin.tableExists(TABLENAME)) {
655 admin.disableTable(TABLENAME);
656 admin.deleteTable(TABLENAME);
658 if (numRegions >= 3) {
659 byte[] startKey = new byte[16];
660 byte[] endKey = new byte[16];
661 Arrays.fill(endKey, (byte) 0xFF);
662 admin.createTable(tableDescriptor, startKey, endKey, numRegions);
663 } else {
664 admin.createTable(tableDescriptor);
666 admin.close();
668 Table table = conn.getTable(TABLENAME);
669 byte[] qf = Bytes.toBytes("qf");
670 Random rand = new Random();
671 byte[] value1 = new byte[16];
672 byte[] value2 = new byte[256];
673 byte[] value3 = new byte[4096];
674 for (long i = 0; i < numRows; i++) {
675 Put put = new Put(Hashing.md5().hashLong(i).asBytes());
676 rand.setSeed(i);
677 rand.nextBytes(value1);
678 rand.nextBytes(value2);
679 rand.nextBytes(value3);
680 put.addColumn(FAMILY1, qf, value1);
681 put.addColumn(FAMILY2, qf, value2);
682 put.addColumn(FAMILY3, qf, value3);
683 table.put(put);
684 if (i % 10000 == 0) {
685 LOG.info(i + " rows put");
688 table.close();
689 conn.close();