2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.regionserver
;
20 import static org
.apache
.hadoop
.hbase
.regionserver
.HStoreFile
.BULKLOAD_TIME_KEY
;
21 import static org
.apache
.hadoop
.hbase
.regionserver
.HStoreFile
.MAX_SEQ_ID_KEY
;
23 import java
.io
.IOException
;
24 import java
.util
.List
;
25 import java
.util
.concurrent
.CountDownLatch
;
26 import org
.apache
.hadoop
.conf
.Configuration
;
27 import org
.apache
.hadoop
.fs
.FileSystem
;
28 import org
.apache
.hadoop
.fs
.Path
;
29 import org
.apache
.hadoop
.hbase
.Cell
;
30 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
31 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
32 import org
.apache
.hadoop
.hbase
.KeyValue
;
33 import org
.apache
.hadoop
.hbase
.TableName
;
34 import org
.apache
.hadoop
.hbase
.TableNotFoundException
;
35 import org
.apache
.hadoop
.hbase
.client
.Admin
;
36 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptor
;
37 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptorBuilder
;
38 import org
.apache
.hadoop
.hbase
.client
.Put
;
39 import org
.apache
.hadoop
.hbase
.client
.Result
;
40 import org
.apache
.hadoop
.hbase
.client
.ResultScanner
;
41 import org
.apache
.hadoop
.hbase
.client
.Scan
;
42 import org
.apache
.hadoop
.hbase
.client
.Table
;
43 import org
.apache
.hadoop
.hbase
.client
.TableDescriptorBuilder
;
44 import org
.apache
.hadoop
.hbase
.io
.hfile
.HFile
;
45 import org
.apache
.hadoop
.hbase
.io
.hfile
.HFileContext
;
46 import org
.apache
.hadoop
.hbase
.io
.hfile
.HFileContextBuilder
;
47 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
48 import org
.apache
.hadoop
.hbase
.testclassification
.RegionServerTests
;
49 import org
.apache
.hadoop
.hbase
.tool
.BulkLoadHFiles
;
50 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
51 import org
.junit
.AfterClass
;
52 import org
.junit
.Assert
;
53 import org
.junit
.BeforeClass
;
54 import org
.junit
.ClassRule
;
55 import org
.junit
.Rule
;
56 import org
.junit
.Test
;
57 import org
.junit
.experimental
.categories
.Category
;
58 import org
.junit
.rules
.TestName
;
60 @Category({RegionServerTests
.class, MediumTests
.class})
61 public class TestScannerWithBulkload
{
64 public static final HBaseClassTestRule CLASS_RULE
=
65 HBaseClassTestRule
.forClass(TestScannerWithBulkload
.class);
67 private final static HBaseTestingUtility TEST_UTIL
= new HBaseTestingUtility();
70 public TestName name
= new TestName();
73 public static void setUpBeforeClass() throws Exception
{
74 TEST_UTIL
.startMiniCluster(1);
77 private static void createTable(Admin admin
, TableName tableName
) throws IOException
{
78 TableDescriptorBuilder tableDescriptorBuilder
=
79 TableDescriptorBuilder
.newBuilder(tableName
);
80 ColumnFamilyDescriptor columnFamilyDescriptor
=
81 ColumnFamilyDescriptorBuilder
82 .newBuilder(Bytes
.toBytes("col"))
83 .setMaxVersions(3).build();
84 tableDescriptorBuilder
.setColumnFamily(columnFamilyDescriptor
);
85 admin
.createTable(tableDescriptorBuilder
.build());
89 public void testBulkLoad() throws Exception
{
90 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
91 long l
= System
.currentTimeMillis();
92 Admin admin
= TEST_UTIL
.getAdmin();
93 createTable(admin
, tableName
);
94 Scan scan
= createScan();
95 final Table table
= init(admin
, l
, scan
, tableName
);
97 final Path hfilePath
= writeToHFile(l
, "/temp/testBulkLoad/", "/temp/testBulkLoad/col/file",
99 Configuration conf
= TEST_UTIL
.getConfiguration();
100 conf
.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
101 BulkLoadHFiles
.create(conf
).bulkLoad(tableName
, hfilePath
);
102 ResultScanner scanner
= table
.getScanner(scan
);
103 Result result
= scanner
.next();
104 result
= scanAfterBulkLoad(scanner
, result
, "version2");
105 Put put0
= new Put(Bytes
.toBytes("row1"));
106 put0
.add(new KeyValue(Bytes
.toBytes("row1"), Bytes
.toBytes("col"), Bytes
.toBytes("q"), l
, Bytes
107 .toBytes("version3")));
109 admin
.flush(tableName
);
110 scanner
= table
.getScanner(scan
);
111 result
= scanner
.next();
112 while (result
!= null) {
113 List
<Cell
> cells
= result
.getColumnCells(Bytes
.toBytes("col"), Bytes
.toBytes("q"));
114 for (Cell _c
: cells
) {
115 if (Bytes
.toString(_c
.getRowArray(), _c
.getRowOffset(), _c
.getRowLength())
118 .println(Bytes
.toString(_c
.getRowArray(), _c
.getRowOffset(), _c
.getRowLength()));
119 System
.out
.println(Bytes
.toString(_c
.getQualifierArray(), _c
.getQualifierOffset(),
120 _c
.getQualifierLength()));
122 Bytes
.toString(_c
.getValueArray(), _c
.getValueOffset(), _c
.getValueLength()));
123 Assert
.assertEquals("version3",
124 Bytes
.toString(_c
.getValueArray(), _c
.getValueOffset(), _c
.getValueLength()));
127 result
= scanner
.next();
133 private Result
scanAfterBulkLoad(ResultScanner scanner
, Result result
, String expctedVal
)
135 while (result
!= null) {
136 List
<Cell
> cells
= result
.getColumnCells(Bytes
.toBytes("col"), Bytes
.toBytes("q"));
137 for (Cell _c
: cells
) {
138 if (Bytes
.toString(_c
.getRowArray(), _c
.getRowOffset(), _c
.getRowLength())
141 .println(Bytes
.toString(_c
.getRowArray(), _c
.getRowOffset(), _c
.getRowLength()));
142 System
.out
.println(Bytes
.toString(_c
.getQualifierArray(), _c
.getQualifierOffset(),
143 _c
.getQualifierLength()));
145 Bytes
.toString(_c
.getValueArray(), _c
.getValueOffset(), _c
.getValueLength()));
146 Assert
.assertEquals(expctedVal
,
147 Bytes
.toString(_c
.getValueArray(), _c
.getValueOffset(), _c
.getValueLength()));
150 result
= scanner
.next();
155 // If nativeHFile is true, we will set cell seq id and MAX_SEQ_ID_KEY in the file.
156 // Else, we will set BULKLOAD_TIME_KEY.
157 private Path
writeToHFile(long l
, String hFilePath
, String pathStr
, boolean nativeHFile
)
159 FileSystem fs
= FileSystem
.get(TEST_UTIL
.getConfiguration());
160 final Path hfilePath
= new Path(hFilePath
);
161 fs
.mkdirs(hfilePath
);
162 Path path
= new Path(pathStr
);
163 HFile
.WriterFactory wf
= HFile
.getWriterFactoryNoCache(TEST_UTIL
.getConfiguration());
164 Assert
.assertNotNull(wf
);
165 HFileContext context
= new HFileContextBuilder().build();
166 HFile
.Writer writer
= wf
.withPath(fs
, path
).withFileContext(context
).create();
167 KeyValue kv
= new KeyValue(Bytes
.toBytes("row1"), Bytes
.toBytes("col"), Bytes
.toBytes("q"), l
,
168 Bytes
.toBytes("version2"));
170 // Set cell seq id to test bulk load native hfiles.
172 // Set a big seq id. Scan should not look at this seq id in a bulk loaded file.
173 // Scan should only look at the seq id appended at the bulk load time, and not skip
175 kv
.setSequenceId(9999999);
181 // Set a big MAX_SEQ_ID_KEY. Scan should not look at this seq id in a bulk loaded file.
182 // Scan should only look at the seq id appended at the bulk load time, and not skip its
184 writer
.appendFileInfo(MAX_SEQ_ID_KEY
, Bytes
.toBytes(new Long(9999999)));
187 writer
.appendFileInfo(BULKLOAD_TIME_KEY
, Bytes
.toBytes(System
.currentTimeMillis()));
193 private Table
init(Admin admin
, long l
, Scan scan
, TableName tableName
) throws Exception
{
194 Table table
= TEST_UTIL
.getConnection().getTable(tableName
);
195 Put put0
= new Put(Bytes
.toBytes("row1"));
196 put0
.add(new KeyValue(Bytes
.toBytes("row1"), Bytes
.toBytes("col"), Bytes
.toBytes("q"), l
,
197 Bytes
.toBytes("version0")));
199 admin
.flush(tableName
);
200 Put put1
= new Put(Bytes
.toBytes("row2"));
201 put1
.add(new KeyValue(Bytes
.toBytes("row2"), Bytes
.toBytes("col"), Bytes
.toBytes("q"), l
, Bytes
202 .toBytes("version0")));
204 admin
.flush(tableName
);
205 put0
= new Put(Bytes
.toBytes("row1"));
206 put0
.add(new KeyValue(Bytes
.toBytes("row1"), Bytes
.toBytes("col"), Bytes
.toBytes("q"), l
, Bytes
207 .toBytes("version1")));
209 admin
.flush(tableName
);
210 admin
.compact(tableName
);
212 ResultScanner scanner
= table
.getScanner(scan
);
213 Result result
= scanner
.next();
214 List
<Cell
> cells
= result
.getColumnCells(Bytes
.toBytes("col"), Bytes
.toBytes("q"));
215 Assert
.assertEquals(1, cells
.size());
216 Cell _c
= cells
.get(0);
217 Assert
.assertEquals("version1",
218 Bytes
.toString(_c
.getValueArray(), _c
.getValueOffset(), _c
.getValueLength()));
224 public void testBulkLoadWithParallelScan() throws Exception
{
225 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
226 final long l
= System
.currentTimeMillis();
227 final Admin admin
= TEST_UTIL
.getAdmin();
228 createTable(admin
, tableName
);
229 Scan scan
= createScan();
231 final Table table
= init(admin
, l
, scan
, tableName
);
233 final Path hfilePath
= writeToHFile(l
, "/temp/testBulkLoadWithParallelScan/",
234 "/temp/testBulkLoadWithParallelScan/col/file", false);
235 Configuration conf
= TEST_UTIL
.getConfiguration();
236 conf
.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
237 final BulkLoadHFiles bulkload
= BulkLoadHFiles
.create(conf
);
238 ResultScanner scanner
= table
.getScanner(scan
);
239 Result result
= scanner
.next();
240 // Create a scanner and then do bulk load
241 final CountDownLatch latch
= new CountDownLatch(1);
246 Put put1
= new Put(Bytes
.toBytes("row5"));
247 put1
.add(new KeyValue(Bytes
.toBytes("row5"), Bytes
.toBytes("col"), Bytes
.toBytes("q"), l
,
248 Bytes
.toBytes("version0")));
250 bulkload
.bulkLoad(tableName
, hfilePath
);
252 } catch (TableNotFoundException e
) {
253 } catch (IOException e
) {
258 // By the time we do next() the bulk loaded files are also added to the kv
260 scanAfterBulkLoad(scanner
, result
, "version1");
266 public void testBulkLoadNativeHFile() throws Exception
{
267 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
268 long l
= System
.currentTimeMillis();
269 Admin admin
= TEST_UTIL
.getAdmin();
270 createTable(admin
, tableName
);
271 Scan scan
= createScan();
272 final Table table
= init(admin
, l
, scan
, tableName
);
274 final Path hfilePath
= writeToHFile(l
, "/temp/testBulkLoadNativeHFile/",
275 "/temp/testBulkLoadNativeHFile/col/file", true);
276 Configuration conf
= TEST_UTIL
.getConfiguration();
277 conf
.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
278 BulkLoadHFiles
.create(conf
).bulkLoad(tableName
, hfilePath
);
279 ResultScanner scanner
= table
.getScanner(scan
);
280 Result result
= scanner
.next();
281 // We had 'version0', 'version1' for 'row1,col:q' in the table.
282 // Bulk load added 'version2' scanner should be able to see 'version2'
283 result
= scanAfterBulkLoad(scanner
, result
, "version2");
284 Put put0
= new Put(Bytes
.toBytes("row1"));
285 put0
.add(new KeyValue(Bytes
.toBytes("row1"), Bytes
.toBytes("col"), Bytes
.toBytes("q"), l
, Bytes
286 .toBytes("version3")));
288 admin
.flush(tableName
);
289 scanner
= table
.getScanner(scan
);
290 result
= scanner
.next();
291 while (result
!= null) {
292 List
<Cell
> cells
= result
.getColumnCells(Bytes
.toBytes("col"), Bytes
.toBytes("q"));
293 for (Cell _c
: cells
) {
294 if (Bytes
.toString(_c
.getRowArray(), _c
.getRowOffset(), _c
.getRowLength())
297 .println(Bytes
.toString(_c
.getRowArray(), _c
.getRowOffset(), _c
.getRowLength()));
298 System
.out
.println(Bytes
.toString(_c
.getQualifierArray(), _c
.getQualifierOffset(),
299 _c
.getQualifierLength()));
301 Bytes
.toString(_c
.getValueArray(), _c
.getValueOffset(), _c
.getValueLength()));
302 Assert
.assertEquals("version3",
303 Bytes
.toString(_c
.getValueArray(), _c
.getValueOffset(), _c
.getValueLength()));
306 result
= scanner
.next();
312 private Scan
createScan() {
313 Scan scan
= new Scan();
314 scan
.setMaxVersions(3);
319 public static void tearDownAfterClass() throws Exception
{
320 TEST_UTIL
.shutdownMiniCluster();