HBASE-24033 Add ut for loading the corrupt recovered hfiles (#1322)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / client / TestConnection.java
blobb92a4d752f7f83aa719825ba1bfe413867753f20
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.client;
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertFalse;
22 import static org.junit.Assert.assertTrue;
24 import com.google.protobuf.ServiceException;
25 import java.io.IOException;
26 import java.util.List;
27 import java.util.Set;
28 import java.util.concurrent.ThreadLocalRandom;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.concurrent.atomic.AtomicReference;
31 import java.util.stream.Collectors;
32 import java.util.stream.IntStream;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.hbase.DoNotRetryIOException;
35 import org.apache.hadoop.hbase.HBaseClassTestRule;
36 import org.apache.hadoop.hbase.HBaseTestingUtility;
37 import org.apache.hadoop.hbase.HConstants;
38 import org.apache.hadoop.hbase.HRegionLocation;
39 import org.apache.hadoop.hbase.ServerName;
40 import org.apache.hadoop.hbase.TableName;
41 import org.apache.hadoop.hbase.Waiter;
42 import org.apache.hadoop.hbase.client.coprocessor.Batch;
43 import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
44 import org.apache.hadoop.hbase.ipc.RpcClient;
45 import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService;
46 import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsResponse;
47 import org.apache.hadoop.hbase.testclassification.LargeTests;
48 import org.apache.hadoop.hbase.util.Bytes;
49 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
50 import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
51 import org.junit.After;
52 import org.junit.AfterClass;
53 import org.junit.Assert;
54 import org.junit.BeforeClass;
55 import org.junit.ClassRule;
56 import org.junit.Rule;
57 import org.junit.Test;
58 import org.junit.experimental.categories.Category;
59 import org.junit.rules.TestName;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
63 import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector;
64 import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector.Level;
66 /**
67 * This class is for testing {@link Connection}.
69 @Category({ LargeTests.class })
70 public class TestConnection {
72 @ClassRule
73 public static final HBaseClassTestRule CLASS_RULE =
74 HBaseClassTestRule.forClass(TestConnection.class);
76 private static final Logger LOG = LoggerFactory.getLogger(TestConnection.class);
77 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
79 private static final byte[] FAM_NAM = Bytes.toBytes("f");
80 private static final byte[] ROW = Bytes.toBytes("bbb");
81 private static final int RPC_RETRY = 5;
83 @Rule
84 public TestName name = new TestName();
86 @BeforeClass
87 public static void setUpBeforeClass() throws Exception {
88 ResourceLeakDetector.setLevel(Level.PARANOID);
89 TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
90 // Up the handlers; this test needs more than usual.
91 TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
92 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY);
93 TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 3);
94 TEST_UTIL.startMiniCluster(2);
98 @AfterClass
99 public static void tearDownAfterClass() throws Exception {
100 TEST_UTIL.shutdownMiniCluster();
103 @After
104 public void tearDown() throws IOException {
105 TEST_UTIL.getAdmin().balancerSwitch(true, true);
109 * Naive test to check that Connection#getAdmin returns a properly constructed HBaseAdmin object
110 * @throws IOException Unable to construct admin
112 @Test
113 public void testAdminFactory() throws IOException {
114 Connection con1 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
115 Admin admin = con1.getAdmin();
116 assertTrue(admin.getConnection() == con1);
117 assertTrue(admin.getConfiguration() == TEST_UTIL.getConfiguration());
118 con1.close();
122 * Test that we can handle connection close: it will trigger a retry, but the calls will finish.
124 @Test
125 public void testConnectionCloseAllowsInterrupt() throws Exception {
126 testConnectionClose(true);
129 @Test
130 public void testConnectionNotAllowsInterrupt() throws Exception {
131 testConnectionClose(false);
134 private void testConnectionClose(boolean allowsInterrupt) throws Exception {
135 TableName tableName = TableName.valueOf("HCM-testConnectionClose" + allowsInterrupt);
136 TEST_UTIL.createTable(tableName, FAM_NAM).close();
138 TEST_UTIL.getAdmin().balancerSwitch(false, true);
140 Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
141 // We want to work on a separate connection.
142 c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
143 c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 100); // retry a lot
144 c2.setInt(HConstants.HBASE_CLIENT_PAUSE, 1); // don't wait between retries.
145 c2.setInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, 0); // Server do not really expire
146 c2.setBoolean(RpcClient.SPECIFIC_WRITE_THREAD, allowsInterrupt);
147 // to avoid the client to be stuck when do the Get
148 c2.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 10000);
149 c2.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 10000);
150 c2.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 5000);
152 Connection connection = ConnectionFactory.createConnection(c2);
153 final Table table = connection.getTable(tableName);
155 Put put = new Put(ROW);
156 put.addColumn(FAM_NAM, ROW, ROW);
157 table.put(put);
159 // 4 steps: ready=0; doGets=1; mustStop=2; stopped=3
160 final AtomicInteger step = new AtomicInteger(0);
162 final AtomicReference<Throwable> failed = new AtomicReference<>(null);
163 Thread t = new Thread("testConnectionCloseThread") {
164 @Override
165 public void run() {
166 int done = 0;
167 try {
168 step.set(1);
169 while (step.get() == 1) {
170 Get get = new Get(ROW);
171 table.get(get);
172 done++;
173 if (done % 100 == 0) {
174 LOG.info("done=" + done);
176 // without the sleep, will cause the exception for too many files in
177 // org.apache.hadoop.hdfs.server.datanode.DataXceiver
178 Thread.sleep(100);
180 } catch (Throwable t) {
181 failed.set(t);
182 LOG.error(t.toString(), t);
184 step.set(3);
187 t.start();
188 TEST_UTIL.waitFor(20000, new Waiter.Predicate<Exception>() {
189 @Override
190 public boolean evaluate() throws Exception {
191 return step.get() == 1;
195 ServerName sn;
196 try (RegionLocator rl = connection.getRegionLocator(tableName)) {
197 sn = rl.getRegionLocation(ROW).getServerName();
200 RpcClient rpcClient = ((AsyncConnectionImpl) connection.toAsyncConnection()).rpcClient;
202 LOG.info("Going to cancel connections. connection=" + connection.toString() + ", sn=" + sn);
203 for (int i = 0; i < 500; i++) {
204 rpcClient.cancelConnections(sn);
205 Thread.sleep(50);
208 step.compareAndSet(1, 2);
209 // The test may fail here if the thread doing the gets is stuck. The way to find
210 // out what's happening is to look for the thread named 'testConnectionCloseThread'
211 TEST_UTIL.waitFor(40000, new Waiter.Predicate<Exception>() {
212 @Override
213 public boolean evaluate() throws Exception {
214 return step.get() == 3;
217 table.close();
218 connection.close();
219 Assert.assertTrue("Unexpected exception is " + failed.get(), failed.get() == null);
223 * Test that connection can become idle without breaking everything.
225 @Test
226 public void testConnectionIdle() throws Exception {
227 final TableName tableName = TableName.valueOf(name.getMethodName());
228 TEST_UTIL.createTable(tableName, FAM_NAM).close();
229 int idleTime = 20000;
230 boolean previousBalance = TEST_UTIL.getAdmin().balancerSwitch(false, true);
232 Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
233 // We want to work on a separate connection.
234 c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
235 c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); // Don't retry: retry = test failed
236 c2.setInt(RpcClient.IDLE_TIME, idleTime);
238 Connection connection = ConnectionFactory.createConnection(c2);
239 final Table table = connection.getTable(tableName);
241 Put put = new Put(ROW);
242 put.addColumn(FAM_NAM, ROW, ROW);
243 table.put(put);
245 ManualEnvironmentEdge mee = new ManualEnvironmentEdge();
246 mee.setValue(System.currentTimeMillis());
247 EnvironmentEdgeManager.injectEdge(mee);
248 LOG.info("first get");
249 table.get(new Get(ROW));
251 LOG.info("first get - changing the time & sleeping");
252 mee.incValue(idleTime + 1000);
253 Thread.sleep(1500); // we need to wait a little for the connection to be seen as idle.
254 // 1500 = sleep time in RpcClient#waitForWork + a margin
256 LOG.info("second get - connection has been marked idle in the middle");
257 // To check that the connection actually became idle would need to read some private
258 // fields of RpcClient.
259 table.get(new Get(ROW));
260 mee.incValue(idleTime + 1000);
262 LOG.info("third get - connection is idle, but the reader doesn't know yet");
263 // We're testing here a special case:
264 // time limit reached BUT connection not yet reclaimed AND a new call.
265 // in this situation, we don't close the connection, instead we use it immediately.
266 // If we're very unlucky we can have a race condition in the test: the connection is already
267 // under closing when we do the get, so we have an exception, and we don't retry as the
268 // retry number is 1. The probability is very very low, and seems acceptable for now. It's
269 // a test issue only.
270 table.get(new Get(ROW));
272 LOG.info("we're done - time will change back");
274 table.close();
276 connection.close();
277 EnvironmentEdgeManager.reset();
278 TEST_UTIL.getAdmin().balancerSwitch(previousBalance, true);
281 @Test
282 public void testClosing() throws Exception {
283 Configuration configuration = new Configuration(TEST_UTIL.getConfiguration());
284 configuration.set(HConstants.HBASE_CLIENT_INSTANCE_ID,
285 String.valueOf(ThreadLocalRandom.current().nextInt()));
287 // as connection caching is going away, now we're just testing
288 // that closed connection does actually get closed.
290 Connection c1 = ConnectionFactory.createConnection(configuration);
291 Connection c2 = ConnectionFactory.createConnection(configuration);
292 // no caching, different connections
293 assertTrue(c1 != c2);
295 // closing independently
296 c1.close();
297 assertTrue(c1.isClosed());
298 assertFalse(c2.isClosed());
300 c2.close();
301 assertTrue(c2.isClosed());
305 * Trivial test to verify that nobody messes with
306 * {@link ConnectionFactory#createConnection(Configuration)}
308 @Test
309 public void testCreateConnection() throws Exception {
310 Configuration configuration = TEST_UTIL.getConfiguration();
311 Connection c1 = ConnectionFactory.createConnection(configuration);
312 Connection c2 = ConnectionFactory.createConnection(configuration);
313 // created from the same configuration, yet they are different
314 assertTrue(c1 != c2);
315 assertTrue(c1.getConfiguration() == c2.getConfiguration());
319 ====> With MasterRegistry, connections cannot outlast the masters' lifetime.
320 @Test
321 public void testConnectionRideOverClusterRestart() throws IOException, InterruptedException {
322 Configuration config = new Configuration(TEST_UTIL.getConfiguration());
324 final TableName tableName = TableName.valueOf(name.getMethodName());
325 TEST_UTIL.createTable(tableName, new byte[][] { FAM_NAM }).close();
327 Connection connection = ConnectionFactory.createConnection(config);
328 Table table = connection.getTable(tableName);
330 // this will cache the meta location and table's region location
331 table.get(new Get(Bytes.toBytes("foo")));
333 // restart HBase
334 TEST_UTIL.shutdownMiniHBaseCluster();
335 TEST_UTIL.restartHBaseCluster(2);
336 // this should be able to discover new locations for meta and table's region
337 table.get(new Get(Bytes.toBytes("foo")));
338 TEST_UTIL.deleteTable(tableName);
339 table.close();
340 connection.close();
344 @Test
345 public void testLocateRegionsWithRegionReplicas() throws IOException {
346 int regionReplication = 3;
347 byte[] family = Bytes.toBytes("cf");
348 TableName tableName = TableName.valueOf(name.getMethodName());
350 // Create a table with region replicas
351 TableDescriptorBuilder builder =
352 TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(regionReplication)
353 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family));
354 TEST_UTIL.getAdmin().createTable(builder.build());
356 try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
357 RegionLocator locator = conn.getRegionLocator(tableName)) {
358 // Get locations of the regions of the table
359 List<HRegionLocation> locations = locator.getAllRegionLocations();
361 // The size of the returned locations should be 3
362 assertEquals(regionReplication, locations.size());
364 // The replicaIds of the returned locations should be 0, 1 and 2
365 Set<Integer> expectedReplicaIds =
366 IntStream.range(0, regionReplication).boxed().collect(Collectors.toSet());
367 for (HRegionLocation location : locations) {
368 assertTrue(expectedReplicaIds.remove(location.getRegion().getReplicaId()));
370 } finally {
371 TEST_UTIL.deleteTable(tableName);
375 @Test(expected = DoNotRetryIOException.class)
376 public void testClosedConnection() throws ServiceException, Throwable {
377 byte[] family = Bytes.toBytes("cf");
378 TableName tableName = TableName.valueOf(name.getMethodName());
379 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName)
380 .setCoprocessor(MultiRowMutationEndpoint.class.getName())
381 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family));
382 TEST_UTIL.getAdmin().createTable(builder.build());
384 Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
385 // cache the location
386 try (Table table = conn.getTable(tableName)) {
387 table.get(new Get(Bytes.toBytes(0)));
388 } finally {
389 conn.close();
391 Batch.Call<MultiRowMutationService, MutateRowsResponse> callable = service -> {
392 throw new RuntimeException("Should not arrive here");
394 conn.getTable(tableName).coprocessorService(MultiRowMutationService.class,
395 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, callable);
398 // There is no assertion, but you need to confirm that there is no resource leak output from netty
399 @Test
400 public void testCancelConnectionMemoryLeak() throws IOException, InterruptedException {
401 TableName tableName = TableName.valueOf(name.getMethodName());
402 TEST_UTIL.createTable(tableName, FAM_NAM).close();
403 TEST_UTIL.getAdmin().balancerSwitch(false, true);
404 try (Connection connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
405 Table table = connection.getTable(tableName)) {
406 table.get(new Get(Bytes.toBytes("1")));
407 ServerName sn = TEST_UTIL.getRSForFirstRegionInTable(tableName).getServerName();
408 RpcClient rpcClient = ((AsyncConnectionImpl) connection.toAsyncConnection()).rpcClient;
409 rpcClient.cancelConnections(sn);
410 Thread.sleep(1000);
411 System.gc();
412 Thread.sleep(1000);