2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
.quotas
;
21 import java
.io
.IOException
;
22 import java
.util
.List
;
23 import java
.util
.Optional
;
25 import org
.apache
.hadoop
.hbase
.TableName
;
26 import org
.apache
.hadoop
.hbase
.ipc
.RpcScheduler
;
27 import org
.apache
.hadoop
.hbase
.ipc
.RpcServer
;
28 import org
.apache
.hadoop
.hbase
.regionserver
.Region
;
29 import org
.apache
.hadoop
.hbase
.regionserver
.RegionServerServices
;
30 import org
.apache
.hadoop
.hbase
.security
.User
;
31 import org
.apache
.hadoop
.security
.UserGroupInformation
;
32 import org
.apache
.yetus
.audience
.InterfaceAudience
;
33 import org
.apache
.yetus
.audience
.InterfaceStability
;
34 import org
.slf4j
.Logger
;
35 import org
.slf4j
.LoggerFactory
;
36 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.annotations
.VisibleForTesting
;
37 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
;
40 * Region Server Quota Manager.
41 * It is responsible to provide access to the quota information of each user/table.
43 * The direct user of this class is the RegionServer that will get and check the
44 * user/table quota for each operation (put, get, scan).
45 * For system tables and user/table with a quota specified, the quota check will be a noop.
47 @InterfaceAudience.Private
48 @InterfaceStability.Evolving
49 public class RegionServerRpcQuotaManager
{
50 private static final Logger LOG
= LoggerFactory
.getLogger(RegionServerRpcQuotaManager
.class);
52 private final RegionServerServices rsServices
;
54 private QuotaCache quotaCache
= null;
55 private volatile boolean rpcThrottleEnabled
;
56 // Storage for quota rpc throttle
57 private RpcThrottleStorage rpcThrottleStorage
;
59 public RegionServerRpcQuotaManager(final RegionServerServices rsServices
) {
60 this.rsServices
= rsServices
;
62 new RpcThrottleStorage(rsServices
.getZooKeeper(), rsServices
.getConfiguration());
65 public void start(final RpcScheduler rpcScheduler
) throws IOException
{
66 if (!QuotaUtil
.isQuotaEnabled(rsServices
.getConfiguration())) {
67 LOG
.info("Quota support disabled");
71 LOG
.info("Initializing RPC quota support");
73 // Initialize quota cache
74 quotaCache
= new QuotaCache(rsServices
);
76 rpcThrottleEnabled
= rpcThrottleStorage
.isRpcThrottleEnabled();
77 LOG
.info("Start rpc quota manager and rpc throttle enabled is {}", rpcThrottleEnabled
);
81 if (isQuotaEnabled()) {
82 quotaCache
.stop("shutdown");
87 protected boolean isRpcThrottleEnabled() {
88 return rpcThrottleEnabled
;
91 private boolean isQuotaEnabled() {
92 return quotaCache
!= null;
95 public void switchRpcThrottle(boolean enable
) throws IOException
{
96 if (isQuotaEnabled()) {
97 if (rpcThrottleEnabled
!= enable
) {
98 boolean previousEnabled
= rpcThrottleEnabled
;
99 rpcThrottleEnabled
= rpcThrottleStorage
.isRpcThrottleEnabled();
100 LOG
.info("Switch rpc throttle from {} to {}", previousEnabled
, rpcThrottleEnabled
);
103 "Skip switch rpc throttle because previous value {} is the same as current value {}",
104 rpcThrottleEnabled
, enable
);
107 LOG
.warn("Skip switch rpc throttle to {} because rpc quota is disabled", enable
);
112 QuotaCache
getQuotaCache() {
117 * Returns the quota for an operation.
119 * @param ugi the user that is executing the operation
120 * @param table the table where the operation will be executed
121 * @return the OperationQuota
123 public OperationQuota
getQuota(final UserGroupInformation ugi
, final TableName table
) {
124 if (isQuotaEnabled() && !table
.isSystemTable() && isRpcThrottleEnabled()) {
125 UserQuotaState userQuotaState
= quotaCache
.getUserQuotaState(ugi
);
126 QuotaLimiter userLimiter
= userQuotaState
.getTableLimiter(table
);
127 boolean useNoop
= userLimiter
.isBypass();
128 if (userQuotaState
.hasBypassGlobals()) {
129 if (LOG
.isTraceEnabled()) {
130 LOG
.trace("get quota for ugi=" + ugi
+ " table=" + table
+ " userLimiter=" + userLimiter
);
133 return new DefaultOperationQuota(this.rsServices
.getConfiguration(), userLimiter
);
136 QuotaLimiter nsLimiter
= quotaCache
.getNamespaceLimiter(table
.getNamespaceAsString());
137 QuotaLimiter tableLimiter
= quotaCache
.getTableLimiter(table
);
138 QuotaLimiter rsLimiter
= quotaCache
139 .getRegionServerQuotaLimiter(QuotaTableUtil
.QUOTA_REGION_SERVER_ROW_KEY
);
140 useNoop
&= tableLimiter
.isBypass() && nsLimiter
.isBypass() && rsLimiter
.isBypass();
141 if (LOG
.isTraceEnabled()) {
142 LOG
.trace("get quota for ugi=" + ugi
+ " table=" + table
+ " userLimiter=" + userLimiter
143 + " tableLimiter=" + tableLimiter
+ " nsLimiter=" + nsLimiter
+ " rsLimiter="
147 return new DefaultOperationQuota(this.rsServices
.getConfiguration(), userLimiter
,
148 tableLimiter
, nsLimiter
, rsLimiter
);
152 return NoopOperationQuota
.get();
156 * Check the quota for the current (rpc-context) user.
157 * Returns the OperationQuota used to get the available quota and
158 * to report the data/usage of the operation.
159 * @param region the region where the operation will be performed
160 * @param type the operation type
161 * @return the OperationQuota
162 * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
164 public OperationQuota
checkQuota(final Region region
,
165 final OperationQuota
.OperationType type
) throws IOException
, RpcThrottlingException
{
167 case SCAN
: return checkQuota(region
, 0, 0, 1);
168 case GET
: return checkQuota(region
, 0, 1, 0);
169 case MUTATE
: return checkQuota(region
, 1, 0, 0);
171 throw new RuntimeException("Invalid operation type: " + type
);
175 * Check the quota for the current (rpc-context) user.
176 * Returns the OperationQuota used to get the available quota and
177 * to report the data/usage of the operation.
178 * @param region the region where the operation will be performed
179 * @param actions the "multi" actions to perform
180 * @return the OperationQuota
181 * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
183 public OperationQuota
checkQuota(final Region region
,
184 final List
<ClientProtos
.Action
> actions
) throws IOException
, RpcThrottlingException
{
187 for (final ClientProtos
.Action action
: actions
) {
188 if (action
.hasMutation()) {
190 } else if (action
.hasGet()) {
194 return checkQuota(region
, numWrites
, numReads
, 0);
198 * Check the quota for the current (rpc-context) user.
199 * Returns the OperationQuota used to get the available quota and
200 * to report the data/usage of the operation.
201 * @param region the region where the operation will be performed
202 * @param numWrites number of writes to perform
203 * @param numReads number of short-reads to perform
204 * @param numScans number of scan to perform
205 * @return the OperationQuota
206 * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
208 private OperationQuota
checkQuota(final Region region
,
209 final int numWrites
, final int numReads
, final int numScans
)
210 throws IOException
, RpcThrottlingException
{
211 Optional
<User
> user
= RpcServer
.getRequestUser();
212 UserGroupInformation ugi
;
213 if (user
.isPresent()) {
214 ugi
= user
.get().getUGI();
216 ugi
= User
.getCurrent().getUGI();
218 TableName table
= region
.getTableDescriptor().getTableName();
220 OperationQuota quota
= getQuota(ugi
, table
);
222 quota
.checkQuota(numWrites
, numReads
, numScans
);
223 } catch (RpcThrottlingException e
) {
224 LOG
.debug("Throttling exception for user=" + ugi
.getUserName() +
225 " table=" + table
+ " numWrites=" + numWrites
+
226 " numReads=" + numReads
+ " numScans=" + numScans
+
227 ": " + e
.getMessage());