2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
;
20 import static org
.junit
.Assert
.assertEquals
;
21 import static org
.junit
.Assert
.assertTrue
;
23 import java
.io
.IOException
;
24 import java
.util
.Collection
;
25 import java
.util
.List
;
26 import java
.util
.concurrent
.CountDownLatch
;
27 import java
.util
.concurrent
.atomic
.AtomicInteger
;
28 import org
.apache
.hadoop
.conf
.Configuration
;
29 import org
.apache
.hadoop
.fs
.FileSystem
;
30 import org
.apache
.hadoop
.fs
.Path
;
31 import org
.apache
.hadoop
.hbase
.client
.Admin
;
32 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptor
;
33 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
34 import org
.apache
.hadoop
.hbase
.client
.Table
;
35 import org
.apache
.hadoop
.hbase
.client
.TableDescriptor
;
36 import org
.apache
.hadoop
.hbase
.regionserver
.CompactingMemStore
;
37 import org
.apache
.hadoop
.hbase
.regionserver
.ConstantSizeRegionSplitPolicy
;
38 import org
.apache
.hadoop
.hbase
.regionserver
.HRegion
;
39 import org
.apache
.hadoop
.hbase
.regionserver
.HRegionServer
;
40 import org
.apache
.hadoop
.hbase
.regionserver
.HStore
;
41 import org
.apache
.hadoop
.hbase
.regionserver
.HStoreFile
;
42 import org
.apache
.hadoop
.hbase
.regionserver
.Region
;
43 import org
.apache
.hadoop
.hbase
.regionserver
.RegionServerServices
;
44 import org
.apache
.hadoop
.hbase
.regionserver
.compactions
.CompactionContext
;
45 import org
.apache
.hadoop
.hbase
.regionserver
.throttle
.ThroughputController
;
46 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.WALUtil
;
47 import org
.apache
.hadoop
.hbase
.security
.User
;
48 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
49 import org
.apache
.hadoop
.hbase
.testclassification
.MiscTests
;
50 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
51 import org
.apache
.hadoop
.hbase
.util
.JVMClusterUtil
.RegionServerThread
;
52 import org
.apache
.hadoop
.hbase
.wal
.WAL
;
53 import org
.junit
.ClassRule
;
54 import org
.junit
.Test
;
55 import org
.junit
.experimental
.categories
.Category
;
56 import org
.slf4j
.Logger
;
57 import org
.slf4j
.LoggerFactory
;
59 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.Lists
;
61 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ProtobufUtil
;
62 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.WALProtos
.CompactionDescriptor
;
65 * Test for the case where a regionserver going down has enough cycles to do damage to regions that
66 * have actually been assigned elsehwere.
68 * If we happen to assign a region before it fully done with in its old location -- i.e. it is on
69 * two servers at the same time -- all can work fine until the case where the region on the dying
70 * server decides to compact or otherwise change the region file set. The region in its new location
71 * will then get a surprise when it tries to do something w/ a file removed by the region in its old
72 * location on dying server.
74 * Making a test for this case is a little tough in that even if a file is deleted up on the
75 * namenode, if the file was opened before the delete, it will continue to let reads happen until
76 * something changes the state of cached blocks in the dfsclient that was already open (a block from
77 * the deleted file is cleaned from the datanode by NN).
79 * What we will do below is do an explicit check for existence on the files listed in the region
80 * that has had some files removed because of a compaction. This sort of hurry's along and makes
81 * certain what is a chance occurance.
83 @Category({MiscTests
.class, LargeTests
.class})
84 public class TestIOFencing
{
87 public static final HBaseClassTestRule CLASS_RULE
=
88 HBaseClassTestRule
.forClass(TestIOFencing
.class);
90 private static final Logger LOG
= LoggerFactory
.getLogger(TestIOFencing
.class);
92 // Uncomment the following lines if more verbosity is needed for
93 // debugging (see HBASE-12285 for details).
94 //((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
95 //((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
96 //((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
97 //((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.server.namenode.FSNamesystem"))
98 // .getLogger().setLevel(Level.ALL);
99 //((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
102 public abstract static class CompactionBlockerRegion
extends HRegion
{
103 AtomicInteger compactCount
= new AtomicInteger();
104 volatile CountDownLatch compactionsBlocked
= new CountDownLatch(0);
105 volatile CountDownLatch compactionsWaiting
= new CountDownLatch(0);
107 @SuppressWarnings("deprecation")
108 public CompactionBlockerRegion(Path tableDir
, WAL log
,
109 FileSystem fs
, Configuration confParam
, RegionInfo info
,
110 TableDescriptor htd
, RegionServerServices rsServices
) {
111 super(tableDir
, log
, fs
, confParam
, info
, htd
, rsServices
);
114 public void stopCompactions() {
115 compactionsBlocked
= new CountDownLatch(1);
116 compactionsWaiting
= new CountDownLatch(1);
119 public void allowCompactions() {
120 LOG
.debug("allowing compactions");
121 compactionsBlocked
.countDown();
123 public void waitForCompactionToBlock() throws IOException
{
125 LOG
.debug("waiting for compaction to block");
126 compactionsWaiting
.await();
127 LOG
.debug("compaction block reached");
128 } catch (InterruptedException ex
) {
129 throw new IOException(ex
);
134 public boolean compact(CompactionContext compaction
, HStore store
,
135 ThroughputController throughputController
) throws IOException
{
137 return super.compact(compaction
, store
, throughputController
);
139 compactCount
.getAndIncrement();
144 public boolean compact(CompactionContext compaction
, HStore store
,
145 ThroughputController throughputController
, User user
) throws IOException
{
147 return super.compact(compaction
, store
, throughputController
, user
);
149 compactCount
.getAndIncrement();
153 public int countStoreFiles() {
155 for (HStore store
: stores
.values()) {
156 count
+= store
.getStorefilesCount();
163 * An override of HRegion that allows us park compactions in a holding pattern and
164 * then when appropriate for the test, allow them proceed again.
166 public static class BlockCompactionsInPrepRegion
extends CompactionBlockerRegion
{
168 public BlockCompactionsInPrepRegion(Path tableDir
, WAL log
,
169 FileSystem fs
, Configuration confParam
, RegionInfo info
,
170 TableDescriptor htd
, RegionServerServices rsServices
) {
171 super(tableDir
, log
, fs
, confParam
, info
, htd
, rsServices
);
174 protected void doRegionCompactionPrep() throws IOException
{
175 compactionsWaiting
.countDown();
177 compactionsBlocked
.await();
178 } catch (InterruptedException ex
) {
179 throw new IOException();
181 super.doRegionCompactionPrep();
186 * An override of HRegion that allows us park compactions in a holding pattern and
187 * then when appropriate for the test, allow them proceed again. This allows the compaction
188 * entry to go the WAL before blocking, but blocks afterwards
190 public static class BlockCompactionsInCompletionRegion
extends CompactionBlockerRegion
{
191 public BlockCompactionsInCompletionRegion(Path tableDir
, WAL log
,
192 FileSystem fs
, Configuration confParam
, RegionInfo info
,
193 TableDescriptor htd
, RegionServerServices rsServices
) {
194 super(tableDir
, log
, fs
, confParam
, info
, htd
, rsServices
);
198 protected HStore
instantiateHStore(final ColumnFamilyDescriptor family
, boolean warmup
)
200 return new BlockCompactionsInCompletionHStore(this, family
, this.conf
, warmup
);
204 public static class BlockCompactionsInCompletionHStore
extends HStore
{
205 CompactionBlockerRegion r
;
206 protected BlockCompactionsInCompletionHStore(HRegion region
, ColumnFamilyDescriptor family
,
207 Configuration confParam
, boolean warmup
) throws IOException
{
208 super(region
, family
, confParam
, warmup
);
209 r
= (CompactionBlockerRegion
) region
;
213 protected void completeCompaction(Collection
<HStoreFile
> compactedFiles
) throws IOException
{
215 r
.compactionsWaiting
.countDown();
216 r
.compactionsBlocked
.await();
217 } catch (InterruptedException ex
) {
218 throw new IOException(ex
);
220 super.completeCompaction(compactedFiles
);
224 private final static HBaseTestingUtility TEST_UTIL
= new HBaseTestingUtility();
225 private final static TableName TABLE_NAME
=
226 TableName
.valueOf("tabletest");
227 private final static byte[] FAMILY
= Bytes
.toBytes("family");
228 private static final int FIRST_BATCH_COUNT
= 4000;
229 private static final int SECOND_BATCH_COUNT
= FIRST_BATCH_COUNT
;
232 * Test that puts up a regionserver, starts a compaction on a loaded region but holds the
233 * compaction until after we have killed the server and the region has come up on
234 * a new regionserver altogether. This fakes the double assignment case where region in one
235 * location changes the files out from underneath a region being served elsewhere.
238 public void testFencingAroundCompaction() throws Exception
{
239 for(MemoryCompactionPolicy policy
: MemoryCompactionPolicy
.values()) {
240 doTest(BlockCompactionsInPrepRegion
.class, policy
);
245 * Test that puts up a regionserver, starts a compaction on a loaded region but holds the
246 * compaction completion until after we have killed the server and the region has come up on
247 * a new regionserver altogether. This fakes the double assignment case where region in one
248 * location changes the files out from underneath a region being served elsewhere.
251 public void testFencingAroundCompactionAfterWALSync() throws Exception
{
252 for(MemoryCompactionPolicy policy
: MemoryCompactionPolicy
.values()) {
253 doTest(BlockCompactionsInCompletionRegion
.class, policy
);
257 public void doTest(Class
<?
> regionClass
, MemoryCompactionPolicy policy
) throws Exception
{
258 Configuration c
= TEST_UTIL
.getConfiguration();
259 // Insert our custom region
260 c
.setClass(HConstants
.REGION_IMPL
, regionClass
, HRegion
.class);
261 // Encourage plenty of flushes
262 c
.setLong("hbase.hregion.memstore.flush.size", 25000);
263 c
.set(HConstants
.HBASE_REGION_SPLIT_POLICY_KEY
, ConstantSizeRegionSplitPolicy
.class.getName());
264 // Only run compaction when we tell it to
265 c
.setInt("hbase.hstore.compaction.min",1);
266 c
.setInt("hbase.hstore.compactionThreshold", 1000);
267 c
.setLong("hbase.hstore.blockingStoreFiles", 1000);
268 // Compact quickly after we tell it to!
269 c
.setInt("hbase.regionserver.thread.splitcompactcheckfrequency", 1000);
270 c
.set(CompactingMemStore
.COMPACTING_MEMSTORE_TYPE_KEY
, String
.valueOf(policy
));
271 LOG
.info("Starting mini cluster");
272 TEST_UTIL
.startMiniCluster(1);
273 CompactionBlockerRegion compactingRegion
= null;
276 LOG
.info("Creating admin");
277 admin
= TEST_UTIL
.getConnection().getAdmin();
278 LOG
.info("Creating table");
279 TEST_UTIL
.createTable(TABLE_NAME
, FAMILY
);
280 Table table
= TEST_UTIL
.getConnection().getTable(TABLE_NAME
);
281 LOG
.info("Loading test table");
283 List
<HRegion
> testRegions
= TEST_UTIL
.getMiniHBaseCluster().findRegionsForTable(TABLE_NAME
);
284 assertEquals(1, testRegions
.size());
285 compactingRegion
= (CompactionBlockerRegion
)testRegions
.get(0);
286 LOG
.info("Blocking compactions");
287 compactingRegion
.stopCompactions();
288 long lastFlushTime
= compactingRegion
.getEarliestFlushTimeForAllStores();
290 TEST_UTIL
.loadNumericRows(table
, FAMILY
, 0, FIRST_BATCH_COUNT
);
292 // add a compaction from an older (non-existing) region to see whether we successfully skip
294 HRegionInfo oldHri
= new HRegionInfo(table
.getName(),
295 HConstants
.EMPTY_START_ROW
, HConstants
.EMPTY_END_ROW
);
296 CompactionDescriptor compactionDescriptor
= ProtobufUtil
.toCompactionDescriptor(oldHri
,
297 FAMILY
, Lists
.newArrayList(new Path("/a")), Lists
.newArrayList(new Path("/b")),
298 new Path("store_dir"));
299 WALUtil
.writeCompactionMarker(compactingRegion
.getWAL(),
300 ((HRegion
)compactingRegion
).getReplicationScope(),
301 oldHri
, compactionDescriptor
, compactingRegion
.getMVCC());
303 // Wait till flush has happened, otherwise there won't be multiple store files
304 long startWaitTime
= System
.currentTimeMillis();
305 while (compactingRegion
.getEarliestFlushTimeForAllStores() <= lastFlushTime
||
306 compactingRegion
.countStoreFiles() <= 1) {
307 LOG
.info("Waiting for the region to flush " +
308 compactingRegion
.getRegionInfo().getRegionNameAsString());
310 admin
.flush(table
.getName());
311 assertTrue("Timed out waiting for the region to flush",
312 System
.currentTimeMillis() - startWaitTime
< 30000);
314 assertTrue(compactingRegion
.countStoreFiles() > 1);
315 final byte REGION_NAME
[] = compactingRegion
.getRegionInfo().getRegionName();
316 LOG
.info("Asking for compaction");
317 admin
.majorCompact(TABLE_NAME
);
318 LOG
.info("Waiting for compaction to be about to start");
319 compactingRegion
.waitForCompactionToBlock();
320 LOG
.info("Starting a new server");
321 RegionServerThread newServerThread
= TEST_UTIL
.getMiniHBaseCluster().startRegionServer();
322 final HRegionServer newServer
= newServerThread
.getRegionServer();
323 LOG
.info("Killing region server ZK lease");
324 TEST_UTIL
.expireRegionServerSession(0);
325 CompactionBlockerRegion newRegion
= null;
326 startWaitTime
= System
.currentTimeMillis();
327 LOG
.info("Waiting for the new server to pick up the region " + Bytes
.toString(REGION_NAME
));
329 // wait for region to be assigned and to go out of log replay if applicable
330 Waiter
.waitFor(c
, 60000, new Waiter
.Predicate
<Exception
>() {
332 public boolean evaluate() throws Exception
{
333 Region newRegion
= newServer
.getOnlineRegion(REGION_NAME
);
334 return newRegion
!= null;
338 newRegion
= (CompactionBlockerRegion
)newServer
.getOnlineRegion(REGION_NAME
);
340 // After compaction of old region finishes on the server that was going down, make sure that
341 // all the files we expect are still working when region is up in new location.
342 FileSystem fs
= newRegion
.getFilesystem();
343 for (String f
: newRegion
.getStoreFileList(new byte [][] {FAMILY
})) {
344 assertTrue("After compaction, does not exist: " + f
, fs
.exists(new Path(f
)));
346 LOG
.info("Allowing compaction to proceed");
347 compactingRegion
.allowCompactions();
348 while (compactingRegion
.compactCount
.get() == 0) {
351 // The server we killed stays up until the compaction that was started before it was killed
352 // completes. In logs you should see the old regionserver now going down.
353 LOG
.info("Compaction finished");
355 // If we survive the split keep going...
356 // Now we make sure that the region isn't totally confused. Load up more rows.
357 TEST_UTIL
.loadNumericRows(table
, FAMILY
, FIRST_BATCH_COUNT
,
358 FIRST_BATCH_COUNT
+ SECOND_BATCH_COUNT
);
359 admin
.majorCompact(TABLE_NAME
);
360 startWaitTime
= System
.currentTimeMillis();
361 while (newRegion
.compactCount
.get() == 0) {
363 assertTrue("New region never compacted",
364 System
.currentTimeMillis() - startWaitTime
< 180000);
367 for (int i
= 0;; i
++) {
369 count
= TEST_UTIL
.countRows(table
);
371 } catch (DoNotRetryIOException e
) {
373 if (i
>= 30 || !e
.getMessage().contains("File does not exist")) {
379 if (policy
== MemoryCompactionPolicy
.EAGER
|| policy
== MemoryCompactionPolicy
.ADAPTIVE
) {
380 assertTrue(FIRST_BATCH_COUNT
+ SECOND_BATCH_COUNT
>= count
);
382 assertEquals(FIRST_BATCH_COUNT
+ SECOND_BATCH_COUNT
, count
);
385 if (compactingRegion
!= null) {
386 compactingRegion
.allowCompactions();
389 TEST_UTIL
.shutdownMiniCluster();