2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.hamcrest
.CoreMatchers
.instanceOf
;
21 import static org
.hamcrest
.MatcherAssert
.assertThat
;
22 import static org
.junit
.Assert
.assertEquals
;
23 import static org
.junit
.Assert
.assertTrue
;
24 import static org
.junit
.Assert
.fail
;
26 import java
.io
.IOException
;
27 import java
.util
.ArrayList
;
28 import java
.util
.List
;
29 import java
.util
.NavigableSet
;
30 import java
.util
.concurrent
.atomic
.AtomicBoolean
;
31 import java
.util
.concurrent
.atomic
.AtomicLong
;
32 import org
.apache
.hadoop
.conf
.Configuration
;
33 import org
.apache
.hadoop
.fs
.FileSystem
;
34 import org
.apache
.hadoop
.fs
.Path
;
35 import org
.apache
.hadoop
.hbase
.Cell
;
36 import org
.apache
.hadoop
.hbase
.DoNotRetryIOException
;
37 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
38 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
39 import org
.apache
.hadoop
.hbase
.HConstants
;
40 import org
.apache
.hadoop
.hbase
.TableName
;
41 import org
.apache
.hadoop
.hbase
.exceptions
.ScannerResetException
;
42 import org
.apache
.hadoop
.hbase
.regionserver
.DelegatingKeyValueScanner
;
43 import org
.apache
.hadoop
.hbase
.regionserver
.HRegion
;
44 import org
.apache
.hadoop
.hbase
.regionserver
.HStore
;
45 import org
.apache
.hadoop
.hbase
.regionserver
.KeyValueScanner
;
46 import org
.apache
.hadoop
.hbase
.regionserver
.RegionServerServices
;
47 import org
.apache
.hadoop
.hbase
.regionserver
.ReversedStoreScanner
;
48 import org
.apache
.hadoop
.hbase
.regionserver
.ScanInfo
;
49 import org
.apache
.hadoop
.hbase
.regionserver
.StoreScanner
;
50 import org
.apache
.hadoop
.hbase
.testclassification
.ClientTests
;
51 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
52 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
53 import org
.apache
.hadoop
.hbase
.wal
.WAL
;
54 import org
.junit
.AfterClass
;
55 import org
.junit
.BeforeClass
;
56 import org
.junit
.ClassRule
;
57 import org
.junit
.Rule
;
58 import org
.junit
.Test
;
59 import org
.junit
.experimental
.categories
.Category
;
60 import org
.junit
.rules
.TestName
;
62 @Category({ MediumTests
.class, ClientTests
.class })
63 public class TestFromClientSideScanExcpetion
{
66 public static final HBaseClassTestRule CLASS_RULE
=
67 HBaseClassTestRule
.forClass(TestFromClientSideScanExcpetion
.class);
69 protected final static HBaseTestingUtil TEST_UTIL
= new HBaseTestingUtil();
71 private static byte[] FAMILY
= Bytes
.toBytes("testFamily");
73 private static int SLAVES
= 3;
76 public TestName name
= new TestName();
79 public static void setUpBeforeClass() throws Exception
{
80 Configuration conf
= TEST_UTIL
.getConfiguration();
81 conf
.setInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
, 3);
82 conf
.setLong(HConstants
.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD
, 6000000);
83 conf
.setClass(HConstants
.REGION_IMPL
, MyHRegion
.class, HRegion
.class);
84 conf
.setBoolean("hbase.client.log.scanner.activity", true);
85 // We need more than one region server in this test
86 TEST_UTIL
.startMiniCluster(SLAVES
);
90 public static void tearDownAfterClass() throws Exception
{
91 TEST_UTIL
.shutdownMiniCluster();
94 private static AtomicBoolean ON
= new AtomicBoolean(false);
95 private static AtomicLong REQ_COUNT
= new AtomicLong(0);
96 private static AtomicBoolean IS_DO_NOT_RETRY
= new AtomicBoolean(false); // whether to throw
98 private static AtomicBoolean THROW_ONCE
= new AtomicBoolean(true); // whether to only throw once
100 private static void reset() {
103 IS_DO_NOT_RETRY
.set(false);
104 THROW_ONCE
.set(true);
107 private static void inject() {
111 public static final class MyHRegion
extends HRegion
{
113 @SuppressWarnings("deprecation")
114 public MyHRegion(Path tableDir
, WAL wal
, FileSystem fs
, Configuration confParam
,
115 RegionInfo regionInfo
, TableDescriptor htd
, RegionServerServices rsServices
) {
116 super(tableDir
, wal
, fs
, confParam
, regionInfo
, htd
, rsServices
);
120 protected HStore
instantiateHStore(ColumnFamilyDescriptor family
, boolean warmup
)
122 return new MyHStore(this, family
, conf
, warmup
);
126 public static final class MyHStore
extends HStore
{
128 public MyHStore(HRegion region
, ColumnFamilyDescriptor family
, Configuration confParam
,
129 boolean warmup
) throws IOException
{
130 super(region
, family
, confParam
, warmup
);
134 protected KeyValueScanner
createScanner(Scan scan
, ScanInfo scanInfo
,
135 NavigableSet
<byte[]> targetCols
, long readPt
) throws IOException
{
136 return scan
.isReversed() ?
new ReversedStoreScanner(this, scanInfo
, scan
, targetCols
, readPt
)
137 : new MyStoreScanner(this, scanInfo
, scan
, targetCols
, readPt
);
141 public static final class MyStoreScanner
extends StoreScanner
{
142 public MyStoreScanner(HStore store
, ScanInfo scanInfo
, Scan scan
, NavigableSet
<byte[]> columns
,
143 long readPt
) throws IOException
{
144 super(store
, scanInfo
, scan
, columns
, readPt
);
148 protected List
<KeyValueScanner
> selectScannersFrom(HStore store
,
149 List
<?
extends KeyValueScanner
> allScanners
) {
150 List
<KeyValueScanner
> scanners
= super.selectScannersFrom(store
, allScanners
);
151 List
<KeyValueScanner
> newScanners
= new ArrayList
<>(scanners
.size());
152 for (KeyValueScanner scanner
: scanners
) {
153 newScanners
.add(new DelegatingKeyValueScanner(scanner
) {
155 public boolean reseek(Cell key
) throws IOException
{
157 REQ_COUNT
.incrementAndGet();
158 if (!THROW_ONCE
.get() || REQ_COUNT
.get() == 1) {
159 if (IS_DO_NOT_RETRY
.get()) {
160 throw new DoNotRetryIOException("Injected exception");
162 throw new IOException("Injected exception");
166 return super.reseek(key
);
175 * Tests the case where a Scan can throw an IOException in the middle of the seek / reseek leaving
176 * the server side RegionScanner to be in dirty state. The client has to ensure that the
177 * ClientScanner does not get an exception and also sees all the data.
178 * @throws IOException
179 * @throws InterruptedException
182 public void testClientScannerIsResetWhenScanThrowsIOException()
183 throws IOException
, InterruptedException
{
185 THROW_ONCE
.set(true); // throw exceptions only once
186 TableName tableName
= TableName
.valueOf(name
.getMethodName());
187 try (Table t
= TEST_UTIL
.createTable(tableName
, FAMILY
)) {
188 int rowCount
= TEST_UTIL
.loadTable(t
, FAMILY
, false);
189 TEST_UTIL
.getAdmin().flush(tableName
);
191 int actualRowCount
= TEST_UTIL
.countRows(t
, new Scan().addColumn(FAMILY
, FAMILY
));
192 assertEquals(rowCount
, actualRowCount
);
194 assertTrue(REQ_COUNT
.get() > 0);
198 * Tests the case where a coprocessor throws a DoNotRetryIOException in the scan. The expectation
199 * is that the exception will bubble up to the client scanner instead of being retried.
202 public void testScannerThrowsExceptionWhenCoprocessorThrowsDNRIOE()
203 throws IOException
, InterruptedException
{
205 IS_DO_NOT_RETRY
.set(true);
206 TableName tableName
= TableName
.valueOf(name
.getMethodName());
207 try (Table t
= TEST_UTIL
.createTable(tableName
, FAMILY
)) {
208 TEST_UTIL
.loadTable(t
, FAMILY
, false);
209 TEST_UTIL
.getAdmin().flush(tableName
);
211 TEST_UTIL
.countRows(t
, new Scan().addColumn(FAMILY
, FAMILY
));
212 fail("Should have thrown an exception");
213 } catch (DoNotRetryIOException expected
) {
216 assertTrue(REQ_COUNT
.get() > 0);
220 * Tests the case where a coprocessor throws a regular IOException in the scan. The expectation is
221 * that the we will keep on retrying, but fail after the retries are exhausted instead of retrying
225 public void testScannerFailsAfterRetriesWhenCoprocessorThrowsIOE()
226 throws IOException
, InterruptedException
{
227 TableName tableName
= TableName
.valueOf(name
.getMethodName());
229 THROW_ONCE
.set(false); // throw exceptions in every retry
230 try (Table t
= TEST_UTIL
.createTable(tableName
, FAMILY
)) {
231 TEST_UTIL
.loadTable(t
, FAMILY
, false);
232 TEST_UTIL
.getAdmin().flush(tableName
);
234 TEST_UTIL
.countRows(t
, new Scan().addColumn(FAMILY
, FAMILY
));
235 fail("Should have thrown an exception");
236 } catch (ScannerResetException expected
) {
238 } catch (RetriesExhaustedException e
) {
240 assertThat(e
.getCause(), instanceOf(ScannerResetException
.class));
242 assertTrue(REQ_COUNT
.get() >= 3);