2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
.quotas
;
21 import java
.io
.IOException
;
22 import java
.util
.List
;
23 import java
.util
.Optional
;
25 import org
.apache
.hadoop
.hbase
.TableName
;
26 import org
.apache
.hadoop
.hbase
.ipc
.RpcScheduler
;
27 import org
.apache
.hadoop
.hbase
.ipc
.RpcServer
;
28 import org
.apache
.hadoop
.hbase
.regionserver
.Region
;
29 import org
.apache
.hadoop
.hbase
.regionserver
.RegionServerServices
;
30 import org
.apache
.hadoop
.hbase
.security
.User
;
31 import org
.apache
.hadoop
.security
.UserGroupInformation
;
32 import org
.apache
.yetus
.audience
.InterfaceAudience
;
33 import org
.apache
.yetus
.audience
.InterfaceStability
;
34 import org
.slf4j
.Logger
;
35 import org
.slf4j
.LoggerFactory
;
37 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
;
40 * Region Server Quota Manager.
41 * It is responsible to provide access to the quota information of each user/table.
43 * The direct user of this class is the RegionServer that will get and check the
44 * user/table quota for each operation (put, get, scan).
45 * For system tables and user/table with a quota specified, the quota check will be a noop.
47 @InterfaceAudience.Private
48 @InterfaceStability.Evolving
49 public class RegionServerRpcQuotaManager
{
50 private static final Logger LOG
= LoggerFactory
.getLogger(RegionServerRpcQuotaManager
.class);
52 private final RegionServerServices rsServices
;
54 private QuotaCache quotaCache
= null;
55 private volatile boolean rpcThrottleEnabled
;
56 // Storage for quota rpc throttle
57 private RpcThrottleStorage rpcThrottleStorage
;
59 public RegionServerRpcQuotaManager(final RegionServerServices rsServices
) {
60 this.rsServices
= rsServices
;
62 new RpcThrottleStorage(rsServices
.getZooKeeper(), rsServices
.getConfiguration());
65 public void start(final RpcScheduler rpcScheduler
) throws IOException
{
66 if (!QuotaUtil
.isQuotaEnabled(rsServices
.getConfiguration())) {
67 LOG
.info("Quota support disabled");
71 LOG
.info("Initializing RPC quota support");
73 // Initialize quota cache
74 quotaCache
= new QuotaCache(rsServices
);
76 rpcThrottleEnabled
= rpcThrottleStorage
.isRpcThrottleEnabled();
77 LOG
.info("Start rpc quota manager and rpc throttle enabled is {}", rpcThrottleEnabled
);
81 if (isQuotaEnabled()) {
82 quotaCache
.stop("shutdown");
86 protected boolean isRpcThrottleEnabled() {
87 return rpcThrottleEnabled
;
90 private boolean isQuotaEnabled() {
91 return quotaCache
!= null;
94 public void switchRpcThrottle(boolean enable
) throws IOException
{
95 if (isQuotaEnabled()) {
96 if (rpcThrottleEnabled
!= enable
) {
97 boolean previousEnabled
= rpcThrottleEnabled
;
98 rpcThrottleEnabled
= rpcThrottleStorage
.isRpcThrottleEnabled();
99 LOG
.info("Switch rpc throttle from {} to {}", previousEnabled
, rpcThrottleEnabled
);
102 "Skip switch rpc throttle because previous value {} is the same as current value {}",
103 rpcThrottleEnabled
, enable
);
106 LOG
.warn("Skip switch rpc throttle to {} because rpc quota is disabled", enable
);
110 QuotaCache
getQuotaCache() {
115 * Returns the quota for an operation.
117 * @param ugi the user that is executing the operation
118 * @param table the table where the operation will be executed
119 * @return the OperationQuota
121 public OperationQuota
getQuota(final UserGroupInformation ugi
, final TableName table
) {
122 if (isQuotaEnabled() && !table
.isSystemTable() && isRpcThrottleEnabled()) {
123 UserQuotaState userQuotaState
= quotaCache
.getUserQuotaState(ugi
);
124 QuotaLimiter userLimiter
= userQuotaState
.getTableLimiter(table
);
125 boolean useNoop
= userLimiter
.isBypass();
126 if (userQuotaState
.hasBypassGlobals()) {
127 if (LOG
.isTraceEnabled()) {
128 LOG
.trace("get quota for ugi=" + ugi
+ " table=" + table
+ " userLimiter=" + userLimiter
);
131 return new DefaultOperationQuota(this.rsServices
.getConfiguration(), userLimiter
);
134 QuotaLimiter nsLimiter
= quotaCache
.getNamespaceLimiter(table
.getNamespaceAsString());
135 QuotaLimiter tableLimiter
= quotaCache
.getTableLimiter(table
);
136 QuotaLimiter rsLimiter
= quotaCache
137 .getRegionServerQuotaLimiter(QuotaTableUtil
.QUOTA_REGION_SERVER_ROW_KEY
);
138 useNoop
&= tableLimiter
.isBypass() && nsLimiter
.isBypass() && rsLimiter
.isBypass();
139 boolean exceedThrottleQuotaEnabled
= quotaCache
.isExceedThrottleQuotaEnabled();
140 if (LOG
.isTraceEnabled()) {
141 LOG
.trace("get quota for ugi=" + ugi
+ " table=" + table
+ " userLimiter=" + userLimiter
142 + " tableLimiter=" + tableLimiter
+ " nsLimiter=" + nsLimiter
+ " rsLimiter="
143 + rsLimiter
+ " exceedThrottleQuotaEnabled=" + exceedThrottleQuotaEnabled
);
146 if (exceedThrottleQuotaEnabled
) {
147 return new ExceedOperationQuota(this.rsServices
.getConfiguration(), rsLimiter
,
148 userLimiter
, tableLimiter
, nsLimiter
);
150 return new DefaultOperationQuota(this.rsServices
.getConfiguration(), userLimiter
,
151 tableLimiter
, nsLimiter
, rsLimiter
);
156 return NoopOperationQuota
.get();
160 * Check the quota for the current (rpc-context) user.
161 * Returns the OperationQuota used to get the available quota and
162 * to report the data/usage of the operation.
163 * @param region the region where the operation will be performed
164 * @param type the operation type
165 * @return the OperationQuota
166 * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
168 public OperationQuota
checkQuota(final Region region
,
169 final OperationQuota
.OperationType type
) throws IOException
, RpcThrottlingException
{
171 case SCAN
: return checkQuota(region
, 0, 0, 1);
172 case GET
: return checkQuota(region
, 0, 1, 0);
173 case MUTATE
: return checkQuota(region
, 1, 0, 0);
175 throw new RuntimeException("Invalid operation type: " + type
);
179 * Check the quota for the current (rpc-context) user.
180 * Returns the OperationQuota used to get the available quota and
181 * to report the data/usage of the operation.
182 * @param region the region where the operation will be performed
183 * @param actions the "multi" actions to perform
184 * @return the OperationQuota
185 * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
187 public OperationQuota
checkQuota(final Region region
,
188 final List
<ClientProtos
.Action
> actions
) throws IOException
, RpcThrottlingException
{
191 for (final ClientProtos
.Action action
: actions
) {
192 if (action
.hasMutation()) {
194 } else if (action
.hasGet()) {
198 return checkQuota(region
, numWrites
, numReads
, 0);
202 * Check the quota for the current (rpc-context) user.
203 * Returns the OperationQuota used to get the available quota and
204 * to report the data/usage of the operation.
205 * @param region the region where the operation will be performed
206 * @param numWrites number of writes to perform
207 * @param numReads number of short-reads to perform
208 * @param numScans number of scan to perform
209 * @return the OperationQuota
210 * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
212 private OperationQuota
checkQuota(final Region region
,
213 final int numWrites
, final int numReads
, final int numScans
)
214 throws IOException
, RpcThrottlingException
{
215 Optional
<User
> user
= RpcServer
.getRequestUser();
216 UserGroupInformation ugi
;
217 if (user
.isPresent()) {
218 ugi
= user
.get().getUGI();
220 ugi
= User
.getCurrent().getUGI();
222 TableName table
= region
.getTableDescriptor().getTableName();
224 OperationQuota quota
= getQuota(ugi
, table
);
226 quota
.checkQuota(numWrites
, numReads
, numScans
);
227 } catch (RpcThrottlingException e
) {
228 LOG
.debug("Throttling exception for user=" + ugi
.getUserName() +
229 " table=" + table
+ " numWrites=" + numWrites
+
230 " numReads=" + numReads
+ " numScans=" + numScans
+
231 ": " + e
.getMessage());