2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.regionserver
;
20 import static org
.apache
.hadoop
.hbase
.regionserver
.HStoreFile
.BULKLOAD_TIME_KEY
;
21 import static org
.hamcrest
.core
.Is
.is
;
22 import static org
.junit
.Assert
.assertThat
;
24 import java
.io
.IOException
;
25 import java
.io
.InterruptedIOException
;
26 import java
.util
.ArrayList
;
27 import java
.util
.Collection
;
28 import java
.util
.Collections
;
29 import java
.util
.List
;
31 import java
.util
.Optional
;
32 import java
.util
.TreeMap
;
33 import java
.util
.concurrent
.atomic
.AtomicLong
;
34 import org
.apache
.hadoop
.conf
.Configuration
;
35 import org
.apache
.hadoop
.fs
.FileSystem
;
36 import org
.apache
.hadoop
.fs
.Path
;
37 import org
.apache
.hadoop
.hbase
.Cell
;
38 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
39 import org
.apache
.hadoop
.hbase
.HBaseConfiguration
;
40 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
41 import org
.apache
.hadoop
.hbase
.HRegionLocation
;
42 import org
.apache
.hadoop
.hbase
.KeyValue
;
43 import org
.apache
.hadoop
.hbase
.KeyValueUtil
;
44 import org
.apache
.hadoop
.hbase
.MultithreadedTestUtil
.RepeatingTestThread
;
45 import org
.apache
.hadoop
.hbase
.MultithreadedTestUtil
.TestContext
;
46 import org
.apache
.hadoop
.hbase
.StartMiniClusterOption
;
47 import org
.apache
.hadoop
.hbase
.TableExistsException
;
48 import org
.apache
.hadoop
.hbase
.TableName
;
49 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptor
;
50 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptorBuilder
;
51 import org
.apache
.hadoop
.hbase
.client
.Connection
;
52 import org
.apache
.hadoop
.hbase
.client
.RegionLocator
;
53 import org
.apache
.hadoop
.hbase
.client
.Result
;
54 import org
.apache
.hadoop
.hbase
.client
.ResultScanner
;
55 import org
.apache
.hadoop
.hbase
.client
.Scan
;
56 import org
.apache
.hadoop
.hbase
.client
.Table
;
57 import org
.apache
.hadoop
.hbase
.client
.TableDescriptorBuilder
;
58 import org
.apache
.hadoop
.hbase
.coprocessor
.ObserverContext
;
59 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionCoprocessor
;
60 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionCoprocessorEnvironment
;
61 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionObserver
;
62 import org
.apache
.hadoop
.hbase
.io
.compress
.Compression
;
63 import org
.apache
.hadoop
.hbase
.io
.compress
.Compression
.Algorithm
;
64 import org
.apache
.hadoop
.hbase
.io
.hfile
.CacheConfig
;
65 import org
.apache
.hadoop
.hbase
.io
.hfile
.HFile
;
66 import org
.apache
.hadoop
.hbase
.io
.hfile
.HFileContext
;
67 import org
.apache
.hadoop
.hbase
.io
.hfile
.HFileContextBuilder
;
68 import org
.apache
.hadoop
.hbase
.regionserver
.compactions
.CompactionLifeCycleTracker
;
69 import org
.apache
.hadoop
.hbase
.regionserver
.compactions
.CompactionRequest
;
70 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.TestWALActionsListener
;
71 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
72 import org
.apache
.hadoop
.hbase
.testclassification
.RegionServerTests
;
73 import org
.apache
.hadoop
.hbase
.tool
.BulkLoadHFiles
;
74 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
75 import org
.apache
.hadoop
.hbase
.wal
.WAL
;
76 import org
.apache
.hadoop
.hbase
.wal
.WALEdit
;
77 import org
.apache
.hadoop
.hbase
.wal
.WALKey
;
78 import org
.junit
.BeforeClass
;
79 import org
.junit
.ClassRule
;
80 import org
.junit
.Test
;
81 import org
.junit
.experimental
.categories
.Category
;
82 import org
.junit
.runner
.RunWith
;
83 import org
.junit
.runners
.Parameterized
;
84 import org
.junit
.runners
.Parameterized
.Parameters
;
85 import org
.slf4j
.Logger
;
86 import org
.slf4j
.LoggerFactory
;
88 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.Lists
;
91 * Tests bulk loading of HFiles and shows the atomicity or lack of atomicity of
92 * the region server's bullkLoad functionality.
94 @RunWith(Parameterized
.class)
95 @Category({RegionServerTests
.class, LargeTests
.class})
96 public class TestHRegionServerBulkLoad
{
99 public static final HBaseClassTestRule CLASS_RULE
=
100 HBaseClassTestRule
.forClass(TestHRegionServerBulkLoad
.class);
102 private static final Logger LOG
= LoggerFactory
.getLogger(TestHRegionServerBulkLoad
.class);
103 protected static HBaseTestingUtility UTIL
= new HBaseTestingUtility();
104 protected final static Configuration conf
= UTIL
.getConfiguration();
105 protected final static byte[] QUAL
= Bytes
.toBytes("qual");
106 protected final static int NUM_CFS
= 10;
107 private int sleepDuration
;
108 public static int BLOCKSIZE
= 64 * 1024;
109 public static Algorithm COMPRESSION
= Compression
.Algorithm
.NONE
;
111 protected final static byte[][] families
= new byte[NUM_CFS
][];
113 for (int i
= 0; i
< NUM_CFS
; i
++) {
114 families
[i
] = Bytes
.toBytes(family(i
));
118 public static final Collection
<Object
[]> parameters() {
119 int[] sleepDurations
= new int[] { 0, 30000 };
120 List
<Object
[]> configurations
= new ArrayList
<>();
121 for (int i
: sleepDurations
) {
122 configurations
.add(new Object
[] { i
});
124 return configurations
;
127 public TestHRegionServerBulkLoad(int duration
) {
128 this.sleepDuration
= duration
;
132 public static void setUpBeforeClass() throws Exception
{
133 conf
.setInt("hbase.rpc.timeout", 10 * 1000);
137 * Create a rowkey compatible with
138 * {@link #createHFile(FileSystem, Path, byte[], byte[], byte[], int)}.
140 public static byte[] rowkey(int i
) {
141 return Bytes
.toBytes(String
.format("row_%08d", i
));
144 static String
family(int i
) {
145 return String
.format("family_%04d", i
);
149 * Create an HFile with the given number of rows with a specified value.
151 public static void createHFile(FileSystem fs
, Path path
, byte[] family
,
152 byte[] qualifier
, byte[] value
, int numRows
) throws IOException
{
153 HFileContext context
= new HFileContextBuilder().withBlockSize(BLOCKSIZE
)
154 .withCompression(COMPRESSION
)
156 HFile
.Writer writer
= HFile
157 .getWriterFactory(conf
, new CacheConfig(conf
))
159 .withFileContext(context
)
161 long now
= System
.currentTimeMillis();
163 // subtract 2 since iterateOnSplits doesn't include boundary keys
164 for (int i
= 0; i
< numRows
; i
++) {
165 KeyValue kv
= new KeyValue(rowkey(i
), family
, qualifier
, now
, value
);
168 writer
.appendFileInfo(BULKLOAD_TIME_KEY
, Bytes
.toBytes(now
));
175 * Thread that does full scans of the table looking for any partially
178 * Each iteration of this loads 10 hdfs files, which occupies 5 file open file
179 * handles. So every 10 iterations (500 file handles) it does a region
180 * compaction to reduce the number of open file handles.
182 public static class AtomicHFileLoader
extends RepeatingTestThread
{
183 final AtomicLong numBulkLoads
= new AtomicLong();
184 final AtomicLong numCompactions
= new AtomicLong();
185 private TableName tableName
;
187 public AtomicHFileLoader(TableName tableName
, TestContext ctx
,
188 byte targetFamilies
[][]) throws IOException
{
190 this.tableName
= tableName
;
194 public void doAnAction() throws Exception
{
195 long iteration
= numBulkLoads
.getAndIncrement();
196 Path dir
= UTIL
.getDataTestDirOnTestFS(String
.format("bulkLoad_%08d",
199 // create HFiles for different column families
200 FileSystem fs
= UTIL
.getTestFileSystem();
201 byte[] val
= Bytes
.toBytes(String
.format("%010d", iteration
));
202 Map
<byte[], List
<Path
>> family2Files
= new TreeMap
<>(Bytes
.BYTES_COMPARATOR
);
203 for (int i
= 0; i
< NUM_CFS
; i
++) {
204 Path hfile
= new Path(dir
, family(i
));
205 byte[] fam
= Bytes
.toBytes(family(i
));
206 createHFile(fs
, hfile
, fam
, QUAL
, val
, 1000);
207 family2Files
.put(fam
, Collections
.singletonList(hfile
));
210 BulkLoadHFiles
.create(UTIL
.getConfiguration()).bulkLoad(tableName
, family2Files
);
211 final Connection conn
= UTIL
.getConnection();
212 // Periodically do compaction to reduce the number of open file handles.
213 if (numBulkLoads
.get() % 5 == 0) {
214 // 5 * 50 = 250 open file handles!
215 try (RegionLocator locator
= conn
.getRegionLocator(tableName
)) {
216 HRegionLocation loc
= locator
.getRegionLocation(Bytes
.toBytes("aaa"), true);
217 conn
.getAdmin().compactRegion(loc
.getRegion().getRegionName());
218 numCompactions
.incrementAndGet();
224 public static class MyObserver
implements RegionCoprocessor
, RegionObserver
{
225 static int sleepDuration
;
228 public Optional
<RegionObserver
> getRegionObserver() {
229 return Optional
.of(this);
233 public InternalScanner
preCompact(ObserverContext
<RegionCoprocessorEnvironment
> e
, Store store
,
234 InternalScanner scanner
, ScanType scanType
, CompactionLifeCycleTracker tracker
,
235 CompactionRequest request
)
238 Thread
.sleep(sleepDuration
);
239 } catch (InterruptedException ie
) {
240 IOException ioe
= new InterruptedIOException();
249 * Thread that does full scans of the table looking for any partially
252 public static class AtomicScanReader
extends RepeatingTestThread
{
253 byte targetFamilies
[][];
255 AtomicLong numScans
= new AtomicLong();
256 AtomicLong numRowsScanned
= new AtomicLong();
257 TableName TABLE_NAME
;
259 public AtomicScanReader(TableName TABLE_NAME
, TestContext ctx
,
260 byte targetFamilies
[][]) throws IOException
{
262 this.TABLE_NAME
= TABLE_NAME
;
263 this.targetFamilies
= targetFamilies
;
264 table
= UTIL
.getConnection().getTable(TABLE_NAME
);
268 public void doAnAction() throws Exception
{
270 for (byte[] family
: targetFamilies
) {
273 ResultScanner scanner
= table
.getScanner(s
);
275 for (Result res
: scanner
) {
276 byte[] lastRow
= null, lastFam
= null, lastQual
= null;
277 byte[] gotValue
= null;
278 for (byte[] family
: targetFamilies
) {
279 byte qualifier
[] = QUAL
;
280 byte thisValue
[] = res
.getValue(family
, qualifier
);
281 if (gotValue
!= null && thisValue
!= null
282 && !Bytes
.equals(gotValue
, thisValue
)) {
284 StringBuilder msg
= new StringBuilder();
285 msg
.append("Failed on scan ").append(numScans
)
286 .append(" after scanning ").append(numRowsScanned
)
288 msg
.append("Current was " + Bytes
.toString(res
.getRow()) + "/"
289 + Bytes
.toString(family
) + ":" + Bytes
.toString(qualifier
)
290 + " = " + Bytes
.toString(thisValue
) + "\n");
291 msg
.append("Previous was " + Bytes
.toString(lastRow
) + "/"
292 + Bytes
.toString(lastFam
) + ":" + Bytes
.toString(lastQual
)
293 + " = " + Bytes
.toString(gotValue
));
294 throw new RuntimeException(msg
.toString());
298 lastQual
= qualifier
;
299 lastRow
= res
.getRow();
300 gotValue
= thisValue
;
302 numRowsScanned
.getAndIncrement();
304 numScans
.getAndIncrement();
309 * Creates a table with given table name and specified number of column
310 * families if the table does not already exist.
312 public void setupTable(TableName table
, int cfs
) throws IOException
{
314 LOG
.info("Creating table " + table
);
315 TableDescriptorBuilder tableDescriptorBuilder
=
316 TableDescriptorBuilder
.newBuilder(table
);
318 tableDescriptorBuilder
.setCoprocessor(MyObserver
.class.getName());
319 MyObserver
.sleepDuration
= this.sleepDuration
;
320 for (int i
= 0; i
< 10; i
++) {
321 ColumnFamilyDescriptor columnFamilyDescriptor
=
322 ColumnFamilyDescriptorBuilder
.newBuilder(Bytes
.toBytes(family(i
))).build();
323 tableDescriptorBuilder
.setColumnFamily(columnFamilyDescriptor
);
326 UTIL
.getAdmin().createTable(tableDescriptorBuilder
.build());
327 } catch (TableExistsException tee
) {
328 LOG
.info("Table " + table
+ " already exists");
336 public void testAtomicBulkLoad() throws Exception
{
337 TableName TABLE_NAME
= TableName
.valueOf("atomicBulkLoad");
339 int millisToRun
= 30000;
340 int numScanners
= 50;
342 // Set createWALDir to true and use default values for other options.
343 UTIL
.startMiniCluster(StartMiniClusterOption
.builder().createWALDir(true).build());
345 WAL log
= UTIL
.getHBaseCluster().getRegionServer(0).getWAL(null);
346 FindBulkHBaseListener listener
= new FindBulkHBaseListener();
347 log
.registerWALActionsListener(listener
);
348 runAtomicBulkloadTest(TABLE_NAME
, millisToRun
, numScanners
);
349 assertThat(listener
.isFound(), is(true));
351 UTIL
.shutdownMiniCluster();
355 void runAtomicBulkloadTest(TableName tableName
, int millisToRun
, int numScanners
)
357 setupTable(tableName
, 10);
359 TestContext ctx
= new TestContext(UTIL
.getConfiguration());
361 AtomicHFileLoader loader
= new AtomicHFileLoader(tableName
, ctx
, null);
362 ctx
.addThread(loader
);
364 List
<AtomicScanReader
> scanners
= Lists
.newArrayList();
365 for (int i
= 0; i
< numScanners
; i
++) {
366 AtomicScanReader scanner
= new AtomicScanReader(tableName
, ctx
, families
);
367 scanners
.add(scanner
);
368 ctx
.addThread(scanner
);
372 ctx
.waitFor(millisToRun
);
375 LOG
.info("Loaders:");
376 LOG
.info(" loaded " + loader
.numBulkLoads
.get());
377 LOG
.info(" compations " + loader
.numCompactions
.get());
379 LOG
.info("Scanners:");
380 for (AtomicScanReader scanner
: scanners
) {
381 LOG
.info(" scanned " + scanner
.numScans
.get());
382 LOG
.info(" verified " + scanner
.numRowsScanned
.get() + " rows");
387 * Run test on an HBase instance for 5 minutes. This assumes that the table
388 * under test only has a single region.
390 public static void main(String args
[]) throws Exception
{
392 Configuration c
= HBaseConfiguration
.create();
393 TestHRegionServerBulkLoad test
= new TestHRegionServerBulkLoad(0);
395 test
.runAtomicBulkloadTest(TableName
.valueOf("atomicTableTest"), 5 * 60 * 1000, 50);
397 System
.exit(0); // something hangs (believe it is lru threadpool)
401 private void setConf(Configuration c
) {
402 UTIL
= new HBaseTestingUtility(c
);
405 static class FindBulkHBaseListener
extends TestWALActionsListener
.DummyWALActionsListener
{
406 private boolean found
= false;
409 public void visitLogEntryBeforeWrite(WALKey logKey
, WALEdit logEdit
) {
410 for (Cell cell
: logEdit
.getCells()) {
411 KeyValue kv
= KeyValueUtil
.ensureKeyValue(cell
);
412 for (Map
.Entry entry
: kv
.toStringMap().entrySet()) {
413 if (entry
.getValue().equals(Bytes
.toString(WALEdit
.BULK_LOAD
))) {
420 public boolean isFound() {