HBASE-26481 Consider rolling upgrading from old region replication framework (#3880)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / client / TestAsyncTableScanRenewLease.java
blob0936bbe4049ae41cb7563528ac0d19f7559516aa
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.client;
20 import static org.junit.Assert.assertEquals;
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.stream.Collectors;
25 import java.util.stream.IntStream;
26 import org.apache.hadoop.hbase.HBaseClassTestRule;
27 import org.apache.hadoop.hbase.HBaseTestingUtil;
28 import org.apache.hadoop.hbase.HConstants;
29 import org.apache.hadoop.hbase.TableName;
30 import org.apache.hadoop.hbase.testclassification.ClientTests;
31 import org.apache.hadoop.hbase.testclassification.LargeTests;
32 import org.apache.hadoop.hbase.util.Bytes;
33 import org.apache.hadoop.hbase.util.Threads;
34 import org.junit.AfterClass;
35 import org.junit.BeforeClass;
36 import org.junit.ClassRule;
37 import org.junit.Test;
38 import org.junit.experimental.categories.Category;
40 @Category({ LargeTests.class, ClientTests.class })
41 public class TestAsyncTableScanRenewLease {
43 @ClassRule
44 public static final HBaseClassTestRule CLASS_RULE =
45 HBaseClassTestRule.forClass(TestAsyncTableScanRenewLease.class);
47 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
49 private static TableName TABLE_NAME = TableName.valueOf("async");
51 private static byte[] FAMILY = Bytes.toBytes("cf");
53 private static byte[] CQ = Bytes.toBytes("cq");
55 private static AsyncConnection CONN;
57 private static AsyncTable<AdvancedScanResultConsumer> TABLE;
59 private static int SCANNER_LEASE_TIMEOUT_PERIOD_MS = 5000;
61 @BeforeClass
62 public static void setUp() throws Exception {
63 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
64 SCANNER_LEASE_TIMEOUT_PERIOD_MS);
65 TEST_UTIL.startMiniCluster(1);
66 TEST_UTIL.createTable(TABLE_NAME, FAMILY);
67 CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
68 TABLE = CONN.getTable(TABLE_NAME);
69 TABLE.putAll(IntStream.range(0, 10).mapToObj(
70 i -> new Put(Bytes.toBytes(String.format("%02d", i))).addColumn(FAMILY, CQ, Bytes.toBytes(i)))
71 .collect(Collectors.toList())).get();
74 @AfterClass
75 public static void tearDown() throws Exception {
76 CONN.close();
77 TEST_UTIL.shutdownMiniCluster();
80 private static final class RenewLeaseConsumer implements AdvancedScanResultConsumer {
82 private final List<Result> results = new ArrayList<>();
84 private Throwable error;
86 private boolean finished = false;
88 private boolean suspended = false;
90 @Override
91 public synchronized void onNext(Result[] results, ScanController controller) {
92 for (Result result : results) {
93 this.results.add(result);
95 if (!suspended) {
96 ScanResumer resumer = controller.suspend();
97 new Thread(() -> {
98 Threads.sleep(2 * SCANNER_LEASE_TIMEOUT_PERIOD_MS);
99 try {
100 TABLE.put(new Put(Bytes.toBytes(String.format("%02d", 10))).addColumn(FAMILY, CQ,
101 Bytes.toBytes(10))).get();
102 } catch (Exception e) {
103 onError(e);
105 resumer.resume();
106 }).start();
110 @Override
111 public synchronized void onError(Throwable error) {
112 this.finished = true;
113 this.error = error;
114 notifyAll();
117 @Override
118 public synchronized void onComplete() {
119 this.finished = true;
120 notifyAll();
123 public synchronized List<Result> get() throws Throwable {
124 while (!finished) {
125 wait();
127 if (error != null) {
128 throw error;
130 return results;
134 @Test
135 public void test() throws Throwable {
136 RenewLeaseConsumer consumer = new RenewLeaseConsumer();
137 TABLE.scan(new Scan(), consumer);
138 List<Result> results = consumer.get();
139 // should not see the newly added value
140 assertEquals(10, results.size());
141 IntStream.range(0, 10).forEach(i -> {
142 Result result = results.get(i);
143 assertEquals(String.format("%02d", i), Bytes.toString(result.getRow()));
144 assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ)));
146 // now we can see the newly added value
147 List<Result> results2 = TABLE.scanAll(new Scan()).get();
148 assertEquals(11, results2.size());
149 IntStream.range(0, 11).forEach(i -> {
150 Result result = results2.get(i);
151 assertEquals(String.format("%02d", i), Bytes.toString(result.getRow()));
152 assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ)));