2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.regionserver
;
20 import static org
.apache
.hadoop
.hbase
.regionserver
.HStoreFile
.BULKLOAD_TIME_KEY
;
21 import static org
.hamcrest
.MatcherAssert
.assertThat
;
22 import static org
.hamcrest
.core
.Is
.is
;
24 import java
.io
.IOException
;
25 import java
.io
.InterruptedIOException
;
26 import java
.util
.ArrayList
;
27 import java
.util
.Collection
;
28 import java
.util
.Collections
;
29 import java
.util
.List
;
31 import java
.util
.Optional
;
32 import java
.util
.TreeMap
;
33 import java
.util
.concurrent
.atomic
.AtomicLong
;
34 import org
.apache
.hadoop
.conf
.Configuration
;
35 import org
.apache
.hadoop
.fs
.FileSystem
;
36 import org
.apache
.hadoop
.fs
.Path
;
37 import org
.apache
.hadoop
.hbase
.Cell
;
38 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
39 import org
.apache
.hadoop
.hbase
.HBaseConfiguration
;
40 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
41 import org
.apache
.hadoop
.hbase
.HRegionLocation
;
42 import org
.apache
.hadoop
.hbase
.KeyValue
;
43 import org
.apache
.hadoop
.hbase
.KeyValueUtil
;
44 import org
.apache
.hadoop
.hbase
.MultithreadedTestUtil
.RepeatingTestThread
;
45 import org
.apache
.hadoop
.hbase
.MultithreadedTestUtil
.TestContext
;
46 import org
.apache
.hadoop
.hbase
.StartTestingClusterOption
;
47 import org
.apache
.hadoop
.hbase
.TableExistsException
;
48 import org
.apache
.hadoop
.hbase
.TableName
;
49 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptor
;
50 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptorBuilder
;
51 import org
.apache
.hadoop
.hbase
.client
.Connection
;
52 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
53 import org
.apache
.hadoop
.hbase
.client
.RegionLocator
;
54 import org
.apache
.hadoop
.hbase
.client
.Result
;
55 import org
.apache
.hadoop
.hbase
.client
.ResultScanner
;
56 import org
.apache
.hadoop
.hbase
.client
.Scan
;
57 import org
.apache
.hadoop
.hbase
.client
.Table
;
58 import org
.apache
.hadoop
.hbase
.client
.TableDescriptorBuilder
;
59 import org
.apache
.hadoop
.hbase
.coprocessor
.ObserverContext
;
60 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionCoprocessor
;
61 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionCoprocessorEnvironment
;
62 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionObserver
;
63 import org
.apache
.hadoop
.hbase
.io
.compress
.Compression
;
64 import org
.apache
.hadoop
.hbase
.io
.compress
.Compression
.Algorithm
;
65 import org
.apache
.hadoop
.hbase
.io
.hfile
.CacheConfig
;
66 import org
.apache
.hadoop
.hbase
.io
.hfile
.HFile
;
67 import org
.apache
.hadoop
.hbase
.io
.hfile
.HFileContext
;
68 import org
.apache
.hadoop
.hbase
.io
.hfile
.HFileContextBuilder
;
69 import org
.apache
.hadoop
.hbase
.regionserver
.compactions
.CompactionLifeCycleTracker
;
70 import org
.apache
.hadoop
.hbase
.regionserver
.compactions
.CompactionRequest
;
71 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.TestWALActionsListener
;
72 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
73 import org
.apache
.hadoop
.hbase
.testclassification
.RegionServerTests
;
74 import org
.apache
.hadoop
.hbase
.tool
.BulkLoadHFiles
;
75 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
76 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
77 import org
.apache
.hadoop
.hbase
.wal
.WAL
;
78 import org
.apache
.hadoop
.hbase
.wal
.WALEdit
;
79 import org
.apache
.hadoop
.hbase
.wal
.WALKey
;
80 import org
.junit
.BeforeClass
;
81 import org
.junit
.ClassRule
;
82 import org
.junit
.Test
;
83 import org
.junit
.experimental
.categories
.Category
;
84 import org
.junit
.runner
.RunWith
;
85 import org
.junit
.runners
.Parameterized
;
86 import org
.junit
.runners
.Parameterized
.Parameters
;
87 import org
.slf4j
.Logger
;
88 import org
.slf4j
.LoggerFactory
;
90 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.Lists
;
93 * Tests bulk loading of HFiles and shows the atomicity or lack of atomicity of
94 * the region server's bullkLoad functionality.
96 @RunWith(Parameterized
.class)
97 @Category({RegionServerTests
.class, LargeTests
.class})
98 public class TestHRegionServerBulkLoad
{
101 public static final HBaseClassTestRule CLASS_RULE
=
102 HBaseClassTestRule
.forClass(TestHRegionServerBulkLoad
.class);
104 private static final Logger LOG
= LoggerFactory
.getLogger(TestHRegionServerBulkLoad
.class);
105 protected static HBaseTestingUtil UTIL
= new HBaseTestingUtil();
106 protected final static Configuration conf
= UTIL
.getConfiguration();
107 protected final static byte[] QUAL
= Bytes
.toBytes("qual");
108 protected final static int NUM_CFS
= 10;
109 private int sleepDuration
;
110 public static int BLOCKSIZE
= 64 * 1024;
111 public static Algorithm COMPRESSION
= Compression
.Algorithm
.NONE
;
113 protected final static byte[][] families
= new byte[NUM_CFS
][];
115 for (int i
= 0; i
< NUM_CFS
; i
++) {
116 families
[i
] = Bytes
.toBytes(family(i
));
120 public static final Collection
<Object
[]> parameters() {
121 int[] sleepDurations
= new int[] { 0, 30000 };
122 List
<Object
[]> configurations
= new ArrayList
<>();
123 for (int i
: sleepDurations
) {
124 configurations
.add(new Object
[] { i
});
126 return configurations
;
129 public TestHRegionServerBulkLoad(int duration
) {
130 this.sleepDuration
= duration
;
134 public static void setUpBeforeClass() throws Exception
{
135 conf
.setInt("hbase.rpc.timeout", 10 * 1000);
139 * Create a rowkey compatible with
140 * {@link #createHFile(FileSystem, Path, byte[], byte[], byte[], int)}.
142 public static byte[] rowkey(int i
) {
143 return Bytes
.toBytes(String
.format("row_%08d", i
));
146 static String
family(int i
) {
147 return String
.format("family_%04d", i
);
151 * Create an HFile with the given number of rows with a specified value.
153 public static void createHFile(FileSystem fs
, Path path
, byte[] family
,
154 byte[] qualifier
, byte[] value
, int numRows
) throws IOException
{
155 HFileContext context
= new HFileContextBuilder().withBlockSize(BLOCKSIZE
)
156 .withCompression(COMPRESSION
)
158 HFile
.Writer writer
= HFile
159 .getWriterFactory(conf
, new CacheConfig(conf
))
161 .withFileContext(context
)
163 long now
= EnvironmentEdgeManager
.currentTime();
165 // subtract 2 since iterateOnSplits doesn't include boundary keys
166 for (int i
= 0; i
< numRows
; i
++) {
167 KeyValue kv
= new KeyValue(rowkey(i
), family
, qualifier
, now
, value
);
170 writer
.appendFileInfo(BULKLOAD_TIME_KEY
, Bytes
.toBytes(now
));
177 * Thread that does full scans of the table looking for any partially
180 * Each iteration of this loads 10 hdfs files, which occupies 5 file open file
181 * handles. So every 10 iterations (500 file handles) it does a region
182 * compaction to reduce the number of open file handles.
184 public static class AtomicHFileLoader
extends RepeatingTestThread
{
185 final AtomicLong numBulkLoads
= new AtomicLong();
186 final AtomicLong numCompactions
= new AtomicLong();
187 private TableName tableName
;
189 public AtomicHFileLoader(TableName tableName
, TestContext ctx
,
190 byte targetFamilies
[][]) throws IOException
{
192 this.tableName
= tableName
;
196 public void doAnAction() throws Exception
{
197 long iteration
= numBulkLoads
.getAndIncrement();
198 Path dir
= UTIL
.getDataTestDirOnTestFS(String
.format("bulkLoad_%08d",
201 // create HFiles for different column families
202 FileSystem fs
= UTIL
.getTestFileSystem();
203 byte[] val
= Bytes
.toBytes(String
.format("%010d", iteration
));
204 Map
<byte[], List
<Path
>> family2Files
= new TreeMap
<>(Bytes
.BYTES_COMPARATOR
);
205 for (int i
= 0; i
< NUM_CFS
; i
++) {
206 Path hfile
= new Path(dir
, family(i
));
207 byte[] fam
= Bytes
.toBytes(family(i
));
208 createHFile(fs
, hfile
, fam
, QUAL
, val
, 1000);
209 family2Files
.put(fam
, Collections
.singletonList(hfile
));
212 BulkLoadHFiles
.create(UTIL
.getConfiguration()).bulkLoad(tableName
, family2Files
);
213 final Connection conn
= UTIL
.getConnection();
214 // Periodically do compaction to reduce the number of open file handles.
215 if (numBulkLoads
.get() % 5 == 0) {
216 // 5 * 50 = 250 open file handles!
217 try (RegionLocator locator
= conn
.getRegionLocator(tableName
)) {
218 HRegionLocation loc
= locator
.getRegionLocation(Bytes
.toBytes("aaa"), true);
219 conn
.getAdmin().compactRegion(loc
.getRegion().getRegionName());
220 numCompactions
.incrementAndGet();
226 public static class MyObserver
implements RegionCoprocessor
, RegionObserver
{
227 static int sleepDuration
;
230 public Optional
<RegionObserver
> getRegionObserver() {
231 return Optional
.of(this);
235 public InternalScanner
preCompact(ObserverContext
<RegionCoprocessorEnvironment
> e
, Store store
,
236 InternalScanner scanner
, ScanType scanType
, CompactionLifeCycleTracker tracker
,
237 CompactionRequest request
)
240 Thread
.sleep(sleepDuration
);
241 } catch (InterruptedException ie
) {
242 IOException ioe
= new InterruptedIOException();
251 * Thread that does full scans of the table looking for any partially
254 public static class AtomicScanReader
extends RepeatingTestThread
{
255 byte targetFamilies
[][];
257 AtomicLong numScans
= new AtomicLong();
258 AtomicLong numRowsScanned
= new AtomicLong();
259 TableName TABLE_NAME
;
261 public AtomicScanReader(TableName TABLE_NAME
, TestContext ctx
,
262 byte targetFamilies
[][]) throws IOException
{
264 this.TABLE_NAME
= TABLE_NAME
;
265 this.targetFamilies
= targetFamilies
;
266 table
= UTIL
.getConnection().getTable(TABLE_NAME
);
270 public void doAnAction() throws Exception
{
272 for (byte[] family
: targetFamilies
) {
275 ResultScanner scanner
= table
.getScanner(s
);
277 for (Result res
: scanner
) {
278 byte[] lastRow
= null, lastFam
= null, lastQual
= null;
279 byte[] gotValue
= null;
280 for (byte[] family
: targetFamilies
) {
281 byte qualifier
[] = QUAL
;
282 byte thisValue
[] = res
.getValue(family
, qualifier
);
283 if (gotValue
!= null && thisValue
!= null
284 && !Bytes
.equals(gotValue
, thisValue
)) {
286 StringBuilder msg
= new StringBuilder();
287 msg
.append("Failed on scan ").append(numScans
)
288 .append(" after scanning ").append(numRowsScanned
)
290 msg
.append("Current was " + Bytes
.toString(res
.getRow()) + "/"
291 + Bytes
.toString(family
) + ":" + Bytes
.toString(qualifier
)
292 + " = " + Bytes
.toString(thisValue
) + "\n");
293 msg
.append("Previous was " + Bytes
.toString(lastRow
) + "/"
294 + Bytes
.toString(lastFam
) + ":" + Bytes
.toString(lastQual
)
295 + " = " + Bytes
.toString(gotValue
));
296 throw new RuntimeException(msg
.toString());
300 lastQual
= qualifier
;
301 lastRow
= res
.getRow();
302 gotValue
= thisValue
;
304 numRowsScanned
.getAndIncrement();
306 numScans
.getAndIncrement();
311 * Creates a table with given table name and specified number of column
312 * families if the table does not already exist.
314 public void setupTable(TableName table
, int cfs
) throws IOException
{
316 LOG
.info("Creating table " + table
);
317 TableDescriptorBuilder tableDescriptorBuilder
=
318 TableDescriptorBuilder
.newBuilder(table
);
320 tableDescriptorBuilder
.setCoprocessor(MyObserver
.class.getName());
321 MyObserver
.sleepDuration
= this.sleepDuration
;
322 for (int i
= 0; i
< 10; i
++) {
323 ColumnFamilyDescriptor columnFamilyDescriptor
=
324 ColumnFamilyDescriptorBuilder
.newBuilder(Bytes
.toBytes(family(i
))).build();
325 tableDescriptorBuilder
.setColumnFamily(columnFamilyDescriptor
);
328 UTIL
.getAdmin().createTable(tableDescriptorBuilder
.build());
329 } catch (TableExistsException tee
) {
330 LOG
.info("Table " + table
+ " already exists");
338 public void testAtomicBulkLoad() throws Exception
{
339 TableName TABLE_NAME
= TableName
.valueOf("atomicBulkLoad");
341 int millisToRun
= 30000;
342 int numScanners
= 50;
344 // Set createWALDir to true and use default values for other options.
345 UTIL
.startMiniCluster(StartTestingClusterOption
.builder().createWALDir(true).build());
347 WAL log
= UTIL
.getHBaseCluster().getRegionServer(0).getWAL(null);
348 FindBulkHBaseListener listener
= new FindBulkHBaseListener();
349 log
.registerWALActionsListener(listener
);
350 runAtomicBulkloadTest(TABLE_NAME
, millisToRun
, numScanners
);
351 assertThat(listener
.isFound(), is(true));
353 UTIL
.shutdownMiniCluster();
357 void runAtomicBulkloadTest(TableName tableName
, int millisToRun
, int numScanners
)
359 setupTable(tableName
, 10);
361 TestContext ctx
= new TestContext(UTIL
.getConfiguration());
363 AtomicHFileLoader loader
= new AtomicHFileLoader(tableName
, ctx
, null);
364 ctx
.addThread(loader
);
366 List
<AtomicScanReader
> scanners
= Lists
.newArrayList();
367 for (int i
= 0; i
< numScanners
; i
++) {
368 AtomicScanReader scanner
= new AtomicScanReader(tableName
, ctx
, families
);
369 scanners
.add(scanner
);
370 ctx
.addThread(scanner
);
374 ctx
.waitFor(millisToRun
);
377 LOG
.info("Loaders:");
378 LOG
.info(" loaded " + loader
.numBulkLoads
.get());
379 LOG
.info(" compations " + loader
.numCompactions
.get());
381 LOG
.info("Scanners:");
382 for (AtomicScanReader scanner
: scanners
) {
383 LOG
.info(" scanned " + scanner
.numScans
.get());
384 LOG
.info(" verified " + scanner
.numRowsScanned
.get() + " rows");
389 * Run test on an HBase instance for 5 minutes. This assumes that the table
390 * under test only has a single region.
392 public static void main(String args
[]) throws Exception
{
394 Configuration c
= HBaseConfiguration
.create();
395 TestHRegionServerBulkLoad test
= new TestHRegionServerBulkLoad(0);
397 test
.runAtomicBulkloadTest(TableName
.valueOf("atomicTableTest"), 5 * 60 * 1000, 50);
399 System
.exit(0); // something hangs (believe it is lru threadpool)
403 private void setConf(Configuration c
) {
404 UTIL
= new HBaseTestingUtil(c
);
407 static class FindBulkHBaseListener
extends TestWALActionsListener
.DummyWALActionsListener
{
408 private boolean found
= false;
411 public void visitLogEntryBeforeWrite(RegionInfo info
, WALKey logKey
, WALEdit logEdit
) {
412 for (Cell cell
: logEdit
.getCells()) {
413 KeyValue kv
= KeyValueUtil
.ensureKeyValue(cell
);
414 for (Map
.Entry entry
: kv
.toStringMap().entrySet()) {
415 if (entry
.getValue().equals(Bytes
.toString(WALEdit
.BULK_LOAD
))) {
422 public boolean isFound() {