2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.regionserver
;
20 import static org
.apache
.hadoop
.hbase
.HBaseTestingUtility
.START_KEY_BYTES
;
21 import static org
.apache
.hadoop
.hbase
.HBaseTestingUtility
.fam1
;
22 import static org
.apache
.hadoop
.hbase
.HBaseTestingUtility
.fam2
;
23 import static org
.junit
.Assert
.assertEquals
;
24 import static org
.junit
.Assert
.assertFalse
;
25 import static org
.junit
.Assert
.assertNotNull
;
26 import static org
.junit
.Assert
.assertTrue
;
27 import static org
.junit
.Assert
.fail
;
28 import java
.io
.IOException
;
29 import java
.util
.ArrayList
;
30 import java
.util
.List
;
31 import org
.apache
.hadoop
.hbase
.Cell
;
32 import org
.apache
.hadoop
.hbase
.CellUtil
;
33 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
34 import org
.apache
.hadoop
.hbase
.HBaseTestCase
;
35 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
36 import org
.apache
.hadoop
.hbase
.HConstants
;
37 import org
.apache
.hadoop
.hbase
.HRegionInfo
;
38 import org
.apache
.hadoop
.hbase
.HTableDescriptor
;
39 import org
.apache
.hadoop
.hbase
.TableName
;
40 import org
.apache
.hadoop
.hbase
.UnknownScannerException
;
41 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptorBuilder
;
42 import org
.apache
.hadoop
.hbase
.client
.Delete
;
43 import org
.apache
.hadoop
.hbase
.client
.Get
;
44 import org
.apache
.hadoop
.hbase
.client
.Put
;
45 import org
.apache
.hadoop
.hbase
.client
.Result
;
46 import org
.apache
.hadoop
.hbase
.client
.ResultScanner
;
47 import org
.apache
.hadoop
.hbase
.client
.Scan
;
48 import org
.apache
.hadoop
.hbase
.client
.Table
;
49 import org
.apache
.hadoop
.hbase
.client
.TableDescriptorBuilder
;
50 import org
.apache
.hadoop
.hbase
.filter
.Filter
;
51 import org
.apache
.hadoop
.hbase
.filter
.InclusiveStopFilter
;
52 import org
.apache
.hadoop
.hbase
.filter
.PrefixFilter
;
53 import org
.apache
.hadoop
.hbase
.filter
.WhileMatchFilter
;
54 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
55 import org
.apache
.hadoop
.hbase
.testclassification
.RegionServerTests
;
56 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
57 import org
.junit
.ClassRule
;
58 import org
.junit
.Rule
;
59 import org
.junit
.Test
;
60 import org
.junit
.experimental
.categories
.Category
;
61 import org
.junit
.rules
.TestName
;
62 import org
.slf4j
.Logger
;
63 import org
.slf4j
.LoggerFactory
;
66 * Test of a long-lived scanner validating as we go.
68 @Category({RegionServerTests
.class, MediumTests
.class})
69 public class TestScanner
{
72 public static final HBaseClassTestRule CLASS_RULE
=
73 HBaseClassTestRule
.forClass(TestScanner
.class);
75 @Rule public TestName name
= new TestName();
77 private static final Logger LOG
= LoggerFactory
.getLogger(TestScanner
.class);
78 private final static HBaseTestingUtility TEST_UTIL
= new HBaseTestingUtility();
80 private static final byte [] FIRST_ROW
= HConstants
.EMPTY_START_ROW
;
81 private static final byte [][] COLS
= { HConstants
.CATALOG_FAMILY
};
82 private static final byte [][] EXPLICIT_COLS
= {
83 HConstants
.REGIONINFO_QUALIFIER
, HConstants
.SERVER_QUALIFIER
,
85 //HConstants.STARTCODE_QUALIFIER
88 static final TableDescriptorBuilder
.ModifyableTableDescriptor TESTTABLEDESC
=
89 new TableDescriptorBuilder
.ModifyableTableDescriptor(TableName
.valueOf("testscanner"));
91 TESTTABLEDESC
.setColumnFamily(
92 new ColumnFamilyDescriptorBuilder
.ModifyableColumnFamilyDescriptor(HConstants
.CATALOG_FAMILY
)
93 // Ten is an arbitrary number. Keep versions to help debugging.
95 .setBlockCacheEnabled(false)
96 .setBlocksize(8 * 1024)
99 /** HRegionInfo for root region */
100 public static final HRegionInfo REGION_INFO
=
101 new HRegionInfo(TESTTABLEDESC
.getTableName(), HConstants
.EMPTY_BYTE_ARRAY
,
102 HConstants
.EMPTY_BYTE_ARRAY
);
104 private static final byte [] ROW_KEY
= REGION_INFO
.getRegionName();
106 private static final long START_CODE
= Long
.MAX_VALUE
;
108 private HRegion region
;
110 private byte[] firstRowBytes
, secondRowBytes
, thirdRowBytes
;
111 final private byte[] col1
;
113 public TestScanner() {
116 firstRowBytes
= START_KEY_BYTES
;
117 secondRowBytes
= START_KEY_BYTES
.clone();
118 // Increment the least significant character so we get to next row.
119 secondRowBytes
[START_KEY_BYTES
.length
- 1]++;
120 thirdRowBytes
= START_KEY_BYTES
.clone();
121 thirdRowBytes
[START_KEY_BYTES
.length
- 1] =
122 (byte) (thirdRowBytes
[START_KEY_BYTES
.length
- 1] + 2);
123 col1
= Bytes
.toBytes("column1");
127 * Test basic stop row filter works.
130 public void testStopRow() throws Exception
{
131 byte [] startrow
= Bytes
.toBytes("bbb");
132 byte [] stoprow
= Bytes
.toBytes("ccc");
134 this.region
= TEST_UTIL
.createLocalHRegion(TESTTABLEDESC
, null, null);
135 HBaseTestCase
.addContent(this.region
, HConstants
.CATALOG_FAMILY
);
136 List
<Cell
> results
= new ArrayList
<>();
137 // Do simple test of getting one row only first.
138 Scan scan
= new Scan(Bytes
.toBytes("abc"), Bytes
.toBytes("abd"));
139 scan
.addFamily(HConstants
.CATALOG_FAMILY
);
141 InternalScanner s
= region
.getScanner(scan
);
143 while (s
.next(results
)) {
147 assertEquals(0, count
);
148 // Now do something a bit more imvolved.
149 scan
= new Scan(startrow
, stoprow
);
150 scan
.addFamily(HConstants
.CATALOG_FAMILY
);
152 s
= region
.getScanner(scan
);
155 results
= new ArrayList
<>();
156 for (boolean first
= true; s
.next(results
);) {
159 assertTrue(CellUtil
.matchingRows(kv
, startrow
));
164 assertTrue(Bytes
.BYTES_COMPARATOR
.compare(stoprow
, CellUtil
.cloneRow(kv
)) > 0);
165 // We got something back.
166 assertTrue(count
> 10);
169 HBaseTestingUtility
.closeRegionAndWAL(this.region
);
173 void rowPrefixFilter(Scan scan
) throws IOException
{
174 List
<Cell
> results
= new ArrayList
<>();
175 scan
.addFamily(HConstants
.CATALOG_FAMILY
);
176 InternalScanner s
= region
.getScanner(scan
);
177 boolean hasMore
= true;
179 hasMore
= s
.next(results
);
180 for (Cell kv
: results
) {
181 assertEquals((byte)'a', CellUtil
.cloneRow(kv
)[0]);
182 assertEquals((byte)'b', CellUtil
.cloneRow(kv
)[1]);
189 void rowInclusiveStopFilter(Scan scan
, byte[] stopRow
) throws IOException
{
190 List
<Cell
> results
= new ArrayList
<>();
191 scan
.addFamily(HConstants
.CATALOG_FAMILY
);
192 InternalScanner s
= region
.getScanner(scan
);
193 boolean hasMore
= true;
195 hasMore
= s
.next(results
);
196 for (Cell kv
: results
) {
197 assertTrue(Bytes
.compareTo(CellUtil
.cloneRow(kv
), stopRow
) <= 0);
205 public void testFilters() throws IOException
{
207 this.region
= TEST_UTIL
.createLocalHRegion(TESTTABLEDESC
, null, null);
208 HBaseTestCase
.addContent(this.region
, HConstants
.CATALOG_FAMILY
);
209 byte [] prefix
= Bytes
.toBytes("ab");
210 Filter newFilter
= new PrefixFilter(prefix
);
211 Scan scan
= new Scan();
212 scan
.setFilter(newFilter
);
213 rowPrefixFilter(scan
);
215 byte[] stopRow
= Bytes
.toBytes("bbc");
216 newFilter
= new WhileMatchFilter(new InclusiveStopFilter(stopRow
));
218 scan
.setFilter(newFilter
);
219 rowInclusiveStopFilter(scan
, stopRow
);
222 HBaseTestingUtility
.closeRegionAndWAL(this.region
);
227 * Test that closing a scanner while a client is using it doesn't throw
228 * NPEs but instead a UnknownScannerException. HBASE-2503
231 public void testRaceBetweenClientAndTimeout() throws Exception
{
233 this.region
= TEST_UTIL
.createLocalHRegion(TESTTABLEDESC
, null, null);
234 HBaseTestCase
.addContent(this.region
, HConstants
.CATALOG_FAMILY
);
235 Scan scan
= new Scan();
236 InternalScanner s
= region
.getScanner(scan
);
237 List
<Cell
> results
= new ArrayList
<>();
242 fail("We don't want anything more, we should be failing");
243 } catch (UnknownScannerException ex
) {
248 HBaseTestingUtility
.closeRegionAndWAL(this.region
);
255 public void testScanner() throws IOException
{
257 region
= TEST_UTIL
.createLocalHRegion(TESTTABLEDESC
, null, null);
258 Table table
= new RegionAsTable(region
);
260 // Write information to the meta table
262 Put put
= new Put(ROW_KEY
, System
.currentTimeMillis());
264 put
.addColumn(HConstants
.CATALOG_FAMILY
, HConstants
.REGIONINFO_QUALIFIER
,
265 REGION_INFO
.toByteArray());
268 // What we just committed is in the memstore. Verify that we can get
269 // it back both with scanning and get
272 getRegionInfo(table
);
276 ((HRegion
)region
).close();
277 region
= HRegion
.openHRegion(region
, null);
278 table
= new RegionAsTable(region
);
280 // Verify we can get the data back now that it is on disk.
283 getRegionInfo(table
);
285 // Store some new information
287 String address
= HConstants
.LOCALHOST_IP
+ ":" + HBaseTestingUtility
.randomFreePort();
289 put
= new Put(ROW_KEY
, System
.currentTimeMillis());
290 put
.addColumn(HConstants
.CATALOG_FAMILY
, HConstants
.SERVER_QUALIFIER
,
291 Bytes
.toBytes(address
));
293 // put.add(HConstants.COL_STARTCODE, Bytes.toBytes(START_CODE));
297 // Validate that we can still get the HRegionInfo, even though it is in
298 // an older row on disk and there is a newer row in the memstore
300 scan(true, address
.toString());
301 getRegionInfo(table
);
304 this.region
.flush(true);
308 scan(true, address
.toString());
309 getRegionInfo(table
);
313 ((HRegion
)region
).close();
314 region
= HRegion
.openHRegion(region
,null);
315 table
= new RegionAsTable(region
);
319 scan(true, address
.toString());
320 getRegionInfo(table
);
322 // Now update the information again
324 address
= "bar.foo.com:4321";
326 put
= new Put(ROW_KEY
, System
.currentTimeMillis());
328 put
.addColumn(HConstants
.CATALOG_FAMILY
, HConstants
.SERVER_QUALIFIER
, Bytes
.toBytes(address
));
333 scan(true, address
.toString());
334 getRegionInfo(table
);
342 scan(true, address
.toString());
343 getRegionInfo(table
);
347 ((HRegion
)this.region
).close();
348 this.region
= HRegion
.openHRegion(region
, null);
349 table
= new RegionAsTable(this.region
);
353 scan(true, address
.toString());
354 getRegionInfo(table
);
358 HBaseTestingUtility
.closeRegionAndWAL(this.region
);
362 /** Compare the HRegionInfo we read from HBase to what we stored */
363 private void validateRegionInfo(byte [] regionBytes
) throws IOException
{
364 HRegionInfo info
= HRegionInfo
.parseFromOrNull(regionBytes
);
366 assertEquals(REGION_INFO
.getRegionId(), info
.getRegionId());
367 assertEquals(0, info
.getStartKey().length
);
368 assertEquals(0, info
.getEndKey().length
);
369 assertEquals(0, Bytes
.compareTo(info
.getRegionName(), REGION_INFO
.getRegionName()));
370 //assertEquals(0, info.getTableDesc().compareTo(REGION_INFO.getTableDesc()));
373 /** Use a scanner to get the region info and then validate the results */
374 private void scan(boolean validateStartcode
, String serverName
)
376 InternalScanner scanner
= null;
378 List
<Cell
> results
= new ArrayList
<>();
379 byte [][][] scanColumns
= {COLS
, EXPLICIT_COLS
};
380 for(int i
= 0; i
< scanColumns
.length
; i
++) {
382 scan
= new Scan(FIRST_ROW
);
383 for (int ii
= 0; ii
< EXPLICIT_COLS
.length
; ii
++) {
384 scan
.addColumn(COLS
[0], EXPLICIT_COLS
[ii
]);
386 scanner
= region
.getScanner(scan
);
387 while (scanner
.next(results
)) {
388 assertTrue(hasColumn(results
, HConstants
.CATALOG_FAMILY
,
389 HConstants
.REGIONINFO_QUALIFIER
));
390 byte [] val
= CellUtil
.cloneValue(getColumn(results
, HConstants
.CATALOG_FAMILY
,
391 HConstants
.REGIONINFO_QUALIFIER
));
392 validateRegionInfo(val
);
393 if(validateStartcode
) {
394 // assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY,
395 // HConstants.STARTCODE_QUALIFIER));
396 // val = getColumn(results, HConstants.CATALOG_FAMILY,
397 // HConstants.STARTCODE_QUALIFIER).getValue();
399 assertFalse(val
.length
== 0);
400 long startCode
= Bytes
.toLong(val
);
401 assertEquals(START_CODE
, startCode
);
404 if(serverName
!= null) {
405 assertTrue(hasColumn(results
, HConstants
.CATALOG_FAMILY
,
406 HConstants
.SERVER_QUALIFIER
));
407 val
= CellUtil
.cloneValue(getColumn(results
, HConstants
.CATALOG_FAMILY
,
408 HConstants
.SERVER_QUALIFIER
));
410 assertFalse(val
.length
== 0);
411 String server
= Bytes
.toString(val
);
412 assertEquals(0, server
.compareTo(serverName
));
416 InternalScanner s
= scanner
;
425 private boolean hasColumn(final List
<Cell
> kvs
, final byte [] family
,
426 final byte [] qualifier
) {
428 if (CellUtil
.matchingFamily(kv
, family
) && CellUtil
.matchingQualifier(kv
, qualifier
)) {
435 private Cell
getColumn(final List
<Cell
> kvs
, final byte [] family
,
436 final byte [] qualifier
) {
438 if (CellUtil
.matchingFamily(kv
, family
) && CellUtil
.matchingQualifier(kv
, qualifier
)) {
446 /** Use get to retrieve the HRegionInfo and validate it */
447 private void getRegionInfo(Table table
) throws IOException
{
448 Get get
= new Get(ROW_KEY
);
449 get
.addColumn(HConstants
.CATALOG_FAMILY
, HConstants
.REGIONINFO_QUALIFIER
);
450 Result result
= table
.get(get
);
451 byte [] bytes
= result
.value();
452 validateRegionInfo(bytes
);
456 * Tests to do a sync flush during the middle of a scan. This is testing the StoreScanner
457 * update readers code essentially. This is not highly concurrent, since its all 1 thread.
461 public void testScanAndSyncFlush() throws Exception
{
462 this.region
= TEST_UTIL
.createLocalHRegion(TESTTABLEDESC
, null, null);
463 Table hri
= new RegionAsTable(region
);
466 HBaseTestCase
.addContent(hri
, Bytes
.toString(HConstants
.CATALOG_FAMILY
),
467 Bytes
.toString(HConstants
.REGIONINFO_QUALIFIER
)));
468 int count
= count(hri
, -1, false);
469 assertEquals(count
, count(hri
, 100, false)); // do a sync flush.
470 } catch (Exception e
) {
471 LOG
.error("Failed", e
);
474 HBaseTestingUtility
.closeRegionAndWAL(this.region
);
479 * Tests to do a concurrent flush (using a 2nd thread) while scanning. This tests both
480 * the StoreScanner update readers and the transition from memstore -> snapshot -> store file.
483 public void testScanAndRealConcurrentFlush() throws Exception
{
484 this.region
= TEST_UTIL
.createLocalHRegion(TESTTABLEDESC
, null, null);
485 Table hri
= new RegionAsTable(region
);
488 HBaseTestCase
.addContent(hri
, Bytes
.toString(HConstants
.CATALOG_FAMILY
),
489 Bytes
.toString(HConstants
.REGIONINFO_QUALIFIER
)));
490 int count
= count(hri
, -1, false);
491 assertEquals(count
, count(hri
, 100, true)); // do a true concurrent background thread flush
492 } catch (Exception e
) {
493 LOG
.error("Failed", e
);
496 HBaseTestingUtility
.closeRegionAndWAL(this.region
);
501 * Make sure scanner returns correct result when we run a major compaction
505 @SuppressWarnings("deprecation")
506 public void testScanAndConcurrentMajorCompact() throws Exception
{
507 HTableDescriptor htd
= TEST_UTIL
.createTableDescriptor(TableName
.valueOf(name
.getMethodName()),
508 ColumnFamilyDescriptorBuilder
.DEFAULT_MIN_VERSIONS
, 3, HConstants
.FOREVER
,
509 ColumnFamilyDescriptorBuilder
.DEFAULT_KEEP_DELETED
);
510 this.region
= TEST_UTIL
.createLocalHRegion(htd
, null, null);
511 Table hri
= new RegionAsTable(region
);
514 HBaseTestCase
.addContent(hri
, Bytes
.toString(fam1
), Bytes
.toString(col1
),
515 firstRowBytes
, secondRowBytes
);
516 HBaseTestCase
.addContent(hri
, Bytes
.toString(fam2
), Bytes
.toString(col1
),
517 firstRowBytes
, secondRowBytes
);
519 Delete dc
= new Delete(firstRowBytes
);
520 /* delete column1 of firstRow */
521 dc
.addColumns(fam1
, col1
);
525 HBaseTestCase
.addContent(hri
, Bytes
.toString(fam1
), Bytes
.toString(col1
),
526 secondRowBytes
, thirdRowBytes
);
527 HBaseTestCase
.addContent(hri
, Bytes
.toString(fam2
), Bytes
.toString(col1
),
528 secondRowBytes
, thirdRowBytes
);
531 InternalScanner s
= region
.getScanner(new Scan());
532 // run a major compact, column1 of firstRow will be cleaned.
533 region
.compact(true);
535 List
<Cell
> results
= new ArrayList
<>();
538 // make sure returns column2 of firstRow
539 assertTrue("result is not correct, keyValues : " + results
,
540 results
.size() == 1);
541 assertTrue(CellUtil
.matchingRows(results
.get(0), firstRowBytes
));
542 assertTrue(CellUtil
.matchingFamily(results
.get(0), fam2
));
544 results
= new ArrayList
<>();
548 assertTrue(results
.size() == 2);
549 assertTrue(CellUtil
.matchingRows(results
.get(0), secondRowBytes
));
550 assertTrue(CellUtil
.matchingFamily(results
.get(0), fam1
));
551 assertTrue(CellUtil
.matchingFamily(results
.get(1), fam2
));
553 HBaseTestingUtility
.closeRegionAndWAL(this.region
);
560 * @param flushIndex At what row we start the flush.
561 * @param concurrent if the flush should be concurrent or sync.
562 * @return Count of rows found.
563 * @throws IOException
565 private int count(final Table countTable
, final int flushIndex
, boolean concurrent
)
567 LOG
.info("Taking out counting scan");
568 Scan scan
= new Scan();
569 for (byte [] qualifier
: EXPLICIT_COLS
) {
570 scan
.addColumn(HConstants
.CATALOG_FAMILY
, qualifier
);
572 ResultScanner s
= countTable
.getScanner(scan
);
574 boolean justFlushed
= false;
575 while (s
.next() != null) {
577 LOG
.info("after next() just after next flush");
581 if (flushIndex
== count
) {
582 LOG
.info("Starting flush at flush index " + flushIndex
);
583 Thread t
= new Thread() {
588 LOG
.info("Finishing flush");
589 } catch (IOException e
) {
590 LOG
.info("Failed flush cache");
595 t
.start(); // concurrently flush.
597 t
.run(); // sync flush
599 LOG
.info("Continuing on after kicking off background flush");
604 LOG
.info("Found " + count
+ " items");