HBASE-26688 Threads shared EMPTY_RESULT may lead to unexpected client job down. ...
[hbase.git] / hbase-client / src / main / java / org / apache / hadoop / hbase / client / SimpleRequestController.java
blob1a184da86e45f4efa2273c052a340462473b886b
1 /*
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org.apache.hadoop.hbase.client;
21 import java.io.InterruptedIOException;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.HashMap;
25 import java.util.HashSet;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Set;
29 import java.util.TreeSet;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.ConcurrentMap;
32 import java.util.concurrent.ConcurrentSkipListMap;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.concurrent.atomic.AtomicLong;
35 import java.util.function.Consumer;
37 import org.apache.hadoop.conf.Configuration;
38 import org.apache.hadoop.hbase.HConstants;
39 import org.apache.hadoop.hbase.HRegionLocation;
40 import org.apache.hadoop.hbase.ServerName;
41 import org.apache.yetus.audience.InterfaceAudience;
42 import org.apache.yetus.audience.InterfaceStability;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45 import org.apache.hadoop.hbase.util.Bytes;
46 import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
47 import org.apache.hadoop.hbase.util.EnvironmentEdge;
48 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
50 /**
51 * Holds back the requests if they reach any thresholds.
53 @InterfaceAudience.Private
54 @InterfaceStability.Evolving
55 class SimpleRequestController implements RequestController {
56 private static final Logger LOG = LoggerFactory.getLogger(SimpleRequestController.class);
57 /**
58 * The maximum heap size for each request.
60 public static final String HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE = "hbase.client.max.perrequest.heapsize";
62 /**
63 * Default value of {@link #HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE}.
65 static final long DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE = 4194304;
67 /**
68 * The maximum number of rows for each request.
70 public static final String HBASE_CLIENT_MAX_PERREQUEST_ROWS = "hbase.client.max.perrequest.rows";
71 /**
72 * Default value of {@link #HBASE_CLIENT_MAX_PERREQUEST_ROWS}.
74 static final long DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_ROWS = 2048;
76 /**
77 * The maximum size of submit.
79 public static final String HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE = "hbase.client.max.submit.heapsize";
80 /**
81 * Default value of {@link #HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE}.
83 static final long DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE = DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE;
84 final AtomicLong tasksInProgress = new AtomicLong(0);
85 final ConcurrentMap<byte[], AtomicInteger> taskCounterPerRegion
86 = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
87 final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer = new ConcurrentHashMap<>();
88 /**
89 * The number of tasks simultaneously executed on the cluster.
91 private final int maxTotalConcurrentTasks;
93 /**
94 * The maximum heap size for each request.
96 private final long maxHeapSizePerRequest;
97 /**
98 * The maximum number of rows for each request.
100 private final long maxRowsPerRequest;
101 private final long maxHeapSizeSubmit;
103 * The number of tasks we run in parallel on a single region. With 1 (the
104 * default) , we ensure that the ordering of the queries is respected: we
105 * don't start a set of operations on a region before the previous one is
106 * done. As well, this limits the pressure we put on the region server.
108 final int maxConcurrentTasksPerRegion;
111 * The number of task simultaneously executed on a single region server.
113 final int maxConcurrentTasksPerServer;
114 private final int thresholdToLogUndoneTaskDetails;
115 public static final String THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS =
116 "hbase.client.threshold.log.details";
117 private static final int DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = 10;
118 public static final String THRESHOLD_TO_LOG_REGION_DETAILS =
119 "hbase.client.threshold.log.region.details";
120 private static final int DEFAULT_THRESHOLD_TO_LOG_REGION_DETAILS = 2;
121 private final int thresholdToLogRegionDetails;
122 SimpleRequestController(final Configuration conf) {
123 this.maxTotalConcurrentTasks = checkAndGet(conf,
124 HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
125 HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
126 this.maxConcurrentTasksPerServer = checkAndGet(conf,
127 HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS,
128 HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS);
129 this.maxConcurrentTasksPerRegion = checkAndGet(conf,
130 HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
131 HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS);
132 this.maxHeapSizePerRequest = checkAndGet(conf,
133 HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
134 DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE);
135 this.maxRowsPerRequest = checkAndGet(conf,
136 HBASE_CLIENT_MAX_PERREQUEST_ROWS,
137 DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_ROWS);
138 this.maxHeapSizeSubmit = checkAndGet(conf,
139 HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE,
140 DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE);
141 this.thresholdToLogUndoneTaskDetails = conf.getInt(
142 THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS,
143 DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS);
144 this.thresholdToLogRegionDetails = conf.getInt(
145 THRESHOLD_TO_LOG_REGION_DETAILS,
146 DEFAULT_THRESHOLD_TO_LOG_REGION_DETAILS);
149 private static int checkAndGet(Configuration conf, String key, int defaultValue) {
150 int value = conf.getInt(key, defaultValue);
151 if (value <= 0) {
152 throw new IllegalArgumentException(key + "=" + value);
154 return value;
157 private static long checkAndGet(Configuration conf, String key, long defaultValue) {
158 long value = conf.getLong(key, defaultValue);
159 if (value <= 0) {
160 throw new IllegalArgumentException(key + "=" + value);
162 return value;
165 static Checker newChecker(List<RowChecker> checkers) {
166 return new Checker() {
167 private boolean isEnd = false;
169 @Override
170 public ReturnCode canTakeRow(HRegionLocation loc, Row row) {
171 if (isEnd) {
172 return ReturnCode.END;
174 long heapSizeOfRow = (row instanceof Mutation) ? ((Mutation) row).heapSize() : 0;
175 ReturnCode code = ReturnCode.INCLUDE;
176 for (RowChecker checker : checkers) {
177 switch (checker.canTakeOperation(loc, heapSizeOfRow)) {
178 case END:
179 isEnd = true;
180 code = ReturnCode.END;
181 break;
182 case SKIP:
183 code = ReturnCode.SKIP;
184 break;
185 case INCLUDE:
186 default:
187 break;
189 if (code == ReturnCode.END) {
190 break;
193 for (RowChecker checker : checkers) {
194 checker.notifyFinal(code, loc, heapSizeOfRow);
196 return code;
199 @Override
200 public void reset() throws InterruptedIOException {
201 isEnd = false;
202 InterruptedIOException e = null;
203 for (RowChecker checker : checkers) {
204 try {
205 checker.reset();
206 } catch (InterruptedIOException ex) {
207 e = ex;
210 if (e != null) {
211 throw e;
217 @Override
218 public Checker newChecker() {
219 List<RowChecker> checkers = new ArrayList<>(4);
220 checkers.add(new TaskCountChecker(maxTotalConcurrentTasks,
221 maxConcurrentTasksPerServer,
222 maxConcurrentTasksPerRegion,
223 tasksInProgress,
224 taskCounterPerServer,
225 taskCounterPerRegion));
226 checkers.add(new RequestHeapSizeChecker(maxHeapSizePerRequest));
227 checkers.add(new SubmittedSizeChecker(maxHeapSizeSubmit));
228 checkers.add(new RequestRowsChecker(maxRowsPerRequest));
229 return newChecker(checkers);
232 @Override
233 public void incTaskCounters(Collection<byte[]> regions, ServerName sn) {
234 tasksInProgress.incrementAndGet();
236 computeIfAbsent(taskCounterPerServer, sn, AtomicInteger::new).incrementAndGet();
238 regions.forEach((regBytes)
239 -> computeIfAbsent(taskCounterPerRegion, regBytes, AtomicInteger::new).incrementAndGet()
243 @Override
244 public void decTaskCounters(Collection<byte[]> regions, ServerName sn) {
245 regions.forEach(regBytes -> {
246 AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
247 regionCnt.decrementAndGet();
250 taskCounterPerServer.get(sn).decrementAndGet();
251 tasksInProgress.decrementAndGet();
252 synchronized (tasksInProgress) {
253 tasksInProgress.notifyAll();
257 @Override
258 public long getNumberOfTasksInProgress() {
259 return tasksInProgress.get();
262 @Override
263 public void waitForMaximumCurrentTasks(long max, long id,
264 int periodToTrigger, Consumer<Long> trigger) throws InterruptedIOException {
265 assert max >= 0;
266 long lastLog = EnvironmentEdgeManager.currentTime();
267 long currentInProgress, oldInProgress = Long.MAX_VALUE;
268 while ((currentInProgress = tasksInProgress.get()) > max) {
269 if (oldInProgress != currentInProgress) { // Wait for in progress to change.
270 long now = EnvironmentEdgeManager.currentTime();
271 if (now > lastLog + periodToTrigger) {
272 lastLog = now;
273 if (trigger != null) {
274 trigger.accept(currentInProgress);
276 logDetailsOfUndoneTasks(currentInProgress);
279 oldInProgress = currentInProgress;
280 try {
281 synchronized (tasksInProgress) {
282 if (tasksInProgress.get() == oldInProgress) {
283 tasksInProgress.wait(10);
286 } catch (InterruptedException e) {
287 throw new InterruptedIOException("#" + id + ", interrupted." +
288 " currentNumberOfTask=" + currentInProgress);
293 private void logDetailsOfUndoneTasks(long taskInProgress) {
294 if (taskInProgress <= thresholdToLogUndoneTaskDetails) {
295 ArrayList<ServerName> servers = new ArrayList<>();
296 for (Map.Entry<ServerName, AtomicInteger> entry : taskCounterPerServer.entrySet()) {
297 if (entry.getValue().get() > 0) {
298 servers.add(entry.getKey());
301 LOG.info("Left over " + taskInProgress + " task(s) are processed on server(s): " + servers);
304 if (taskInProgress <= thresholdToLogRegionDetails) {
305 ArrayList<String> regions = new ArrayList<>();
306 for (Map.Entry<byte[], AtomicInteger> entry : taskCounterPerRegion.entrySet()) {
307 if (entry.getValue().get() > 0) {
308 regions.add(Bytes.toString(entry.getKey()));
311 LOG.info("Regions against which left over task(s) are processed: " + regions);
315 @Override
316 public void waitForFreeSlot(long id, int periodToTrigger, Consumer<Long> trigger) throws InterruptedIOException {
317 waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1, id, periodToTrigger, trigger);
321 * limit the heapsize of total submitted data. Reduce the limit of heapsize
322 * for submitting quickly if there is no running task.
324 static class SubmittedSizeChecker implements RowChecker {
326 private final long maxHeapSizeSubmit;
327 private long heapSize = 0;
329 SubmittedSizeChecker(final long maxHeapSizeSubmit) {
330 this.maxHeapSizeSubmit = maxHeapSizeSubmit;
333 @Override
334 public ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow) {
335 if (heapSize >= maxHeapSizeSubmit) {
336 return ReturnCode.END;
338 return ReturnCode.INCLUDE;
341 @Override
342 public void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow) {
343 if (code == ReturnCode.INCLUDE) {
344 heapSize += heapSizeOfRow;
348 @Override
349 public void reset() {
350 heapSize = 0;
355 * limit the max number of tasks in an AsyncProcess.
357 static class TaskCountChecker implements RowChecker {
359 private static final long MAX_WAITING_TIME = 1000; //ms
360 private final Set<RegionInfo> regionsIncluded = new HashSet<>();
361 private final Set<ServerName> serversIncluded = new HashSet<>();
362 private final int maxConcurrentTasksPerRegion;
363 private final int maxTotalConcurrentTasks;
364 private final int maxConcurrentTasksPerServer;
365 private final Map<byte[], AtomicInteger> taskCounterPerRegion;
366 private final Map<ServerName, AtomicInteger> taskCounterPerServer;
367 private final Set<byte[]> busyRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
368 private final AtomicLong tasksInProgress;
370 TaskCountChecker(final int maxTotalConcurrentTasks,
371 final int maxConcurrentTasksPerServer,
372 final int maxConcurrentTasksPerRegion,
373 final AtomicLong tasksInProgress,
374 final Map<ServerName, AtomicInteger> taskCounterPerServer,
375 final Map<byte[], AtomicInteger> taskCounterPerRegion) {
376 this.maxTotalConcurrentTasks = maxTotalConcurrentTasks;
377 this.maxConcurrentTasksPerRegion = maxConcurrentTasksPerRegion;
378 this.maxConcurrentTasksPerServer = maxConcurrentTasksPerServer;
379 this.taskCounterPerRegion = taskCounterPerRegion;
380 this.taskCounterPerServer = taskCounterPerServer;
381 this.tasksInProgress = tasksInProgress;
384 @Override
385 public void reset() throws InterruptedIOException {
386 // prevent the busy-waiting
387 waitForRegion();
388 regionsIncluded.clear();
389 serversIncluded.clear();
390 busyRegions.clear();
393 private void waitForRegion() throws InterruptedIOException {
394 if (busyRegions.isEmpty()) {
395 return;
397 EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
398 final long start = ee.currentTime();
399 while ((ee.currentTime() - start) <= MAX_WAITING_TIME) {
400 for (byte[] region : busyRegions) {
401 AtomicInteger count = taskCounterPerRegion.get(region);
402 if (count == null || count.get() < maxConcurrentTasksPerRegion) {
403 return;
406 try {
407 synchronized (tasksInProgress) {
408 tasksInProgress.wait(10);
410 } catch (InterruptedException e) {
411 throw new InterruptedIOException("Interrupted."
412 + " tasksInProgress=" + tasksInProgress);
418 * 1) check the regions is allowed. 2) check the concurrent tasks for
419 * regions. 3) check the total concurrent tasks. 4) check the concurrent
420 * tasks for server.
422 * @param loc the destination of data
423 * @param heapSizeOfRow the data size
424 * @return either Include {@link RequestController.ReturnCode} or skip
425 * {@link RequestController.ReturnCode}
427 @Override
428 public ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow) {
429 RegionInfo regionInfo = loc.getRegion();
430 if (regionsIncluded.contains(regionInfo)) {
431 // We already know what to do with this region.
432 return ReturnCode.INCLUDE;
434 AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegion().getRegionName());
435 if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) {
436 // Too many tasks on this region already.
437 return ReturnCode.SKIP;
439 int newServers = serversIncluded.size()
440 + (serversIncluded.contains(loc.getServerName()) ? 0 : 1);
441 if ((newServers + tasksInProgress.get()) > maxTotalConcurrentTasks) {
442 // Too many tasks.
443 return ReturnCode.SKIP;
445 AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName());
446 if (serverCnt != null && serverCnt.get() >= maxConcurrentTasksPerServer) {
447 // Too many tasks for this individual server
448 return ReturnCode.SKIP;
450 return ReturnCode.INCLUDE;
453 @Override
454 public void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow) {
455 if (code == ReturnCode.INCLUDE) {
456 regionsIncluded.add(loc.getRegion());
457 serversIncluded.add(loc.getServerName());
459 busyRegions.add(loc.getRegion().getRegionName());
464 * limit the number of rows for each request.
466 static class RequestRowsChecker implements RowChecker {
468 private final long maxRowsPerRequest;
469 private final Map<ServerName, Long> serverRows = new HashMap<>();
471 RequestRowsChecker(final long maxRowsPerRequest) {
472 this.maxRowsPerRequest = maxRowsPerRequest;
475 @Override
476 public void reset() {
477 serverRows.clear();
480 @Override
481 public ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow) {
482 long currentRows = serverRows.containsKey(loc.getServerName())
483 ? serverRows.get(loc.getServerName()) : 0L;
484 // accept at least one row
485 if (currentRows == 0 || currentRows < maxRowsPerRequest) {
486 return ReturnCode.INCLUDE;
488 return ReturnCode.SKIP;
491 @Override
492 public void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow) {
493 if (code == ReturnCode.INCLUDE) {
494 long currentRows = serverRows.containsKey(loc.getServerName())
495 ? serverRows.get(loc.getServerName()) : 0L;
496 serverRows.put(loc.getServerName(), currentRows + 1);
502 * limit the heap size for each request.
504 static class RequestHeapSizeChecker implements RowChecker {
506 private final long maxHeapSizePerRequest;
507 private final Map<ServerName, Long> serverRequestSizes = new HashMap<>();
509 RequestHeapSizeChecker(final long maxHeapSizePerRequest) {
510 this.maxHeapSizePerRequest = maxHeapSizePerRequest;
513 @Override
514 public void reset() {
515 serverRequestSizes.clear();
518 @Override
519 public ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow) {
520 // Is it ok for limit of request size?
521 long currentRequestSize = serverRequestSizes.containsKey(loc.getServerName())
522 ? serverRequestSizes.get(loc.getServerName()) : 0L;
523 // accept at least one request
524 if (currentRequestSize == 0 || currentRequestSize + heapSizeOfRow <= maxHeapSizePerRequest) {
525 return ReturnCode.INCLUDE;
527 return ReturnCode.SKIP;
530 @Override
531 public void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow) {
532 if (code == ReturnCode.INCLUDE) {
533 long currentRequestSize = serverRequestSizes.containsKey(loc.getServerName())
534 ? serverRequestSizes.get(loc.getServerName()) : 0L;
535 serverRequestSizes.put(loc.getServerName(), currentRequestSize + heapSizeOfRow);
541 * Provide a way to control the flow of rows iteration.
543 interface RowChecker {
545 ReturnCode canTakeOperation(HRegionLocation loc, long heapSizeOfRow);
548 * Add the final ReturnCode to the checker. The ReturnCode may be reversed,
549 * so the checker need the final decision to update the inner state.
551 * @param code The final decision
552 * @param loc the destination of data
553 * @param heapSizeOfRow the data size
555 void notifyFinal(ReturnCode code, HRegionLocation loc, long heapSizeOfRow);
558 * Reset the inner state.
560 void reset() throws InterruptedIOException;