HBASE-26921 Rewrite the counting cells part in TestMultiVersions (#4316)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / regionserver / TestScannerWithBulkload.java
blob7be349e88afce58266d900f91c17054a2b1753d3
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.regionserver;
20 import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY;
21 import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAX_SEQ_ID_KEY;
23 import java.io.IOException;
24 import java.util.List;
25 import java.util.concurrent.CountDownLatch;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.fs.FileSystem;
28 import org.apache.hadoop.fs.Path;
29 import org.apache.hadoop.hbase.Cell;
30 import org.apache.hadoop.hbase.HBaseClassTestRule;
31 import org.apache.hadoop.hbase.HBaseTestingUtil;
32 import org.apache.hadoop.hbase.KeyValue;
33 import org.apache.hadoop.hbase.TableName;
34 import org.apache.hadoop.hbase.TableNotFoundException;
35 import org.apache.hadoop.hbase.client.Admin;
36 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
37 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
38 import org.apache.hadoop.hbase.client.Put;
39 import org.apache.hadoop.hbase.client.Result;
40 import org.apache.hadoop.hbase.client.ResultScanner;
41 import org.apache.hadoop.hbase.client.Scan;
42 import org.apache.hadoop.hbase.client.Table;
43 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
44 import org.apache.hadoop.hbase.io.hfile.HFile;
45 import org.apache.hadoop.hbase.io.hfile.HFileContext;
46 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
47 import org.apache.hadoop.hbase.testclassification.MediumTests;
48 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
49 import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
50 import org.apache.hadoop.hbase.util.Bytes;
51 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
52 import org.junit.AfterClass;
53 import org.junit.Assert;
54 import org.junit.BeforeClass;
55 import org.junit.ClassRule;
56 import org.junit.Rule;
57 import org.junit.Test;
58 import org.junit.experimental.categories.Category;
59 import org.junit.rules.TestName;
61 @Category({RegionServerTests.class, MediumTests.class})
62 public class TestScannerWithBulkload {
64 @ClassRule
65 public static final HBaseClassTestRule CLASS_RULE =
66 HBaseClassTestRule.forClass(TestScannerWithBulkload.class);
68 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
70 @Rule
71 public TestName name = new TestName();
73 @BeforeClass
74 public static void setUpBeforeClass() throws Exception {
75 TEST_UTIL.startMiniCluster(1);
78 private static void createTable(Admin admin, TableName tableName) throws IOException {
79 TableDescriptorBuilder tableDescriptorBuilder =
80 TableDescriptorBuilder.newBuilder(tableName);
81 ColumnFamilyDescriptor columnFamilyDescriptor =
82 ColumnFamilyDescriptorBuilder
83 .newBuilder(Bytes.toBytes("col"))
84 .setMaxVersions(3).build();
85 tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptor);
86 admin.createTable(tableDescriptorBuilder.build());
89 @Test
90 public void testBulkLoad() throws Exception {
91 final TableName tableName = TableName.valueOf(name.getMethodName());
92 long l = EnvironmentEdgeManager.currentTime();
93 Admin admin = TEST_UTIL.getAdmin();
94 createTable(admin, tableName);
95 Scan scan = createScan();
96 final Table table = init(admin, l, scan, tableName);
97 // use bulkload
98 final Path hfilePath = writeToHFile(l, "/temp/testBulkLoad/", "/temp/testBulkLoad/col/file",
99 false);
100 Configuration conf = TEST_UTIL.getConfiguration();
101 conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
102 BulkLoadHFiles.create(conf).bulkLoad(tableName, hfilePath);
103 ResultScanner scanner = table.getScanner(scan);
104 Result result = scanner.next();
105 result = scanAfterBulkLoad(scanner, result, "version2");
106 Put put0 = new Put(Bytes.toBytes("row1"));
107 put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
108 .toBytes("version3")));
109 table.put(put0);
110 admin.flush(tableName);
111 scanner = table.getScanner(scan);
112 result = scanner.next();
113 while (result != null) {
114 List<Cell> cells = result.getColumnCells(Bytes.toBytes("col"), Bytes.toBytes("q"));
115 for (Cell _c : cells) {
116 if (Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength())
117 .equals("row1")) {
118 System.out
119 .println(Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength()));
120 System.out.println(Bytes.toString(_c.getQualifierArray(), _c.getQualifierOffset(),
121 _c.getQualifierLength()));
122 System.out.println(
123 Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength()));
124 Assert.assertEquals("version3",
125 Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength()));
128 result = scanner.next();
130 scanner.close();
131 table.close();
134 private Result scanAfterBulkLoad(ResultScanner scanner, Result result, String expctedVal)
135 throws IOException {
136 while (result != null) {
137 List<Cell> cells = result.getColumnCells(Bytes.toBytes("col"), Bytes.toBytes("q"));
138 for (Cell _c : cells) {
139 if (Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength())
140 .equals("row1")) {
141 System.out
142 .println(Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength()));
143 System.out.println(Bytes.toString(_c.getQualifierArray(), _c.getQualifierOffset(),
144 _c.getQualifierLength()));
145 System.out.println(
146 Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength()));
147 Assert.assertEquals(expctedVal,
148 Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength()));
151 result = scanner.next();
153 return result;
156 // If nativeHFile is true, we will set cell seq id and MAX_SEQ_ID_KEY in the file.
157 // Else, we will set BULKLOAD_TIME_KEY.
158 private Path writeToHFile(long l, String hFilePath, String pathStr, boolean nativeHFile)
159 throws IOException {
160 FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
161 final Path hfilePath = new Path(hFilePath);
162 fs.mkdirs(hfilePath);
163 Path path = new Path(pathStr);
164 HFile.WriterFactory wf = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration());
165 Assert.assertNotNull(wf);
166 HFileContext context = new HFileContextBuilder().build();
167 HFile.Writer writer = wf.withPath(fs, path).withFileContext(context).create();
168 KeyValue kv = new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l,
169 Bytes.toBytes("version2"));
171 // Set cell seq id to test bulk load native hfiles.
172 if (nativeHFile) {
173 // Set a big seq id. Scan should not look at this seq id in a bulk loaded file.
174 // Scan should only look at the seq id appended at the bulk load time, and not skip
175 // this kv.
176 kv.setSequenceId(9999999);
179 writer.append(kv);
181 if (nativeHFile) {
182 // Set a big MAX_SEQ_ID_KEY. Scan should not look at this seq id in a bulk loaded file.
183 // Scan should only look at the seq id appended at the bulk load time, and not skip its
184 // kv.
185 writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(new Long(9999999)));
187 else {
188 writer.appendFileInfo(BULKLOAD_TIME_KEY,
189 Bytes.toBytes(EnvironmentEdgeManager.currentTime()));
191 writer.close();
192 return hfilePath;
195 private Table init(Admin admin, long l, Scan scan, TableName tableName) throws Exception {
196 Table table = TEST_UTIL.getConnection().getTable(tableName);
197 Put put0 = new Put(Bytes.toBytes("row1"));
198 put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l,
199 Bytes.toBytes("version0")));
200 table.put(put0);
201 admin.flush(tableName);
202 Put put1 = new Put(Bytes.toBytes("row2"));
203 put1.add(new KeyValue(Bytes.toBytes("row2"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
204 .toBytes("version0")));
205 table.put(put1);
206 admin.flush(tableName);
207 put0 = new Put(Bytes.toBytes("row1"));
208 put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
209 .toBytes("version1")));
210 table.put(put0);
211 admin.flush(tableName);
212 admin.compact(tableName);
214 ResultScanner scanner = table.getScanner(scan);
215 Result result = scanner.next();
216 List<Cell> cells = result.getColumnCells(Bytes.toBytes("col"), Bytes.toBytes("q"));
217 Assert.assertEquals(1, cells.size());
218 Cell _c = cells.get(0);
219 Assert.assertEquals("version1",
220 Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength()));
221 scanner.close();
222 return table;
225 @Test
226 public void testBulkLoadWithParallelScan() throws Exception {
227 final TableName tableName = TableName.valueOf(name.getMethodName());
228 final long l = EnvironmentEdgeManager.currentTime();
229 final Admin admin = TEST_UTIL.getAdmin();
230 createTable(admin, tableName);
231 Scan scan = createScan();
232 scan.setCaching(1);
233 final Table table = init(admin, l, scan, tableName);
234 // use bulkload
235 final Path hfilePath = writeToHFile(l, "/temp/testBulkLoadWithParallelScan/",
236 "/temp/testBulkLoadWithParallelScan/col/file", false);
237 Configuration conf = TEST_UTIL.getConfiguration();
238 conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
239 final BulkLoadHFiles bulkload = BulkLoadHFiles.create(conf);
240 ResultScanner scanner = table.getScanner(scan);
241 Result result = scanner.next();
242 // Create a scanner and then do bulk load
243 final CountDownLatch latch = new CountDownLatch(1);
244 new Thread() {
245 @Override
246 public void run() {
247 try {
248 Put put1 = new Put(Bytes.toBytes("row5"));
249 put1.add(new KeyValue(Bytes.toBytes("row5"), Bytes.toBytes("col"), Bytes.toBytes("q"), l,
250 Bytes.toBytes("version0")));
251 table.put(put1);
252 bulkload.bulkLoad(tableName, hfilePath);
253 latch.countDown();
254 } catch (TableNotFoundException e) {
255 } catch (IOException e) {
258 }.start();
259 latch.await();
260 // By the time we do next() the bulk loaded files are also added to the kv
261 // scanner
262 scanAfterBulkLoad(scanner, result, "version1");
263 scanner.close();
264 table.close();
267 @Test
268 public void testBulkLoadNativeHFile() throws Exception {
269 final TableName tableName = TableName.valueOf(name.getMethodName());
270 long l = EnvironmentEdgeManager.currentTime();
271 Admin admin = TEST_UTIL.getAdmin();
272 createTable(admin, tableName);
273 Scan scan = createScan();
274 final Table table = init(admin, l, scan, tableName);
275 // use bulkload
276 final Path hfilePath = writeToHFile(l, "/temp/testBulkLoadNativeHFile/",
277 "/temp/testBulkLoadNativeHFile/col/file", true);
278 Configuration conf = TEST_UTIL.getConfiguration();
279 conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
280 BulkLoadHFiles.create(conf).bulkLoad(tableName, hfilePath);
281 ResultScanner scanner = table.getScanner(scan);
282 Result result = scanner.next();
283 // We had 'version0', 'version1' for 'row1,col:q' in the table.
284 // Bulk load added 'version2' scanner should be able to see 'version2'
285 result = scanAfterBulkLoad(scanner, result, "version2");
286 Put put0 = new Put(Bytes.toBytes("row1"));
287 put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
288 .toBytes("version3")));
289 table.put(put0);
290 admin.flush(tableName);
291 scanner = table.getScanner(scan);
292 result = scanner.next();
293 while (result != null) {
294 List<Cell> cells = result.getColumnCells(Bytes.toBytes("col"), Bytes.toBytes("q"));
295 for (Cell _c : cells) {
296 if (Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength())
297 .equals("row1")) {
298 System.out
299 .println(Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength()));
300 System.out.println(Bytes.toString(_c.getQualifierArray(), _c.getQualifierOffset(),
301 _c.getQualifierLength()));
302 System.out.println(
303 Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength()));
304 Assert.assertEquals("version3",
305 Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength()));
308 result = scanner.next();
310 scanner.close();
311 table.close();
314 private Scan createScan() {
315 Scan scan = new Scan();
316 scan.readVersions(3);
317 return scan;
320 @AfterClass
321 public static void tearDownAfterClass() throws Exception {
322 TEST_UTIL.shutdownMiniCluster();