2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.regionserver
;
20 import java
.io
.IOException
;
21 import java
.nio
.ByteBuffer
;
22 import java
.util
.Deque
;
24 import java
.util
.concurrent
.atomic
.AtomicReference
;
25 import java
.util
.function
.Consumer
;
26 import org
.apache
.hadoop
.conf
.Configuration
;
27 import org
.apache
.hadoop
.fs
.Path
;
28 import org
.apache
.hadoop
.hbase
.Cell
;
29 import org
.apache
.hadoop
.hbase
.DoNotRetryIOException
;
30 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
31 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
32 import org
.apache
.hadoop
.hbase
.TableName
;
33 import org
.apache
.hadoop
.hbase
.client
.AsyncClusterConnection
;
34 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptor
;
35 import org
.apache
.hadoop
.hbase
.client
.Get
;
36 import org
.apache
.hadoop
.hbase
.client
.Put
;
37 import org
.apache
.hadoop
.hbase
.client
.Result
;
38 import org
.apache
.hadoop
.hbase
.client
.Table
;
39 import org
.apache
.hadoop
.hbase
.client
.TableDescriptor
;
40 import org
.apache
.hadoop
.hbase
.io
.ByteBuffAllocator
;
41 import org
.apache
.hadoop
.hbase
.io
.compress
.Compression
;
42 import org
.apache
.hadoop
.hbase
.io
.crypto
.Encryption
;
43 import org
.apache
.hadoop
.hbase
.io
.hfile
.CacheConfig
;
44 import org
.apache
.hadoop
.hbase
.io
.hfile
.HFile
;
45 import org
.apache
.hadoop
.hbase
.io
.hfile
.HFileContext
;
46 import org
.apache
.hadoop
.hbase
.io
.hfile
.HFileContextBuilder
;
47 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
48 import org
.apache
.hadoop
.hbase
.testclassification
.RegionServerTests
;
49 import org
.apache
.hadoop
.hbase
.tool
.BulkLoadHFilesTool
;
50 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
51 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
52 import org
.apache
.hadoop
.hbase
.util
.Threads
;
53 import org
.junit
.AfterClass
;
54 import org
.junit
.Assert
;
55 import org
.junit
.BeforeClass
;
56 import org
.junit
.ClassRule
;
57 import org
.junit
.Test
;
58 import org
.junit
.experimental
.categories
.Category
;
59 import org
.slf4j
.Logger
;
60 import org
.slf4j
.LoggerFactory
;
62 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.Multimap
;
65 @Category({RegionServerTests
.class, MediumTests
.class})
66 public class TestSecureBulkLoadManager
{
69 public static final HBaseClassTestRule CLASS_RULE
=
70 HBaseClassTestRule
.forClass(TestSecureBulkLoadManager
.class);
72 private static final Logger LOG
=
73 LoggerFactory
.getLogger(TestSecureBulkLoadManager
.class);
75 private static TableName TABLE
= TableName
.valueOf(Bytes
.toBytes("TestSecureBulkLoadManager"));
76 private static byte[] FAMILY
= Bytes
.toBytes("family");
77 private static byte[] COLUMN
= Bytes
.toBytes("column");
78 private static byte[] key1
= Bytes
.toBytes("row1");
79 private static byte[] key2
= Bytes
.toBytes("row2");
80 private static byte[] key3
= Bytes
.toBytes("row3");
81 private static byte[] value1
= Bytes
.toBytes("t1");
82 private static byte[] value3
= Bytes
.toBytes("t3");
83 private static byte[] SPLIT_ROWKEY
= key2
;
85 private Thread ealierBulkload
;
86 private Thread laterBulkload
;
88 protected final static HBaseTestingUtility testUtil
= new HBaseTestingUtility();
89 private static Configuration conf
= testUtil
.getConfiguration();
92 public static void setUp() throws Exception
{
93 testUtil
.startMiniCluster();
97 public static void tearDown() throws Exception
{
98 testUtil
.shutdownMiniCluster();
99 testUtil
.cleanupTestDir();
103 * After a secure bulkload finished , there is a clean-up for FileSystems used in the bulkload.
104 * Sometimes, FileSystems used in the finished bulkload might also be used in other bulkload
105 * calls, or there are other FileSystems created by the same user, they could be closed by a
106 * FileSystem.closeAllForUGI call. So during the clean-up, those FileSystems need to be used
107 * later can not get closed ,or else a race condition occurs.
109 * testForRaceCondition tests the case that two secure bulkload calls from the same UGI go
110 * into two different regions and one bulkload finishes earlier when the other bulkload still
111 * needs its FileSystems, checks that both bulkloads succeed.
114 public void testForRaceCondition() throws Exception
{
115 Consumer
<HRegion
> fsCreatedListener
= new Consumer
<HRegion
>() {
117 public void accept(HRegion hRegion
) {
118 if (hRegion
.getRegionInfo().containsRow(key3
)) {
119 Threads
.shutdown(ealierBulkload
);/// wait util the other bulkload finished
123 testUtil
.getMiniHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
124 .getSecureBulkLoadManager().setFsCreatedListener(fsCreatedListener
);
126 testUtil
.createTable(TABLE
,FAMILY
,Bytes
.toByteArrays(SPLIT_ROWKEY
));
129 Path rootdir
= testUtil
.getMiniHBaseCluster().getRegionServerThreads().get(0)
130 .getRegionServer().getDataRootDir();
131 Path dir1
= new Path(rootdir
, "dir1");
132 prepareHFile(dir1
, key1
, value1
);
133 Path dir2
= new Path(rootdir
, "dir2");
134 prepareHFile(dir2
, key3
, value3
);
137 final AtomicReference
<Throwable
> t1Exception
= new AtomicReference
<>();
138 final AtomicReference
<Throwable
> t2Exception
= new AtomicReference
<>();
139 ealierBulkload
= new Thread(new Runnable() {
143 doBulkloadWithoutRetry(dir1
);
144 } catch (Exception e
) {
145 LOG
.error("bulk load failed .",e
);
150 laterBulkload
= new Thread(new Runnable() {
154 doBulkloadWithoutRetry(dir2
);
155 } catch (Exception e
) {
156 LOG
.error("bulk load failed .",e
);
161 ealierBulkload
.start();
162 laterBulkload
.start();
163 Threads
.shutdown(ealierBulkload
);
164 Threads
.shutdown(laterBulkload
);
165 Assert
.assertNull(t1Exception
.get());
166 Assert
.assertNull(t2Exception
.get());
168 /// check bulkload ok
169 Get get1
= new Get(key1
);
170 Get get3
= new Get(key3
);
171 Table t
= testUtil
.getConnection().getTable(TABLE
);
172 Result r
= t
.get(get1
);
173 Assert
.assertArrayEquals(r
.getValue(FAMILY
, COLUMN
), value1
);
175 Assert
.assertArrayEquals(r
.getValue(FAMILY
, COLUMN
), value3
);
180 * A trick is used to make sure server-side failures( if any ) not being covered up by a client
181 * retry. Since BulkLoadHFilesTool.bulkLoad keeps performing bulkload calls as long as the
182 * HFile queue is not empty, while server-side exceptions in the doAs block do not lead
183 * to a client exception, a bulkload will always succeed in this case by default, thus client
184 * will never be aware that failures have ever happened . To avoid this kind of retry ,
185 * a MyExceptionToAvoidRetry exception is thrown after bulkLoadPhase finished and caught
186 * silently outside the doBulkLoad call, so that the bulkLoadPhase would be called exactly
187 * once, and server-side failures, if any ,can be checked via data.
189 class MyExceptionToAvoidRetry
extends DoNotRetryIOException
{
191 private static final long serialVersionUID
= -6802760664998771151L;
194 private void doBulkloadWithoutRetry(Path dir
) throws Exception
{
195 BulkLoadHFilesTool h
= new BulkLoadHFilesTool(conf
) {
198 protected void bulkLoadPhase(AsyncClusterConnection conn
, TableName tableName
,
199 Deque
<LoadQueueItem
> queue
, Multimap
<ByteBuffer
, LoadQueueItem
> regionGroups
,
200 boolean copyFiles
, Map
<LoadQueueItem
, ByteBuffer
> item2RegionMap
) throws IOException
{
201 super.bulkLoadPhase(conn
, tableName
, queue
, regionGroups
, copyFiles
, item2RegionMap
);
202 throw new MyExceptionToAvoidRetry(); // throw exception to avoid retry
206 h
.bulkLoad(TABLE
, dir
);
207 Assert
.fail("MyExceptionToAvoidRetry is expected");
208 } catch (MyExceptionToAvoidRetry e
) { //expected
212 private void prepareHFile(Path dir
, byte[] key
, byte[] value
) throws Exception
{
213 TableDescriptor desc
= testUtil
.getAdmin().getDescriptor(TABLE
);
214 ColumnFamilyDescriptor family
= desc
.getColumnFamily(FAMILY
);
215 Compression
.Algorithm compression
= HFile
.DEFAULT_COMPRESSION_ALGORITHM
;
217 CacheConfig writerCacheConf
= new CacheConfig(conf
, family
, null, ByteBuffAllocator
.HEAP
);
218 writerCacheConf
.setCacheDataOnWrite(false);
219 HFileContext hFileContext
= new HFileContextBuilder()
220 .withIncludesMvcc(false)
221 .withIncludesTags(true)
222 .withCompression(compression
)
223 .withCompressTags(family
.isCompressTags())
224 .withChecksumType(HStore
.getChecksumType(conf
))
225 .withBytesPerCheckSum(HStore
.getBytesPerChecksum(conf
))
226 .withBlockSize(family
.getBlocksize())
227 .withHBaseCheckSum(true)
228 .withDataBlockEncoding(family
.getDataBlockEncoding())
229 .withEncryptionContext(Encryption
.Context
.NONE
)
230 .withCreateTime(EnvironmentEdgeManager
.currentTime())
232 StoreFileWriter
.Builder builder
=
233 new StoreFileWriter
.Builder(conf
, writerCacheConf
, dir
.getFileSystem(conf
))
234 .withOutputDir(new Path(dir
, family
.getNameAsString()))
235 .withBloomType(family
.getBloomFilterType())
236 .withMaxKeyCount(Integer
.MAX_VALUE
)
237 .withFileContext(hFileContext
);
238 StoreFileWriter writer
= builder
.build();
240 Put put
= new Put(key
);
241 put
.addColumn(FAMILY
, COLUMN
, value
);
242 for (Cell c
: put
.get(FAMILY
, COLUMN
)) {