HBASE-26921 Rewrite the counting cells part in TestMultiVersions (#4316)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / client / TestConnection.java
blob662790de0adc8cc1136c0ee0f282169b0f32f73d
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.client;
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertFalse;
22 import static org.junit.Assert.assertTrue;
24 import java.io.IOException;
25 import java.util.List;
26 import java.util.Set;
27 import java.util.concurrent.ThreadLocalRandom;
28 import java.util.concurrent.atomic.AtomicInteger;
29 import java.util.concurrent.atomic.AtomicReference;
30 import java.util.stream.Collectors;
31 import java.util.stream.IntStream;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.hbase.DoNotRetryIOException;
34 import org.apache.hadoop.hbase.HBaseClassTestRule;
35 import org.apache.hadoop.hbase.HBaseTestingUtil;
36 import org.apache.hadoop.hbase.HConstants;
37 import org.apache.hadoop.hbase.HRegionLocation;
38 import org.apache.hadoop.hbase.ServerName;
39 import org.apache.hadoop.hbase.TableName;
40 import org.apache.hadoop.hbase.Waiter;
41 import org.apache.hadoop.hbase.client.coprocessor.Batch;
42 import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
43 import org.apache.hadoop.hbase.ipc.RpcClient;
44 import org.apache.hadoop.hbase.testclassification.LargeTests;
45 import org.apache.hadoop.hbase.util.Bytes;
46 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
47 import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
48 import org.junit.After;
49 import org.junit.AfterClass;
50 import org.junit.Assert;
51 import org.junit.BeforeClass;
52 import org.junit.ClassRule;
53 import org.junit.Rule;
54 import org.junit.Test;
55 import org.junit.experimental.categories.Category;
56 import org.junit.rules.TestName;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
60 import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
61 import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector;
62 import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector.Level;
64 import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService;
65 import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos.MutateRowsResponse;
67 /**
68 * This class is for testing {@link Connection}.
70 @Category({ LargeTests.class })
71 public class TestConnection {
73 @ClassRule
74 public static final HBaseClassTestRule CLASS_RULE =
75 HBaseClassTestRule.forClass(TestConnection.class);
77 private static final Logger LOG = LoggerFactory.getLogger(TestConnection.class);
78 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
80 private static final byte[] FAM_NAM = Bytes.toBytes("f");
81 private static final byte[] ROW = Bytes.toBytes("bbb");
82 private static final int RPC_RETRY = 5;
84 @Rule
85 public TestName name = new TestName();
87 @BeforeClass
88 public static void setUpBeforeClass() throws Exception {
89 ResourceLeakDetector.setLevel(Level.PARANOID);
90 TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
91 // Up the handlers; this test needs more than usual.
92 TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
93 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY);
94 TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 3);
95 TEST_UTIL.startMiniCluster(2);
99 @AfterClass
100 public static void tearDownAfterClass() throws Exception {
101 TEST_UTIL.shutdownMiniCluster();
104 @After
105 public void tearDown() throws IOException {
106 TEST_UTIL.getAdmin().balancerSwitch(true, true);
110 * Naive test to check that Connection#getAdmin returns a properly constructed HBaseAdmin object
111 * @throws IOException Unable to construct admin
113 @Test
114 public void testAdminFactory() throws IOException {
115 Connection con1 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
116 Admin admin = con1.getAdmin();
117 assertTrue(admin.getConnection() == con1);
118 assertTrue(admin.getConfiguration() == TEST_UTIL.getConfiguration());
119 con1.close();
123 * Test that we can handle connection close: it will trigger a retry, but the calls will finish.
125 @Test
126 public void testConnectionCloseAllowsInterrupt() throws Exception {
127 testConnectionClose(true);
130 @Test
131 public void testConnectionNotAllowsInterrupt() throws Exception {
132 testConnectionClose(false);
135 private void testConnectionClose(boolean allowsInterrupt) throws Exception {
136 TableName tableName = TableName.valueOf("HCM-testConnectionClose" + allowsInterrupt);
137 TEST_UTIL.createTable(tableName, FAM_NAM).close();
139 TEST_UTIL.getAdmin().balancerSwitch(false, true);
141 Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
142 // We want to work on a separate connection.
143 c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
144 c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 100); // retry a lot
145 c2.setInt(HConstants.HBASE_CLIENT_PAUSE, 1); // don't wait between retries.
146 c2.setInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, 0); // Server do not really expire
147 c2.setBoolean(RpcClient.SPECIFIC_WRITE_THREAD, allowsInterrupt);
148 // to avoid the client to be stuck when do the Get
149 c2.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 10000);
150 c2.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 10000);
151 c2.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 5000);
153 Connection connection = ConnectionFactory.createConnection(c2);
154 final Table table = connection.getTable(tableName);
156 Put put = new Put(ROW);
157 put.addColumn(FAM_NAM, ROW, ROW);
158 table.put(put);
160 // 4 steps: ready=0; doGets=1; mustStop=2; stopped=3
161 final AtomicInteger step = new AtomicInteger(0);
163 final AtomicReference<Throwable> failed = new AtomicReference<>(null);
164 Thread t = new Thread("testConnectionCloseThread") {
165 @Override
166 public void run() {
167 int done = 0;
168 try {
169 step.set(1);
170 while (step.get() == 1) {
171 Get get = new Get(ROW);
172 table.get(get);
173 done++;
174 if (done % 100 == 0) {
175 LOG.info("done=" + done);
177 // without the sleep, will cause the exception for too many files in
178 // org.apache.hadoop.hdfs.server.datanode.DataXceiver
179 Thread.sleep(100);
181 } catch (Throwable t) {
182 failed.set(t);
183 LOG.error(t.toString(), t);
185 step.set(3);
188 t.start();
189 TEST_UTIL.waitFor(20000, new Waiter.Predicate<Exception>() {
190 @Override
191 public boolean evaluate() throws Exception {
192 return step.get() == 1;
196 ServerName sn;
197 try (RegionLocator rl = connection.getRegionLocator(tableName)) {
198 sn = rl.getRegionLocation(ROW).getServerName();
201 RpcClient rpcClient = ((AsyncConnectionImpl) connection.toAsyncConnection()).rpcClient;
203 LOG.info("Going to cancel connections. connection=" + connection.toString() + ", sn=" + sn);
204 for (int i = 0; i < 500; i++) {
205 rpcClient.cancelConnections(sn);
206 Thread.sleep(50);
209 step.compareAndSet(1, 2);
210 // The test may fail here if the thread doing the gets is stuck. The way to find
211 // out what's happening is to look for the thread named 'testConnectionCloseThread'
212 TEST_UTIL.waitFor(40000, new Waiter.Predicate<Exception>() {
213 @Override
214 public boolean evaluate() throws Exception {
215 return step.get() == 3;
218 table.close();
219 connection.close();
220 Assert.assertTrue("Unexpected exception is " + failed.get(), failed.get() == null);
224 * Test that connection can become idle without breaking everything.
226 @Test
227 public void testConnectionIdle() throws Exception {
228 final TableName tableName = TableName.valueOf(name.getMethodName());
229 TEST_UTIL.createTable(tableName, FAM_NAM).close();
230 int idleTime = 20000;
231 boolean previousBalance = TEST_UTIL.getAdmin().balancerSwitch(false, true);
233 Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
234 // We want to work on a separate connection.
235 c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
236 c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); // Don't retry: retry = test failed
237 c2.setInt(RpcClient.IDLE_TIME, idleTime);
239 Connection connection = ConnectionFactory.createConnection(c2);
240 final Table table = connection.getTable(tableName);
242 Put put = new Put(ROW);
243 put.addColumn(FAM_NAM, ROW, ROW);
244 table.put(put);
246 ManualEnvironmentEdge mee = new ManualEnvironmentEdge();
247 mee.setValue(EnvironmentEdgeManager.currentTime());
248 EnvironmentEdgeManager.injectEdge(mee);
249 LOG.info("first get");
250 table.get(new Get(ROW));
252 LOG.info("first get - changing the time & sleeping");
253 mee.incValue(idleTime + 1000);
254 Thread.sleep(1500); // we need to wait a little for the connection to be seen as idle.
255 // 1500 = sleep time in RpcClient#waitForWork + a margin
257 LOG.info("second get - connection has been marked idle in the middle");
258 // To check that the connection actually became idle would need to read some private
259 // fields of RpcClient.
260 table.get(new Get(ROW));
261 mee.incValue(idleTime + 1000);
263 LOG.info("third get - connection is idle, but the reader doesn't know yet");
264 // We're testing here a special case:
265 // time limit reached BUT connection not yet reclaimed AND a new call.
266 // in this situation, we don't close the connection, instead we use it immediately.
267 // If we're very unlucky we can have a race condition in the test: the connection is already
268 // under closing when we do the get, so we have an exception, and we don't retry as the
269 // retry number is 1. The probability is very very low, and seems acceptable for now. It's
270 // a test issue only.
271 table.get(new Get(ROW));
273 LOG.info("we're done - time will change back");
275 table.close();
277 connection.close();
278 EnvironmentEdgeManager.reset();
279 TEST_UTIL.getAdmin().balancerSwitch(previousBalance, true);
282 @Test
283 public void testClosing() throws Exception {
284 Configuration configuration = new Configuration(TEST_UTIL.getConfiguration());
285 configuration.set(HConstants.HBASE_CLIENT_INSTANCE_ID,
286 String.valueOf(ThreadLocalRandom.current().nextInt()));
288 // as connection caching is going away, now we're just testing
289 // that closed connection does actually get closed.
291 Connection c1 = ConnectionFactory.createConnection(configuration);
292 Connection c2 = ConnectionFactory.createConnection(configuration);
293 // no caching, different connections
294 assertTrue(c1 != c2);
296 // closing independently
297 c1.close();
298 assertTrue(c1.isClosed());
299 assertFalse(c2.isClosed());
301 c2.close();
302 assertTrue(c2.isClosed());
306 * Trivial test to verify that nobody messes with
307 * {@link ConnectionFactory#createConnection(Configuration)}
309 @Test
310 public void testCreateConnection() throws Exception {
311 Configuration configuration = TEST_UTIL.getConfiguration();
312 Connection c1 = ConnectionFactory.createConnection(configuration);
313 Connection c2 = ConnectionFactory.createConnection(configuration);
314 // created from the same configuration, yet they are different
315 assertTrue(c1 != c2);
316 assertTrue(c1.getConfiguration() == c2.getConfiguration());
320 ====> With MasterRegistry, connections cannot outlast the masters' lifetime.
321 @Test
322 public void testConnectionRideOverClusterRestart() throws IOException, InterruptedException {
323 Configuration config = new Configuration(TEST_UTIL.getConfiguration());
325 final TableName tableName = TableName.valueOf(name.getMethodName());
326 TEST_UTIL.createTable(tableName, new byte[][] { FAM_NAM }).close();
328 Connection connection = ConnectionFactory.createConnection(config);
329 Table table = connection.getTable(tableName);
331 // this will cache the meta location and table's region location
332 table.get(new Get(Bytes.toBytes("foo")));
334 // restart HBase
335 TEST_UTIL.shutdownMiniHBaseCluster();
336 TEST_UTIL.restartHBaseCluster(2);
337 // this should be able to discover new locations for meta and table's region
338 table.get(new Get(Bytes.toBytes("foo")));
339 TEST_UTIL.deleteTable(tableName);
340 table.close();
341 connection.close();
345 @Test
346 public void testLocateRegionsWithRegionReplicas() throws IOException {
347 int regionReplication = 3;
348 byte[] family = Bytes.toBytes("cf");
349 TableName tableName = TableName.valueOf(name.getMethodName());
351 // Create a table with region replicas
352 TableDescriptorBuilder builder =
353 TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(regionReplication)
354 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family));
355 TEST_UTIL.getAdmin().createTable(builder.build());
357 try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
358 RegionLocator locator = conn.getRegionLocator(tableName)) {
359 // Get locations of the regions of the table
360 List<HRegionLocation> locations = locator.getAllRegionLocations();
362 // The size of the returned locations should be 3
363 assertEquals(regionReplication, locations.size());
365 // The replicaIds of the returned locations should be 0, 1 and 2
366 Set<Integer> expectedReplicaIds =
367 IntStream.range(0, regionReplication).boxed().collect(Collectors.toSet());
368 for (HRegionLocation location : locations) {
369 assertTrue(expectedReplicaIds.remove(location.getRegion().getReplicaId()));
371 } finally {
372 TEST_UTIL.deleteTable(tableName);
376 @Test(expected = DoNotRetryIOException.class)
377 public void testClosedConnection() throws ServiceException, Throwable {
378 byte[] family = Bytes.toBytes("cf");
379 TableName tableName = TableName.valueOf(name.getMethodName());
380 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName)
381 .setCoprocessor(MultiRowMutationEndpoint.class.getName())
382 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family));
383 TEST_UTIL.getAdmin().createTable(builder.build());
385 Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
386 // cache the location
387 try (Table table = conn.getTable(tableName)) {
388 table.get(new Get(Bytes.toBytes(0)));
389 } finally {
390 conn.close();
392 Batch.Call<MultiRowMutationService, MutateRowsResponse> callable = service -> {
393 throw new RuntimeException("Should not arrive here");
395 conn.getTable(tableName).coprocessorService(MultiRowMutationService.class,
396 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, callable);
399 // There is no assertion, but you need to confirm that there is no resource leak output from netty
400 @Test
401 public void testCancelConnectionMemoryLeak() throws IOException, InterruptedException {
402 TableName tableName = TableName.valueOf(name.getMethodName());
403 TEST_UTIL.createTable(tableName, FAM_NAM).close();
404 TEST_UTIL.getAdmin().balancerSwitch(false, true);
405 try (Connection connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
406 Table table = connection.getTable(tableName)) {
407 table.get(new Get(Bytes.toBytes("1")));
408 ServerName sn = TEST_UTIL.getRSForFirstRegionInTable(tableName).getServerName();
409 RpcClient rpcClient = ((AsyncConnectionImpl) connection.toAsyncConnection()).rpcClient;
410 rpcClient.cancelConnections(sn);
411 Thread.sleep(1000);
412 System.gc();
413 Thread.sleep(1000);