HBASE-19498 Fix findbugs and error-prone warnings in hbase-client (branch-2)
[hbase.git] / hbase-client / src / main / java / org / apache / hadoop / hbase / client / HTableMultiplexer.java
blob77d4fb2923f40f39d359620f387310d873ae7b91
1 /**
2 * Copyright The Apache Software Foundation
4 * Licensed to the Apache Software Foundation (ASF) under one
5 * or more contributor license agreements. See the NOTICE file
6 * distributed with this work for additional information
7 * regarding copyright ownership. The ASF licenses this file
8 * to you under the Apache License, Version 2.0 (the
9 * "License"); you may not use this file except in compliance
10 * with the License. You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
20 package org.apache.hadoop.hbase.client;
22 import java.io.IOException;
23 import java.util.AbstractMap.SimpleEntry;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.Executors;
32 import java.util.concurrent.LinkedBlockingQueue;
33 import java.util.concurrent.ScheduledExecutorService;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.atomic.AtomicInteger;
36 import java.util.concurrent.atomic.AtomicLong;
38 import org.apache.commons.logging.Log;
39 import org.apache.commons.logging.LogFactory;
40 import org.apache.hadoop.conf.Configuration;
41 import org.apache.hadoop.hbase.HBaseConfiguration;
42 import org.apache.hadoop.hbase.HConstants;
43 import org.apache.hadoop.hbase.HRegionLocation;
44 import org.apache.hadoop.hbase.ServerName;
45 import org.apache.hadoop.hbase.TableName;
46 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
47 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
48 import org.apache.yetus.audience.InterfaceAudience;
50 import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
51 import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
53 /**
54 * HTableMultiplexer provides a thread-safe non blocking PUT API across all the tables.
55 * Each put will be sharded into different buffer queues based on its destination region server.
56 * So each region server buffer queue will only have the puts which share the same destination.
57 * And each queue will have a flush worker thread to flush the puts request to the region server.
58 * If any queue is full, the HTableMultiplexer starts to drop the Put requests for that
59 * particular queue.
61 * Also all the puts will be retried as a configuration number before dropping.
62 * And the HTableMultiplexer can report the number of buffered requests and the number of the
63 * failed (dropped) requests in total or on per region server basis.
65 * This class is thread safe.
67 @InterfaceAudience.Public
68 public class HTableMultiplexer {
69 private static final Log LOG = LogFactory.getLog(HTableMultiplexer.class.getName());
71 public static final String TABLE_MULTIPLEXER_FLUSH_PERIOD_MS =
72 "hbase.tablemultiplexer.flush.period.ms";
73 public static final String TABLE_MULTIPLEXER_INIT_THREADS = "hbase.tablemultiplexer.init.threads";
74 public static final String TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE =
75 "hbase.client.max.retries.in.queue";
77 /** The map between each region server to its flush worker */
78 private final Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap =
79 new ConcurrentHashMap<>();
81 private final Configuration workerConf;
82 private final ClusterConnection conn;
83 private final ExecutorService pool;
84 private final int maxAttempts;
85 private final int perRegionServerBufferQueueSize;
86 private final int maxKeyValueSize;
87 private final ScheduledExecutorService executor;
88 private final long flushPeriod;
90 /**
91 * @param conf The HBaseConfiguration
92 * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for
93 * each region server before dropping the request.
95 public HTableMultiplexer(Configuration conf, int perRegionServerBufferQueueSize)
96 throws IOException {
97 this(ConnectionFactory.createConnection(conf), conf, perRegionServerBufferQueueSize);
101 * @param conn The HBase connection.
102 * @param conf The HBase configuration
103 * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for
104 * each region server before dropping the request.
106 public HTableMultiplexer(Connection conn, Configuration conf,
107 int perRegionServerBufferQueueSize) {
108 this.conn = (ClusterConnection) conn;
109 this.pool = HTable.getDefaultExecutor(conf);
110 // how many times we could try in total, one more than retry number
111 this.maxAttempts = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
112 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1;
113 this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize;
114 this.maxKeyValueSize = HTable.getMaxKeyValueSize(conf);
115 this.flushPeriod = conf.getLong(TABLE_MULTIPLEXER_FLUSH_PERIOD_MS, 100);
116 int initThreads = conf.getInt(TABLE_MULTIPLEXER_INIT_THREADS, 10);
117 this.executor =
118 Executors.newScheduledThreadPool(initThreads,
119 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HTableFlushWorker-%d").build());
121 this.workerConf = HBaseConfiguration.create(conf);
122 // We do not do the retry because we need to reassign puts to different queues if regions are
123 // moved.
124 this.workerConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
128 * Closes the internal {@link Connection}. Does nothing if the {@link Connection} has already
129 * been closed.
130 * @throws IOException If there is an error closing the connection.
132 @SuppressWarnings("deprecation")
133 public synchronized void close() throws IOException {
134 if (!getConnection().isClosed()) {
135 getConnection().close();
140 * The put request will be buffered by its corresponding buffer queue. Return false if the queue
141 * is already full.
142 * @param tableName
143 * @param put
144 * @return true if the request can be accepted by its corresponding buffer queue.
146 public boolean put(TableName tableName, final Put put) {
147 return put(tableName, put, this.maxAttempts);
151 * The puts request will be buffered by their corresponding buffer queue.
152 * Return the list of puts which could not be queued.
153 * @param tableName
154 * @param puts
155 * @return the list of puts which could not be queued
157 public List<Put> put(TableName tableName, final List<Put> puts) {
158 if (puts == null)
159 return null;
161 List <Put> failedPuts = null;
162 boolean result;
163 for (Put put : puts) {
164 result = put(tableName, put, this.maxAttempts);
165 if (result == false) {
167 // Create the failed puts list if necessary
168 if (failedPuts == null) {
169 failedPuts = new ArrayList<>();
171 // Add the put to the failed puts list
172 failedPuts.add(put);
175 return failedPuts;
179 * @deprecated Use {@link #put(TableName, List) } instead.
181 @Deprecated
182 public List<Put> put(byte[] tableName, final List<Put> puts) {
183 return put(TableName.valueOf(tableName), puts);
187 * The put request will be buffered by its corresponding buffer queue. And the put request will be
188 * retried before dropping the request.
189 * Return false if the queue is already full.
190 * @return true if the request can be accepted by its corresponding buffer queue.
192 public boolean put(final TableName tableName, final Put put, int maxAttempts) {
193 if (maxAttempts <= 0) {
194 return false;
197 try {
198 HTable.validatePut(put, maxKeyValueSize);
199 // Allow mocking to get at the connection, but don't expose the connection to users.
200 ClusterConnection conn = (ClusterConnection) getConnection();
201 // AsyncProcess in the FlushWorker should take care of refreshing the location cache
202 // as necessary. We shouldn't have to do that here.
203 HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), false);
204 if (loc != null) {
205 // Add the put pair into its corresponding queue.
206 LinkedBlockingQueue<PutStatus> queue = getQueue(loc);
208 // Generate a MultiPutStatus object and offer it into the queue
209 PutStatus s = new PutStatus(loc.getRegion(), put, maxAttempts);
211 return queue.offer(s);
213 } catch (IOException e) {
214 LOG.debug("Cannot process the put " + put, e);
216 return false;
220 * @deprecated Use {@link #put(TableName, Put) } instead.
222 @Deprecated
223 public boolean put(final byte[] tableName, final Put put, int retry) {
224 return put(TableName.valueOf(tableName), put, retry);
228 * @deprecated Use {@link #put(TableName, Put)} instead.
230 @Deprecated
231 public boolean put(final byte[] tableName, Put put) {
232 return put(TableName.valueOf(tableName), put);
236 * @return the current HTableMultiplexerStatus
238 public HTableMultiplexerStatus getHTableMultiplexerStatus() {
239 return new HTableMultiplexerStatus(serverToFlushWorkerMap);
242 @VisibleForTesting
243 LinkedBlockingQueue<PutStatus> getQueue(HRegionLocation addr) {
244 FlushWorker worker = serverToFlushWorkerMap.get(addr);
245 if (worker == null) {
246 synchronized (this.serverToFlushWorkerMap) {
247 worker = serverToFlushWorkerMap.get(addr);
248 if (worker == null) {
249 // Create the flush worker
250 worker = new FlushWorker(workerConf, this.conn, addr, this,
251 perRegionServerBufferQueueSize, pool, executor);
252 this.serverToFlushWorkerMap.put(addr, worker);
253 executor.scheduleAtFixedRate(worker, flushPeriod, flushPeriod, TimeUnit.MILLISECONDS);
257 return worker.getQueue();
260 @VisibleForTesting
261 ClusterConnection getConnection() {
262 return this.conn;
266 * HTableMultiplexerStatus keeps track of the current status of the HTableMultiplexer.
267 * report the number of buffered requests and the number of the failed (dropped) requests
268 * in total or on per region server basis.
270 @InterfaceAudience.Public
271 public static class HTableMultiplexerStatus {
272 private long totalFailedPutCounter;
273 private long totalBufferedPutCounter;
274 private long maxLatency;
275 private long overallAverageLatency;
276 private Map<String, Long> serverToFailedCounterMap;
277 private Map<String, Long> serverToBufferedCounterMap;
278 private Map<String, Long> serverToAverageLatencyMap;
279 private Map<String, Long> serverToMaxLatencyMap;
281 public HTableMultiplexerStatus(
282 Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) {
283 this.totalBufferedPutCounter = 0;
284 this.totalFailedPutCounter = 0;
285 this.maxLatency = 0;
286 this.overallAverageLatency = 0;
287 this.serverToBufferedCounterMap = new HashMap<>();
288 this.serverToFailedCounterMap = new HashMap<>();
289 this.serverToAverageLatencyMap = new HashMap<>();
290 this.serverToMaxLatencyMap = new HashMap<>();
291 this.initialize(serverToFlushWorkerMap);
294 private void initialize(
295 Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) {
296 if (serverToFlushWorkerMap == null) {
297 return;
300 long averageCalcSum = 0;
301 int averageCalcCount = 0;
302 for (Map.Entry<HRegionLocation, FlushWorker> entry : serverToFlushWorkerMap
303 .entrySet()) {
304 HRegionLocation addr = entry.getKey();
305 FlushWorker worker = entry.getValue();
307 long bufferedCounter = worker.getTotalBufferedCount();
308 long failedCounter = worker.getTotalFailedCount();
309 long serverMaxLatency = worker.getMaxLatency();
310 AtomicAverageCounter averageCounter = worker.getAverageLatencyCounter();
311 // Get sum and count pieces separately to compute overall average
312 SimpleEntry<Long, Integer> averageComponents = averageCounter
313 .getComponents();
314 long serverAvgLatency = averageCounter.getAndReset();
316 this.totalBufferedPutCounter += bufferedCounter;
317 this.totalFailedPutCounter += failedCounter;
318 if (serverMaxLatency > this.maxLatency) {
319 this.maxLatency = serverMaxLatency;
321 averageCalcSum += averageComponents.getKey();
322 averageCalcCount += averageComponents.getValue();
324 this.serverToBufferedCounterMap.put(addr.getHostnamePort(),
325 bufferedCounter);
326 this.serverToFailedCounterMap
327 .put(addr.getHostnamePort(),
328 failedCounter);
329 this.serverToAverageLatencyMap.put(addr.getHostnamePort(),
330 serverAvgLatency);
331 this.serverToMaxLatencyMap
332 .put(addr.getHostnamePort(),
333 serverMaxLatency);
335 this.overallAverageLatency = averageCalcCount != 0 ? averageCalcSum
336 / averageCalcCount : 0;
339 public long getTotalBufferedCounter() {
340 return this.totalBufferedPutCounter;
343 public long getTotalFailedCounter() {
344 return this.totalFailedPutCounter;
347 public long getMaxLatency() {
348 return this.maxLatency;
351 public long getOverallAverageLatency() {
352 return this.overallAverageLatency;
355 public Map<String, Long> getBufferedCounterForEachRegionServer() {
356 return this.serverToBufferedCounterMap;
359 public Map<String, Long> getFailedCounterForEachRegionServer() {
360 return this.serverToFailedCounterMap;
363 public Map<String, Long> getMaxLatencyForEachRegionServer() {
364 return this.serverToMaxLatencyMap;
367 public Map<String, Long> getAverageLatencyForEachRegionServer() {
368 return this.serverToAverageLatencyMap;
372 @VisibleForTesting
373 static class PutStatus {
374 final RegionInfo regionInfo;
375 final Put put;
376 final int maxAttempCount;
378 public PutStatus(RegionInfo regionInfo, Put put, int maxAttempCount) {
379 this.regionInfo = regionInfo;
380 this.put = put;
381 this.maxAttempCount = maxAttempCount;
386 * Helper to count the average over an interval until reset.
388 private static class AtomicAverageCounter {
389 private long sum;
390 private int count;
392 public AtomicAverageCounter() {
393 this.sum = 0L;
394 this.count = 0;
397 public synchronized long getAndReset() {
398 long result = this.get();
399 this.reset();
400 return result;
403 public synchronized long get() {
404 if (this.count == 0) {
405 return 0;
407 return this.sum / this.count;
410 public synchronized SimpleEntry<Long, Integer> getComponents() {
411 return new SimpleEntry<>(sum, count);
414 public synchronized void reset() {
415 this.sum = 0L;
416 this.count = 0;
419 public synchronized void add(long value) {
420 this.sum += value;
421 this.count++;
425 @VisibleForTesting
426 static class FlushWorker implements Runnable {
427 private final HRegionLocation addr;
428 private final LinkedBlockingQueue<PutStatus> queue;
429 private final HTableMultiplexer multiplexer;
430 private final AtomicLong totalFailedPutCount = new AtomicLong(0);
431 private final AtomicInteger currentProcessingCount = new AtomicInteger(0);
432 private final AtomicAverageCounter averageLatency = new AtomicAverageCounter();
433 private final AtomicLong maxLatency = new AtomicLong(0);
435 private final AsyncProcess ap;
436 private final List<PutStatus> processingList = new ArrayList<>();
437 private final ScheduledExecutorService executor;
438 private final int maxRetryInQueue;
439 private final AtomicInteger retryInQueue = new AtomicInteger(0);
440 private final int writeRpcTimeout; // needed to pass in through AsyncProcess constructor
441 private final int operationTimeout;
442 private final ExecutorService pool;
443 public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr,
444 HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize,
445 ExecutorService pool, ScheduledExecutorService executor) {
446 this.addr = addr;
447 this.multiplexer = htableMultiplexer;
448 this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize);
449 RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf);
450 RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf);
451 this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
452 conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
453 HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
454 this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
455 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
456 this.ap = new AsyncProcess(conn, conf, rpcCallerFactory, false, rpcControllerFactory);
457 this.executor = executor;
458 this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000);
459 this.pool = pool;
462 protected LinkedBlockingQueue<PutStatus> getQueue() {
463 return this.queue;
466 public long getTotalFailedCount() {
467 return totalFailedPutCount.get();
470 public long getTotalBufferedCount() {
471 return (long) queue.size() + currentProcessingCount.get();
474 public AtomicAverageCounter getAverageLatencyCounter() {
475 return this.averageLatency;
478 public long getMaxLatency() {
479 return this.maxLatency.getAndSet(0);
482 boolean resubmitFailedPut(PutStatus ps, HRegionLocation oldLoc) throws IOException {
483 // Decrease the retry count
484 final int retryCount = ps.maxAttempCount - 1;
486 if (retryCount <= 0) {
487 // Update the failed counter and no retry any more.
488 return false;
491 int cnt = getRetryInQueue().incrementAndGet();
492 if (cnt > getMaxRetryInQueue()) {
493 // Too many Puts in queue for resubmit, give up this
494 getRetryInQueue().decrementAndGet();
495 return false;
498 final Put failedPut = ps.put;
499 // The currentPut is failed. So get the table name for the currentPut.
500 final TableName tableName = ps.regionInfo.getTable();
502 long delayMs = getNextDelay(retryCount);
503 if (LOG.isDebugEnabled()) {
504 LOG.debug("resubmitting after " + delayMs + "ms: " + retryCount);
507 // HBASE-12198, HBASE-15221, HBASE-15232: AsyncProcess should be responsible for updating
508 // the region location cache when the Put original failed with some exception. If we keep
509 // re-trying the same Put to the same location, AsyncProcess isn't doing the right stuff
510 // that we expect it to.
511 getExecutor().schedule(new Runnable() {
512 @Override
513 public void run() {
514 boolean succ = false;
515 try {
516 succ = FlushWorker.this.getMultiplexer().put(tableName, failedPut, retryCount);
517 } finally {
518 FlushWorker.this.getRetryInQueue().decrementAndGet();
519 if (!succ) {
520 FlushWorker.this.getTotalFailedPutCount().incrementAndGet();
524 }, delayMs, TimeUnit.MILLISECONDS);
525 return true;
528 @VisibleForTesting
529 long getNextDelay(int retryCount) {
530 return ConnectionUtils.getPauseTime(multiplexer.flushPeriod,
531 multiplexer.maxAttempts - retryCount - 1);
534 @VisibleForTesting
535 AtomicInteger getRetryInQueue() {
536 return this.retryInQueue;
539 @VisibleForTesting
540 int getMaxRetryInQueue() {
541 return this.maxRetryInQueue;
544 @VisibleForTesting
545 AtomicLong getTotalFailedPutCount() {
546 return this.totalFailedPutCount;
549 @VisibleForTesting
550 HTableMultiplexer getMultiplexer() {
551 return this.multiplexer;
554 @VisibleForTesting
555 ScheduledExecutorService getExecutor() {
556 return this.executor;
559 @Override
560 public void run() {
561 int failedCount = 0;
562 try {
563 long start = EnvironmentEdgeManager.currentTime();
565 // drain all the queued puts into the tmp list
566 processingList.clear();
567 queue.drainTo(processingList);
568 if (processingList.isEmpty()) {
569 // Nothing to flush
570 return;
573 currentProcessingCount.set(processingList.size());
574 // failedCount is decreased whenever a Put is success or resubmit.
575 failedCount = processingList.size();
577 List<Action> retainedActions = new ArrayList<>(processingList.size());
578 MultiAction actions = new MultiAction();
579 for (int i = 0; i < processingList.size(); i++) {
580 PutStatus putStatus = processingList.get(i);
581 Action action = new Action(putStatus.put, i);
582 actions.add(putStatus.regionInfo.getRegionName(), action);
583 retainedActions.add(action);
586 // Process this multi-put request
587 List<PutStatus> failed = null;
588 Object[] results = new Object[actions.size()];
589 ServerName server = addr.getServerName();
590 Map<ServerName, MultiAction> actionsByServer =
591 Collections.singletonMap(server, actions);
592 try {
593 AsyncProcessTask task = AsyncProcessTask.newBuilder()
594 .setResults(results)
595 .setPool(pool)
596 .setRpcTimeout(writeRpcTimeout)
597 .setOperationTimeout(operationTimeout)
598 .build();
599 AsyncRequestFuture arf =
600 ap.submitMultiActions(task, retainedActions, 0L, null, null, actionsByServer);
601 arf.waitUntilDone();
602 if (arf.hasError()) {
603 // We just log and ignore the exception here since failed Puts will be resubmit again.
604 LOG.debug("Caught some exceptions when flushing puts to region server "
605 + addr.getHostnamePort(), arf.getErrors());
607 } finally {
608 for (int i = 0; i < results.length; i++) {
609 if (results[i] instanceof Result) {
610 failedCount--;
611 } else {
612 if (failed == null) {
613 failed = new ArrayList<>();
615 failed.add(processingList.get(i));
620 if (failed != null) {
621 // Resubmit failed puts
622 for (PutStatus putStatus : failed) {
623 if (resubmitFailedPut(putStatus, this.addr)) {
624 failedCount--;
629 long elapsed = EnvironmentEdgeManager.currentTime() - start;
630 // Update latency counters
631 averageLatency.add(elapsed);
632 if (elapsed > maxLatency.get()) {
633 maxLatency.set(elapsed);
636 // Log some basic info
637 if (LOG.isDebugEnabled()) {
638 LOG.debug("Processed " + currentProcessingCount + " put requests for "
639 + addr.getHostnamePort() + " and " + failedCount + " failed"
640 + ", latency for this send: " + elapsed);
643 // Reset the current processing put count
644 currentProcessingCount.set(0);
645 } catch (RuntimeException e) {
646 // To make findbugs happy
647 // Log all the exceptions and move on
648 LOG.debug(
649 "Caught some exceptions " + e + " when flushing puts to region server "
650 + addr.getHostnamePort(), e);
651 } catch (Exception e) {
652 if (e instanceof InterruptedException) {
653 Thread.currentThread().interrupt();
655 // Log all the exceptions and move on
656 LOG.debug(
657 "Caught some exceptions " + e + " when flushing puts to region server "
658 + addr.getHostnamePort(), e);
659 } finally {
660 // Update the totalFailedCount
661 this.totalFailedPutCount.addAndGet(failedCount);