2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.junit
.Assert
.assertEquals
;
21 import static org
.junit
.Assert
.assertNotEquals
;
22 import static org
.junit
.Assert
.assertTrue
;
23 import static org
.junit
.Assert
.fail
;
25 import java
.io
.IOException
;
26 import java
.io
.InterruptedIOException
;
27 import java
.nio
.charset
.StandardCharsets
;
28 import java
.util
.Arrays
;
29 import java
.util
.HashMap
;
31 import java
.util
.concurrent
.BrokenBarrierException
;
32 import java
.util
.concurrent
.CyclicBarrier
;
33 import java
.util
.concurrent
.atomic
.AtomicInteger
;
34 import java
.util
.concurrent
.atomic
.AtomicLong
;
36 import org
.apache
.hadoop
.conf
.Configuration
;
37 import org
.apache
.hadoop
.hbase
.HBaseConfiguration
;
38 import org
.apache
.hadoop
.hbase
.HConstants
;
39 import org
.apache
.hadoop
.hbase
.HRegionInfo
;
40 import org
.apache
.hadoop
.hbase
.HRegionLocation
;
41 import org
.apache
.hadoop
.hbase
.ServerName
;
42 import org
.apache
.hadoop
.hbase
.TableName
;
43 import org
.apache
.hadoop
.hbase
.client
.RequestController
.ReturnCode
;
44 import org
.apache
.hadoop
.hbase
.testclassification
.ClientTests
;
45 import org
.apache
.hadoop
.hbase
.testclassification
.SmallTests
;
46 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
47 import org
.junit
.Assert
;
48 import org
.junit
.Test
;
49 import org
.junit
.experimental
.categories
.Category
;
51 @Category({ClientTests
.class, SmallTests
.class})
52 public class TestSimpleRequestController
{
54 private static final TableName DUMMY_TABLE
55 = TableName
.valueOf("DUMMY_TABLE");
56 private static final byte[] DUMMY_BYTES_1
= "DUMMY_BYTES_1".getBytes(StandardCharsets
.UTF_8
);
57 private static final byte[] DUMMY_BYTES_2
= "DUMMY_BYTES_2".getBytes(StandardCharsets
.UTF_8
);
58 private static final byte[] DUMMY_BYTES_3
= "DUMMY_BYTES_3".getBytes(StandardCharsets
.UTF_8
);
59 private static final ServerName SN
= ServerName
.valueOf("s1,1,1");
60 private static final ServerName SN2
= ServerName
.valueOf("s2,2,2");
61 private static final HRegionInfo HRI1
62 = new HRegionInfo(DUMMY_TABLE
, DUMMY_BYTES_1
, DUMMY_BYTES_2
, false, 1);
63 private static final HRegionInfo HRI2
64 = new HRegionInfo(DUMMY_TABLE
, DUMMY_BYTES_2
, HConstants
.EMPTY_END_ROW
, false, 2);
65 private static final HRegionInfo HRI3
66 = new HRegionInfo(DUMMY_TABLE
, DUMMY_BYTES_3
, HConstants
.EMPTY_END_ROW
, false, 3);
67 private static final HRegionLocation LOC1
= new HRegionLocation(HRI1
, SN
);
68 private static final HRegionLocation LOC2
= new HRegionLocation(HRI2
, SN
);
69 private static final HRegionLocation LOC3
= new HRegionLocation(HRI3
, SN2
);
72 public void testIllegalRequestHeapSize() {
73 testIllegalArgument(SimpleRequestController
.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE
, -1);
77 public void testIllegalRsTasks() {
78 testIllegalArgument(HConstants
.HBASE_CLIENT_MAX_PERSERVER_TASKS
, -1);
82 public void testIllegalRegionTasks() {
83 testIllegalArgument(HConstants
.HBASE_CLIENT_MAX_PERREGION_TASKS
, -1);
87 public void testIllegalSubmittedSize() {
88 testIllegalArgument(SimpleRequestController
.HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE
, -1);
92 public void testIllegalRequestRows() {
93 testIllegalArgument(SimpleRequestController
.HBASE_CLIENT_MAX_PERREQUEST_ROWS
, -1);
96 private void testIllegalArgument(String key
, long value
) {
97 Configuration conf
= HBaseConfiguration
.create();
98 conf
.setLong(key
, value
);
100 SimpleRequestController controller
= new SimpleRequestController(conf
);
101 fail("The " + key
+ " must be bigger than zero");
102 } catch (IllegalArgumentException e
) {
106 private static Put
createPut(long maxHeapSizePerRequest
) {
107 return new Put(Bytes
.toBytes("row")) {
109 public long heapSize() {
110 return maxHeapSizePerRequest
;
116 public void testTaskCheckerHost() throws IOException
{
117 final int maxTotalConcurrentTasks
= 100;
118 final int maxConcurrentTasksPerServer
= 2;
119 final int maxConcurrentTasksPerRegion
= 1;
120 final AtomicLong tasksInProgress
= new AtomicLong(0);
121 final Map
<ServerName
, AtomicInteger
> taskCounterPerServer
= new HashMap
<>();
122 final Map
<byte[], AtomicInteger
> taskCounterPerRegion
= new HashMap
<>();
123 SimpleRequestController
.TaskCountChecker countChecker
=
124 new SimpleRequestController
.TaskCountChecker(
125 maxTotalConcurrentTasks
,
126 maxConcurrentTasksPerServer
,
127 maxConcurrentTasksPerRegion
,
128 tasksInProgress
, taskCounterPerServer
, taskCounterPerRegion
);
129 final long maxHeapSizePerRequest
= 2 * 1024 * 1024;
131 SimpleRequestController
.RequestHeapSizeChecker sizeChecker
=
132 new SimpleRequestController
.RequestHeapSizeChecker(maxHeapSizePerRequest
);
133 RequestController
.Checker checker
=
134 SimpleRequestController
.newChecker(Arrays
.asList(countChecker
, sizeChecker
));
135 ReturnCode loc1Code
= checker
.canTakeRow(LOC1
, createPut(maxHeapSizePerRequest
));
136 assertEquals(ReturnCode
.INCLUDE
, loc1Code
);
138 ReturnCode loc1Code_2
= checker
.canTakeRow(LOC1
, createPut(maxHeapSizePerRequest
));
140 assertNotEquals(ReturnCode
.INCLUDE
, loc1Code_2
);
142 ReturnCode loc2Code
= checker
.canTakeRow(LOC2
, createPut(maxHeapSizePerRequest
));
144 assertNotEquals(ReturnCode
.INCLUDE
, loc2Code
);
146 // fill the task slots for LOC3.
147 taskCounterPerRegion
.put(LOC3
.getRegionInfo().getRegionName(), new AtomicInteger(100));
148 taskCounterPerServer
.put(LOC3
.getServerName(), new AtomicInteger(100));
150 ReturnCode loc3Code
= checker
.canTakeRow(LOC3
, createPut(1L));
151 // rejected for count
152 assertNotEquals(ReturnCode
.INCLUDE
, loc3Code
);
154 // release the task slots for LOC3.
155 taskCounterPerRegion
.put(LOC3
.getRegionInfo().getRegionName(), new AtomicInteger(0));
156 taskCounterPerServer
.put(LOC3
.getServerName(), new AtomicInteger(0));
158 ReturnCode loc3Code_2
= checker
.canTakeRow(LOC3
, createPut(1L));
159 assertEquals(ReturnCode
.INCLUDE
, loc3Code_2
);
163 public void testRequestHeapSizeChecker() throws IOException
{
164 final long maxHeapSizePerRequest
= 2 * 1024 * 1024;
165 SimpleRequestController
.RequestHeapSizeChecker checker
166 = new SimpleRequestController
.RequestHeapSizeChecker(maxHeapSizePerRequest
);
168 // inner state is unchanged.
169 for (int i
= 0; i
!= 10; ++i
) {
170 ReturnCode code
= checker
.canTakeOperation(LOC1
, maxHeapSizePerRequest
);
171 assertEquals(ReturnCode
.INCLUDE
, code
);
172 code
= checker
.canTakeOperation(LOC2
, maxHeapSizePerRequest
);
173 assertEquals(ReturnCode
.INCLUDE
, code
);
176 // accept the data located on LOC1 region.
177 ReturnCode acceptCode
= checker
.canTakeOperation(LOC1
, maxHeapSizePerRequest
);
178 assertEquals(ReturnCode
.INCLUDE
, acceptCode
);
179 checker
.notifyFinal(acceptCode
, LOC1
, maxHeapSizePerRequest
);
181 // the sn server reachs the limit.
182 for (int i
= 0; i
!= 10; ++i
) {
183 ReturnCode code
= checker
.canTakeOperation(LOC1
, maxHeapSizePerRequest
);
184 assertNotEquals(ReturnCode
.INCLUDE
, code
);
185 code
= checker
.canTakeOperation(LOC2
, maxHeapSizePerRequest
);
186 assertNotEquals(ReturnCode
.INCLUDE
, code
);
189 // the request to sn2 server should be accepted.
190 for (int i
= 0; i
!= 10; ++i
) {
191 ReturnCode code
= checker
.canTakeOperation(LOC3
, maxHeapSizePerRequest
);
192 assertEquals(ReturnCode
.INCLUDE
, code
);
196 for (int i
= 0; i
!= 10; ++i
) {
197 ReturnCode code
= checker
.canTakeOperation(LOC1
, maxHeapSizePerRequest
);
198 assertEquals(ReturnCode
.INCLUDE
, code
);
199 code
= checker
.canTakeOperation(LOC2
, maxHeapSizePerRequest
);
200 assertEquals(ReturnCode
.INCLUDE
, code
);
205 public void testRequestRowsChecker() throws IOException
{
206 final long maxRowCount
= 100;
207 SimpleRequestController
.RequestRowsChecker checker
208 = new SimpleRequestController
.RequestRowsChecker(maxRowCount
);
210 final long heapSizeOfRow
= 100; //unused
211 // inner state is unchanged.
212 for (int i
= 0; i
!= 10; ++i
) {
213 ReturnCode code
= checker
.canTakeOperation(LOC1
, heapSizeOfRow
);
214 assertEquals(ReturnCode
.INCLUDE
, code
);
215 code
= checker
.canTakeOperation(LOC2
, heapSizeOfRow
);
216 assertEquals(ReturnCode
.INCLUDE
, code
);
219 // accept the data located on LOC1 region.
220 for (int i
= 0; i
!= maxRowCount
; ++i
) {
221 ReturnCode acceptCode
= checker
.canTakeOperation(LOC1
, heapSizeOfRow
);
222 assertEquals(ReturnCode
.INCLUDE
, acceptCode
);
223 checker
.notifyFinal(acceptCode
, LOC1
, heapSizeOfRow
);
226 // the sn server reachs the limit.
227 for (int i
= 0; i
!= 10; ++i
) {
228 ReturnCode code
= checker
.canTakeOperation(LOC1
, heapSizeOfRow
);
229 assertNotEquals(ReturnCode
.INCLUDE
, code
);
230 code
= checker
.canTakeOperation(LOC2
, heapSizeOfRow
);
231 assertNotEquals(ReturnCode
.INCLUDE
, code
);
234 // the request to sn2 server should be accepted.
235 for (int i
= 0; i
!= 10; ++i
) {
236 ReturnCode code
= checker
.canTakeOperation(LOC3
, heapSizeOfRow
);
237 assertEquals(ReturnCode
.INCLUDE
, code
);
241 for (int i
= 0; i
!= 10; ++i
) {
242 ReturnCode code
= checker
.canTakeOperation(LOC1
, heapSizeOfRow
);
243 assertEquals(ReturnCode
.INCLUDE
, code
);
244 code
= checker
.canTakeOperation(LOC2
, heapSizeOfRow
);
245 assertEquals(ReturnCode
.INCLUDE
, code
);
250 public void testSubmittedSizeChecker() {
251 final long maxHeapSizeSubmit
= 2 * 1024 * 1024;
252 SimpleRequestController
.SubmittedSizeChecker checker
253 = new SimpleRequestController
.SubmittedSizeChecker(maxHeapSizeSubmit
);
255 for (int i
= 0; i
!= 10; ++i
) {
256 ReturnCode include
= checker
.canTakeOperation(LOC1
, 100000);
257 assertEquals(ReturnCode
.INCLUDE
, include
);
260 for (int i
= 0; i
!= 10; ++i
) {
261 checker
.notifyFinal(ReturnCode
.INCLUDE
, LOC1
, maxHeapSizeSubmit
);
264 for (int i
= 0; i
!= 10; ++i
) {
265 ReturnCode include
= checker
.canTakeOperation(LOC1
, 100000);
266 assertEquals(ReturnCode
.END
, include
);
268 for (int i
= 0; i
!= 10; ++i
) {
269 ReturnCode include
= checker
.canTakeOperation(LOC2
, 100000);
270 assertEquals(ReturnCode
.END
, include
);
273 for (int i
= 0; i
!= 10; ++i
) {
274 ReturnCode include
= checker
.canTakeOperation(LOC1
, 100000);
275 assertEquals(ReturnCode
.INCLUDE
, include
);
280 public void testTaskCountChecker() throws InterruptedIOException
{
281 long heapSizeOfRow
= 12345;
282 int maxTotalConcurrentTasks
= 100;
283 int maxConcurrentTasksPerServer
= 2;
284 int maxConcurrentTasksPerRegion
= 1;
285 AtomicLong tasksInProgress
= new AtomicLong(0);
286 Map
<ServerName
, AtomicInteger
> taskCounterPerServer
= new HashMap
<>();
287 Map
<byte[], AtomicInteger
> taskCounterPerRegion
= new HashMap
<>();
288 SimpleRequestController
.TaskCountChecker checker
= new SimpleRequestController
.TaskCountChecker(
289 maxTotalConcurrentTasks
,
290 maxConcurrentTasksPerServer
,
291 maxConcurrentTasksPerRegion
,
292 tasksInProgress
, taskCounterPerServer
, taskCounterPerRegion
);
294 // inner state is unchanged.
295 for (int i
= 0; i
!= 10; ++i
) {
296 ReturnCode code
= checker
.canTakeOperation(LOC1
, heapSizeOfRow
);
297 assertEquals(ReturnCode
.INCLUDE
, code
);
300 ReturnCode code
= checker
.canTakeOperation(LOC1
, heapSizeOfRow
);
301 assertEquals(ReturnCode
.INCLUDE
, code
);
302 checker
.notifyFinal(code
, LOC1
, heapSizeOfRow
);
304 // fill the task slots for LOC1.
305 taskCounterPerRegion
.put(LOC1
.getRegionInfo().getRegionName(), new AtomicInteger(100));
306 taskCounterPerServer
.put(LOC1
.getServerName(), new AtomicInteger(100));
308 // the region was previously accepted, so it must be accpted now.
309 for (int i
= 0; i
!= maxConcurrentTasksPerRegion
* 5; ++i
) {
310 ReturnCode includeCode
= checker
.canTakeOperation(LOC1
, heapSizeOfRow
);
311 assertEquals(ReturnCode
.INCLUDE
, includeCode
);
312 checker
.notifyFinal(includeCode
, LOC1
, heapSizeOfRow
);
315 // fill the task slots for LOC3.
316 taskCounterPerRegion
.put(LOC3
.getRegionInfo().getRegionName(), new AtomicInteger(100));
317 taskCounterPerServer
.put(LOC3
.getServerName(), new AtomicInteger(100));
320 for (int i
= 0; i
!= maxConcurrentTasksPerRegion
* 5; ++i
) {
321 ReturnCode excludeCode
= checker
.canTakeOperation(LOC3
, heapSizeOfRow
);
322 assertNotEquals(ReturnCode
.INCLUDE
, excludeCode
);
323 checker
.notifyFinal(excludeCode
, LOC3
, heapSizeOfRow
);
326 // release the tasks for LOC3.
327 taskCounterPerRegion
.put(LOC3
.getRegionInfo().getRegionName(), new AtomicInteger(0));
328 taskCounterPerServer
.put(LOC3
.getServerName(), new AtomicInteger(0));
331 ReturnCode code3
= checker
.canTakeOperation(LOC3
, heapSizeOfRow
);
332 assertEquals(ReturnCode
.INCLUDE
, code3
);
333 checker
.notifyFinal(code3
, LOC3
, heapSizeOfRow
);
335 // the region was previously accepted, so it must be accpted now.
336 for (int i
= 0; i
!= maxConcurrentTasksPerRegion
* 5; ++i
) {
337 ReturnCode includeCode
= checker
.canTakeOperation(LOC3
, heapSizeOfRow
);
338 assertEquals(ReturnCode
.INCLUDE
, includeCode
);
339 checker
.notifyFinal(includeCode
, LOC3
, heapSizeOfRow
);
343 // the region was previously accepted,
344 // but checker have reseted and task slots for LOC1 is full.
345 // So it must be rejected now.
346 for (int i
= 0; i
!= maxConcurrentTasksPerRegion
* 5; ++i
) {
347 ReturnCode includeCode
= checker
.canTakeOperation(LOC1
, heapSizeOfRow
);
348 assertNotEquals(ReturnCode
.INCLUDE
, includeCode
);
349 checker
.notifyFinal(includeCode
, LOC1
, heapSizeOfRow
);
354 public void testWaitForMaximumCurrentTasks() throws Exception
{
355 final AtomicInteger max
= new AtomicInteger(0);
356 final CyclicBarrier barrier
= new CyclicBarrier(2);
357 SimpleRequestController controller
= new SimpleRequestController(HBaseConfiguration
.create());
358 final AtomicLong tasks
= controller
.tasksInProgress
;
359 Runnable runnable
= () -> {
362 controller
.waitForMaximumCurrentTasks(max
.get(), 123, 1, null);
363 } catch (InterruptedIOException e
) {
364 Assert
.fail(e
.getMessage());
365 } catch (InterruptedException
| BrokenBarrierException e
) {
369 // First test that our runnable thread only exits when tasks is zero.
370 Thread t
= new Thread(runnable
);
374 // Now assert we stay running if max == zero and tasks is > 0.
377 t
= new Thread(runnable
);
380 while (tasks
.get() > 0) {
381 assertTrue(t
.isAlive());
382 tasks
.set(tasks
.get() - 1);