3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
20 package org
.apache
.hadoop
.hbase
.util
;
22 import java
.io
.IOException
;
23 import java
.util
.List
;
24 import java
.util
.concurrent
.ExecutorService
;
25 import java
.util
.concurrent
.LinkedBlockingQueue
;
26 import java
.util
.concurrent
.ThreadLocalRandom
;
27 import java
.util
.concurrent
.ThreadPoolExecutor
;
28 import java
.util
.concurrent
.TimeUnit
;
30 import org
.apache
.hadoop
.conf
.Configuration
;
31 import org
.apache
.hadoop
.hbase
.HConstants
;
32 import org
.apache
.hadoop
.hbase
.TableName
;
33 import org
.apache
.yetus
.audience
.InterfaceAudience
;
34 import org
.slf4j
.Logger
;
35 import org
.slf4j
.LoggerFactory
;
36 import org
.apache
.hadoop
.hbase
.client
.ClusterConnection
;
37 import org
.apache
.hadoop
.hbase
.client
.Connection
;
38 import org
.apache
.hadoop
.hbase
.client
.ConnectionFactory
;
39 import org
.apache
.hadoop
.hbase
.client
.HTable
;
40 import org
.apache
.hadoop
.hbase
.client
.Row
;
41 import org
.apache
.hadoop
.hbase
.client
.coprocessor
.Batch
;
44 * Provides ability to create multiple Connection instances and allows to process a batch of
45 * actions using CHTable.doBatchWithCallback()
47 @InterfaceAudience.Private
48 public class MultiHConnection
{
49 private static final Logger LOG
= LoggerFactory
.getLogger(MultiHConnection
.class);
50 private Connection
[] connections
;
51 private final Object connectionsLock
= new Object();
52 private final int noOfConnections
;
53 private ExecutorService batchPool
;
56 * Create multiple Connection instances and initialize a thread pool executor
57 * @param conf configuration
58 * @param noOfConnections total no of Connections to create
59 * @throws IOException if IO failure occurs
61 public MultiHConnection(Configuration conf
, int noOfConnections
)
63 this.noOfConnections
= noOfConnections
;
64 synchronized (this.connectionsLock
) {
65 connections
= new Connection
[noOfConnections
];
66 for (int i
= 0; i
< noOfConnections
; i
++) {
67 Connection conn
= ConnectionFactory
.createConnection(conf
);
68 connections
[i
] = conn
;
71 createBatchPool(conf
);
75 * Close the open connections and shutdown the batchpool
78 synchronized (connectionsLock
) {
79 if (connections
!= null) {
80 for (Connection conn
: connections
) {
84 } catch (IOException e
) {
85 LOG
.info("Got exception in closing connection", e
);
94 if (this.batchPool
!= null && !this.batchPool
.isShutdown()) {
95 this.batchPool
.shutdown();
97 if (!this.batchPool
.awaitTermination(10, TimeUnit
.SECONDS
)) {
98 this.batchPool
.shutdownNow();
100 } catch (InterruptedException e
) {
101 this.batchPool
.shutdownNow();
108 * Randomly pick a connection and process the batch of actions for a given table
109 * @param actions the actions
110 * @param tableName table name
111 * @param results the results array
112 * @param callback to run when results are in
113 * @throws IOException If IO failure occurs
115 @SuppressWarnings("deprecation")
116 public <R
> void processBatchCallback(List
<?
extends Row
> actions
, TableName tableName
,
117 Object
[] results
, Batch
.Callback
<R
> callback
) throws IOException
{
118 // Currently used by RegionStateStore
119 ClusterConnection conn
=
120 (ClusterConnection
) connections
[ThreadLocalRandom
.current().nextInt(noOfConnections
)];
122 HTable
.doBatchWithCallback(actions
, results
, callback
, conn
, batchPool
, tableName
);
125 // Copied from ConnectionImplementation.getBatchPool()
126 // We should get rid of this when Connection.processBatchCallback is un-deprecated and provides
127 // an API to manage a batch pool
128 private void createBatchPool(Configuration conf
) {
129 // Use the same config for keep alive as in ConnectionImplementation.getBatchPool();
130 int maxThreads
= conf
.getInt("hbase.multihconnection.threads.max", 256);
131 if (maxThreads
== 0) {
132 maxThreads
= Runtime
.getRuntime().availableProcessors() * 8;
134 long keepAliveTime
= conf
.getLong("hbase.multihconnection.threads.keepalivetime", 60);
135 LinkedBlockingQueue
<Runnable
> workQueue
=
136 new LinkedBlockingQueue
<>(maxThreads
137 * conf
.getInt(HConstants
.HBASE_CLIENT_MAX_TOTAL_TASKS
,
138 HConstants
.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS
));
139 ThreadPoolExecutor tpe
=
140 new ThreadPoolExecutor(maxThreads
, maxThreads
, keepAliveTime
, TimeUnit
.SECONDS
, workQueue
,
141 Threads
.newDaemonThreadFactory("MultiHConnection" + "-shared-"));
142 tpe
.allowCoreThreadTimeOut(true);
143 this.batchPool
= tpe
;