HBASE-26765 Minor refactor of async scanning code (#4121)
[hbase.git] / hbase-client / src / main / java / org / apache / hadoop / hbase / client / AsyncConnectionConfiguration.java
bloba6d403f597b83410a2ecb6051b1638e8039fa2a0
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.client;
20 import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT;
21 import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_PAUSE;
22 import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER;
23 import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING;
24 import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE;
25 import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
26 import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_META_SCANNER_CACHING;
27 import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
28 import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT;
29 import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT;
30 import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT;
31 import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT;
32 import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE;
33 import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE;
34 import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
35 import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_CACHING;
36 import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY;
37 import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
38 import static org.apache.hadoop.hbase.HConstants.HBASE_META_SCANNER_CACHING;
39 import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY;
40 import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
41 import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY;
42 import static org.apache.hadoop.hbase.client.ConnectionConfiguration.MAX_KEYVALUE_SIZE_DEFAULT;
43 import static org.apache.hadoop.hbase.client.ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY;
44 import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND;
45 import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT;
46 import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_SCAN_TIMEOUT_MICROSECOND;
47 import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_SCAN_TIMEOUT_MICROSECOND_DEFAULT;
48 import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS;
49 import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT;
50 import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_DEFAULT;
51 import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_KEY;
53 import java.util.concurrent.TimeUnit;
54 import org.apache.hadoop.conf.Configuration;
55 import org.apache.yetus.audience.InterfaceAudience;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
59 /**
60 * Timeout configs.
62 @InterfaceAudience.Private
63 class AsyncConnectionConfiguration {
65 private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionConfiguration.class);
67 /**
68 * Configure the number of failures after which the client will start logging. A few failures
69 * is fine: region moved, then is not opened, then is overloaded. We try to have an acceptable
70 * heuristic for the number of errors we don't log. 5 was chosen because we wait for 1s at
71 * this stage.
73 public static final String START_LOG_ERRORS_AFTER_COUNT_KEY =
74 "hbase.client.start.log.errors.counter";
75 public static final int DEFAULT_START_LOG_ERRORS_AFTER_COUNT = 5;
77 private final long metaOperationTimeoutNs;
79 // timeout for a whole operation such as get, put or delete. Notice that scan will not be effected
80 // by this value, see scanTimeoutNs.
81 private final long operationTimeoutNs;
83 // timeout for each rpc request. Can be overridden by a more specific config, such as
84 // readRpcTimeout or writeRpcTimeout.
85 private final long rpcTimeoutNs;
87 // timeout for each read rpc request
88 private final long readRpcTimeoutNs;
90 // timeout for each write rpc request
91 private final long writeRpcTimeoutNs;
93 private final long pauseNs;
95 private final long pauseForCQTBENs;
97 private final int maxRetries;
99 /** How many retries are allowed before we start to log */
100 private final int startLogErrorsCnt;
102 // As now we have heartbeat support for scan, ideally a scan will never timeout unless the RS is
103 // crash. The RS will always return something before the rpc timeout or scan timeout to tell the
104 // client that it is still alive. The scan timeout is used as operation timeout for every
105 // operations in a scan, such as openScanner or next.
106 private final long scanTimeoutNs;
108 private final int scannerCaching;
110 private final int metaScannerCaching;
112 private final long scannerMaxResultSize;
114 private final long writeBufferSize;
116 private final long writeBufferPeriodicFlushTimeoutNs;
118 // this is for supporting region replica get, if the primary does not finished within this
119 // timeout, we will send request to secondaries.
120 private final long primaryCallTimeoutNs;
122 private final long primaryScanTimeoutNs;
124 private final long primaryMetaScanTimeoutNs;
126 private final int maxKeyValueSize;
128 AsyncConnectionConfiguration(Configuration conf) {
129 this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
130 conf.getLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
131 this.operationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
132 conf.getLong(HBASE_CLIENT_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
133 long rpcTimeoutMs = conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT);
134 this.rpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(rpcTimeoutMs);
135 this.readRpcTimeoutNs =
136 TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, rpcTimeoutMs));
137 this.writeRpcTimeoutNs =
138 TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, rpcTimeoutMs));
139 long pauseMs = conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE);
140 long pauseForCQTBEMs = conf.getLong(HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseMs);
141 if (pauseForCQTBEMs < pauseMs) {
142 LOG.warn(
143 "The {} setting: {} ms is less than the {} setting: {} ms, use the greater one instead",
144 HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseForCQTBEMs, HBASE_CLIENT_PAUSE, pauseMs);
145 pauseForCQTBEMs = pauseMs;
147 this.pauseNs = TimeUnit.MILLISECONDS.toNanos(pauseMs);
148 this.pauseForCQTBENs = TimeUnit.MILLISECONDS.toNanos(pauseForCQTBEMs);
149 this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
150 this.startLogErrorsCnt =
151 conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
152 this.scanTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
153 conf.getInt(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
154 DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD));
155 this.scannerCaching =
156 conf.getInt(HBASE_CLIENT_SCANNER_CACHING, DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
157 this.metaScannerCaching =
158 conf.getInt(HBASE_META_SCANNER_CACHING, DEFAULT_HBASE_META_SCANNER_CACHING);
159 this.scannerMaxResultSize = conf.getLong(HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
160 DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
161 this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT);
162 this.writeBufferPeriodicFlushTimeoutNs =
163 TimeUnit.MILLISECONDS.toNanos(conf.getLong(WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS,
164 WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT));
165 this.primaryCallTimeoutNs = TimeUnit.MICROSECONDS.toNanos(
166 conf.getLong(PRIMARY_CALL_TIMEOUT_MICROSECOND, PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT));
167 this.primaryScanTimeoutNs = TimeUnit.MICROSECONDS.toNanos(
168 conf.getLong(PRIMARY_SCAN_TIMEOUT_MICROSECOND, PRIMARY_SCAN_TIMEOUT_MICROSECOND_DEFAULT));
169 this.primaryMetaScanTimeoutNs =
170 TimeUnit.MICROSECONDS.toNanos(conf.getLong(HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT,
171 HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT));
172 this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT);
175 long getMetaOperationTimeoutNs() {
176 return metaOperationTimeoutNs;
179 long getOperationTimeoutNs() {
180 return operationTimeoutNs;
183 long getRpcTimeoutNs() {
184 return rpcTimeoutNs;
187 long getReadRpcTimeoutNs() {
188 return readRpcTimeoutNs;
191 long getWriteRpcTimeoutNs() {
192 return writeRpcTimeoutNs;
195 long getPauseNs() {
196 return pauseNs;
199 long getPauseForCQTBENs() {
200 return pauseForCQTBENs;
203 int getMaxRetries() {
204 return maxRetries;
207 int getStartLogErrorsCnt() {
208 return startLogErrorsCnt;
211 long getScanTimeoutNs() {
212 return scanTimeoutNs;
215 int getScannerCaching() {
216 return scannerCaching;
219 int getMetaScannerCaching() {
220 return metaScannerCaching;
223 long getScannerMaxResultSize() {
224 return scannerMaxResultSize;
227 long getWriteBufferSize() {
228 return writeBufferSize;
231 long getWriteBufferPeriodicFlushTimeoutNs() {
232 return writeBufferPeriodicFlushTimeoutNs;
235 long getPrimaryCallTimeoutNs() {
236 return primaryCallTimeoutNs;
239 long getPrimaryScanTimeoutNs() {
240 return primaryScanTimeoutNs;
243 long getPrimaryMetaScanTimeoutNs() {
244 return primaryMetaScanTimeoutNs;
247 int getMaxKeyValueSize() {
248 return maxKeyValueSize;