2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.regionserver
;
20 import static org
.apache
.hadoop
.hbase
.regionserver
.HStoreFile
.BULKLOAD_TIME_KEY
;
21 import static org
.apache
.hadoop
.hbase
.regionserver
.HStoreFile
.MAX_SEQ_ID_KEY
;
23 import java
.io
.IOException
;
24 import java
.util
.List
;
25 import java
.util
.concurrent
.CountDownLatch
;
26 import org
.apache
.hadoop
.conf
.Configuration
;
27 import org
.apache
.hadoop
.fs
.FileSystem
;
28 import org
.apache
.hadoop
.fs
.Path
;
29 import org
.apache
.hadoop
.hbase
.Cell
;
30 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
31 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
32 import org
.apache
.hadoop
.hbase
.KeyValue
;
33 import org
.apache
.hadoop
.hbase
.TableName
;
34 import org
.apache
.hadoop
.hbase
.TableNotFoundException
;
35 import org
.apache
.hadoop
.hbase
.client
.Admin
;
36 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptor
;
37 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptorBuilder
;
38 import org
.apache
.hadoop
.hbase
.client
.Put
;
39 import org
.apache
.hadoop
.hbase
.client
.Result
;
40 import org
.apache
.hadoop
.hbase
.client
.ResultScanner
;
41 import org
.apache
.hadoop
.hbase
.client
.Scan
;
42 import org
.apache
.hadoop
.hbase
.client
.Table
;
43 import org
.apache
.hadoop
.hbase
.client
.TableDescriptorBuilder
;
44 import org
.apache
.hadoop
.hbase
.io
.hfile
.HFile
;
45 import org
.apache
.hadoop
.hbase
.io
.hfile
.HFileContext
;
46 import org
.apache
.hadoop
.hbase
.io
.hfile
.HFileContextBuilder
;
47 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
48 import org
.apache
.hadoop
.hbase
.testclassification
.RegionServerTests
;
49 import org
.apache
.hadoop
.hbase
.tool
.BulkLoadHFiles
;
50 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
51 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
52 import org
.junit
.AfterClass
;
53 import org
.junit
.Assert
;
54 import org
.junit
.BeforeClass
;
55 import org
.junit
.ClassRule
;
56 import org
.junit
.Rule
;
57 import org
.junit
.Test
;
58 import org
.junit
.experimental
.categories
.Category
;
59 import org
.junit
.rules
.TestName
;
61 @Category({RegionServerTests
.class, MediumTests
.class})
62 public class TestScannerWithBulkload
{
65 public static final HBaseClassTestRule CLASS_RULE
=
66 HBaseClassTestRule
.forClass(TestScannerWithBulkload
.class);
68 private final static HBaseTestingUtil TEST_UTIL
= new HBaseTestingUtil();
71 public TestName name
= new TestName();
74 public static void setUpBeforeClass() throws Exception
{
75 TEST_UTIL
.startMiniCluster(1);
78 private static void createTable(Admin admin
, TableName tableName
) throws IOException
{
79 TableDescriptorBuilder tableDescriptorBuilder
=
80 TableDescriptorBuilder
.newBuilder(tableName
);
81 ColumnFamilyDescriptor columnFamilyDescriptor
=
82 ColumnFamilyDescriptorBuilder
83 .newBuilder(Bytes
.toBytes("col"))
84 .setMaxVersions(3).build();
85 tableDescriptorBuilder
.setColumnFamily(columnFamilyDescriptor
);
86 admin
.createTable(tableDescriptorBuilder
.build());
90 public void testBulkLoad() throws Exception
{
91 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
92 long l
= EnvironmentEdgeManager
.currentTime();
93 Admin admin
= TEST_UTIL
.getAdmin();
94 createTable(admin
, tableName
);
95 Scan scan
= createScan();
96 final Table table
= init(admin
, l
, scan
, tableName
);
98 final Path hfilePath
= writeToHFile(l
, "/temp/testBulkLoad/", "/temp/testBulkLoad/col/file",
100 Configuration conf
= TEST_UTIL
.getConfiguration();
101 conf
.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
102 BulkLoadHFiles
.create(conf
).bulkLoad(tableName
, hfilePath
);
103 ResultScanner scanner
= table
.getScanner(scan
);
104 Result result
= scanner
.next();
105 result
= scanAfterBulkLoad(scanner
, result
, "version2");
106 Put put0
= new Put(Bytes
.toBytes("row1"));
107 put0
.add(new KeyValue(Bytes
.toBytes("row1"), Bytes
.toBytes("col"), Bytes
.toBytes("q"), l
, Bytes
108 .toBytes("version3")));
110 admin
.flush(tableName
);
111 scanner
= table
.getScanner(scan
);
112 result
= scanner
.next();
113 while (result
!= null) {
114 List
<Cell
> cells
= result
.getColumnCells(Bytes
.toBytes("col"), Bytes
.toBytes("q"));
115 for (Cell _c
: cells
) {
116 if (Bytes
.toString(_c
.getRowArray(), _c
.getRowOffset(), _c
.getRowLength())
119 .println(Bytes
.toString(_c
.getRowArray(), _c
.getRowOffset(), _c
.getRowLength()));
120 System
.out
.println(Bytes
.toString(_c
.getQualifierArray(), _c
.getQualifierOffset(),
121 _c
.getQualifierLength()));
123 Bytes
.toString(_c
.getValueArray(), _c
.getValueOffset(), _c
.getValueLength()));
124 Assert
.assertEquals("version3",
125 Bytes
.toString(_c
.getValueArray(), _c
.getValueOffset(), _c
.getValueLength()));
128 result
= scanner
.next();
134 private Result
scanAfterBulkLoad(ResultScanner scanner
, Result result
, String expctedVal
)
136 while (result
!= null) {
137 List
<Cell
> cells
= result
.getColumnCells(Bytes
.toBytes("col"), Bytes
.toBytes("q"));
138 for (Cell _c
: cells
) {
139 if (Bytes
.toString(_c
.getRowArray(), _c
.getRowOffset(), _c
.getRowLength())
142 .println(Bytes
.toString(_c
.getRowArray(), _c
.getRowOffset(), _c
.getRowLength()));
143 System
.out
.println(Bytes
.toString(_c
.getQualifierArray(), _c
.getQualifierOffset(),
144 _c
.getQualifierLength()));
146 Bytes
.toString(_c
.getValueArray(), _c
.getValueOffset(), _c
.getValueLength()));
147 Assert
.assertEquals(expctedVal
,
148 Bytes
.toString(_c
.getValueArray(), _c
.getValueOffset(), _c
.getValueLength()));
151 result
= scanner
.next();
156 // If nativeHFile is true, we will set cell seq id and MAX_SEQ_ID_KEY in the file.
157 // Else, we will set BULKLOAD_TIME_KEY.
158 private Path
writeToHFile(long l
, String hFilePath
, String pathStr
, boolean nativeHFile
)
160 FileSystem fs
= FileSystem
.get(TEST_UTIL
.getConfiguration());
161 final Path hfilePath
= new Path(hFilePath
);
162 fs
.mkdirs(hfilePath
);
163 Path path
= new Path(pathStr
);
164 HFile
.WriterFactory wf
= HFile
.getWriterFactoryNoCache(TEST_UTIL
.getConfiguration());
165 Assert
.assertNotNull(wf
);
166 HFileContext context
= new HFileContextBuilder().build();
167 HFile
.Writer writer
= wf
.withPath(fs
, path
).withFileContext(context
).create();
168 KeyValue kv
= new KeyValue(Bytes
.toBytes("row1"), Bytes
.toBytes("col"), Bytes
.toBytes("q"), l
,
169 Bytes
.toBytes("version2"));
171 // Set cell seq id to test bulk load native hfiles.
173 // Set a big seq id. Scan should not look at this seq id in a bulk loaded file.
174 // Scan should only look at the seq id appended at the bulk load time, and not skip
176 kv
.setSequenceId(9999999);
182 // Set a big MAX_SEQ_ID_KEY. Scan should not look at this seq id in a bulk loaded file.
183 // Scan should only look at the seq id appended at the bulk load time, and not skip its
185 writer
.appendFileInfo(MAX_SEQ_ID_KEY
, Bytes
.toBytes(new Long(9999999)));
188 writer
.appendFileInfo(BULKLOAD_TIME_KEY
,
189 Bytes
.toBytes(EnvironmentEdgeManager
.currentTime()));
195 private Table
init(Admin admin
, long l
, Scan scan
, TableName tableName
) throws Exception
{
196 Table table
= TEST_UTIL
.getConnection().getTable(tableName
);
197 Put put0
= new Put(Bytes
.toBytes("row1"));
198 put0
.add(new KeyValue(Bytes
.toBytes("row1"), Bytes
.toBytes("col"), Bytes
.toBytes("q"), l
,
199 Bytes
.toBytes("version0")));
201 admin
.flush(tableName
);
202 Put put1
= new Put(Bytes
.toBytes("row2"));
203 put1
.add(new KeyValue(Bytes
.toBytes("row2"), Bytes
.toBytes("col"), Bytes
.toBytes("q"), l
, Bytes
204 .toBytes("version0")));
206 admin
.flush(tableName
);
207 put0
= new Put(Bytes
.toBytes("row1"));
208 put0
.add(new KeyValue(Bytes
.toBytes("row1"), Bytes
.toBytes("col"), Bytes
.toBytes("q"), l
, Bytes
209 .toBytes("version1")));
211 admin
.flush(tableName
);
212 admin
.compact(tableName
);
214 ResultScanner scanner
= table
.getScanner(scan
);
215 Result result
= scanner
.next();
216 List
<Cell
> cells
= result
.getColumnCells(Bytes
.toBytes("col"), Bytes
.toBytes("q"));
217 Assert
.assertEquals(1, cells
.size());
218 Cell _c
= cells
.get(0);
219 Assert
.assertEquals("version1",
220 Bytes
.toString(_c
.getValueArray(), _c
.getValueOffset(), _c
.getValueLength()));
226 public void testBulkLoadWithParallelScan() throws Exception
{
227 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
228 final long l
= EnvironmentEdgeManager
.currentTime();
229 final Admin admin
= TEST_UTIL
.getAdmin();
230 createTable(admin
, tableName
);
231 Scan scan
= createScan();
233 final Table table
= init(admin
, l
, scan
, tableName
);
235 final Path hfilePath
= writeToHFile(l
, "/temp/testBulkLoadWithParallelScan/",
236 "/temp/testBulkLoadWithParallelScan/col/file", false);
237 Configuration conf
= TEST_UTIL
.getConfiguration();
238 conf
.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
239 final BulkLoadHFiles bulkload
= BulkLoadHFiles
.create(conf
);
240 ResultScanner scanner
= table
.getScanner(scan
);
241 Result result
= scanner
.next();
242 // Create a scanner and then do bulk load
243 final CountDownLatch latch
= new CountDownLatch(1);
248 Put put1
= new Put(Bytes
.toBytes("row5"));
249 put1
.add(new KeyValue(Bytes
.toBytes("row5"), Bytes
.toBytes("col"), Bytes
.toBytes("q"), l
,
250 Bytes
.toBytes("version0")));
252 bulkload
.bulkLoad(tableName
, hfilePath
);
254 } catch (TableNotFoundException e
) {
255 } catch (IOException e
) {
260 // By the time we do next() the bulk loaded files are also added to the kv
262 scanAfterBulkLoad(scanner
, result
, "version1");
268 public void testBulkLoadNativeHFile() throws Exception
{
269 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
270 long l
= EnvironmentEdgeManager
.currentTime();
271 Admin admin
= TEST_UTIL
.getAdmin();
272 createTable(admin
, tableName
);
273 Scan scan
= createScan();
274 final Table table
= init(admin
, l
, scan
, tableName
);
276 final Path hfilePath
= writeToHFile(l
, "/temp/testBulkLoadNativeHFile/",
277 "/temp/testBulkLoadNativeHFile/col/file", true);
278 Configuration conf
= TEST_UTIL
.getConfiguration();
279 conf
.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
280 BulkLoadHFiles
.create(conf
).bulkLoad(tableName
, hfilePath
);
281 ResultScanner scanner
= table
.getScanner(scan
);
282 Result result
= scanner
.next();
283 // We had 'version0', 'version1' for 'row1,col:q' in the table.
284 // Bulk load added 'version2' scanner should be able to see 'version2'
285 result
= scanAfterBulkLoad(scanner
, result
, "version2");
286 Put put0
= new Put(Bytes
.toBytes("row1"));
287 put0
.add(new KeyValue(Bytes
.toBytes("row1"), Bytes
.toBytes("col"), Bytes
.toBytes("q"), l
, Bytes
288 .toBytes("version3")));
290 admin
.flush(tableName
);
291 scanner
= table
.getScanner(scan
);
292 result
= scanner
.next();
293 while (result
!= null) {
294 List
<Cell
> cells
= result
.getColumnCells(Bytes
.toBytes("col"), Bytes
.toBytes("q"));
295 for (Cell _c
: cells
) {
296 if (Bytes
.toString(_c
.getRowArray(), _c
.getRowOffset(), _c
.getRowLength())
299 .println(Bytes
.toString(_c
.getRowArray(), _c
.getRowOffset(), _c
.getRowLength()));
300 System
.out
.println(Bytes
.toString(_c
.getQualifierArray(), _c
.getQualifierOffset(),
301 _c
.getQualifierLength()));
303 Bytes
.toString(_c
.getValueArray(), _c
.getValueOffset(), _c
.getValueLength()));
304 Assert
.assertEquals("version3",
305 Bytes
.toString(_c
.getValueArray(), _c
.getValueOffset(), _c
.getValueLength()));
308 result
= scanner
.next();
314 private Scan
createScan() {
315 Scan scan
= new Scan();
316 scan
.readVersions(3);
321 public static void tearDownAfterClass() throws Exception
{
322 TEST_UTIL
.shutdownMiniCluster();