HBASE-20457 Return immediately for a scan rpc call when we want to switch from pread...
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / regionserver / TestSwitchToStreamRead.java
blob815643d441ea31ede2f4aa20d984fd13b969a771
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.regionserver;
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertFalse;
22 import static org.junit.Assert.assertTrue;
24 import java.io.IOException;
25 import java.util.ArrayList;
26 import java.util.List;
27 import java.util.concurrent.ThreadLocalRandom;
28 import org.apache.hadoop.hbase.Cell;
29 import org.apache.hadoop.hbase.HBaseClassTestRule;
30 import org.apache.hadoop.hbase.HBaseTestingUtility;
31 import org.apache.hadoop.hbase.TableName;
32 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
33 import org.apache.hadoop.hbase.client.Put;
34 import org.apache.hadoop.hbase.client.Result;
35 import org.apache.hadoop.hbase.client.Scan;
36 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
37 import org.apache.hadoop.hbase.filter.Filter;
38 import org.apache.hadoop.hbase.filter.FilterBase;
39 import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
40 import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
41 import org.apache.hadoop.hbase.testclassification.MediumTests;
42 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
43 import org.apache.hadoop.hbase.util.Bytes;
44 import org.junit.AfterClass;
45 import org.junit.BeforeClass;
46 import org.junit.ClassRule;
47 import org.junit.Ignore;
48 import org.junit.Test;
49 import org.junit.experimental.categories.Category;
51 @Category({ RegionServerTests.class, MediumTests.class })
52 public class TestSwitchToStreamRead {
54 @ClassRule
55 public static final HBaseClassTestRule CLASS_RULE =
56 HBaseClassTestRule.forClass(TestSwitchToStreamRead.class);
58 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
60 private static TableName TABLE_NAME = TableName.valueOf("stream");
62 private static byte[] FAMILY = Bytes.toBytes("cf");
64 private static byte[] QUAL = Bytes.toBytes("cq");
66 private static String VALUE_PREFIX;
68 private static HRegion REGION;
70 @BeforeClass
71 public static void setUp() throws IOException {
72 UTIL.getConfiguration().setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 2048);
73 StringBuilder sb = new StringBuilder(256);
74 for (int i = 0; i < 255; i++) {
75 sb.append((char) ThreadLocalRandom.current().nextInt('A', 'z' + 1));
77 VALUE_PREFIX = sb.append("-").toString();
78 REGION = UTIL.createLocalHRegion(
79 TableDescriptorBuilder.newBuilder(TABLE_NAME)
80 .setColumnFamily(
81 ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setBlocksize(1024).build())
82 .build(),
83 null, null);
84 for (int i = 0; i < 900; i++) {
85 REGION
86 .put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i)));
88 REGION.flush(true);
89 for (int i = 900; i < 1000; i++) {
90 REGION
91 .put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i)));
95 @AfterClass
96 public static void tearDown() throws IOException {
97 REGION.close(true);
98 UTIL.cleanupTestDir();
101 @Test
102 public void test() throws IOException {
103 try (RegionScannerImpl scanner = REGION.getScanner(new Scan())) {
104 StoreScanner storeScanner =
105 (StoreScanner) (scanner).getStoreHeapForTesting().getCurrentForTesting();
106 for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
107 if (kvs instanceof StoreFileScanner) {
108 StoreFileScanner sfScanner = (StoreFileScanner) kvs;
109 // starting from pread so we use shared reader here.
110 assertTrue(sfScanner.getReader().shared);
113 List<Cell> cells = new ArrayList<>();
114 for (int i = 0; i < 500; i++) {
115 assertTrue(scanner.next(cells));
116 Result result = Result.create(cells);
117 assertEquals(VALUE_PREFIX + i, Bytes.toString(result.getValue(FAMILY, QUAL)));
118 cells.clear();
119 scanner.shipped();
121 for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
122 if (kvs instanceof StoreFileScanner) {
123 StoreFileScanner sfScanner = (StoreFileScanner) kvs;
124 // we should have convert to use stream read now.
125 assertFalse(sfScanner.getReader().shared);
128 for (int i = 500; i < 1000; i++) {
129 assertEquals(i != 999, scanner.next(cells));
130 Result result = Result.create(cells);
131 assertEquals(VALUE_PREFIX + i, Bytes.toString(result.getValue(FAMILY, QUAL)));
132 cells.clear();
133 scanner.shipped();
136 // make sure all scanners are closed.
137 for (HStoreFile sf : REGION.getStore(FAMILY).getStorefiles()) {
138 assertFalse(sf.isReferencedInReads());
142 public static final class MatchLastRowKeyFilter extends FilterBase {
144 @Override
145 public boolean filterRowKey(Cell cell) throws IOException {
146 return Bytes.toInt(cell.getRowArray(), cell.getRowOffset()) != 999;
150 private void testFilter(Filter filter) throws IOException {
151 try (RegionScannerImpl scanner = REGION.getScanner(new Scan().setFilter(filter))) {
152 StoreScanner storeScanner =
153 (StoreScanner) (scanner).getStoreHeapForTesting().getCurrentForTesting();
154 for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
155 if (kvs instanceof StoreFileScanner) {
156 StoreFileScanner sfScanner = (StoreFileScanner) kvs;
157 // starting from pread so we use shared reader here.
158 assertTrue(sfScanner.getReader().shared);
161 List<Cell> cells = new ArrayList<>();
162 // should return before finishing the scan as we want to switch from pread to stream
163 assertTrue(scanner.next(cells,
164 ScannerContext.newBuilder().setTimeLimit(LimitScope.BETWEEN_CELLS, -1).build()));
165 assertTrue(cells.isEmpty());
166 scanner.shipped();
168 for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
169 if (kvs instanceof StoreFileScanner) {
170 StoreFileScanner sfScanner = (StoreFileScanner) kvs;
171 // we should have convert to use stream read now.
172 assertFalse(sfScanner.getReader().shared);
175 assertFalse(scanner.next(cells,
176 ScannerContext.newBuilder().setTimeLimit(LimitScope.BETWEEN_CELLS, -1).build()));
177 Result result = Result.create(cells);
178 assertEquals(VALUE_PREFIX + 999, Bytes.toString(result.getValue(FAMILY, QUAL)));
179 cells.clear();
180 scanner.shipped();
182 // make sure all scanners are closed.
183 for (HStoreFile sf : REGION.getStore(FAMILY).getStorefiles()) {
184 assertFalse(sf.isReferencedInReads());
188 // We use a different logic to implement filterRowKey, where we will keep calling kvHeap.next
189 // until the row key is changed. And there we can only use NoLimitScannerContext so we can not
190 // make the upper layer return immediately. Simply do not use NoLimitScannerContext will lead to
191 // an infinite loop. Need to dig more, the code are way too complicated...
192 @Ignore
193 @Test
194 public void testFilterRowKey() throws IOException {
195 testFilter(new MatchLastRowKeyFilter());
198 public static final class MatchLastRowCellNextColFilter extends FilterBase {
200 @Override
201 public ReturnCode filterCell(Cell c) throws IOException {
202 if (Bytes.toInt(c.getRowArray(), c.getRowOffset()) == 999) {
203 return ReturnCode.INCLUDE;
204 } else {
205 return ReturnCode.NEXT_COL;
210 @Test
211 public void testFilterCellNextCol() throws IOException {
212 testFilter(new MatchLastRowCellNextColFilter());
215 public static final class MatchLastRowCellNextRowFilter extends FilterBase {
217 @Override
218 public ReturnCode filterCell(Cell c) throws IOException {
219 if (Bytes.toInt(c.getRowArray(), c.getRowOffset()) == 999) {
220 return ReturnCode.INCLUDE;
221 } else {
222 return ReturnCode.NEXT_ROW;
227 @Test
228 public void testFilterCellNextRow() throws IOException {
229 testFilter(new MatchLastRowCellNextRowFilter());
232 public static final class MatchLastRowFilterRowFilter extends FilterBase {
234 private boolean exclude;
236 @Override
237 public void filterRowCells(List<Cell> kvs) throws IOException {
238 Cell c = kvs.get(0);
239 exclude = Bytes.toInt(c.getRowArray(), c.getRowOffset()) != 999;
242 @Override
243 public void reset() throws IOException {
244 exclude = false;
247 @Override
248 public boolean filterRow() throws IOException {
249 return exclude;
252 @Override
253 public boolean hasFilterRow() {
254 return true;
258 @Test
259 public void testFilterRow() throws IOException {
260 testFilter(new MatchLastRowFilterRowFilter());