2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.apache
.hadoop
.hbase
.HConstants
.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT
;
21 import static org
.apache
.hadoop
.hbase
.HConstants
.DEFAULT_HBASE_CLIENT_PAUSE
;
22 import static org
.apache
.hadoop
.hbase
.HConstants
.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER
;
23 import static org
.apache
.hadoop
.hbase
.HConstants
.DEFAULT_HBASE_CLIENT_SCANNER_CACHING
;
24 import static org
.apache
.hadoop
.hbase
.HConstants
.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE
;
25 import static org
.apache
.hadoop
.hbase
.HConstants
.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD
;
26 import static org
.apache
.hadoop
.hbase
.HConstants
.DEFAULT_HBASE_META_SCANNER_CACHING
;
27 import static org
.apache
.hadoop
.hbase
.HConstants
.DEFAULT_HBASE_RPC_TIMEOUT
;
28 import static org
.apache
.hadoop
.hbase
.HConstants
.HBASE_CLIENT_META_OPERATION_TIMEOUT
;
29 import static org
.apache
.hadoop
.hbase
.HConstants
.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT
;
30 import static org
.apache
.hadoop
.hbase
.HConstants
.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT
;
31 import static org
.apache
.hadoop
.hbase
.HConstants
.HBASE_CLIENT_OPERATION_TIMEOUT
;
32 import static org
.apache
.hadoop
.hbase
.HConstants
.HBASE_CLIENT_PAUSE
;
33 import static org
.apache
.hadoop
.hbase
.HConstants
.HBASE_CLIENT_PAUSE_FOR_CQTBE
;
34 import static org
.apache
.hadoop
.hbase
.HConstants
.HBASE_CLIENT_RETRIES_NUMBER
;
35 import static org
.apache
.hadoop
.hbase
.HConstants
.HBASE_CLIENT_SCANNER_CACHING
;
36 import static org
.apache
.hadoop
.hbase
.HConstants
.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY
;
37 import static org
.apache
.hadoop
.hbase
.HConstants
.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD
;
38 import static org
.apache
.hadoop
.hbase
.HConstants
.HBASE_META_SCANNER_CACHING
;
39 import static org
.apache
.hadoop
.hbase
.HConstants
.HBASE_RPC_READ_TIMEOUT_KEY
;
40 import static org
.apache
.hadoop
.hbase
.HConstants
.HBASE_RPC_TIMEOUT_KEY
;
41 import static org
.apache
.hadoop
.hbase
.HConstants
.HBASE_RPC_WRITE_TIMEOUT_KEY
;
42 import static org
.apache
.hadoop
.hbase
.client
.ConnectionConfiguration
.MAX_KEYVALUE_SIZE_DEFAULT
;
43 import static org
.apache
.hadoop
.hbase
.client
.ConnectionConfiguration
.MAX_KEYVALUE_SIZE_KEY
;
44 import static org
.apache
.hadoop
.hbase
.client
.ConnectionConfiguration
.PRIMARY_CALL_TIMEOUT_MICROSECOND
;
45 import static org
.apache
.hadoop
.hbase
.client
.ConnectionConfiguration
.PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT
;
46 import static org
.apache
.hadoop
.hbase
.client
.ConnectionConfiguration
.PRIMARY_SCAN_TIMEOUT_MICROSECOND
;
47 import static org
.apache
.hadoop
.hbase
.client
.ConnectionConfiguration
.PRIMARY_SCAN_TIMEOUT_MICROSECOND_DEFAULT
;
48 import static org
.apache
.hadoop
.hbase
.client
.ConnectionConfiguration
.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS
;
49 import static org
.apache
.hadoop
.hbase
.client
.ConnectionConfiguration
.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT
;
50 import static org
.apache
.hadoop
.hbase
.client
.ConnectionConfiguration
.WRITE_BUFFER_SIZE_DEFAULT
;
51 import static org
.apache
.hadoop
.hbase
.client
.ConnectionConfiguration
.WRITE_BUFFER_SIZE_KEY
;
53 import java
.util
.concurrent
.TimeUnit
;
54 import org
.apache
.hadoop
.conf
.Configuration
;
55 import org
.apache
.yetus
.audience
.InterfaceAudience
;
56 import org
.slf4j
.Logger
;
57 import org
.slf4j
.LoggerFactory
;
62 @InterfaceAudience.Private
63 class AsyncConnectionConfiguration
{
65 private static final Logger LOG
= LoggerFactory
.getLogger(AsyncConnectionConfiguration
.class);
68 * Configure the number of failures after which the client will start logging. A few failures
69 * is fine: region moved, then is not opened, then is overloaded. We try to have an acceptable
70 * heuristic for the number of errors we don't log. 5 was chosen because we wait for 1s at
73 public static final String START_LOG_ERRORS_AFTER_COUNT_KEY
=
74 "hbase.client.start.log.errors.counter";
75 public static final int DEFAULT_START_LOG_ERRORS_AFTER_COUNT
= 5;
77 private final long metaOperationTimeoutNs
;
79 // timeout for a whole operation such as get, put or delete. Notice that scan will not be effected
80 // by this value, see scanTimeoutNs.
81 private final long operationTimeoutNs
;
83 // timeout for each rpc request. Can be overridden by a more specific config, such as
84 // readRpcTimeout or writeRpcTimeout.
85 private final long rpcTimeoutNs
;
87 // timeout for each read rpc request
88 private final long readRpcTimeoutNs
;
90 // timeout for each write rpc request
91 private final long writeRpcTimeoutNs
;
93 private final long pauseNs
;
95 private final long pauseForCQTBENs
;
97 private final int maxRetries
;
99 /** How many retries are allowed before we start to log */
100 private final int startLogErrorsCnt
;
102 // As now we have heartbeat support for scan, ideally a scan will never timeout unless the RS is
103 // crash. The RS will always return something before the rpc timeout or scan timeout to tell the
104 // client that it is still alive. The scan timeout is used as operation timeout for every
105 // operations in a scan, such as openScanner or next.
106 private final long scanTimeoutNs
;
108 private final int scannerCaching
;
110 private final int metaScannerCaching
;
112 private final long scannerMaxResultSize
;
114 private final long writeBufferSize
;
116 private final long writeBufferPeriodicFlushTimeoutNs
;
118 // this is for supporting region replica get, if the primary does not finished within this
119 // timeout, we will send request to secondaries.
120 private final long primaryCallTimeoutNs
;
122 private final long primaryScanTimeoutNs
;
124 private final long primaryMetaScanTimeoutNs
;
126 private final int maxKeyValueSize
;
128 AsyncConnectionConfiguration(Configuration conf
) {
129 this.metaOperationTimeoutNs
= TimeUnit
.MILLISECONDS
.toNanos(
130 conf
.getLong(HBASE_CLIENT_META_OPERATION_TIMEOUT
, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT
));
131 this.operationTimeoutNs
= TimeUnit
.MILLISECONDS
.toNanos(
132 conf
.getLong(HBASE_CLIENT_OPERATION_TIMEOUT
, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT
));
133 long rpcTimeoutMs
= conf
.getLong(HBASE_RPC_TIMEOUT_KEY
, DEFAULT_HBASE_RPC_TIMEOUT
);
134 this.rpcTimeoutNs
= TimeUnit
.MILLISECONDS
.toNanos(rpcTimeoutMs
);
135 this.readRpcTimeoutNs
=
136 TimeUnit
.MILLISECONDS
.toNanos(conf
.getLong(HBASE_RPC_READ_TIMEOUT_KEY
, rpcTimeoutMs
));
137 this.writeRpcTimeoutNs
=
138 TimeUnit
.MILLISECONDS
.toNanos(conf
.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY
, rpcTimeoutMs
));
139 long pauseMs
= conf
.getLong(HBASE_CLIENT_PAUSE
, DEFAULT_HBASE_CLIENT_PAUSE
);
140 long pauseForCQTBEMs
= conf
.getLong(HBASE_CLIENT_PAUSE_FOR_CQTBE
, pauseMs
);
141 if (pauseForCQTBEMs
< pauseMs
) {
143 "The {} setting: {} ms is less than the {} setting: {} ms, use the greater one instead",
144 HBASE_CLIENT_PAUSE_FOR_CQTBE
, pauseForCQTBEMs
, HBASE_CLIENT_PAUSE
, pauseMs
);
145 pauseForCQTBEMs
= pauseMs
;
147 this.pauseNs
= TimeUnit
.MILLISECONDS
.toNanos(pauseMs
);
148 this.pauseForCQTBENs
= TimeUnit
.MILLISECONDS
.toNanos(pauseForCQTBEMs
);
149 this.maxRetries
= conf
.getInt(HBASE_CLIENT_RETRIES_NUMBER
, DEFAULT_HBASE_CLIENT_RETRIES_NUMBER
);
150 this.startLogErrorsCnt
=
151 conf
.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY
, DEFAULT_START_LOG_ERRORS_AFTER_COUNT
);
152 this.scanTimeoutNs
= TimeUnit
.MILLISECONDS
.toNanos(
153 conf
.getInt(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD
,
154 DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD
));
155 this.scannerCaching
=
156 conf
.getInt(HBASE_CLIENT_SCANNER_CACHING
, DEFAULT_HBASE_CLIENT_SCANNER_CACHING
);
157 this.metaScannerCaching
=
158 conf
.getInt(HBASE_META_SCANNER_CACHING
, DEFAULT_HBASE_META_SCANNER_CACHING
);
159 this.scannerMaxResultSize
= conf
.getLong(HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY
,
160 DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE
);
161 this.writeBufferSize
= conf
.getLong(WRITE_BUFFER_SIZE_KEY
, WRITE_BUFFER_SIZE_DEFAULT
);
162 this.writeBufferPeriodicFlushTimeoutNs
=
163 TimeUnit
.MILLISECONDS
.toNanos(conf
.getLong(WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS
,
164 WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT
));
165 this.primaryCallTimeoutNs
= TimeUnit
.MICROSECONDS
.toNanos(
166 conf
.getLong(PRIMARY_CALL_TIMEOUT_MICROSECOND
, PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT
));
167 this.primaryScanTimeoutNs
= TimeUnit
.MICROSECONDS
.toNanos(
168 conf
.getLong(PRIMARY_SCAN_TIMEOUT_MICROSECOND
, PRIMARY_SCAN_TIMEOUT_MICROSECOND_DEFAULT
));
169 this.primaryMetaScanTimeoutNs
=
170 TimeUnit
.MICROSECONDS
.toNanos(conf
.getLong(HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT
,
171 HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT
));
172 this.maxKeyValueSize
= conf
.getInt(MAX_KEYVALUE_SIZE_KEY
, MAX_KEYVALUE_SIZE_DEFAULT
);
175 long getMetaOperationTimeoutNs() {
176 return metaOperationTimeoutNs
;
179 long getOperationTimeoutNs() {
180 return operationTimeoutNs
;
183 long getRpcTimeoutNs() {
187 long getReadRpcTimeoutNs() {
188 return readRpcTimeoutNs
;
191 long getWriteRpcTimeoutNs() {
192 return writeRpcTimeoutNs
;
199 long getPauseForCQTBENs() {
200 return pauseForCQTBENs
;
203 int getMaxRetries() {
207 int getStartLogErrorsCnt() {
208 return startLogErrorsCnt
;
211 long getScanTimeoutNs() {
212 return scanTimeoutNs
;
215 int getScannerCaching() {
216 return scannerCaching
;
219 int getMetaScannerCaching() {
220 return metaScannerCaching
;
223 long getScannerMaxResultSize() {
224 return scannerMaxResultSize
;
227 long getWriteBufferSize() {
228 return writeBufferSize
;
231 long getWriteBufferPeriodicFlushTimeoutNs() {
232 return writeBufferPeriodicFlushTimeoutNs
;
235 long getPrimaryCallTimeoutNs() {
236 return primaryCallTimeoutNs
;
239 long getPrimaryScanTimeoutNs() {
240 return primaryScanTimeoutNs
;
243 long getPrimaryMetaScanTimeoutNs() {
244 return primaryMetaScanTimeoutNs
;
247 int getMaxKeyValueSize() {
248 return maxKeyValueSize
;