2 * Copyright The Apache Software Foundation
4 * Licensed to the Apache Software Foundation (ASF) under one
5 * or more contributor license agreements. See the NOTICE file
6 * distributed with this work for additional information
7 * regarding copyright ownership. The ASF licenses this file
8 * to you under the Apache License, Version 2.0 (the
9 * "License"); you may not use this file except in compliance
10 * with the License. You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
20 package org
.apache
.hadoop
.hbase
.client
;
22 import java
.io
.IOException
;
23 import java
.util
.AbstractMap
.SimpleEntry
;
24 import java
.util
.ArrayList
;
25 import java
.util
.Collections
;
26 import java
.util
.HashMap
;
27 import java
.util
.List
;
29 import java
.util
.concurrent
.ConcurrentHashMap
;
30 import java
.util
.concurrent
.ExecutorService
;
31 import java
.util
.concurrent
.Executors
;
32 import java
.util
.concurrent
.LinkedBlockingQueue
;
33 import java
.util
.concurrent
.ScheduledExecutorService
;
34 import java
.util
.concurrent
.TimeUnit
;
35 import java
.util
.concurrent
.atomic
.AtomicInteger
;
36 import java
.util
.concurrent
.atomic
.AtomicLong
;
38 import org
.apache
.commons
.logging
.Log
;
39 import org
.apache
.commons
.logging
.LogFactory
;
40 import org
.apache
.hadoop
.conf
.Configuration
;
41 import org
.apache
.hadoop
.hbase
.HBaseConfiguration
;
42 import org
.apache
.hadoop
.hbase
.HConstants
;
43 import org
.apache
.hadoop
.hbase
.HRegionLocation
;
44 import org
.apache
.hadoop
.hbase
.ServerName
;
45 import org
.apache
.hadoop
.hbase
.TableName
;
46 import org
.apache
.hadoop
.hbase
.ipc
.RpcControllerFactory
;
47 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
48 import org
.apache
.yetus
.audience
.InterfaceAudience
;
50 import org
.apache
.hadoop
.hbase
.shaded
.com
.google
.common
.annotations
.VisibleForTesting
;
51 import org
.apache
.hadoop
.hbase
.shaded
.com
.google
.common
.util
.concurrent
.ThreadFactoryBuilder
;
54 * HTableMultiplexer provides a thread-safe non blocking PUT API across all the tables.
55 * Each put will be sharded into different buffer queues based on its destination region server.
56 * So each region server buffer queue will only have the puts which share the same destination.
57 * And each queue will have a flush worker thread to flush the puts request to the region server.
58 * If any queue is full, the HTableMultiplexer starts to drop the Put requests for that
61 * Also all the puts will be retried as a configuration number before dropping.
62 * And the HTableMultiplexer can report the number of buffered requests and the number of the
63 * failed (dropped) requests in total or on per region server basis.
65 * This class is thread safe.
67 @InterfaceAudience.Public
68 public class HTableMultiplexer
{
69 private static final Log LOG
= LogFactory
.getLog(HTableMultiplexer
.class.getName());
71 public static final String TABLE_MULTIPLEXER_FLUSH_PERIOD_MS
=
72 "hbase.tablemultiplexer.flush.period.ms";
73 public static final String TABLE_MULTIPLEXER_INIT_THREADS
= "hbase.tablemultiplexer.init.threads";
74 public static final String TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE
=
75 "hbase.client.max.retries.in.queue";
77 /** The map between each region server to its flush worker */
78 private final Map
<HRegionLocation
, FlushWorker
> serverToFlushWorkerMap
=
79 new ConcurrentHashMap
<>();
81 private final Configuration workerConf
;
82 private final ClusterConnection conn
;
83 private final ExecutorService pool
;
84 private final int maxAttempts
;
85 private final int perRegionServerBufferQueueSize
;
86 private final int maxKeyValueSize
;
87 private final ScheduledExecutorService executor
;
88 private final long flushPeriod
;
91 * @param conf The HBaseConfiguration
92 * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for
93 * each region server before dropping the request.
95 public HTableMultiplexer(Configuration conf
, int perRegionServerBufferQueueSize
)
97 this(ConnectionFactory
.createConnection(conf
), conf
, perRegionServerBufferQueueSize
);
101 * @param conn The HBase connection.
102 * @param conf The HBase configuration
103 * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for
104 * each region server before dropping the request.
106 public HTableMultiplexer(Connection conn
, Configuration conf
,
107 int perRegionServerBufferQueueSize
) {
108 this.conn
= (ClusterConnection
) conn
;
109 this.pool
= HTable
.getDefaultExecutor(conf
);
110 // how many times we could try in total, one more than retry number
111 this.maxAttempts
= conf
.getInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
,
112 HConstants
.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER
) + 1;
113 this.perRegionServerBufferQueueSize
= perRegionServerBufferQueueSize
;
114 this.maxKeyValueSize
= HTable
.getMaxKeyValueSize(conf
);
115 this.flushPeriod
= conf
.getLong(TABLE_MULTIPLEXER_FLUSH_PERIOD_MS
, 100);
116 int initThreads
= conf
.getInt(TABLE_MULTIPLEXER_INIT_THREADS
, 10);
118 Executors
.newScheduledThreadPool(initThreads
,
119 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HTableFlushWorker-%d").build());
121 this.workerConf
= HBaseConfiguration
.create(conf
);
122 // We do not do the retry because we need to reassign puts to different queues if regions are
124 this.workerConf
.setInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
, 0);
128 * Closes the internal {@link Connection}. Does nothing if the {@link Connection} has already
130 * @throws IOException If there is an error closing the connection.
132 @SuppressWarnings("deprecation")
133 public synchronized void close() throws IOException
{
134 if (!getConnection().isClosed()) {
135 getConnection().close();
140 * The put request will be buffered by its corresponding buffer queue. Return false if the queue
144 * @return true if the request can be accepted by its corresponding buffer queue.
146 public boolean put(TableName tableName
, final Put put
) {
147 return put(tableName
, put
, this.maxAttempts
);
151 * The puts request will be buffered by their corresponding buffer queue.
152 * Return the list of puts which could not be queued.
155 * @return the list of puts which could not be queued
157 public List
<Put
> put(TableName tableName
, final List
<Put
> puts
) {
161 List
<Put
> failedPuts
= null;
163 for (Put put
: puts
) {
164 result
= put(tableName
, put
, this.maxAttempts
);
165 if (result
== false) {
167 // Create the failed puts list if necessary
168 if (failedPuts
== null) {
169 failedPuts
= new ArrayList
<>();
171 // Add the put to the failed puts list
179 * @deprecated Use {@link #put(TableName, List) } instead.
182 public List
<Put
> put(byte[] tableName
, final List
<Put
> puts
) {
183 return put(TableName
.valueOf(tableName
), puts
);
187 * The put request will be buffered by its corresponding buffer queue. And the put request will be
188 * retried before dropping the request.
189 * Return false if the queue is already full.
190 * @return true if the request can be accepted by its corresponding buffer queue.
192 public boolean put(final TableName tableName
, final Put put
, int maxAttempts
) {
193 if (maxAttempts
<= 0) {
198 HTable
.validatePut(put
, maxKeyValueSize
);
199 // Allow mocking to get at the connection, but don't expose the connection to users.
200 ClusterConnection conn
= (ClusterConnection
) getConnection();
201 // AsyncProcess in the FlushWorker should take care of refreshing the location cache
202 // as necessary. We shouldn't have to do that here.
203 HRegionLocation loc
= conn
.getRegionLocation(tableName
, put
.getRow(), false);
205 // Add the put pair into its corresponding queue.
206 LinkedBlockingQueue
<PutStatus
> queue
= getQueue(loc
);
208 // Generate a MultiPutStatus object and offer it into the queue
209 PutStatus s
= new PutStatus(loc
.getRegion(), put
, maxAttempts
);
211 return queue
.offer(s
);
213 } catch (IOException e
) {
214 LOG
.debug("Cannot process the put " + put
, e
);
220 * @deprecated Use {@link #put(TableName, Put) } instead.
223 public boolean put(final byte[] tableName
, final Put put
, int retry
) {
224 return put(TableName
.valueOf(tableName
), put
, retry
);
228 * @deprecated Use {@link #put(TableName, Put)} instead.
231 public boolean put(final byte[] tableName
, Put put
) {
232 return put(TableName
.valueOf(tableName
), put
);
236 * @return the current HTableMultiplexerStatus
238 public HTableMultiplexerStatus
getHTableMultiplexerStatus() {
239 return new HTableMultiplexerStatus(serverToFlushWorkerMap
);
243 LinkedBlockingQueue
<PutStatus
> getQueue(HRegionLocation addr
) {
244 FlushWorker worker
= serverToFlushWorkerMap
.get(addr
);
245 if (worker
== null) {
246 synchronized (this.serverToFlushWorkerMap
) {
247 worker
= serverToFlushWorkerMap
.get(addr
);
248 if (worker
== null) {
249 // Create the flush worker
250 worker
= new FlushWorker(workerConf
, this.conn
, addr
, this,
251 perRegionServerBufferQueueSize
, pool
, executor
);
252 this.serverToFlushWorkerMap
.put(addr
, worker
);
253 executor
.scheduleAtFixedRate(worker
, flushPeriod
, flushPeriod
, TimeUnit
.MILLISECONDS
);
257 return worker
.getQueue();
261 ClusterConnection
getConnection() {
266 * HTableMultiplexerStatus keeps track of the current status of the HTableMultiplexer.
267 * report the number of buffered requests and the number of the failed (dropped) requests
268 * in total or on per region server basis.
270 @InterfaceAudience.Public
271 public static class HTableMultiplexerStatus
{
272 private long totalFailedPutCounter
;
273 private long totalBufferedPutCounter
;
274 private long maxLatency
;
275 private long overallAverageLatency
;
276 private Map
<String
, Long
> serverToFailedCounterMap
;
277 private Map
<String
, Long
> serverToBufferedCounterMap
;
278 private Map
<String
, Long
> serverToAverageLatencyMap
;
279 private Map
<String
, Long
> serverToMaxLatencyMap
;
281 public HTableMultiplexerStatus(
282 Map
<HRegionLocation
, FlushWorker
> serverToFlushWorkerMap
) {
283 this.totalBufferedPutCounter
= 0;
284 this.totalFailedPutCounter
= 0;
286 this.overallAverageLatency
= 0;
287 this.serverToBufferedCounterMap
= new HashMap
<>();
288 this.serverToFailedCounterMap
= new HashMap
<>();
289 this.serverToAverageLatencyMap
= new HashMap
<>();
290 this.serverToMaxLatencyMap
= new HashMap
<>();
291 this.initialize(serverToFlushWorkerMap
);
294 private void initialize(
295 Map
<HRegionLocation
, FlushWorker
> serverToFlushWorkerMap
) {
296 if (serverToFlushWorkerMap
== null) {
300 long averageCalcSum
= 0;
301 int averageCalcCount
= 0;
302 for (Map
.Entry
<HRegionLocation
, FlushWorker
> entry
: serverToFlushWorkerMap
304 HRegionLocation addr
= entry
.getKey();
305 FlushWorker worker
= entry
.getValue();
307 long bufferedCounter
= worker
.getTotalBufferedCount();
308 long failedCounter
= worker
.getTotalFailedCount();
309 long serverMaxLatency
= worker
.getMaxLatency();
310 AtomicAverageCounter averageCounter
= worker
.getAverageLatencyCounter();
311 // Get sum and count pieces separately to compute overall average
312 SimpleEntry
<Long
, Integer
> averageComponents
= averageCounter
314 long serverAvgLatency
= averageCounter
.getAndReset();
316 this.totalBufferedPutCounter
+= bufferedCounter
;
317 this.totalFailedPutCounter
+= failedCounter
;
318 if (serverMaxLatency
> this.maxLatency
) {
319 this.maxLatency
= serverMaxLatency
;
321 averageCalcSum
+= averageComponents
.getKey();
322 averageCalcCount
+= averageComponents
.getValue();
324 this.serverToBufferedCounterMap
.put(addr
.getHostnamePort(),
326 this.serverToFailedCounterMap
327 .put(addr
.getHostnamePort(),
329 this.serverToAverageLatencyMap
.put(addr
.getHostnamePort(),
331 this.serverToMaxLatencyMap
332 .put(addr
.getHostnamePort(),
335 this.overallAverageLatency
= averageCalcCount
!= 0 ? averageCalcSum
336 / averageCalcCount
: 0;
339 public long getTotalBufferedCounter() {
340 return this.totalBufferedPutCounter
;
343 public long getTotalFailedCounter() {
344 return this.totalFailedPutCounter
;
347 public long getMaxLatency() {
348 return this.maxLatency
;
351 public long getOverallAverageLatency() {
352 return this.overallAverageLatency
;
355 public Map
<String
, Long
> getBufferedCounterForEachRegionServer() {
356 return this.serverToBufferedCounterMap
;
359 public Map
<String
, Long
> getFailedCounterForEachRegionServer() {
360 return this.serverToFailedCounterMap
;
363 public Map
<String
, Long
> getMaxLatencyForEachRegionServer() {
364 return this.serverToMaxLatencyMap
;
367 public Map
<String
, Long
> getAverageLatencyForEachRegionServer() {
368 return this.serverToAverageLatencyMap
;
373 static class PutStatus
{
374 final RegionInfo regionInfo
;
376 final int maxAttempCount
;
378 public PutStatus(RegionInfo regionInfo
, Put put
, int maxAttempCount
) {
379 this.regionInfo
= regionInfo
;
381 this.maxAttempCount
= maxAttempCount
;
386 * Helper to count the average over an interval until reset.
388 private static class AtomicAverageCounter
{
392 public AtomicAverageCounter() {
397 public synchronized long getAndReset() {
398 long result
= this.get();
403 public synchronized long get() {
404 if (this.count
== 0) {
407 return this.sum
/ this.count
;
410 public synchronized SimpleEntry
<Long
, Integer
> getComponents() {
411 return new SimpleEntry
<>(sum
, count
);
414 public synchronized void reset() {
419 public synchronized void add(long value
) {
426 static class FlushWorker
implements Runnable
{
427 private final HRegionLocation addr
;
428 private final LinkedBlockingQueue
<PutStatus
> queue
;
429 private final HTableMultiplexer multiplexer
;
430 private final AtomicLong totalFailedPutCount
= new AtomicLong(0);
431 private final AtomicInteger currentProcessingCount
= new AtomicInteger(0);
432 private final AtomicAverageCounter averageLatency
= new AtomicAverageCounter();
433 private final AtomicLong maxLatency
= new AtomicLong(0);
435 private final AsyncProcess ap
;
436 private final List
<PutStatus
> processingList
= new ArrayList
<>();
437 private final ScheduledExecutorService executor
;
438 private final int maxRetryInQueue
;
439 private final AtomicInteger retryInQueue
= new AtomicInteger(0);
440 private final int writeRpcTimeout
; // needed to pass in through AsyncProcess constructor
441 private final int operationTimeout
;
442 private final ExecutorService pool
;
443 public FlushWorker(Configuration conf
, ClusterConnection conn
, HRegionLocation addr
,
444 HTableMultiplexer htableMultiplexer
, int perRegionServerBufferQueueSize
,
445 ExecutorService pool
, ScheduledExecutorService executor
) {
447 this.multiplexer
= htableMultiplexer
;
448 this.queue
= new LinkedBlockingQueue
<>(perRegionServerBufferQueueSize
);
449 RpcRetryingCallerFactory rpcCallerFactory
= RpcRetryingCallerFactory
.instantiate(conf
);
450 RpcControllerFactory rpcControllerFactory
= RpcControllerFactory
.instantiate(conf
);
451 this.writeRpcTimeout
= conf
.getInt(HConstants
.HBASE_RPC_WRITE_TIMEOUT_KEY
,
452 conf
.getInt(HConstants
.HBASE_RPC_TIMEOUT_KEY
,
453 HConstants
.DEFAULT_HBASE_RPC_TIMEOUT
));
454 this.operationTimeout
= conf
.getInt(HConstants
.HBASE_CLIENT_OPERATION_TIMEOUT
,
455 HConstants
.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT
);
456 this.ap
= new AsyncProcess(conn
, conf
, rpcCallerFactory
, false, rpcControllerFactory
);
457 this.executor
= executor
;
458 this.maxRetryInQueue
= conf
.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE
, 10000);
462 protected LinkedBlockingQueue
<PutStatus
> getQueue() {
466 public long getTotalFailedCount() {
467 return totalFailedPutCount
.get();
470 public long getTotalBufferedCount() {
471 return (long) queue
.size() + currentProcessingCount
.get();
474 public AtomicAverageCounter
getAverageLatencyCounter() {
475 return this.averageLatency
;
478 public long getMaxLatency() {
479 return this.maxLatency
.getAndSet(0);
482 boolean resubmitFailedPut(PutStatus ps
, HRegionLocation oldLoc
) throws IOException
{
483 // Decrease the retry count
484 final int retryCount
= ps
.maxAttempCount
- 1;
486 if (retryCount
<= 0) {
487 // Update the failed counter and no retry any more.
491 int cnt
= getRetryInQueue().incrementAndGet();
492 if (cnt
> getMaxRetryInQueue()) {
493 // Too many Puts in queue for resubmit, give up this
494 getRetryInQueue().decrementAndGet();
498 final Put failedPut
= ps
.put
;
499 // The currentPut is failed. So get the table name for the currentPut.
500 final TableName tableName
= ps
.regionInfo
.getTable();
502 long delayMs
= getNextDelay(retryCount
);
503 if (LOG
.isDebugEnabled()) {
504 LOG
.debug("resubmitting after " + delayMs
+ "ms: " + retryCount
);
507 // HBASE-12198, HBASE-15221, HBASE-15232: AsyncProcess should be responsible for updating
508 // the region location cache when the Put original failed with some exception. If we keep
509 // re-trying the same Put to the same location, AsyncProcess isn't doing the right stuff
510 // that we expect it to.
511 getExecutor().schedule(new Runnable() {
514 boolean succ
= false;
516 succ
= FlushWorker
.this.getMultiplexer().put(tableName
, failedPut
, retryCount
);
518 FlushWorker
.this.getRetryInQueue().decrementAndGet();
520 FlushWorker
.this.getTotalFailedPutCount().incrementAndGet();
524 }, delayMs
, TimeUnit
.MILLISECONDS
);
529 long getNextDelay(int retryCount
) {
530 return ConnectionUtils
.getPauseTime(multiplexer
.flushPeriod
,
531 multiplexer
.maxAttempts
- retryCount
- 1);
535 AtomicInteger
getRetryInQueue() {
536 return this.retryInQueue
;
540 int getMaxRetryInQueue() {
541 return this.maxRetryInQueue
;
545 AtomicLong
getTotalFailedPutCount() {
546 return this.totalFailedPutCount
;
550 HTableMultiplexer
getMultiplexer() {
551 return this.multiplexer
;
555 ScheduledExecutorService
getExecutor() {
556 return this.executor
;
563 long start
= EnvironmentEdgeManager
.currentTime();
565 // drain all the queued puts into the tmp list
566 processingList
.clear();
567 queue
.drainTo(processingList
);
568 if (processingList
.isEmpty()) {
573 currentProcessingCount
.set(processingList
.size());
574 // failedCount is decreased whenever a Put is success or resubmit.
575 failedCount
= processingList
.size();
577 List
<Action
> retainedActions
= new ArrayList
<>(processingList
.size());
578 MultiAction actions
= new MultiAction();
579 for (int i
= 0; i
< processingList
.size(); i
++) {
580 PutStatus putStatus
= processingList
.get(i
);
581 Action action
= new Action(putStatus
.put
, i
);
582 actions
.add(putStatus
.regionInfo
.getRegionName(), action
);
583 retainedActions
.add(action
);
586 // Process this multi-put request
587 List
<PutStatus
> failed
= null;
588 Object
[] results
= new Object
[actions
.size()];
589 ServerName server
= addr
.getServerName();
590 Map
<ServerName
, MultiAction
> actionsByServer
=
591 Collections
.singletonMap(server
, actions
);
593 AsyncProcessTask task
= AsyncProcessTask
.newBuilder()
596 .setRpcTimeout(writeRpcTimeout
)
597 .setOperationTimeout(operationTimeout
)
599 AsyncRequestFuture arf
=
600 ap
.submitMultiActions(task
, retainedActions
, 0L, null, null, actionsByServer
);
602 if (arf
.hasError()) {
603 // We just log and ignore the exception here since failed Puts will be resubmit again.
604 LOG
.debug("Caught some exceptions when flushing puts to region server "
605 + addr
.getHostnamePort(), arf
.getErrors());
608 for (int i
= 0; i
< results
.length
; i
++) {
609 if (results
[i
] instanceof Result
) {
612 if (failed
== null) {
613 failed
= new ArrayList
<>();
615 failed
.add(processingList
.get(i
));
620 if (failed
!= null) {
621 // Resubmit failed puts
622 for (PutStatus putStatus
: failed
) {
623 if (resubmitFailedPut(putStatus
, this.addr
)) {
629 long elapsed
= EnvironmentEdgeManager
.currentTime() - start
;
630 // Update latency counters
631 averageLatency
.add(elapsed
);
632 if (elapsed
> maxLatency
.get()) {
633 maxLatency
.set(elapsed
);
636 // Log some basic info
637 if (LOG
.isDebugEnabled()) {
638 LOG
.debug("Processed " + currentProcessingCount
+ " put requests for "
639 + addr
.getHostnamePort() + " and " + failedCount
+ " failed"
640 + ", latency for this send: " + elapsed
);
643 // Reset the current processing put count
644 currentProcessingCount
.set(0);
645 } catch (RuntimeException e
) {
646 // To make findbugs happy
647 // Log all the exceptions and move on
649 "Caught some exceptions " + e
+ " when flushing puts to region server "
650 + addr
.getHostnamePort(), e
);
651 } catch (Exception e
) {
652 if (e
instanceof InterruptedException
) {
653 Thread
.currentThread().interrupt();
655 // Log all the exceptions and move on
657 "Caught some exceptions " + e
+ " when flushing puts to region server "
658 + addr
.getHostnamePort(), e
);
660 // Update the totalFailedCount
661 this.totalFailedPutCount
.addAndGet(failedCount
);