HBASE-26700 The way we bypass broken track file is not enough in StoreFileListFile...
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / regionserver / TestHRegionServerBulkLoad.java
bloba21c1e1c21b96b741a004d5b200fa7a6f80f19d4
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.regionserver;
20 import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY;
21 import static org.hamcrest.MatcherAssert.assertThat;
22 import static org.hamcrest.core.Is.is;
24 import java.io.IOException;
25 import java.io.InterruptedIOException;
26 import java.util.ArrayList;
27 import java.util.Collection;
28 import java.util.Collections;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Optional;
32 import java.util.TreeMap;
33 import java.util.concurrent.atomic.AtomicLong;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.fs.FileSystem;
36 import org.apache.hadoop.fs.Path;
37 import org.apache.hadoop.hbase.Cell;
38 import org.apache.hadoop.hbase.HBaseClassTestRule;
39 import org.apache.hadoop.hbase.HBaseConfiguration;
40 import org.apache.hadoop.hbase.HBaseTestingUtil;
41 import org.apache.hadoop.hbase.HRegionLocation;
42 import org.apache.hadoop.hbase.KeyValue;
43 import org.apache.hadoop.hbase.KeyValueUtil;
44 import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
45 import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
46 import org.apache.hadoop.hbase.StartTestingClusterOption;
47 import org.apache.hadoop.hbase.TableExistsException;
48 import org.apache.hadoop.hbase.TableName;
49 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
50 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
51 import org.apache.hadoop.hbase.client.Connection;
52 import org.apache.hadoop.hbase.client.RegionInfo;
53 import org.apache.hadoop.hbase.client.RegionLocator;
54 import org.apache.hadoop.hbase.client.Result;
55 import org.apache.hadoop.hbase.client.ResultScanner;
56 import org.apache.hadoop.hbase.client.Scan;
57 import org.apache.hadoop.hbase.client.Table;
58 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
59 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
60 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
61 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
62 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
63 import org.apache.hadoop.hbase.io.compress.Compression;
64 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
65 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
66 import org.apache.hadoop.hbase.io.hfile.HFile;
67 import org.apache.hadoop.hbase.io.hfile.HFileContext;
68 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
69 import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
70 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
71 import org.apache.hadoop.hbase.regionserver.wal.TestWALActionsListener;
72 import org.apache.hadoop.hbase.testclassification.LargeTests;
73 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
74 import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
75 import org.apache.hadoop.hbase.util.Bytes;
76 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
77 import org.apache.hadoop.hbase.wal.WAL;
78 import org.apache.hadoop.hbase.wal.WALEdit;
79 import org.apache.hadoop.hbase.wal.WALKey;
80 import org.junit.BeforeClass;
81 import org.junit.ClassRule;
82 import org.junit.Test;
83 import org.junit.experimental.categories.Category;
84 import org.junit.runner.RunWith;
85 import org.junit.runners.Parameterized;
86 import org.junit.runners.Parameterized.Parameters;
87 import org.slf4j.Logger;
88 import org.slf4j.LoggerFactory;
90 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
92 /**
93 * Tests bulk loading of HFiles and shows the atomicity or lack of atomicity of
94 * the region server's bullkLoad functionality.
96 @RunWith(Parameterized.class)
97 @Category({RegionServerTests.class, LargeTests.class})
98 public class TestHRegionServerBulkLoad {
100 @ClassRule
101 public static final HBaseClassTestRule CLASS_RULE =
102 HBaseClassTestRule.forClass(TestHRegionServerBulkLoad.class);
104 private static final Logger LOG = LoggerFactory.getLogger(TestHRegionServerBulkLoad.class);
105 protected static HBaseTestingUtil UTIL = new HBaseTestingUtil();
106 protected final static Configuration conf = UTIL.getConfiguration();
107 protected final static byte[] QUAL = Bytes.toBytes("qual");
108 protected final static int NUM_CFS = 10;
109 private int sleepDuration;
110 public static int BLOCKSIZE = 64 * 1024;
111 public static Algorithm COMPRESSION = Compression.Algorithm.NONE;
113 protected final static byte[][] families = new byte[NUM_CFS][];
114 static {
115 for (int i = 0; i < NUM_CFS; i++) {
116 families[i] = Bytes.toBytes(family(i));
119 @Parameters
120 public static final Collection<Object[]> parameters() {
121 int[] sleepDurations = new int[] { 0, 30000 };
122 List<Object[]> configurations = new ArrayList<>();
123 for (int i : sleepDurations) {
124 configurations.add(new Object[] { i });
126 return configurations;
129 public TestHRegionServerBulkLoad(int duration) {
130 this.sleepDuration = duration;
133 @BeforeClass
134 public static void setUpBeforeClass() throws Exception {
135 conf.setInt("hbase.rpc.timeout", 10 * 1000);
139 * Create a rowkey compatible with
140 * {@link #createHFile(FileSystem, Path, byte[], byte[], byte[], int)}.
142 public static byte[] rowkey(int i) {
143 return Bytes.toBytes(String.format("row_%08d", i));
146 static String family(int i) {
147 return String.format("family_%04d", i);
151 * Create an HFile with the given number of rows with a specified value.
153 public static void createHFile(FileSystem fs, Path path, byte[] family,
154 byte[] qualifier, byte[] value, int numRows) throws IOException {
155 HFileContext context = new HFileContextBuilder().withBlockSize(BLOCKSIZE)
156 .withCompression(COMPRESSION)
157 .build();
158 HFile.Writer writer = HFile
159 .getWriterFactory(conf, new CacheConfig(conf))
160 .withPath(fs, path)
161 .withFileContext(context)
162 .create();
163 long now = EnvironmentEdgeManager.currentTime();
164 try {
165 // subtract 2 since iterateOnSplits doesn't include boundary keys
166 for (int i = 0; i < numRows; i++) {
167 KeyValue kv = new KeyValue(rowkey(i), family, qualifier, now, value);
168 writer.append(kv);
170 writer.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(now));
171 } finally {
172 writer.close();
177 * Thread that does full scans of the table looking for any partially
178 * completed rows.
180 * Each iteration of this loads 10 hdfs files, which occupies 5 file open file
181 * handles. So every 10 iterations (500 file handles) it does a region
182 * compaction to reduce the number of open file handles.
184 public static class AtomicHFileLoader extends RepeatingTestThread {
185 final AtomicLong numBulkLoads = new AtomicLong();
186 final AtomicLong numCompactions = new AtomicLong();
187 private TableName tableName;
189 public AtomicHFileLoader(TableName tableName, TestContext ctx,
190 byte targetFamilies[][]) throws IOException {
191 super(ctx);
192 this.tableName = tableName;
195 @Override
196 public void doAnAction() throws Exception {
197 long iteration = numBulkLoads.getAndIncrement();
198 Path dir = UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d",
199 iteration));
201 // create HFiles for different column families
202 FileSystem fs = UTIL.getTestFileSystem();
203 byte[] val = Bytes.toBytes(String.format("%010d", iteration));
204 Map<byte[], List<Path>> family2Files = new TreeMap<>(Bytes.BYTES_COMPARATOR);
205 for (int i = 0; i < NUM_CFS; i++) {
206 Path hfile = new Path(dir, family(i));
207 byte[] fam = Bytes.toBytes(family(i));
208 createHFile(fs, hfile, fam, QUAL, val, 1000);
209 family2Files.put(fam, Collections.singletonList(hfile));
211 // bulk load HFiles
212 BulkLoadHFiles.create(UTIL.getConfiguration()).bulkLoad(tableName, family2Files);
213 final Connection conn = UTIL.getConnection();
214 // Periodically do compaction to reduce the number of open file handles.
215 if (numBulkLoads.get() % 5 == 0) {
216 // 5 * 50 = 250 open file handles!
217 try (RegionLocator locator = conn.getRegionLocator(tableName)) {
218 HRegionLocation loc = locator.getRegionLocation(Bytes.toBytes("aaa"), true);
219 conn.getAdmin().compactRegion(loc.getRegion().getRegionName());
220 numCompactions.incrementAndGet();
226 public static class MyObserver implements RegionCoprocessor, RegionObserver {
227 static int sleepDuration;
229 @Override
230 public Optional<RegionObserver> getRegionObserver() {
231 return Optional.of(this);
234 @Override
235 public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
236 InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
237 CompactionRequest request)
238 throws IOException {
239 try {
240 Thread.sleep(sleepDuration);
241 } catch (InterruptedException ie) {
242 IOException ioe = new InterruptedIOException();
243 ioe.initCause(ie);
244 throw ioe;
246 return scanner;
251 * Thread that does full scans of the table looking for any partially
252 * completed rows.
254 public static class AtomicScanReader extends RepeatingTestThread {
255 byte targetFamilies[][];
256 Table table;
257 AtomicLong numScans = new AtomicLong();
258 AtomicLong numRowsScanned = new AtomicLong();
259 TableName TABLE_NAME;
261 public AtomicScanReader(TableName TABLE_NAME, TestContext ctx,
262 byte targetFamilies[][]) throws IOException {
263 super(ctx);
264 this.TABLE_NAME = TABLE_NAME;
265 this.targetFamilies = targetFamilies;
266 table = UTIL.getConnection().getTable(TABLE_NAME);
269 @Override
270 public void doAnAction() throws Exception {
271 Scan s = new Scan();
272 for (byte[] family : targetFamilies) {
273 s.addFamily(family);
275 ResultScanner scanner = table.getScanner(s);
277 for (Result res : scanner) {
278 byte[] lastRow = null, lastFam = null, lastQual = null;
279 byte[] gotValue = null;
280 for (byte[] family : targetFamilies) {
281 byte qualifier[] = QUAL;
282 byte thisValue[] = res.getValue(family, qualifier);
283 if (gotValue != null && thisValue != null
284 && !Bytes.equals(gotValue, thisValue)) {
286 StringBuilder msg = new StringBuilder();
287 msg.append("Failed on scan ").append(numScans)
288 .append(" after scanning ").append(numRowsScanned)
289 .append(" rows!\n");
290 msg.append("Current was " + Bytes.toString(res.getRow()) + "/"
291 + Bytes.toString(family) + ":" + Bytes.toString(qualifier)
292 + " = " + Bytes.toString(thisValue) + "\n");
293 msg.append("Previous was " + Bytes.toString(lastRow) + "/"
294 + Bytes.toString(lastFam) + ":" + Bytes.toString(lastQual)
295 + " = " + Bytes.toString(gotValue));
296 throw new RuntimeException(msg.toString());
299 lastFam = family;
300 lastQual = qualifier;
301 lastRow = res.getRow();
302 gotValue = thisValue;
304 numRowsScanned.getAndIncrement();
306 numScans.getAndIncrement();
311 * Creates a table with given table name and specified number of column
312 * families if the table does not already exist.
314 public void setupTable(TableName table, int cfs) throws IOException {
315 try {
316 LOG.info("Creating table " + table);
317 TableDescriptorBuilder tableDescriptorBuilder =
318 TableDescriptorBuilder.newBuilder(table);
320 tableDescriptorBuilder.setCoprocessor(MyObserver.class.getName());
321 MyObserver.sleepDuration = this.sleepDuration;
322 for (int i = 0; i < 10; i++) {
323 ColumnFamilyDescriptor columnFamilyDescriptor =
324 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family(i))).build();
325 tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptor);
328 UTIL.getAdmin().createTable(tableDescriptorBuilder.build());
329 } catch (TableExistsException tee) {
330 LOG.info("Table " + table + " already exists");
335 * Atomic bulk load.
337 @Test
338 public void testAtomicBulkLoad() throws Exception {
339 TableName TABLE_NAME = TableName.valueOf("atomicBulkLoad");
341 int millisToRun = 30000;
342 int numScanners = 50;
344 // Set createWALDir to true and use default values for other options.
345 UTIL.startMiniCluster(StartTestingClusterOption.builder().createWALDir(true).build());
346 try {
347 WAL log = UTIL.getHBaseCluster().getRegionServer(0).getWAL(null);
348 FindBulkHBaseListener listener = new FindBulkHBaseListener();
349 log.registerWALActionsListener(listener);
350 runAtomicBulkloadTest(TABLE_NAME, millisToRun, numScanners);
351 assertThat(listener.isFound(), is(true));
352 } finally {
353 UTIL.shutdownMiniCluster();
357 void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners)
358 throws Exception {
359 setupTable(tableName, 10);
361 TestContext ctx = new TestContext(UTIL.getConfiguration());
363 AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null);
364 ctx.addThread(loader);
366 List<AtomicScanReader> scanners = Lists.newArrayList();
367 for (int i = 0; i < numScanners; i++) {
368 AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families);
369 scanners.add(scanner);
370 ctx.addThread(scanner);
373 ctx.startThreads();
374 ctx.waitFor(millisToRun);
375 ctx.stop();
377 LOG.info("Loaders:");
378 LOG.info(" loaded " + loader.numBulkLoads.get());
379 LOG.info(" compations " + loader.numCompactions.get());
381 LOG.info("Scanners:");
382 for (AtomicScanReader scanner : scanners) {
383 LOG.info(" scanned " + scanner.numScans.get());
384 LOG.info(" verified " + scanner.numRowsScanned.get() + " rows");
389 * Run test on an HBase instance for 5 minutes. This assumes that the table
390 * under test only has a single region.
392 public static void main(String args[]) throws Exception {
393 try {
394 Configuration c = HBaseConfiguration.create();
395 TestHRegionServerBulkLoad test = new TestHRegionServerBulkLoad(0);
396 test.setConf(c);
397 test.runAtomicBulkloadTest(TableName.valueOf("atomicTableTest"), 5 * 60 * 1000, 50);
398 } finally {
399 System.exit(0); // something hangs (believe it is lru threadpool)
403 private void setConf(Configuration c) {
404 UTIL = new HBaseTestingUtil(c);
407 static class FindBulkHBaseListener extends TestWALActionsListener.DummyWALActionsListener {
408 private boolean found = false;
410 @Override
411 public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) {
412 for (Cell cell : logEdit.getCells()) {
413 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
414 for (Map.Entry entry : kv.toStringMap().entrySet()) {
415 if (entry.getValue().equals(Bytes.toString(WALEdit.BULK_LOAD))) {
416 found = true;
422 public boolean isFound() {
423 return found;