HBASE-23741 Data loss when WAL split to HFile enabled (#1254)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / TestIOFencing.java
blob3880516f9c7c198a8842bc556c703a34a305aeba
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase;
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertTrue;
23 import java.io.IOException;
24 import java.util.Collection;
25 import java.util.List;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.atomic.AtomicInteger;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.fs.FileSystem;
30 import org.apache.hadoop.fs.Path;
31 import org.apache.hadoop.hbase.client.Admin;
32 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
33 import org.apache.hadoop.hbase.client.RegionInfo;
34 import org.apache.hadoop.hbase.client.Table;
35 import org.apache.hadoop.hbase.client.TableDescriptor;
36 import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
37 import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
38 import org.apache.hadoop.hbase.regionserver.HRegion;
39 import org.apache.hadoop.hbase.regionserver.HRegionServer;
40 import org.apache.hadoop.hbase.regionserver.HStore;
41 import org.apache.hadoop.hbase.regionserver.HStoreFile;
42 import org.apache.hadoop.hbase.regionserver.Region;
43 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
44 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
45 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
46 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
47 import org.apache.hadoop.hbase.security.User;
48 import org.apache.hadoop.hbase.testclassification.LargeTests;
49 import org.apache.hadoop.hbase.testclassification.MiscTests;
50 import org.apache.hadoop.hbase.util.Bytes;
51 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
52 import org.apache.hadoop.hbase.wal.WAL;
53 import org.junit.ClassRule;
54 import org.junit.Test;
55 import org.junit.experimental.categories.Category;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
59 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
61 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
62 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
64 /**
65 * Test for the case where a regionserver going down has enough cycles to do damage to regions that
66 * have actually been assigned elsehwere.
67 * <p>
68 * If we happen to assign a region before it fully done with in its old location -- i.e. it is on
69 * two servers at the same time -- all can work fine until the case where the region on the dying
70 * server decides to compact or otherwise change the region file set. The region in its new location
71 * will then get a surprise when it tries to do something w/ a file removed by the region in its old
72 * location on dying server.
73 * <p>
74 * Making a test for this case is a little tough in that even if a file is deleted up on the
75 * namenode, if the file was opened before the delete, it will continue to let reads happen until
76 * something changes the state of cached blocks in the dfsclient that was already open (a block from
77 * the deleted file is cleaned from the datanode by NN).
78 * <p>
79 * What we will do below is do an explicit check for existence on the files listed in the region
80 * that has had some files removed because of a compaction. This sort of hurry's along and makes
81 * certain what is a chance occurance.
83 @Category({MiscTests.class, LargeTests.class})
84 public class TestIOFencing {
86 @ClassRule
87 public static final HBaseClassTestRule CLASS_RULE =
88 HBaseClassTestRule.forClass(TestIOFencing.class);
90 private static final Logger LOG = LoggerFactory.getLogger(TestIOFencing.class);
91 static {
92 // Uncomment the following lines if more verbosity is needed for
93 // debugging (see HBASE-12285 for details).
94 //((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
95 //((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
96 //((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
97 //((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.server.namenode.FSNamesystem"))
98 // .getLogger().setLevel(Level.ALL);
99 //((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
102 public abstract static class CompactionBlockerRegion extends HRegion {
103 AtomicInteger compactCount = new AtomicInteger();
104 volatile CountDownLatch compactionsBlocked = new CountDownLatch(0);
105 volatile CountDownLatch compactionsWaiting = new CountDownLatch(0);
107 @SuppressWarnings("deprecation")
108 public CompactionBlockerRegion(Path tableDir, WAL log,
109 FileSystem fs, Configuration confParam, RegionInfo info,
110 TableDescriptor htd, RegionServerServices rsServices) {
111 super(tableDir, log, fs, confParam, info, htd, rsServices);
114 public void stopCompactions() {
115 compactionsBlocked = new CountDownLatch(1);
116 compactionsWaiting = new CountDownLatch(1);
119 public void allowCompactions() {
120 LOG.debug("allowing compactions");
121 compactionsBlocked.countDown();
123 public void waitForCompactionToBlock() throws IOException {
124 try {
125 LOG.debug("waiting for compaction to block");
126 compactionsWaiting.await();
127 LOG.debug("compaction block reached");
128 } catch (InterruptedException ex) {
129 throw new IOException(ex);
133 @Override
134 public boolean compact(CompactionContext compaction, HStore store,
135 ThroughputController throughputController) throws IOException {
136 try {
137 return super.compact(compaction, store, throughputController);
138 } finally {
139 compactCount.getAndIncrement();
143 @Override
144 public boolean compact(CompactionContext compaction, HStore store,
145 ThroughputController throughputController, User user) throws IOException {
146 try {
147 return super.compact(compaction, store, throughputController, user);
148 } finally {
149 compactCount.getAndIncrement();
153 public int countStoreFiles() {
154 int count = 0;
155 for (HStore store : stores.values()) {
156 count += store.getStorefilesCount();
158 return count;
163 * An override of HRegion that allows us park compactions in a holding pattern and
164 * then when appropriate for the test, allow them proceed again.
166 public static class BlockCompactionsInPrepRegion extends CompactionBlockerRegion {
168 public BlockCompactionsInPrepRegion(Path tableDir, WAL log,
169 FileSystem fs, Configuration confParam, RegionInfo info,
170 TableDescriptor htd, RegionServerServices rsServices) {
171 super(tableDir, log, fs, confParam, info, htd, rsServices);
173 @Override
174 protected void doRegionCompactionPrep() throws IOException {
175 compactionsWaiting.countDown();
176 try {
177 compactionsBlocked.await();
178 } catch (InterruptedException ex) {
179 throw new IOException();
181 super.doRegionCompactionPrep();
186 * An override of HRegion that allows us park compactions in a holding pattern and
187 * then when appropriate for the test, allow them proceed again. This allows the compaction
188 * entry to go the WAL before blocking, but blocks afterwards
190 public static class BlockCompactionsInCompletionRegion extends CompactionBlockerRegion {
191 public BlockCompactionsInCompletionRegion(Path tableDir, WAL log,
192 FileSystem fs, Configuration confParam, RegionInfo info,
193 TableDescriptor htd, RegionServerServices rsServices) {
194 super(tableDir, log, fs, confParam, info, htd, rsServices);
197 @Override
198 protected HStore instantiateHStore(final ColumnFamilyDescriptor family, boolean warmup)
199 throws IOException {
200 return new BlockCompactionsInCompletionHStore(this, family, this.conf, warmup);
204 public static class BlockCompactionsInCompletionHStore extends HStore {
205 CompactionBlockerRegion r;
206 protected BlockCompactionsInCompletionHStore(HRegion region, ColumnFamilyDescriptor family,
207 Configuration confParam, boolean warmup) throws IOException {
208 super(region, family, confParam, warmup);
209 r = (CompactionBlockerRegion) region;
212 @Override
213 protected void completeCompaction(Collection<HStoreFile> compactedFiles) throws IOException {
214 try {
215 r.compactionsWaiting.countDown();
216 r.compactionsBlocked.await();
217 } catch (InterruptedException ex) {
218 throw new IOException(ex);
220 super.completeCompaction(compactedFiles);
224 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
225 private final static TableName TABLE_NAME =
226 TableName.valueOf("tabletest");
227 private final static byte[] FAMILY = Bytes.toBytes("family");
228 private static final int FIRST_BATCH_COUNT = 4000;
229 private static final int SECOND_BATCH_COUNT = FIRST_BATCH_COUNT;
232 * Test that puts up a regionserver, starts a compaction on a loaded region but holds the
233 * compaction until after we have killed the server and the region has come up on
234 * a new regionserver altogether. This fakes the double assignment case where region in one
235 * location changes the files out from underneath a region being served elsewhere.
237 @Test
238 public void testFencingAroundCompaction() throws Exception {
239 for(MemoryCompactionPolicy policy : MemoryCompactionPolicy.values()) {
240 doTest(BlockCompactionsInPrepRegion.class, policy);
245 * Test that puts up a regionserver, starts a compaction on a loaded region but holds the
246 * compaction completion until after we have killed the server and the region has come up on
247 * a new regionserver altogether. This fakes the double assignment case where region in one
248 * location changes the files out from underneath a region being served elsewhere.
250 @Test
251 public void testFencingAroundCompactionAfterWALSync() throws Exception {
252 for(MemoryCompactionPolicy policy : MemoryCompactionPolicy.values()) {
253 doTest(BlockCompactionsInCompletionRegion.class, policy);
257 public void doTest(Class<?> regionClass, MemoryCompactionPolicy policy) throws Exception {
258 Configuration c = TEST_UTIL.getConfiguration();
259 // Insert our custom region
260 c.setClass(HConstants.REGION_IMPL, regionClass, HRegion.class);
261 // Encourage plenty of flushes
262 c.setLong("hbase.hregion.memstore.flush.size", 25000);
263 c.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, ConstantSizeRegionSplitPolicy.class.getName());
264 // Only run compaction when we tell it to
265 c.setInt("hbase.hstore.compaction.min",1);
266 c.setInt("hbase.hstore.compactionThreshold", 1000);
267 c.setLong("hbase.hstore.blockingStoreFiles", 1000);
268 // Compact quickly after we tell it to!
269 c.setInt("hbase.regionserver.thread.splitcompactcheckfrequency", 1000);
270 c.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, String.valueOf(policy));
271 LOG.info("Starting mini cluster");
272 TEST_UTIL.startMiniCluster(1);
273 CompactionBlockerRegion compactingRegion = null;
274 Admin admin = null;
275 try {
276 LOG.info("Creating admin");
277 admin = TEST_UTIL.getConnection().getAdmin();
278 LOG.info("Creating table");
279 TEST_UTIL.createTable(TABLE_NAME, FAMILY);
280 Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME);
281 LOG.info("Loading test table");
282 // Find the region
283 List<HRegion> testRegions = TEST_UTIL.getMiniHBaseCluster().findRegionsForTable(TABLE_NAME);
284 assertEquals(1, testRegions.size());
285 compactingRegion = (CompactionBlockerRegion)testRegions.get(0);
286 LOG.info("Blocking compactions");
287 compactingRegion.stopCompactions();
288 long lastFlushTime = compactingRegion.getEarliestFlushTimeForAllStores();
289 // Load some rows
290 TEST_UTIL.loadNumericRows(table, FAMILY, 0, FIRST_BATCH_COUNT);
292 // add a compaction from an older (non-existing) region to see whether we successfully skip
293 // those entries
294 HRegionInfo oldHri = new HRegionInfo(table.getName(),
295 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
296 CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(oldHri,
297 FAMILY, Lists.newArrayList(new Path("/a")), Lists.newArrayList(new Path("/b")),
298 new Path("store_dir"));
299 WALUtil.writeCompactionMarker(compactingRegion.getWAL(),
300 ((HRegion)compactingRegion).getReplicationScope(),
301 oldHri, compactionDescriptor, compactingRegion.getMVCC());
303 // Wait till flush has happened, otherwise there won't be multiple store files
304 long startWaitTime = System.currentTimeMillis();
305 while (compactingRegion.getEarliestFlushTimeForAllStores() <= lastFlushTime ||
306 compactingRegion.countStoreFiles() <= 1) {
307 LOG.info("Waiting for the region to flush " +
308 compactingRegion.getRegionInfo().getRegionNameAsString());
309 Thread.sleep(1000);
310 admin.flush(table.getName());
311 assertTrue("Timed out waiting for the region to flush",
312 System.currentTimeMillis() - startWaitTime < 30000);
314 assertTrue(compactingRegion.countStoreFiles() > 1);
315 final byte REGION_NAME[] = compactingRegion.getRegionInfo().getRegionName();
316 LOG.info("Asking for compaction");
317 admin.majorCompact(TABLE_NAME);
318 LOG.info("Waiting for compaction to be about to start");
319 compactingRegion.waitForCompactionToBlock();
320 LOG.info("Starting a new server");
321 RegionServerThread newServerThread = TEST_UTIL.getMiniHBaseCluster().startRegionServer();
322 final HRegionServer newServer = newServerThread.getRegionServer();
323 LOG.info("Killing region server ZK lease");
324 TEST_UTIL.expireRegionServerSession(0);
325 CompactionBlockerRegion newRegion = null;
326 startWaitTime = System.currentTimeMillis();
327 LOG.info("Waiting for the new server to pick up the region " + Bytes.toString(REGION_NAME));
329 // wait for region to be assigned and to go out of log replay if applicable
330 Waiter.waitFor(c, 60000, new Waiter.Predicate<Exception>() {
331 @Override
332 public boolean evaluate() throws Exception {
333 Region newRegion = newServer.getOnlineRegion(REGION_NAME);
334 return newRegion != null;
338 newRegion = (CompactionBlockerRegion)newServer.getOnlineRegion(REGION_NAME);
340 // After compaction of old region finishes on the server that was going down, make sure that
341 // all the files we expect are still working when region is up in new location.
342 FileSystem fs = newRegion.getFilesystem();
343 for (String f: newRegion.getStoreFileList(new byte [][] {FAMILY})) {
344 assertTrue("After compaction, does not exist: " + f, fs.exists(new Path(f)));
346 LOG.info("Allowing compaction to proceed");
347 compactingRegion.allowCompactions();
348 while (compactingRegion.compactCount.get() == 0) {
349 Thread.sleep(1000);
351 // The server we killed stays up until the compaction that was started before it was killed
352 // completes. In logs you should see the old regionserver now going down.
353 LOG.info("Compaction finished");
355 // If we survive the split keep going...
356 // Now we make sure that the region isn't totally confused. Load up more rows.
357 TEST_UTIL.loadNumericRows(table, FAMILY, FIRST_BATCH_COUNT,
358 FIRST_BATCH_COUNT + SECOND_BATCH_COUNT);
359 admin.majorCompact(TABLE_NAME);
360 startWaitTime = System.currentTimeMillis();
361 while (newRegion.compactCount.get() == 0) {
362 Thread.sleep(1000);
363 assertTrue("New region never compacted",
364 System.currentTimeMillis() - startWaitTime < 180000);
366 int count;
367 for (int i = 0;; i++) {
368 try {
369 count = TEST_UTIL.countRows(table);
370 break;
371 } catch (DoNotRetryIOException e) {
372 // wait up to 30s
373 if (i >= 30 || !e.getMessage().contains("File does not exist")) {
374 throw e;
376 Thread.sleep(1000);
379 if (policy == MemoryCompactionPolicy.EAGER || policy == MemoryCompactionPolicy.ADAPTIVE) {
380 assertTrue(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT >= count);
381 } else {
382 assertEquals(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT, count);
384 } finally {
385 if (compactingRegion != null) {
386 compactingRegion.allowCompactions();
388 admin.close();
389 TEST_UTIL.shutdownMiniCluster();