HBASE-19498 Fix findbugs and error-prone warnings in hbase-client (branch-2)
[hbase.git] / hbase-client / src / test / java / org / apache / hadoop / hbase / client / TestSimpleRequestController.java
blob3107aa705ea277bab75c441a0aa095d9969c0b5f
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.client;
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertNotEquals;
22 import static org.junit.Assert.assertTrue;
23 import static org.junit.Assert.fail;
25 import java.io.IOException;
26 import java.io.InterruptedIOException;
27 import java.nio.charset.StandardCharsets;
28 import java.util.Arrays;
29 import java.util.HashMap;
30 import java.util.Map;
31 import java.util.concurrent.BrokenBarrierException;
32 import java.util.concurrent.CyclicBarrier;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.concurrent.atomic.AtomicLong;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.hbase.HBaseConfiguration;
38 import org.apache.hadoop.hbase.HConstants;
39 import org.apache.hadoop.hbase.HRegionInfo;
40 import org.apache.hadoop.hbase.HRegionLocation;
41 import org.apache.hadoop.hbase.ServerName;
42 import org.apache.hadoop.hbase.TableName;
43 import org.apache.hadoop.hbase.client.RequestController.ReturnCode;
44 import org.apache.hadoop.hbase.testclassification.ClientTests;
45 import org.apache.hadoop.hbase.testclassification.SmallTests;
46 import org.apache.hadoop.hbase.util.Bytes;
47 import org.junit.Assert;
48 import org.junit.Test;
49 import org.junit.experimental.categories.Category;
51 @Category({ClientTests.class, SmallTests.class})
52 public class TestSimpleRequestController {
54 private static final TableName DUMMY_TABLE
55 = TableName.valueOf("DUMMY_TABLE");
56 private static final byte[] DUMMY_BYTES_1 = "DUMMY_BYTES_1".getBytes(StandardCharsets.UTF_8);
57 private static final byte[] DUMMY_BYTES_2 = "DUMMY_BYTES_2".getBytes(StandardCharsets.UTF_8);
58 private static final byte[] DUMMY_BYTES_3 = "DUMMY_BYTES_3".getBytes(StandardCharsets.UTF_8);
59 private static final ServerName SN = ServerName.valueOf("s1,1,1");
60 private static final ServerName SN2 = ServerName.valueOf("s2,2,2");
61 private static final HRegionInfo HRI1
62 = new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1);
63 private static final HRegionInfo HRI2
64 = new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_2, HConstants.EMPTY_END_ROW, false, 2);
65 private static final HRegionInfo HRI3
66 = new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_3, HConstants.EMPTY_END_ROW, false, 3);
67 private static final HRegionLocation LOC1 = new HRegionLocation(HRI1, SN);
68 private static final HRegionLocation LOC2 = new HRegionLocation(HRI2, SN);
69 private static final HRegionLocation LOC3 = new HRegionLocation(HRI3, SN2);
71 @Test
72 public void testIllegalRequestHeapSize() {
73 testIllegalArgument(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, -1);
76 @Test
77 public void testIllegalRsTasks() {
78 testIllegalArgument(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS, -1);
81 @Test
82 public void testIllegalRegionTasks() {
83 testIllegalArgument(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS, -1);
86 @Test
87 public void testIllegalSubmittedSize() {
88 testIllegalArgument(SimpleRequestController.HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, -1);
91 @Test
92 public void testIllegalRequestRows() {
93 testIllegalArgument(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_ROWS, -1);
96 private void testIllegalArgument(String key, long value) {
97 Configuration conf = HBaseConfiguration.create();
98 conf.setLong(key, value);
99 try {
100 SimpleRequestController controller = new SimpleRequestController(conf);
101 fail("The " + key + " must be bigger than zero");
102 } catch (IllegalArgumentException e) {
106 private static Put createPut(long maxHeapSizePerRequest) {
107 return new Put(Bytes.toBytes("row")) {
108 @Override
109 public long heapSize() {
110 return maxHeapSizePerRequest;
115 @Test
116 public void testTaskCheckerHost() throws IOException {
117 final int maxTotalConcurrentTasks = 100;
118 final int maxConcurrentTasksPerServer = 2;
119 final int maxConcurrentTasksPerRegion = 1;
120 final AtomicLong tasksInProgress = new AtomicLong(0);
121 final Map<ServerName, AtomicInteger> taskCounterPerServer = new HashMap<>();
122 final Map<byte[], AtomicInteger> taskCounterPerRegion = new HashMap<>();
123 SimpleRequestController.TaskCountChecker countChecker =
124 new SimpleRequestController.TaskCountChecker(
125 maxTotalConcurrentTasks,
126 maxConcurrentTasksPerServer,
127 maxConcurrentTasksPerRegion,
128 tasksInProgress, taskCounterPerServer, taskCounterPerRegion);
129 final long maxHeapSizePerRequest = 2 * 1024 * 1024;
130 // unlimiited
131 SimpleRequestController.RequestHeapSizeChecker sizeChecker =
132 new SimpleRequestController.RequestHeapSizeChecker(maxHeapSizePerRequest);
133 RequestController.Checker checker =
134 SimpleRequestController.newChecker(Arrays.asList(countChecker, sizeChecker));
135 ReturnCode loc1Code = checker.canTakeRow(LOC1, createPut(maxHeapSizePerRequest));
136 assertEquals(ReturnCode.INCLUDE, loc1Code);
138 ReturnCode loc1Code_2 = checker.canTakeRow(LOC1, createPut(maxHeapSizePerRequest));
139 // rejected for size
140 assertNotEquals(ReturnCode.INCLUDE, loc1Code_2);
142 ReturnCode loc2Code = checker.canTakeRow(LOC2, createPut(maxHeapSizePerRequest));
143 // rejected for size
144 assertNotEquals(ReturnCode.INCLUDE, loc2Code);
146 // fill the task slots for LOC3.
147 taskCounterPerRegion.put(LOC3.getRegionInfo().getRegionName(), new AtomicInteger(100));
148 taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(100));
150 ReturnCode loc3Code = checker.canTakeRow(LOC3, createPut(1L));
151 // rejected for count
152 assertNotEquals(ReturnCode.INCLUDE, loc3Code);
154 // release the task slots for LOC3.
155 taskCounterPerRegion.put(LOC3.getRegionInfo().getRegionName(), new AtomicInteger(0));
156 taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(0));
158 ReturnCode loc3Code_2 = checker.canTakeRow(LOC3, createPut(1L));
159 assertEquals(ReturnCode.INCLUDE, loc3Code_2);
162 @Test
163 public void testRequestHeapSizeChecker() throws IOException {
164 final long maxHeapSizePerRequest = 2 * 1024 * 1024;
165 SimpleRequestController.RequestHeapSizeChecker checker
166 = new SimpleRequestController.RequestHeapSizeChecker(maxHeapSizePerRequest);
168 // inner state is unchanged.
169 for (int i = 0; i != 10; ++i) {
170 ReturnCode code = checker.canTakeOperation(LOC1, maxHeapSizePerRequest);
171 assertEquals(ReturnCode.INCLUDE, code);
172 code = checker.canTakeOperation(LOC2, maxHeapSizePerRequest);
173 assertEquals(ReturnCode.INCLUDE, code);
176 // accept the data located on LOC1 region.
177 ReturnCode acceptCode = checker.canTakeOperation(LOC1, maxHeapSizePerRequest);
178 assertEquals(ReturnCode.INCLUDE, acceptCode);
179 checker.notifyFinal(acceptCode, LOC1, maxHeapSizePerRequest);
181 // the sn server reachs the limit.
182 for (int i = 0; i != 10; ++i) {
183 ReturnCode code = checker.canTakeOperation(LOC1, maxHeapSizePerRequest);
184 assertNotEquals(ReturnCode.INCLUDE, code);
185 code = checker.canTakeOperation(LOC2, maxHeapSizePerRequest);
186 assertNotEquals(ReturnCode.INCLUDE, code);
189 // the request to sn2 server should be accepted.
190 for (int i = 0; i != 10; ++i) {
191 ReturnCode code = checker.canTakeOperation(LOC3, maxHeapSizePerRequest);
192 assertEquals(ReturnCode.INCLUDE, code);
195 checker.reset();
196 for (int i = 0; i != 10; ++i) {
197 ReturnCode code = checker.canTakeOperation(LOC1, maxHeapSizePerRequest);
198 assertEquals(ReturnCode.INCLUDE, code);
199 code = checker.canTakeOperation(LOC2, maxHeapSizePerRequest);
200 assertEquals(ReturnCode.INCLUDE, code);
204 @Test
205 public void testRequestRowsChecker() throws IOException {
206 final long maxRowCount = 100;
207 SimpleRequestController.RequestRowsChecker checker
208 = new SimpleRequestController.RequestRowsChecker(maxRowCount);
210 final long heapSizeOfRow = 100; //unused
211 // inner state is unchanged.
212 for (int i = 0; i != 10; ++i) {
213 ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow);
214 assertEquals(ReturnCode.INCLUDE, code);
215 code = checker.canTakeOperation(LOC2, heapSizeOfRow);
216 assertEquals(ReturnCode.INCLUDE, code);
219 // accept the data located on LOC1 region.
220 for (int i = 0; i != maxRowCount; ++i) {
221 ReturnCode acceptCode = checker.canTakeOperation(LOC1, heapSizeOfRow);
222 assertEquals(ReturnCode.INCLUDE, acceptCode);
223 checker.notifyFinal(acceptCode, LOC1, heapSizeOfRow);
226 // the sn server reachs the limit.
227 for (int i = 0; i != 10; ++i) {
228 ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow);
229 assertNotEquals(ReturnCode.INCLUDE, code);
230 code = checker.canTakeOperation(LOC2, heapSizeOfRow);
231 assertNotEquals(ReturnCode.INCLUDE, code);
234 // the request to sn2 server should be accepted.
235 for (int i = 0; i != 10; ++i) {
236 ReturnCode code = checker.canTakeOperation(LOC3, heapSizeOfRow);
237 assertEquals(ReturnCode.INCLUDE, code);
240 checker.reset();
241 for (int i = 0; i != 10; ++i) {
242 ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow);
243 assertEquals(ReturnCode.INCLUDE, code);
244 code = checker.canTakeOperation(LOC2, heapSizeOfRow);
245 assertEquals(ReturnCode.INCLUDE, code);
249 @Test
250 public void testSubmittedSizeChecker() {
251 final long maxHeapSizeSubmit = 2 * 1024 * 1024;
252 SimpleRequestController.SubmittedSizeChecker checker
253 = new SimpleRequestController.SubmittedSizeChecker(maxHeapSizeSubmit);
255 for (int i = 0; i != 10; ++i) {
256 ReturnCode include = checker.canTakeOperation(LOC1, 100000);
257 assertEquals(ReturnCode.INCLUDE, include);
260 for (int i = 0; i != 10; ++i) {
261 checker.notifyFinal(ReturnCode.INCLUDE, LOC1, maxHeapSizeSubmit);
264 for (int i = 0; i != 10; ++i) {
265 ReturnCode include = checker.canTakeOperation(LOC1, 100000);
266 assertEquals(ReturnCode.END, include);
268 for (int i = 0; i != 10; ++i) {
269 ReturnCode include = checker.canTakeOperation(LOC2, 100000);
270 assertEquals(ReturnCode.END, include);
272 checker.reset();
273 for (int i = 0; i != 10; ++i) {
274 ReturnCode include = checker.canTakeOperation(LOC1, 100000);
275 assertEquals(ReturnCode.INCLUDE, include);
279 @Test
280 public void testTaskCountChecker() throws InterruptedIOException {
281 long heapSizeOfRow = 12345;
282 int maxTotalConcurrentTasks = 100;
283 int maxConcurrentTasksPerServer = 2;
284 int maxConcurrentTasksPerRegion = 1;
285 AtomicLong tasksInProgress = new AtomicLong(0);
286 Map<ServerName, AtomicInteger> taskCounterPerServer = new HashMap<>();
287 Map<byte[], AtomicInteger> taskCounterPerRegion = new HashMap<>();
288 SimpleRequestController.TaskCountChecker checker = new SimpleRequestController.TaskCountChecker(
289 maxTotalConcurrentTasks,
290 maxConcurrentTasksPerServer,
291 maxConcurrentTasksPerRegion,
292 tasksInProgress, taskCounterPerServer, taskCounterPerRegion);
294 // inner state is unchanged.
295 for (int i = 0; i != 10; ++i) {
296 ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow);
297 assertEquals(ReturnCode.INCLUDE, code);
299 // add LOC1 region.
300 ReturnCode code = checker.canTakeOperation(LOC1, heapSizeOfRow);
301 assertEquals(ReturnCode.INCLUDE, code);
302 checker.notifyFinal(code, LOC1, heapSizeOfRow);
304 // fill the task slots for LOC1.
305 taskCounterPerRegion.put(LOC1.getRegionInfo().getRegionName(), new AtomicInteger(100));
306 taskCounterPerServer.put(LOC1.getServerName(), new AtomicInteger(100));
308 // the region was previously accepted, so it must be accpted now.
309 for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
310 ReturnCode includeCode = checker.canTakeOperation(LOC1, heapSizeOfRow);
311 assertEquals(ReturnCode.INCLUDE, includeCode);
312 checker.notifyFinal(includeCode, LOC1, heapSizeOfRow);
315 // fill the task slots for LOC3.
316 taskCounterPerRegion.put(LOC3.getRegionInfo().getRegionName(), new AtomicInteger(100));
317 taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(100));
319 // no task slots.
320 for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
321 ReturnCode excludeCode = checker.canTakeOperation(LOC3, heapSizeOfRow);
322 assertNotEquals(ReturnCode.INCLUDE, excludeCode);
323 checker.notifyFinal(excludeCode, LOC3, heapSizeOfRow);
326 // release the tasks for LOC3.
327 taskCounterPerRegion.put(LOC3.getRegionInfo().getRegionName(), new AtomicInteger(0));
328 taskCounterPerServer.put(LOC3.getServerName(), new AtomicInteger(0));
330 // add LOC3 region.
331 ReturnCode code3 = checker.canTakeOperation(LOC3, heapSizeOfRow);
332 assertEquals(ReturnCode.INCLUDE, code3);
333 checker.notifyFinal(code3, LOC3, heapSizeOfRow);
335 // the region was previously accepted, so it must be accpted now.
336 for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
337 ReturnCode includeCode = checker.canTakeOperation(LOC3, heapSizeOfRow);
338 assertEquals(ReturnCode.INCLUDE, includeCode);
339 checker.notifyFinal(includeCode, LOC3, heapSizeOfRow);
342 checker.reset();
343 // the region was previously accepted,
344 // but checker have reseted and task slots for LOC1 is full.
345 // So it must be rejected now.
346 for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
347 ReturnCode includeCode = checker.canTakeOperation(LOC1, heapSizeOfRow);
348 assertNotEquals(ReturnCode.INCLUDE, includeCode);
349 checker.notifyFinal(includeCode, LOC1, heapSizeOfRow);
353 @Test
354 public void testWaitForMaximumCurrentTasks() throws Exception {
355 final AtomicInteger max = new AtomicInteger(0);
356 final CyclicBarrier barrier = new CyclicBarrier(2);
357 SimpleRequestController controller = new SimpleRequestController(HBaseConfiguration.create());
358 final AtomicLong tasks = controller.tasksInProgress;
359 Runnable runnable = () -> {
360 try {
361 barrier.await();
362 controller.waitForMaximumCurrentTasks(max.get(), 123, 1, null);
363 } catch (InterruptedIOException e) {
364 Assert.fail(e.getMessage());
365 } catch (InterruptedException | BrokenBarrierException e) {
366 e.printStackTrace();
369 // First test that our runnable thread only exits when tasks is zero.
370 Thread t = new Thread(runnable);
371 t.start();
372 barrier.await();
373 t.join();
374 // Now assert we stay running if max == zero and tasks is > 0.
375 barrier.reset();
376 tasks.set(1000000);
377 t = new Thread(runnable);
378 t.start();
379 barrier.await();
380 while (tasks.get() > 0) {
381 assertTrue(t.isAlive());
382 tasks.set(tasks.get() - 1);
384 t.join();