HBASE-24033 Add ut for loading the corrupt recovered hfiles (#1322)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / regionserver / TestScannerWithBulkload.java
blobaeeaf3d1d8e5c85e25a8b4a939cb473020aaa2ec
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.regionserver;
20 import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY;
21 import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAX_SEQ_ID_KEY;
23 import java.io.IOException;
24 import java.util.List;
25 import java.util.concurrent.CountDownLatch;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.fs.FileSystem;
28 import org.apache.hadoop.fs.Path;
29 import org.apache.hadoop.hbase.Cell;
30 import org.apache.hadoop.hbase.HBaseClassTestRule;
31 import org.apache.hadoop.hbase.HBaseTestingUtility;
32 import org.apache.hadoop.hbase.KeyValue;
33 import org.apache.hadoop.hbase.TableName;
34 import org.apache.hadoop.hbase.TableNotFoundException;
35 import org.apache.hadoop.hbase.client.Admin;
36 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
37 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
38 import org.apache.hadoop.hbase.client.Put;
39 import org.apache.hadoop.hbase.client.Result;
40 import org.apache.hadoop.hbase.client.ResultScanner;
41 import org.apache.hadoop.hbase.client.Scan;
42 import org.apache.hadoop.hbase.client.Table;
43 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
44 import org.apache.hadoop.hbase.io.hfile.HFile;
45 import org.apache.hadoop.hbase.io.hfile.HFileContext;
46 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
47 import org.apache.hadoop.hbase.testclassification.MediumTests;
48 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
49 import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
50 import org.apache.hadoop.hbase.util.Bytes;
51 import org.junit.AfterClass;
52 import org.junit.Assert;
53 import org.junit.BeforeClass;
54 import org.junit.ClassRule;
55 import org.junit.Rule;
56 import org.junit.Test;
57 import org.junit.experimental.categories.Category;
58 import org.junit.rules.TestName;
60 @Category({RegionServerTests.class, MediumTests.class})
61 public class TestScannerWithBulkload {
63 @ClassRule
64 public static final HBaseClassTestRule CLASS_RULE =
65 HBaseClassTestRule.forClass(TestScannerWithBulkload.class);
67 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
69 @Rule
70 public TestName name = new TestName();
72 @BeforeClass
73 public static void setUpBeforeClass() throws Exception {
74 TEST_UTIL.startMiniCluster(1);
77 private static void createTable(Admin admin, TableName tableName) throws IOException {
78 TableDescriptorBuilder tableDescriptorBuilder =
79 TableDescriptorBuilder.newBuilder(tableName);
80 ColumnFamilyDescriptor columnFamilyDescriptor =
81 ColumnFamilyDescriptorBuilder
82 .newBuilder(Bytes.toBytes("col"))
83 .setMaxVersions(3).build();
84 tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptor);
85 admin.createTable(tableDescriptorBuilder.build());
88 @Test
89 public void testBulkLoad() throws Exception {
90 final TableName tableName = TableName.valueOf(name.getMethodName());
91 long l = System.currentTimeMillis();
92 Admin admin = TEST_UTIL.getAdmin();
93 createTable(admin, tableName);
94 Scan scan = createScan();
95 final Table table = init(admin, l, scan, tableName);
96 // use bulkload
97 final Path hfilePath = writeToHFile(l, "/temp/testBulkLoad/", "/temp/testBulkLoad/col/file",
98 false);
99 Configuration conf = TEST_UTIL.getConfiguration();
100 conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
101 BulkLoadHFiles.create(conf).bulkLoad(tableName, hfilePath);
102 ResultScanner scanner = table.getScanner(scan);
103 Result result = scanner.next();
104 result = scanAfterBulkLoad(scanner, result, "version2");
105 Put put0 = new Put(Bytes.toBytes("row1"));
106 put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
107 .toBytes("version3")));
108 table.put(put0);
109 admin.flush(tableName);
110 scanner = table.getScanner(scan);
111 result = scanner.next();
112 while (result != null) {
113 List<Cell> cells = result.getColumnCells(Bytes.toBytes("col"), Bytes.toBytes("q"));
114 for (Cell _c : cells) {
115 if (Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength())
116 .equals("row1")) {
117 System.out
118 .println(Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength()));
119 System.out.println(Bytes.toString(_c.getQualifierArray(), _c.getQualifierOffset(),
120 _c.getQualifierLength()));
121 System.out.println(
122 Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength()));
123 Assert.assertEquals("version3",
124 Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength()));
127 result = scanner.next();
129 scanner.close();
130 table.close();
133 private Result scanAfterBulkLoad(ResultScanner scanner, Result result, String expctedVal)
134 throws IOException {
135 while (result != null) {
136 List<Cell> cells = result.getColumnCells(Bytes.toBytes("col"), Bytes.toBytes("q"));
137 for (Cell _c : cells) {
138 if (Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength())
139 .equals("row1")) {
140 System.out
141 .println(Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength()));
142 System.out.println(Bytes.toString(_c.getQualifierArray(), _c.getQualifierOffset(),
143 _c.getQualifierLength()));
144 System.out.println(
145 Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength()));
146 Assert.assertEquals(expctedVal,
147 Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength()));
150 result = scanner.next();
152 return result;
155 // If nativeHFile is true, we will set cell seq id and MAX_SEQ_ID_KEY in the file.
156 // Else, we will set BULKLOAD_TIME_KEY.
157 private Path writeToHFile(long l, String hFilePath, String pathStr, boolean nativeHFile)
158 throws IOException {
159 FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
160 final Path hfilePath = new Path(hFilePath);
161 fs.mkdirs(hfilePath);
162 Path path = new Path(pathStr);
163 HFile.WriterFactory wf = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration());
164 Assert.assertNotNull(wf);
165 HFileContext context = new HFileContextBuilder().build();
166 HFile.Writer writer = wf.withPath(fs, path).withFileContext(context).create();
167 KeyValue kv = new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l,
168 Bytes.toBytes("version2"));
170 // Set cell seq id to test bulk load native hfiles.
171 if (nativeHFile) {
172 // Set a big seq id. Scan should not look at this seq id in a bulk loaded file.
173 // Scan should only look at the seq id appended at the bulk load time, and not skip
174 // this kv.
175 kv.setSequenceId(9999999);
178 writer.append(kv);
180 if (nativeHFile) {
181 // Set a big MAX_SEQ_ID_KEY. Scan should not look at this seq id in a bulk loaded file.
182 // Scan should only look at the seq id appended at the bulk load time, and not skip its
183 // kv.
184 writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(new Long(9999999)));
186 else {
187 writer.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis()));
189 writer.close();
190 return hfilePath;
193 private Table init(Admin admin, long l, Scan scan, TableName tableName) throws Exception {
194 Table table = TEST_UTIL.getConnection().getTable(tableName);
195 Put put0 = new Put(Bytes.toBytes("row1"));
196 put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l,
197 Bytes.toBytes("version0")));
198 table.put(put0);
199 admin.flush(tableName);
200 Put put1 = new Put(Bytes.toBytes("row2"));
201 put1.add(new KeyValue(Bytes.toBytes("row2"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
202 .toBytes("version0")));
203 table.put(put1);
204 admin.flush(tableName);
205 put0 = new Put(Bytes.toBytes("row1"));
206 put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
207 .toBytes("version1")));
208 table.put(put0);
209 admin.flush(tableName);
210 admin.compact(tableName);
212 ResultScanner scanner = table.getScanner(scan);
213 Result result = scanner.next();
214 List<Cell> cells = result.getColumnCells(Bytes.toBytes("col"), Bytes.toBytes("q"));
215 Assert.assertEquals(1, cells.size());
216 Cell _c = cells.get(0);
217 Assert.assertEquals("version1",
218 Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength()));
219 scanner.close();
220 return table;
223 @Test
224 public void testBulkLoadWithParallelScan() throws Exception {
225 final TableName tableName = TableName.valueOf(name.getMethodName());
226 final long l = System.currentTimeMillis();
227 final Admin admin = TEST_UTIL.getAdmin();
228 createTable(admin, tableName);
229 Scan scan = createScan();
230 scan.setCaching(1);
231 final Table table = init(admin, l, scan, tableName);
232 // use bulkload
233 final Path hfilePath = writeToHFile(l, "/temp/testBulkLoadWithParallelScan/",
234 "/temp/testBulkLoadWithParallelScan/col/file", false);
235 Configuration conf = TEST_UTIL.getConfiguration();
236 conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
237 final BulkLoadHFiles bulkload = BulkLoadHFiles.create(conf);
238 ResultScanner scanner = table.getScanner(scan);
239 Result result = scanner.next();
240 // Create a scanner and then do bulk load
241 final CountDownLatch latch = new CountDownLatch(1);
242 new Thread() {
243 @Override
244 public void run() {
245 try {
246 Put put1 = new Put(Bytes.toBytes("row5"));
247 put1.add(new KeyValue(Bytes.toBytes("row5"), Bytes.toBytes("col"), Bytes.toBytes("q"), l,
248 Bytes.toBytes("version0")));
249 table.put(put1);
250 bulkload.bulkLoad(tableName, hfilePath);
251 latch.countDown();
252 } catch (TableNotFoundException e) {
253 } catch (IOException e) {
256 }.start();
257 latch.await();
258 // By the time we do next() the bulk loaded files are also added to the kv
259 // scanner
260 scanAfterBulkLoad(scanner, result, "version1");
261 scanner.close();
262 table.close();
265 @Test
266 public void testBulkLoadNativeHFile() throws Exception {
267 final TableName tableName = TableName.valueOf(name.getMethodName());
268 long l = System.currentTimeMillis();
269 Admin admin = TEST_UTIL.getAdmin();
270 createTable(admin, tableName);
271 Scan scan = createScan();
272 final Table table = init(admin, l, scan, tableName);
273 // use bulkload
274 final Path hfilePath = writeToHFile(l, "/temp/testBulkLoadNativeHFile/",
275 "/temp/testBulkLoadNativeHFile/col/file", true);
276 Configuration conf = TEST_UTIL.getConfiguration();
277 conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
278 BulkLoadHFiles.create(conf).bulkLoad(tableName, hfilePath);
279 ResultScanner scanner = table.getScanner(scan);
280 Result result = scanner.next();
281 // We had 'version0', 'version1' for 'row1,col:q' in the table.
282 // Bulk load added 'version2' scanner should be able to see 'version2'
283 result = scanAfterBulkLoad(scanner, result, "version2");
284 Put put0 = new Put(Bytes.toBytes("row1"));
285 put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
286 .toBytes("version3")));
287 table.put(put0);
288 admin.flush(tableName);
289 scanner = table.getScanner(scan);
290 result = scanner.next();
291 while (result != null) {
292 List<Cell> cells = result.getColumnCells(Bytes.toBytes("col"), Bytes.toBytes("q"));
293 for (Cell _c : cells) {
294 if (Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength())
295 .equals("row1")) {
296 System.out
297 .println(Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength()));
298 System.out.println(Bytes.toString(_c.getQualifierArray(), _c.getQualifierOffset(),
299 _c.getQualifierLength()));
300 System.out.println(
301 Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength()));
302 Assert.assertEquals("version3",
303 Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength()));
306 result = scanner.next();
308 scanner.close();
309 table.close();
312 private Scan createScan() {
313 Scan scan = new Scan();
314 scan.setMaxVersions(3);
315 return scan;
318 @AfterClass
319 public static void tearDownAfterClass() throws Exception {
320 TEST_UTIL.shutdownMiniCluster();