HBASE-17532 Replaced explicit type with diamond operator
[hbase.git] / hbase-server / src / main / java / org / apache / hadoop / hbase / quotas / QuotaCache.java
blobad916179920f79ecef23319141e0bac63358315f
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
19 package org.apache.hadoop.hbase.quotas;
21 import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
23 import com.google.common.annotations.VisibleForTesting;
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Set;
30 import java.util.concurrent.ConcurrentHashMap;
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.hbase.ScheduledChore;
36 import org.apache.hadoop.hbase.Stoppable;
37 import org.apache.hadoop.hbase.TableName;
38 import org.apache.hadoop.hbase.classification.InterfaceAudience;
39 import org.apache.hadoop.hbase.classification.InterfaceStability;
40 import org.apache.hadoop.hbase.client.Get;
41 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
42 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
43 import org.apache.hadoop.security.UserGroupInformation;
45 /**
46 * Cache that keeps track of the quota settings for the users and tables that
47 * are interacting with it.
49 * To avoid blocking the operations if the requested quota is not in cache
50 * an "empty quota" will be returned and the request to fetch the quota information
51 * will be enqueued for the next refresh.
53 * TODO: At the moment the Cache has a Chore that will be triggered every 5min
54 * or on cache-miss events. Later the Quotas will be pushed using the notification system.
56 @InterfaceAudience.Private
57 @InterfaceStability.Evolving
58 public class QuotaCache implements Stoppable {
59 private static final Log LOG = LogFactory.getLog(QuotaCache.class);
61 public static final String REFRESH_CONF_KEY = "hbase.quota.refresh.period";
62 private static final int REFRESH_DEFAULT_PERIOD = 5 * 60000; // 5min
63 private static final int EVICT_PERIOD_FACTOR = 5; // N * REFRESH_DEFAULT_PERIOD
65 // for testing purpose only, enforce the cache to be always refreshed
66 static boolean TEST_FORCE_REFRESH = false;
68 private final ConcurrentHashMap<String, QuotaState> namespaceQuotaCache = new ConcurrentHashMap<>();
69 private final ConcurrentHashMap<TableName, QuotaState> tableQuotaCache = new ConcurrentHashMap<>();
70 private final ConcurrentHashMap<String, UserQuotaState> userQuotaCache = new ConcurrentHashMap<>();
71 private final RegionServerServices rsServices;
73 private QuotaRefresherChore refreshChore;
74 private boolean stopped = true;
76 public QuotaCache(final RegionServerServices rsServices) {
77 this.rsServices = rsServices;
80 public void start() throws IOException {
81 stopped = false;
83 // TODO: This will be replaced once we have the notification bus ready.
84 Configuration conf = rsServices.getConfiguration();
85 int period = conf.getInt(REFRESH_CONF_KEY, REFRESH_DEFAULT_PERIOD);
86 refreshChore = new QuotaRefresherChore(period, this);
87 rsServices.getChoreService().scheduleChore(refreshChore);
90 @Override
91 public void stop(final String why) {
92 stopped = true;
95 @Override
96 public boolean isStopped() {
97 return stopped;
101 * Returns the limiter associated to the specified user/table.
103 * @param ugi the user to limit
104 * @param table the table to limit
105 * @return the limiter associated to the specified user/table
107 public QuotaLimiter getUserLimiter(final UserGroupInformation ugi, final TableName table) {
108 if (table.isSystemTable()) {
109 return NoopQuotaLimiter.get();
111 return getUserQuotaState(ugi).getTableLimiter(table);
115 * Returns the QuotaState associated to the specified user.
116 * @param ugi the user
117 * @return the quota info associated to specified user
119 public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) {
120 return computeIfAbsent(userQuotaCache, ugi.getShortUserName(), UserQuotaState::new,
121 this::triggerCacheRefresh);
125 * Returns the limiter associated to the specified table.
127 * @param table the table to limit
128 * @return the limiter associated to the specified table
130 public QuotaLimiter getTableLimiter(final TableName table) {
131 return getQuotaState(this.tableQuotaCache, table).getGlobalLimiter();
135 * Returns the limiter associated to the specified namespace.
137 * @param namespace the namespace to limit
138 * @return the limiter associated to the specified namespace
140 public QuotaLimiter getNamespaceLimiter(final String namespace) {
141 return getQuotaState(this.namespaceQuotaCache, namespace).getGlobalLimiter();
145 * Returns the QuotaState requested. If the quota info is not in cache an empty one will be
146 * returned and the quota request will be enqueued for the next cache refresh.
148 private <K> QuotaState getQuotaState(final ConcurrentHashMap<K, QuotaState> quotasMap,
149 final K key) {
150 return computeIfAbsent(quotasMap, key, QuotaState::new, this::triggerCacheRefresh);
153 @VisibleForTesting
154 void triggerCacheRefresh() {
155 refreshChore.triggerNow();
158 @VisibleForTesting
159 long getLastUpdate() {
160 return refreshChore.lastUpdate;
163 @VisibleForTesting
164 Map<String, QuotaState> getNamespaceQuotaCache() {
165 return namespaceQuotaCache;
168 @VisibleForTesting
169 Map<TableName, QuotaState> getTableQuotaCache() {
170 return tableQuotaCache;
173 @VisibleForTesting
174 Map<String, UserQuotaState> getUserQuotaCache() {
175 return userQuotaCache;
178 // TODO: Remove this once we have the notification bus
179 private class QuotaRefresherChore extends ScheduledChore {
180 private long lastUpdate = 0;
182 public QuotaRefresherChore(final int period, final Stoppable stoppable) {
183 super("QuotaRefresherChore", stoppable, period);
186 @Override
187 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="GC_UNRELATED_TYPES",
188 justification="I do not understand why the complaints, it looks good to me -- FIX")
189 protected void chore() {
190 // Prefetch online tables/namespaces
191 for (TableName table: QuotaCache.this.rsServices.getOnlineTables()) {
192 if (table.isSystemTable()) continue;
193 if (!QuotaCache.this.tableQuotaCache.contains(table)) {
194 QuotaCache.this.tableQuotaCache.putIfAbsent(table, new QuotaState());
196 String ns = table.getNamespaceAsString();
197 if (!QuotaCache.this.namespaceQuotaCache.contains(ns)) {
198 QuotaCache.this.namespaceQuotaCache.putIfAbsent(ns, new QuotaState());
202 fetchNamespaceQuotaState();
203 fetchTableQuotaState();
204 fetchUserQuotaState();
205 lastUpdate = EnvironmentEdgeManager.currentTime();
208 private void fetchNamespaceQuotaState() {
209 fetch("namespace", QuotaCache.this.namespaceQuotaCache, new Fetcher<String, QuotaState>() {
210 @Override
211 public Get makeGet(final Map.Entry<String, QuotaState> entry) {
212 return QuotaUtil.makeGetForNamespaceQuotas(entry.getKey());
215 @Override
216 public Map<String, QuotaState> fetchEntries(final List<Get> gets)
217 throws IOException {
218 return QuotaUtil.fetchNamespaceQuotas(rsServices.getConnection(), gets);
223 private void fetchTableQuotaState() {
224 fetch("table", QuotaCache.this.tableQuotaCache, new Fetcher<TableName, QuotaState>() {
225 @Override
226 public Get makeGet(final Map.Entry<TableName, QuotaState> entry) {
227 return QuotaUtil.makeGetForTableQuotas(entry.getKey());
230 @Override
231 public Map<TableName, QuotaState> fetchEntries(final List<Get> gets)
232 throws IOException {
233 return QuotaUtil.fetchTableQuotas(rsServices.getConnection(), gets);
238 private void fetchUserQuotaState() {
239 final Set<String> namespaces = QuotaCache.this.namespaceQuotaCache.keySet();
240 final Set<TableName> tables = QuotaCache.this.tableQuotaCache.keySet();
241 fetch("user", QuotaCache.this.userQuotaCache, new Fetcher<String, UserQuotaState>() {
242 @Override
243 public Get makeGet(final Map.Entry<String, UserQuotaState> entry) {
244 return QuotaUtil.makeGetForUserQuotas(entry.getKey(), tables, namespaces);
247 @Override
248 public Map<String, UserQuotaState> fetchEntries(final List<Get> gets)
249 throws IOException {
250 return QuotaUtil.fetchUserQuotas(rsServices.getConnection(), gets);
255 private <K, V extends QuotaState> void fetch(final String type,
256 final ConcurrentHashMap<K, V> quotasMap, final Fetcher<K, V> fetcher) {
257 long now = EnvironmentEdgeManager.currentTime();
258 long refreshPeriod = getPeriod();
259 long evictPeriod = refreshPeriod * EVICT_PERIOD_FACTOR;
261 // Find the quota entries to update
262 List<Get> gets = new ArrayList<>();
263 List<K> toRemove = new ArrayList<>();
264 for (Map.Entry<K, V> entry: quotasMap.entrySet()) {
265 long lastUpdate = entry.getValue().getLastUpdate();
266 long lastQuery = entry.getValue().getLastQuery();
267 if (lastQuery > 0 && (now - lastQuery) >= evictPeriod) {
268 toRemove.add(entry.getKey());
269 } else if (TEST_FORCE_REFRESH || (now - lastUpdate) >= refreshPeriod) {
270 gets.add(fetcher.makeGet(entry));
274 for (final K key: toRemove) {
275 if (LOG.isTraceEnabled()) {
276 LOG.trace("evict " + type + " key=" + key);
278 quotasMap.remove(key);
281 // fetch and update the quota entries
282 if (!gets.isEmpty()) {
283 try {
284 for (Map.Entry<K, V> entry: fetcher.fetchEntries(gets).entrySet()) {
285 V quotaInfo = quotasMap.putIfAbsent(entry.getKey(), entry.getValue());
286 if (quotaInfo != null) {
287 quotaInfo.update(entry.getValue());
290 if (LOG.isTraceEnabled()) {
291 LOG.trace("refresh " + type + " key=" + entry.getKey() + " quotas=" + quotaInfo);
294 } catch (IOException e) {
295 LOG.warn("Unable to read " + type + " from quota table", e);
301 static interface Fetcher<Key, Value> {
302 Get makeGet(Map.Entry<Key, Value> entry);
303 Map<Key, Value> fetchEntries(List<Get> gets) throws IOException;