2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
.quotas
;
21 import static org
.apache
.hadoop
.hbase
.util
.CollectionUtils
.computeIfAbsent
;
23 import com
.google
.common
.annotations
.VisibleForTesting
;
25 import java
.io
.IOException
;
26 import java
.util
.ArrayList
;
27 import java
.util
.List
;
30 import java
.util
.concurrent
.ConcurrentHashMap
;
32 import org
.apache
.commons
.logging
.Log
;
33 import org
.apache
.commons
.logging
.LogFactory
;
34 import org
.apache
.hadoop
.conf
.Configuration
;
35 import org
.apache
.hadoop
.hbase
.ScheduledChore
;
36 import org
.apache
.hadoop
.hbase
.Stoppable
;
37 import org
.apache
.hadoop
.hbase
.TableName
;
38 import org
.apache
.hadoop
.hbase
.classification
.InterfaceAudience
;
39 import org
.apache
.hadoop
.hbase
.classification
.InterfaceStability
;
40 import org
.apache
.hadoop
.hbase
.client
.Get
;
41 import org
.apache
.hadoop
.hbase
.regionserver
.RegionServerServices
;
42 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
43 import org
.apache
.hadoop
.security
.UserGroupInformation
;
46 * Cache that keeps track of the quota settings for the users and tables that
47 * are interacting with it.
49 * To avoid blocking the operations if the requested quota is not in cache
50 * an "empty quota" will be returned and the request to fetch the quota information
51 * will be enqueued for the next refresh.
53 * TODO: At the moment the Cache has a Chore that will be triggered every 5min
54 * or on cache-miss events. Later the Quotas will be pushed using the notification system.
56 @InterfaceAudience.Private
57 @InterfaceStability.Evolving
58 public class QuotaCache
implements Stoppable
{
59 private static final Log LOG
= LogFactory
.getLog(QuotaCache
.class);
61 public static final String REFRESH_CONF_KEY
= "hbase.quota.refresh.period";
62 private static final int REFRESH_DEFAULT_PERIOD
= 5 * 60000; // 5min
63 private static final int EVICT_PERIOD_FACTOR
= 5; // N * REFRESH_DEFAULT_PERIOD
65 // for testing purpose only, enforce the cache to be always refreshed
66 static boolean TEST_FORCE_REFRESH
= false;
68 private final ConcurrentHashMap
<String
, QuotaState
> namespaceQuotaCache
= new ConcurrentHashMap
<>();
69 private final ConcurrentHashMap
<TableName
, QuotaState
> tableQuotaCache
= new ConcurrentHashMap
<>();
70 private final ConcurrentHashMap
<String
, UserQuotaState
> userQuotaCache
= new ConcurrentHashMap
<>();
71 private final RegionServerServices rsServices
;
73 private QuotaRefresherChore refreshChore
;
74 private boolean stopped
= true;
76 public QuotaCache(final RegionServerServices rsServices
) {
77 this.rsServices
= rsServices
;
80 public void start() throws IOException
{
83 // TODO: This will be replaced once we have the notification bus ready.
84 Configuration conf
= rsServices
.getConfiguration();
85 int period
= conf
.getInt(REFRESH_CONF_KEY
, REFRESH_DEFAULT_PERIOD
);
86 refreshChore
= new QuotaRefresherChore(period
, this);
87 rsServices
.getChoreService().scheduleChore(refreshChore
);
91 public void stop(final String why
) {
96 public boolean isStopped() {
101 * Returns the limiter associated to the specified user/table.
103 * @param ugi the user to limit
104 * @param table the table to limit
105 * @return the limiter associated to the specified user/table
107 public QuotaLimiter
getUserLimiter(final UserGroupInformation ugi
, final TableName table
) {
108 if (table
.isSystemTable()) {
109 return NoopQuotaLimiter
.get();
111 return getUserQuotaState(ugi
).getTableLimiter(table
);
115 * Returns the QuotaState associated to the specified user.
116 * @param ugi the user
117 * @return the quota info associated to specified user
119 public UserQuotaState
getUserQuotaState(final UserGroupInformation ugi
) {
120 return computeIfAbsent(userQuotaCache
, ugi
.getShortUserName(), UserQuotaState
::new,
121 this::triggerCacheRefresh
);
125 * Returns the limiter associated to the specified table.
127 * @param table the table to limit
128 * @return the limiter associated to the specified table
130 public QuotaLimiter
getTableLimiter(final TableName table
) {
131 return getQuotaState(this.tableQuotaCache
, table
).getGlobalLimiter();
135 * Returns the limiter associated to the specified namespace.
137 * @param namespace the namespace to limit
138 * @return the limiter associated to the specified namespace
140 public QuotaLimiter
getNamespaceLimiter(final String namespace
) {
141 return getQuotaState(this.namespaceQuotaCache
, namespace
).getGlobalLimiter();
145 * Returns the QuotaState requested. If the quota info is not in cache an empty one will be
146 * returned and the quota request will be enqueued for the next cache refresh.
148 private <K
> QuotaState
getQuotaState(final ConcurrentHashMap
<K
, QuotaState
> quotasMap
,
150 return computeIfAbsent(quotasMap
, key
, QuotaState
::new, this::triggerCacheRefresh
);
154 void triggerCacheRefresh() {
155 refreshChore
.triggerNow();
159 long getLastUpdate() {
160 return refreshChore
.lastUpdate
;
164 Map
<String
, QuotaState
> getNamespaceQuotaCache() {
165 return namespaceQuotaCache
;
169 Map
<TableName
, QuotaState
> getTableQuotaCache() {
170 return tableQuotaCache
;
174 Map
<String
, UserQuotaState
> getUserQuotaCache() {
175 return userQuotaCache
;
178 // TODO: Remove this once we have the notification bus
179 private class QuotaRefresherChore
extends ScheduledChore
{
180 private long lastUpdate
= 0;
182 public QuotaRefresherChore(final int period
, final Stoppable stoppable
) {
183 super("QuotaRefresherChore", stoppable
, period
);
187 @edu.umd
.cs
.findbugs
.annotations
.SuppressWarnings(value
="GC_UNRELATED_TYPES",
188 justification
="I do not understand why the complaints, it looks good to me -- FIX")
189 protected void chore() {
190 // Prefetch online tables/namespaces
191 for (TableName table
: QuotaCache
.this.rsServices
.getOnlineTables()) {
192 if (table
.isSystemTable()) continue;
193 if (!QuotaCache
.this.tableQuotaCache
.contains(table
)) {
194 QuotaCache
.this.tableQuotaCache
.putIfAbsent(table
, new QuotaState());
196 String ns
= table
.getNamespaceAsString();
197 if (!QuotaCache
.this.namespaceQuotaCache
.contains(ns
)) {
198 QuotaCache
.this.namespaceQuotaCache
.putIfAbsent(ns
, new QuotaState());
202 fetchNamespaceQuotaState();
203 fetchTableQuotaState();
204 fetchUserQuotaState();
205 lastUpdate
= EnvironmentEdgeManager
.currentTime();
208 private void fetchNamespaceQuotaState() {
209 fetch("namespace", QuotaCache
.this.namespaceQuotaCache
, new Fetcher
<String
, QuotaState
>() {
211 public Get
makeGet(final Map
.Entry
<String
, QuotaState
> entry
) {
212 return QuotaUtil
.makeGetForNamespaceQuotas(entry
.getKey());
216 public Map
<String
, QuotaState
> fetchEntries(final List
<Get
> gets
)
218 return QuotaUtil
.fetchNamespaceQuotas(rsServices
.getConnection(), gets
);
223 private void fetchTableQuotaState() {
224 fetch("table", QuotaCache
.this.tableQuotaCache
, new Fetcher
<TableName
, QuotaState
>() {
226 public Get
makeGet(final Map
.Entry
<TableName
, QuotaState
> entry
) {
227 return QuotaUtil
.makeGetForTableQuotas(entry
.getKey());
231 public Map
<TableName
, QuotaState
> fetchEntries(final List
<Get
> gets
)
233 return QuotaUtil
.fetchTableQuotas(rsServices
.getConnection(), gets
);
238 private void fetchUserQuotaState() {
239 final Set
<String
> namespaces
= QuotaCache
.this.namespaceQuotaCache
.keySet();
240 final Set
<TableName
> tables
= QuotaCache
.this.tableQuotaCache
.keySet();
241 fetch("user", QuotaCache
.this.userQuotaCache
, new Fetcher
<String
, UserQuotaState
>() {
243 public Get
makeGet(final Map
.Entry
<String
, UserQuotaState
> entry
) {
244 return QuotaUtil
.makeGetForUserQuotas(entry
.getKey(), tables
, namespaces
);
248 public Map
<String
, UserQuotaState
> fetchEntries(final List
<Get
> gets
)
250 return QuotaUtil
.fetchUserQuotas(rsServices
.getConnection(), gets
);
255 private <K
, V
extends QuotaState
> void fetch(final String type
,
256 final ConcurrentHashMap
<K
, V
> quotasMap
, final Fetcher
<K
, V
> fetcher
) {
257 long now
= EnvironmentEdgeManager
.currentTime();
258 long refreshPeriod
= getPeriod();
259 long evictPeriod
= refreshPeriod
* EVICT_PERIOD_FACTOR
;
261 // Find the quota entries to update
262 List
<Get
> gets
= new ArrayList
<>();
263 List
<K
> toRemove
= new ArrayList
<>();
264 for (Map
.Entry
<K
, V
> entry
: quotasMap
.entrySet()) {
265 long lastUpdate
= entry
.getValue().getLastUpdate();
266 long lastQuery
= entry
.getValue().getLastQuery();
267 if (lastQuery
> 0 && (now
- lastQuery
) >= evictPeriod
) {
268 toRemove
.add(entry
.getKey());
269 } else if (TEST_FORCE_REFRESH
|| (now
- lastUpdate
) >= refreshPeriod
) {
270 gets
.add(fetcher
.makeGet(entry
));
274 for (final K key
: toRemove
) {
275 if (LOG
.isTraceEnabled()) {
276 LOG
.trace("evict " + type
+ " key=" + key
);
278 quotasMap
.remove(key
);
281 // fetch and update the quota entries
282 if (!gets
.isEmpty()) {
284 for (Map
.Entry
<K
, V
> entry
: fetcher
.fetchEntries(gets
).entrySet()) {
285 V quotaInfo
= quotasMap
.putIfAbsent(entry
.getKey(), entry
.getValue());
286 if (quotaInfo
!= null) {
287 quotaInfo
.update(entry
.getValue());
290 if (LOG
.isTraceEnabled()) {
291 LOG
.trace("refresh " + type
+ " key=" + entry
.getKey() + " quotas=" + quotaInfo
);
294 } catch (IOException e
) {
295 LOG
.warn("Unable to read " + type
+ " from quota table", e
);
301 static interface Fetcher
<Key
, Value
> {
302 Get
makeGet(Map
.Entry
<Key
, Value
> entry
);
303 Map
<Key
, Value
> fetchEntries(List
<Get
> gets
) throws IOException
;