HBASE-24033 Add ut for loading the corrupt recovered hfiles (#1322)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / regionserver / TestSecureBulkLoadManager.java
blob12cf57671f9c1a1d387db9e54d44acd7789a8edd
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.regionserver;
20 import java.io.IOException;
21 import java.nio.ByteBuffer;
22 import java.util.Deque;
23 import java.util.Map;
24 import java.util.concurrent.atomic.AtomicReference;
25 import java.util.function.Consumer;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.fs.Path;
28 import org.apache.hadoop.hbase.Cell;
29 import org.apache.hadoop.hbase.DoNotRetryIOException;
30 import org.apache.hadoop.hbase.HBaseClassTestRule;
31 import org.apache.hadoop.hbase.HBaseTestingUtility;
32 import org.apache.hadoop.hbase.TableName;
33 import org.apache.hadoop.hbase.client.AsyncClusterConnection;
34 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
35 import org.apache.hadoop.hbase.client.Get;
36 import org.apache.hadoop.hbase.client.Put;
37 import org.apache.hadoop.hbase.client.Result;
38 import org.apache.hadoop.hbase.client.Table;
39 import org.apache.hadoop.hbase.client.TableDescriptor;
40 import org.apache.hadoop.hbase.io.ByteBuffAllocator;
41 import org.apache.hadoop.hbase.io.compress.Compression;
42 import org.apache.hadoop.hbase.io.crypto.Encryption;
43 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
44 import org.apache.hadoop.hbase.io.hfile.HFile;
45 import org.apache.hadoop.hbase.io.hfile.HFileContext;
46 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
47 import org.apache.hadoop.hbase.testclassification.MediumTests;
48 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
49 import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
50 import org.apache.hadoop.hbase.util.Bytes;
51 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
52 import org.apache.hadoop.hbase.util.Threads;
53 import org.junit.AfterClass;
54 import org.junit.Assert;
55 import org.junit.BeforeClass;
56 import org.junit.ClassRule;
57 import org.junit.Test;
58 import org.junit.experimental.categories.Category;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
62 import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
65 @Category({RegionServerTests.class, MediumTests.class})
66 public class TestSecureBulkLoadManager {
68 @ClassRule
69 public static final HBaseClassTestRule CLASS_RULE =
70 HBaseClassTestRule.forClass(TestSecureBulkLoadManager.class);
72 private static final Logger LOG =
73 LoggerFactory.getLogger(TestSecureBulkLoadManager.class);
75 private static TableName TABLE = TableName.valueOf(Bytes.toBytes("TestSecureBulkLoadManager"));
76 private static byte[] FAMILY = Bytes.toBytes("family");
77 private static byte[] COLUMN = Bytes.toBytes("column");
78 private static byte[] key1 = Bytes.toBytes("row1");
79 private static byte[] key2 = Bytes.toBytes("row2");
80 private static byte[] key3 = Bytes.toBytes("row3");
81 private static byte[] value1 = Bytes.toBytes("t1");
82 private static byte[] value3 = Bytes.toBytes("t3");
83 private static byte[] SPLIT_ROWKEY = key2;
85 private Thread ealierBulkload;
86 private Thread laterBulkload;
88 protected final static HBaseTestingUtility testUtil = new HBaseTestingUtility();
89 private static Configuration conf = testUtil.getConfiguration();
91 @BeforeClass
92 public static void setUp() throws Exception {
93 testUtil.startMiniCluster();
96 @AfterClass
97 public static void tearDown() throws Exception {
98 testUtil.shutdownMiniCluster();
99 testUtil.cleanupTestDir();
103 * After a secure bulkload finished , there is a clean-up for FileSystems used in the bulkload.
104 * Sometimes, FileSystems used in the finished bulkload might also be used in other bulkload
105 * calls, or there are other FileSystems created by the same user, they could be closed by a
106 * FileSystem.closeAllForUGI call. So during the clean-up, those FileSystems need to be used
107 * later can not get closed ,or else a race condition occurs.
109 * testForRaceCondition tests the case that two secure bulkload calls from the same UGI go
110 * into two different regions and one bulkload finishes earlier when the other bulkload still
111 * needs its FileSystems, checks that both bulkloads succeed.
113 @Test
114 public void testForRaceCondition() throws Exception {
115 Consumer<HRegion> fsCreatedListener = new Consumer<HRegion>() {
116 @Override
117 public void accept(HRegion hRegion) {
118 if (hRegion.getRegionInfo().containsRow(key3)) {
119 Threads.shutdown(ealierBulkload);/// wait util the other bulkload finished
123 testUtil.getMiniHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
124 .getSecureBulkLoadManager().setFsCreatedListener(fsCreatedListener);
125 /// create table
126 testUtil.createTable(TABLE,FAMILY,Bytes.toByteArrays(SPLIT_ROWKEY));
128 /// prepare files
129 Path rootdir = testUtil.getMiniHBaseCluster().getRegionServerThreads().get(0)
130 .getRegionServer().getDataRootDir();
131 Path dir1 = new Path(rootdir, "dir1");
132 prepareHFile(dir1, key1, value1);
133 Path dir2 = new Path(rootdir, "dir2");
134 prepareHFile(dir2, key3, value3);
136 /// do bulkload
137 final AtomicReference<Throwable> t1Exception = new AtomicReference<>();
138 final AtomicReference<Throwable> t2Exception = new AtomicReference<>();
139 ealierBulkload = new Thread(new Runnable() {
140 @Override
141 public void run() {
142 try {
143 doBulkloadWithoutRetry(dir1);
144 } catch (Exception e) {
145 LOG.error("bulk load failed .",e);
146 t1Exception.set(e);
150 laterBulkload = new Thread(new Runnable() {
151 @Override
152 public void run() {
153 try {
154 doBulkloadWithoutRetry(dir2);
155 } catch (Exception e) {
156 LOG.error("bulk load failed .",e);
157 t2Exception.set(e);
161 ealierBulkload.start();
162 laterBulkload.start();
163 Threads.shutdown(ealierBulkload);
164 Threads.shutdown(laterBulkload);
165 Assert.assertNull(t1Exception.get());
166 Assert.assertNull(t2Exception.get());
168 /// check bulkload ok
169 Get get1 = new Get(key1);
170 Get get3 = new Get(key3);
171 Table t = testUtil.getConnection().getTable(TABLE);
172 Result r = t.get(get1);
173 Assert.assertArrayEquals(r.getValue(FAMILY, COLUMN), value1);
174 r = t.get(get3);
175 Assert.assertArrayEquals(r.getValue(FAMILY, COLUMN), value3);
180 * A trick is used to make sure server-side failures( if any ) not being covered up by a client
181 * retry. Since BulkLoadHFilesTool.bulkLoad keeps performing bulkload calls as long as the
182 * HFile queue is not empty, while server-side exceptions in the doAs block do not lead
183 * to a client exception, a bulkload will always succeed in this case by default, thus client
184 * will never be aware that failures have ever happened . To avoid this kind of retry ,
185 * a MyExceptionToAvoidRetry exception is thrown after bulkLoadPhase finished and caught
186 * silently outside the doBulkLoad call, so that the bulkLoadPhase would be called exactly
187 * once, and server-side failures, if any ,can be checked via data.
189 class MyExceptionToAvoidRetry extends DoNotRetryIOException {
191 private static final long serialVersionUID = -6802760664998771151L;
194 private void doBulkloadWithoutRetry(Path dir) throws Exception {
195 BulkLoadHFilesTool h = new BulkLoadHFilesTool(conf) {
197 @Override
198 protected void bulkLoadPhase(AsyncClusterConnection conn, TableName tableName,
199 Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups,
200 boolean copyFiles, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
201 super.bulkLoadPhase(conn, tableName, queue, regionGroups, copyFiles, item2RegionMap);
202 throw new MyExceptionToAvoidRetry(); // throw exception to avoid retry
205 try {
206 h.bulkLoad(TABLE, dir);
207 Assert.fail("MyExceptionToAvoidRetry is expected");
208 } catch (MyExceptionToAvoidRetry e) { //expected
212 private void prepareHFile(Path dir, byte[] key, byte[] value) throws Exception {
213 TableDescriptor desc = testUtil.getAdmin().getDescriptor(TABLE);
214 ColumnFamilyDescriptor family = desc.getColumnFamily(FAMILY);
215 Compression.Algorithm compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
217 CacheConfig writerCacheConf = new CacheConfig(conf, family, null, ByteBuffAllocator.HEAP);
218 writerCacheConf.setCacheDataOnWrite(false);
219 HFileContext hFileContext = new HFileContextBuilder()
220 .withIncludesMvcc(false)
221 .withIncludesTags(true)
222 .withCompression(compression)
223 .withCompressTags(family.isCompressTags())
224 .withChecksumType(HStore.getChecksumType(conf))
225 .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
226 .withBlockSize(family.getBlocksize())
227 .withHBaseCheckSum(true)
228 .withDataBlockEncoding(family.getDataBlockEncoding())
229 .withEncryptionContext(Encryption.Context.NONE)
230 .withCreateTime(EnvironmentEdgeManager.currentTime())
231 .build();
232 StoreFileWriter.Builder builder =
233 new StoreFileWriter.Builder(conf, writerCacheConf, dir.getFileSystem(conf))
234 .withOutputDir(new Path(dir, family.getNameAsString()))
235 .withBloomType(family.getBloomFilterType())
236 .withMaxKeyCount(Integer.MAX_VALUE)
237 .withFileContext(hFileContext);
238 StoreFileWriter writer = builder.build();
240 Put put = new Put(key);
241 put.addColumn(FAMILY, COLUMN, value);
242 for (Cell c : put.get(FAMILY, COLUMN)) {
243 writer.append(c);
246 writer.close();