2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.regionserver
;
20 import static org
.junit
.Assert
.assertEquals
;
21 import static org
.junit
.Assert
.assertFalse
;
22 import static org
.junit
.Assert
.assertTrue
;
24 import java
.io
.IOException
;
25 import java
.util
.ArrayList
;
26 import java
.util
.List
;
27 import java
.util
.concurrent
.ThreadLocalRandom
;
28 import org
.apache
.hadoop
.hbase
.Cell
;
29 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
30 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
31 import org
.apache
.hadoop
.hbase
.TableName
;
32 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptorBuilder
;
33 import org
.apache
.hadoop
.hbase
.client
.Put
;
34 import org
.apache
.hadoop
.hbase
.client
.Result
;
35 import org
.apache
.hadoop
.hbase
.client
.Scan
;
36 import org
.apache
.hadoop
.hbase
.client
.TableDescriptorBuilder
;
37 import org
.apache
.hadoop
.hbase
.filter
.Filter
;
38 import org
.apache
.hadoop
.hbase
.filter
.FilterBase
;
39 import org
.apache
.hadoop
.hbase
.regionserver
.HRegion
.RegionScannerImpl
;
40 import org
.apache
.hadoop
.hbase
.regionserver
.ScannerContext
.LimitScope
;
41 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
42 import org
.apache
.hadoop
.hbase
.testclassification
.RegionServerTests
;
43 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
44 import org
.junit
.AfterClass
;
45 import org
.junit
.BeforeClass
;
46 import org
.junit
.ClassRule
;
47 import org
.junit
.Ignore
;
48 import org
.junit
.Test
;
49 import org
.junit
.experimental
.categories
.Category
;
51 @Category({ RegionServerTests
.class, MediumTests
.class })
52 public class TestSwitchToStreamRead
{
55 public static final HBaseClassTestRule CLASS_RULE
=
56 HBaseClassTestRule
.forClass(TestSwitchToStreamRead
.class);
58 private static final HBaseTestingUtility UTIL
= new HBaseTestingUtility();
60 private static TableName TABLE_NAME
= TableName
.valueOf("stream");
62 private static byte[] FAMILY
= Bytes
.toBytes("cf");
64 private static byte[] QUAL
= Bytes
.toBytes("cq");
66 private static String VALUE_PREFIX
;
68 private static HRegion REGION
;
71 public static void setUp() throws IOException
{
72 UTIL
.getConfiguration().setLong(StoreScanner
.STORESCANNER_PREAD_MAX_BYTES
, 2048);
73 StringBuilder sb
= new StringBuilder(256);
74 for (int i
= 0; i
< 255; i
++) {
75 sb
.append((char) ThreadLocalRandom
.current().nextInt('A', 'z' + 1));
77 VALUE_PREFIX
= sb
.append("-").toString();
78 REGION
= UTIL
.createLocalHRegion(
79 TableDescriptorBuilder
.newBuilder(TABLE_NAME
)
81 ColumnFamilyDescriptorBuilder
.newBuilder(FAMILY
).setBlocksize(1024).build())
84 for (int i
= 0; i
< 900; i
++) {
86 .put(new Put(Bytes
.toBytes(i
)).addColumn(FAMILY
, QUAL
, Bytes
.toBytes(VALUE_PREFIX
+ i
)));
89 for (int i
= 900; i
< 1000; i
++) {
91 .put(new Put(Bytes
.toBytes(i
)).addColumn(FAMILY
, QUAL
, Bytes
.toBytes(VALUE_PREFIX
+ i
)));
96 public static void tearDown() throws IOException
{
98 UTIL
.cleanupTestDir();
102 public void test() throws IOException
{
103 try (RegionScannerImpl scanner
= REGION
.getScanner(new Scan())) {
104 StoreScanner storeScanner
=
105 (StoreScanner
) (scanner
).getStoreHeapForTesting().getCurrentForTesting();
106 for (KeyValueScanner kvs
: storeScanner
.getAllScannersForTesting()) {
107 if (kvs
instanceof StoreFileScanner
) {
108 StoreFileScanner sfScanner
= (StoreFileScanner
) kvs
;
109 // starting from pread so we use shared reader here.
110 assertTrue(sfScanner
.getReader().shared
);
113 List
<Cell
> cells
= new ArrayList
<>();
114 for (int i
= 0; i
< 500; i
++) {
115 assertTrue(scanner
.next(cells
));
116 Result result
= Result
.create(cells
);
117 assertEquals(VALUE_PREFIX
+ i
, Bytes
.toString(result
.getValue(FAMILY
, QUAL
)));
121 for (KeyValueScanner kvs
: storeScanner
.getAllScannersForTesting()) {
122 if (kvs
instanceof StoreFileScanner
) {
123 StoreFileScanner sfScanner
= (StoreFileScanner
) kvs
;
124 // we should have convert to use stream read now.
125 assertFalse(sfScanner
.getReader().shared
);
128 for (int i
= 500; i
< 1000; i
++) {
129 assertEquals(i
!= 999, scanner
.next(cells
));
130 Result result
= Result
.create(cells
);
131 assertEquals(VALUE_PREFIX
+ i
, Bytes
.toString(result
.getValue(FAMILY
, QUAL
)));
136 // make sure all scanners are closed.
137 for (HStoreFile sf
: REGION
.getStore(FAMILY
).getStorefiles()) {
138 assertFalse(sf
.isReferencedInReads());
142 public static final class MatchLastRowKeyFilter
extends FilterBase
{
145 public boolean filterRowKey(Cell cell
) throws IOException
{
146 return Bytes
.toInt(cell
.getRowArray(), cell
.getRowOffset()) != 999;
150 private void testFilter(Filter filter
) throws IOException
{
151 try (RegionScannerImpl scanner
= REGION
.getScanner(new Scan().setFilter(filter
))) {
152 StoreScanner storeScanner
=
153 (StoreScanner
) (scanner
).getStoreHeapForTesting().getCurrentForTesting();
154 for (KeyValueScanner kvs
: storeScanner
.getAllScannersForTesting()) {
155 if (kvs
instanceof StoreFileScanner
) {
156 StoreFileScanner sfScanner
= (StoreFileScanner
) kvs
;
157 // starting from pread so we use shared reader here.
158 assertTrue(sfScanner
.getReader().shared
);
161 List
<Cell
> cells
= new ArrayList
<>();
162 // should return before finishing the scan as we want to switch from pread to stream
163 assertTrue(scanner
.next(cells
,
164 ScannerContext
.newBuilder().setTimeLimit(LimitScope
.BETWEEN_CELLS
, -1).build()));
165 assertTrue(cells
.isEmpty());
168 for (KeyValueScanner kvs
: storeScanner
.getAllScannersForTesting()) {
169 if (kvs
instanceof StoreFileScanner
) {
170 StoreFileScanner sfScanner
= (StoreFileScanner
) kvs
;
171 // we should have convert to use stream read now.
172 assertFalse(sfScanner
.getReader().shared
);
175 assertFalse(scanner
.next(cells
,
176 ScannerContext
.newBuilder().setTimeLimit(LimitScope
.BETWEEN_CELLS
, -1).build()));
177 Result result
= Result
.create(cells
);
178 assertEquals(VALUE_PREFIX
+ 999, Bytes
.toString(result
.getValue(FAMILY
, QUAL
)));
182 // make sure all scanners are closed.
183 for (HStoreFile sf
: REGION
.getStore(FAMILY
).getStorefiles()) {
184 assertFalse(sf
.isReferencedInReads());
188 // We use a different logic to implement filterRowKey, where we will keep calling kvHeap.next
189 // until the row key is changed. And there we can only use NoLimitScannerContext so we can not
190 // make the upper layer return immediately. Simply do not use NoLimitScannerContext will lead to
191 // an infinite loop. Need to dig more, the code are way too complicated...
194 public void testFilterRowKey() throws IOException
{
195 testFilter(new MatchLastRowKeyFilter());
198 public static final class MatchLastRowCellNextColFilter
extends FilterBase
{
201 public ReturnCode
filterCell(Cell c
) throws IOException
{
202 if (Bytes
.toInt(c
.getRowArray(), c
.getRowOffset()) == 999) {
203 return ReturnCode
.INCLUDE
;
205 return ReturnCode
.NEXT_COL
;
211 public void testFilterCellNextCol() throws IOException
{
212 testFilter(new MatchLastRowCellNextColFilter());
215 public static final class MatchLastRowCellNextRowFilter
extends FilterBase
{
218 public ReturnCode
filterCell(Cell c
) throws IOException
{
219 if (Bytes
.toInt(c
.getRowArray(), c
.getRowOffset()) == 999) {
220 return ReturnCode
.INCLUDE
;
222 return ReturnCode
.NEXT_ROW
;
228 public void testFilterCellNextRow() throws IOException
{
229 testFilter(new MatchLastRowCellNextRowFilter());
232 public static final class MatchLastRowFilterRowFilter
extends FilterBase
{
234 private boolean exclude
;
237 public void filterRowCells(List
<Cell
> kvs
) throws IOException
{
239 exclude
= Bytes
.toInt(c
.getRowArray(), c
.getRowOffset()) != 999;
243 public void reset() throws IOException
{
248 public boolean filterRow() throws IOException
{
253 public boolean hasFilterRow() {
259 public void testFilterRow() throws IOException
{
260 testFilter(new MatchLastRowFilterRowFilter());