3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
.client
;
21 import java
.io
.InterruptedIOException
;
22 import java
.util
.ArrayList
;
23 import java
.util
.Collection
;
24 import java
.util
.HashMap
;
25 import java
.util
.HashSet
;
26 import java
.util
.List
;
29 import java
.util
.TreeSet
;
30 import java
.util
.concurrent
.ConcurrentHashMap
;
31 import java
.util
.concurrent
.ConcurrentMap
;
32 import java
.util
.concurrent
.ConcurrentSkipListMap
;
33 import java
.util
.concurrent
.atomic
.AtomicInteger
;
34 import java
.util
.concurrent
.atomic
.AtomicLong
;
35 import java
.util
.function
.Consumer
;
37 import org
.apache
.hadoop
.conf
.Configuration
;
38 import org
.apache
.hadoop
.hbase
.HConstants
;
39 import org
.apache
.hadoop
.hbase
.HRegionLocation
;
40 import org
.apache
.hadoop
.hbase
.ServerName
;
41 import org
.apache
.yetus
.audience
.InterfaceAudience
;
42 import org
.apache
.yetus
.audience
.InterfaceStability
;
43 import org
.slf4j
.Logger
;
44 import org
.slf4j
.LoggerFactory
;
45 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
46 import static org
.apache
.hadoop
.hbase
.util
.ConcurrentMapUtils
.computeIfAbsent
;
47 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdge
;
48 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
51 * Holds back the requests if they reach any thresholds.
53 @InterfaceAudience.Private
54 @InterfaceStability.Evolving
55 class SimpleRequestController
implements RequestController
{
56 private static final Logger LOG
= LoggerFactory
.getLogger(SimpleRequestController
.class);
58 * The maximum heap size for each request.
60 public static final String HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE
= "hbase.client.max.perrequest.heapsize";
63 * Default value of {@link #HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE}.
65 static final long DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE
= 4194304;
68 * The maximum number of rows for each request.
70 public static final String HBASE_CLIENT_MAX_PERREQUEST_ROWS
= "hbase.client.max.perrequest.rows";
72 * Default value of {@link #HBASE_CLIENT_MAX_PERREQUEST_ROWS}.
74 static final long DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_ROWS
= 2048;
77 * The maximum size of submit.
79 public static final String HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE
= "hbase.client.max.submit.heapsize";
81 * Default value of {@link #HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE}.
83 static final long DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE
= DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE
;
84 final AtomicLong tasksInProgress
= new AtomicLong(0);
85 final ConcurrentMap
<byte[], AtomicInteger
> taskCounterPerRegion
86 = new ConcurrentSkipListMap
<>(Bytes
.BYTES_COMPARATOR
);
87 final ConcurrentMap
<ServerName
, AtomicInteger
> taskCounterPerServer
= new ConcurrentHashMap
<>();
89 * The number of tasks simultaneously executed on the cluster.
91 private final int maxTotalConcurrentTasks
;
94 * The maximum heap size for each request.
96 private final long maxHeapSizePerRequest
;
98 * The maximum number of rows for each request.
100 private final long maxRowsPerRequest
;
101 private final long maxHeapSizeSubmit
;
103 * The number of tasks we run in parallel on a single region. With 1 (the
104 * default) , we ensure that the ordering of the queries is respected: we
105 * don't start a set of operations on a region before the previous one is
106 * done. As well, this limits the pressure we put on the region server.
108 final int maxConcurrentTasksPerRegion
;
111 * The number of task simultaneously executed on a single region server.
113 final int maxConcurrentTasksPerServer
;
114 private final int thresholdToLogUndoneTaskDetails
;
115 public static final String THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS
=
116 "hbase.client.threshold.log.details";
117 private static final int DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS
= 10;
118 public static final String THRESHOLD_TO_LOG_REGION_DETAILS
=
119 "hbase.client.threshold.log.region.details";
120 private static final int DEFAULT_THRESHOLD_TO_LOG_REGION_DETAILS
= 2;
121 private final int thresholdToLogRegionDetails
;
122 SimpleRequestController(final Configuration conf
) {
123 this.maxTotalConcurrentTasks
= checkAndGet(conf
,
124 HConstants
.HBASE_CLIENT_MAX_TOTAL_TASKS
,
125 HConstants
.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS
);
126 this.maxConcurrentTasksPerServer
= checkAndGet(conf
,
127 HConstants
.HBASE_CLIENT_MAX_PERSERVER_TASKS
,
128 HConstants
.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS
);
129 this.maxConcurrentTasksPerRegion
= checkAndGet(conf
,
130 HConstants
.HBASE_CLIENT_MAX_PERREGION_TASKS
,
131 HConstants
.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS
);
132 this.maxHeapSizePerRequest
= checkAndGet(conf
,
133 HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE
,
134 DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE
);
135 this.maxRowsPerRequest
= checkAndGet(conf
,
136 HBASE_CLIENT_MAX_PERREQUEST_ROWS
,
137 DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_ROWS
);
138 this.maxHeapSizeSubmit
= checkAndGet(conf
,
139 HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE
,
140 DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE
);
141 this.thresholdToLogUndoneTaskDetails
= conf
.getInt(
142 THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS
,
143 DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS
);
144 this.thresholdToLogRegionDetails
= conf
.getInt(
145 THRESHOLD_TO_LOG_REGION_DETAILS
,
146 DEFAULT_THRESHOLD_TO_LOG_REGION_DETAILS
);
149 private static int checkAndGet(Configuration conf
, String key
, int defaultValue
) {
150 int value
= conf
.getInt(key
, defaultValue
);
152 throw new IllegalArgumentException(key
+ "=" + value
);
157 private static long checkAndGet(Configuration conf
, String key
, long defaultValue
) {
158 long value
= conf
.getLong(key
, defaultValue
);
160 throw new IllegalArgumentException(key
+ "=" + value
);
165 static Checker
newChecker(List
<RowChecker
> checkers
) {
166 return new Checker() {
167 private boolean isEnd
= false;
170 public ReturnCode
canTakeRow(HRegionLocation loc
, Row row
) {
172 return ReturnCode
.END
;
174 long heapSizeOfRow
= (row
instanceof Mutation
) ?
((Mutation
) row
).heapSize() : 0;
175 ReturnCode code
= ReturnCode
.INCLUDE
;
176 for (RowChecker checker
: checkers
) {
177 switch (checker
.canTakeOperation(loc
, heapSizeOfRow
)) {
180 code
= ReturnCode
.END
;
183 code
= ReturnCode
.SKIP
;
189 if (code
== ReturnCode
.END
) {
193 for (RowChecker checker
: checkers
) {
194 checker
.notifyFinal(code
, loc
, heapSizeOfRow
);
200 public void reset() throws InterruptedIOException
{
202 InterruptedIOException e
= null;
203 for (RowChecker checker
: checkers
) {
206 } catch (InterruptedIOException ex
) {
218 public Checker
newChecker() {
219 List
<RowChecker
> checkers
= new ArrayList
<>(4);
220 checkers
.add(new TaskCountChecker(maxTotalConcurrentTasks
,
221 maxConcurrentTasksPerServer
,
222 maxConcurrentTasksPerRegion
,
224 taskCounterPerServer
,
225 taskCounterPerRegion
));
226 checkers
.add(new RequestHeapSizeChecker(maxHeapSizePerRequest
));
227 checkers
.add(new SubmittedSizeChecker(maxHeapSizeSubmit
));
228 checkers
.add(new RequestRowsChecker(maxRowsPerRequest
));
229 return newChecker(checkers
);
233 public void incTaskCounters(Collection
<byte[]> regions
, ServerName sn
) {
234 tasksInProgress
.incrementAndGet();
236 computeIfAbsent(taskCounterPerServer
, sn
, AtomicInteger
::new).incrementAndGet();
238 regions
.forEach((regBytes
)
239 -> computeIfAbsent(taskCounterPerRegion
, regBytes
, AtomicInteger
::new).incrementAndGet()
244 public void decTaskCounters(Collection
<byte[]> regions
, ServerName sn
) {
245 regions
.forEach(regBytes
-> {
246 AtomicInteger regionCnt
= taskCounterPerRegion
.get(regBytes
);
247 regionCnt
.decrementAndGet();
250 taskCounterPerServer
.get(sn
).decrementAndGet();
251 tasksInProgress
.decrementAndGet();
252 synchronized (tasksInProgress
) {
253 tasksInProgress
.notifyAll();
258 public long getNumberOfTasksInProgress() {
259 return tasksInProgress
.get();
263 public void waitForMaximumCurrentTasks(long max
, long id
,
264 int periodToTrigger
, Consumer
<Long
> trigger
) throws InterruptedIOException
{
266 long lastLog
= EnvironmentEdgeManager
.currentTime();
267 long currentInProgress
, oldInProgress
= Long
.MAX_VALUE
;
268 while ((currentInProgress
= tasksInProgress
.get()) > max
) {
269 if (oldInProgress
!= currentInProgress
) { // Wait for in progress to change.
270 long now
= EnvironmentEdgeManager
.currentTime();
271 if (now
> lastLog
+ periodToTrigger
) {
273 if (trigger
!= null) {
274 trigger
.accept(currentInProgress
);
276 logDetailsOfUndoneTasks(currentInProgress
);
279 oldInProgress
= currentInProgress
;
281 synchronized (tasksInProgress
) {
282 if (tasksInProgress
.get() == oldInProgress
) {
283 tasksInProgress
.wait(10);
286 } catch (InterruptedException e
) {
287 throw new InterruptedIOException("#" + id
+ ", interrupted." +
288 " currentNumberOfTask=" + currentInProgress
);
293 private void logDetailsOfUndoneTasks(long taskInProgress
) {
294 if (taskInProgress
<= thresholdToLogUndoneTaskDetails
) {
295 ArrayList
<ServerName
> servers
= new ArrayList
<>();
296 for (Map
.Entry
<ServerName
, AtomicInteger
> entry
: taskCounterPerServer
.entrySet()) {
297 if (entry
.getValue().get() > 0) {
298 servers
.add(entry
.getKey());
301 LOG
.info("Left over " + taskInProgress
+ " task(s) are processed on server(s): " + servers
);
304 if (taskInProgress
<= thresholdToLogRegionDetails
) {
305 ArrayList
<String
> regions
= new ArrayList
<>();
306 for (Map
.Entry
<byte[], AtomicInteger
> entry
: taskCounterPerRegion
.entrySet()) {
307 if (entry
.getValue().get() > 0) {
308 regions
.add(Bytes
.toString(entry
.getKey()));
311 LOG
.info("Regions against which left over task(s) are processed: " + regions
);
316 public void waitForFreeSlot(long id
, int periodToTrigger
, Consumer
<Long
> trigger
) throws InterruptedIOException
{
317 waitForMaximumCurrentTasks(maxTotalConcurrentTasks
- 1, id
, periodToTrigger
, trigger
);
321 * limit the heapsize of total submitted data. Reduce the limit of heapsize
322 * for submitting quickly if there is no running task.
324 static class SubmittedSizeChecker
implements RowChecker
{
326 private final long maxHeapSizeSubmit
;
327 private long heapSize
= 0;
329 SubmittedSizeChecker(final long maxHeapSizeSubmit
) {
330 this.maxHeapSizeSubmit
= maxHeapSizeSubmit
;
334 public ReturnCode
canTakeOperation(HRegionLocation loc
, long heapSizeOfRow
) {
335 if (heapSize
>= maxHeapSizeSubmit
) {
336 return ReturnCode
.END
;
338 return ReturnCode
.INCLUDE
;
342 public void notifyFinal(ReturnCode code
, HRegionLocation loc
, long heapSizeOfRow
) {
343 if (code
== ReturnCode
.INCLUDE
) {
344 heapSize
+= heapSizeOfRow
;
349 public void reset() {
355 * limit the max number of tasks in an AsyncProcess.
357 static class TaskCountChecker
implements RowChecker
{
359 private static final long MAX_WAITING_TIME
= 1000; //ms
360 private final Set
<RegionInfo
> regionsIncluded
= new HashSet
<>();
361 private final Set
<ServerName
> serversIncluded
= new HashSet
<>();
362 private final int maxConcurrentTasksPerRegion
;
363 private final int maxTotalConcurrentTasks
;
364 private final int maxConcurrentTasksPerServer
;
365 private final Map
<byte[], AtomicInteger
> taskCounterPerRegion
;
366 private final Map
<ServerName
, AtomicInteger
> taskCounterPerServer
;
367 private final Set
<byte[]> busyRegions
= new TreeSet
<>(Bytes
.BYTES_COMPARATOR
);
368 private final AtomicLong tasksInProgress
;
370 TaskCountChecker(final int maxTotalConcurrentTasks
,
371 final int maxConcurrentTasksPerServer
,
372 final int maxConcurrentTasksPerRegion
,
373 final AtomicLong tasksInProgress
,
374 final Map
<ServerName
, AtomicInteger
> taskCounterPerServer
,
375 final Map
<byte[], AtomicInteger
> taskCounterPerRegion
) {
376 this.maxTotalConcurrentTasks
= maxTotalConcurrentTasks
;
377 this.maxConcurrentTasksPerRegion
= maxConcurrentTasksPerRegion
;
378 this.maxConcurrentTasksPerServer
= maxConcurrentTasksPerServer
;
379 this.taskCounterPerRegion
= taskCounterPerRegion
;
380 this.taskCounterPerServer
= taskCounterPerServer
;
381 this.tasksInProgress
= tasksInProgress
;
385 public void reset() throws InterruptedIOException
{
386 // prevent the busy-waiting
388 regionsIncluded
.clear();
389 serversIncluded
.clear();
393 private void waitForRegion() throws InterruptedIOException
{
394 if (busyRegions
.isEmpty()) {
397 EnvironmentEdge ee
= EnvironmentEdgeManager
.getDelegate();
398 final long start
= ee
.currentTime();
399 while ((ee
.currentTime() - start
) <= MAX_WAITING_TIME
) {
400 for (byte[] region
: busyRegions
) {
401 AtomicInteger count
= taskCounterPerRegion
.get(region
);
402 if (count
== null || count
.get() < maxConcurrentTasksPerRegion
) {
407 synchronized (tasksInProgress
) {
408 tasksInProgress
.wait(10);
410 } catch (InterruptedException e
) {
411 throw new InterruptedIOException("Interrupted."
412 + " tasksInProgress=" + tasksInProgress
);
418 * 1) check the regions is allowed. 2) check the concurrent tasks for
419 * regions. 3) check the total concurrent tasks. 4) check the concurrent
422 * @param loc the destination of data
423 * @param heapSizeOfRow the data size
424 * @return either Include {@link RequestController.ReturnCode} or skip
425 * {@link RequestController.ReturnCode}
428 public ReturnCode
canTakeOperation(HRegionLocation loc
, long heapSizeOfRow
) {
429 RegionInfo regionInfo
= loc
.getRegion();
430 if (regionsIncluded
.contains(regionInfo
)) {
431 // We already know what to do with this region.
432 return ReturnCode
.INCLUDE
;
434 AtomicInteger regionCnt
= taskCounterPerRegion
.get(loc
.getRegion().getRegionName());
435 if (regionCnt
!= null && regionCnt
.get() >= maxConcurrentTasksPerRegion
) {
436 // Too many tasks on this region already.
437 return ReturnCode
.SKIP
;
439 int newServers
= serversIncluded
.size()
440 + (serversIncluded
.contains(loc
.getServerName()) ?
0 : 1);
441 if ((newServers
+ tasksInProgress
.get()) > maxTotalConcurrentTasks
) {
443 return ReturnCode
.SKIP
;
445 AtomicInteger serverCnt
= taskCounterPerServer
.get(loc
.getServerName());
446 if (serverCnt
!= null && serverCnt
.get() >= maxConcurrentTasksPerServer
) {
447 // Too many tasks for this individual server
448 return ReturnCode
.SKIP
;
450 return ReturnCode
.INCLUDE
;
454 public void notifyFinal(ReturnCode code
, HRegionLocation loc
, long heapSizeOfRow
) {
455 if (code
== ReturnCode
.INCLUDE
) {
456 regionsIncluded
.add(loc
.getRegion());
457 serversIncluded
.add(loc
.getServerName());
459 busyRegions
.add(loc
.getRegion().getRegionName());
464 * limit the number of rows for each request.
466 static class RequestRowsChecker
implements RowChecker
{
468 private final long maxRowsPerRequest
;
469 private final Map
<ServerName
, Long
> serverRows
= new HashMap
<>();
471 RequestRowsChecker(final long maxRowsPerRequest
) {
472 this.maxRowsPerRequest
= maxRowsPerRequest
;
476 public void reset() {
481 public ReturnCode
canTakeOperation(HRegionLocation loc
, long heapSizeOfRow
) {
482 long currentRows
= serverRows
.containsKey(loc
.getServerName())
483 ? serverRows
.get(loc
.getServerName()) : 0L;
484 // accept at least one row
485 if (currentRows
== 0 || currentRows
< maxRowsPerRequest
) {
486 return ReturnCode
.INCLUDE
;
488 return ReturnCode
.SKIP
;
492 public void notifyFinal(ReturnCode code
, HRegionLocation loc
, long heapSizeOfRow
) {
493 if (code
== ReturnCode
.INCLUDE
) {
494 long currentRows
= serverRows
.containsKey(loc
.getServerName())
495 ? serverRows
.get(loc
.getServerName()) : 0L;
496 serverRows
.put(loc
.getServerName(), currentRows
+ 1);
502 * limit the heap size for each request.
504 static class RequestHeapSizeChecker
implements RowChecker
{
506 private final long maxHeapSizePerRequest
;
507 private final Map
<ServerName
, Long
> serverRequestSizes
= new HashMap
<>();
509 RequestHeapSizeChecker(final long maxHeapSizePerRequest
) {
510 this.maxHeapSizePerRequest
= maxHeapSizePerRequest
;
514 public void reset() {
515 serverRequestSizes
.clear();
519 public ReturnCode
canTakeOperation(HRegionLocation loc
, long heapSizeOfRow
) {
520 // Is it ok for limit of request size?
521 long currentRequestSize
= serverRequestSizes
.containsKey(loc
.getServerName())
522 ? serverRequestSizes
.get(loc
.getServerName()) : 0L;
523 // accept at least one request
524 if (currentRequestSize
== 0 || currentRequestSize
+ heapSizeOfRow
<= maxHeapSizePerRequest
) {
525 return ReturnCode
.INCLUDE
;
527 return ReturnCode
.SKIP
;
531 public void notifyFinal(ReturnCode code
, HRegionLocation loc
, long heapSizeOfRow
) {
532 if (code
== ReturnCode
.INCLUDE
) {
533 long currentRequestSize
= serverRequestSizes
.containsKey(loc
.getServerName())
534 ? serverRequestSizes
.get(loc
.getServerName()) : 0L;
535 serverRequestSizes
.put(loc
.getServerName(), currentRequestSize
+ heapSizeOfRow
);
541 * Provide a way to control the flow of rows iteration.
543 interface RowChecker
{
545 ReturnCode
canTakeOperation(HRegionLocation loc
, long heapSizeOfRow
);
548 * Add the final ReturnCode to the checker. The ReturnCode may be reversed,
549 * so the checker need the final decision to update the inner state.
551 * @param code The final decision
552 * @param loc the destination of data
553 * @param heapSizeOfRow the data size
555 void notifyFinal(ReturnCode code
, HRegionLocation loc
, long heapSizeOfRow
);
558 * Reset the inner state.
560 void reset() throws InterruptedIOException
;