2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
.quotas
;
21 import java
.io
.IOException
;
22 import java
.util
.Collections
;
23 import java
.util
.HashMap
;
24 import java
.util
.HashSet
;
25 import java
.util
.Iterator
;
27 import java
.util
.Map
.Entry
;
28 import java
.util
.Optional
;
30 import java
.util
.concurrent
.ConcurrentHashMap
;
31 import org
.apache
.commons
.lang3
.builder
.HashCodeBuilder
;
32 import org
.apache
.hadoop
.conf
.Configuration
;
33 import org
.apache
.hadoop
.fs
.FileSystem
;
34 import org
.apache
.hadoop
.hbase
.DoNotRetryIOException
;
35 import org
.apache
.hadoop
.hbase
.NamespaceDescriptor
;
36 import org
.apache
.hadoop
.hbase
.RegionStateListener
;
37 import org
.apache
.hadoop
.hbase
.TableName
;
38 import org
.apache
.hadoop
.hbase
.client
.Connection
;
39 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
40 import org
.apache
.hadoop
.hbase
.master
.MasterServices
;
41 import org
.apache
.hadoop
.hbase
.master
.procedure
.ProcedurePrepareLatch
;
42 import org
.apache
.hadoop
.hbase
.master
.procedure
.SwitchRpcThrottleProcedure
;
43 import org
.apache
.hadoop
.hbase
.namespace
.NamespaceAuditor
;
44 import org
.apache
.hadoop
.hbase
.quotas
.SpaceQuotaSnapshot
.SpaceQuotaStatus
;
45 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
46 import org
.apache
.yetus
.audience
.InterfaceAudience
;
47 import org
.apache
.yetus
.audience
.InterfaceStability
;
48 import org
.slf4j
.Logger
;
49 import org
.slf4j
.LoggerFactory
;
51 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.HashMultimap
;
52 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.Maps
;
53 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.TextFormat
;
55 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ProtobufUtil
;
56 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.MasterProtos
.IsRpcThrottleEnabledRequest
;
57 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.MasterProtos
.IsRpcThrottleEnabledResponse
;
58 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.MasterProtos
.SetQuotaRequest
;
59 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.MasterProtos
.SetQuotaResponse
;
60 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.MasterProtos
.SwitchExceedThrottleQuotaRequest
;
61 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.MasterProtos
.SwitchExceedThrottleQuotaResponse
;
62 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.MasterProtos
.SwitchRpcThrottleRequest
;
63 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.MasterProtos
.SwitchRpcThrottleResponse
;
64 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.FileArchiveNotificationRequest
;
65 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.FileArchiveNotificationRequest
.FileWithSize
;
68 * Master Quota Manager.
69 * It is responsible for initialize the quota table on the first-run and
70 * provide the admin operations to interact with the quota table.
72 * TODO: FUTURE: The master will be responsible to notify each RS of quota changes
73 * and it will do the "quota aggregation" when the QuotaScope is CLUSTER.
75 @InterfaceAudience.Private
76 @InterfaceStability.Evolving
77 public class MasterQuotaManager
implements RegionStateListener
{
78 private static final Logger LOG
= LoggerFactory
.getLogger(MasterQuotaManager
.class);
79 private static final Map
<RegionInfo
, Long
> EMPTY_MAP
= Collections
.unmodifiableMap(
82 private final MasterServices masterServices
;
83 private NamedLock
<String
> namespaceLocks
;
84 private NamedLock
<TableName
> tableLocks
;
85 private NamedLock
<String
> userLocks
;
86 private NamedLock
<String
> regionServerLocks
;
87 private boolean initialized
= false;
88 private NamespaceAuditor namespaceQuotaManager
;
89 private ConcurrentHashMap
<RegionInfo
, SizeSnapshotWithTimestamp
> regionSizes
;
90 // Storage for quota rpc throttle
91 private RpcThrottleStorage rpcThrottleStorage
;
93 public MasterQuotaManager(final MasterServices masterServices
) {
94 this.masterServices
= masterServices
;
97 public void start() throws IOException
{
98 // If the user doesn't want the quota support skip all the initializations.
99 if (!QuotaUtil
.isQuotaEnabled(masterServices
.getConfiguration())) {
100 LOG
.info("Quota support disabled");
104 // Create the quota table if missing
105 if (!masterServices
.getTableDescriptors().exists(QuotaUtil
.QUOTA_TABLE_NAME
)) {
106 LOG
.info("Quota table not found. Creating...");
110 LOG
.info("Initializing quota support");
111 namespaceLocks
= new NamedLock
<>();
112 tableLocks
= new NamedLock
<>();
113 userLocks
= new NamedLock
<>();
114 regionServerLocks
= new NamedLock
<>();
115 regionSizes
= new ConcurrentHashMap
<>();
117 namespaceQuotaManager
= new NamespaceAuditor(masterServices
);
118 namespaceQuotaManager
.start();
122 new RpcThrottleStorage(masterServices
.getZooKeeper(), masterServices
.getConfiguration());
128 public boolean isQuotaInitialized() {
129 return initialized
&& namespaceQuotaManager
.isInitialized();
132 /* ==========================================================================
133 * Admin operations to manage the quota table
135 public SetQuotaResponse
setQuota(final SetQuotaRequest req
)
136 throws IOException
, InterruptedException
{
139 if (req
.hasUserName()) {
140 userLocks
.lock(req
.getUserName());
142 if (req
.hasTableName()) {
143 setUserQuota(req
.getUserName(), ProtobufUtil
.toTableName(req
.getTableName()), req
);
144 } else if (req
.hasNamespace()) {
145 setUserQuota(req
.getUserName(), req
.getNamespace(), req
);
147 setUserQuota(req
.getUserName(), req
);
150 userLocks
.unlock(req
.getUserName());
152 } else if (req
.hasTableName()) {
153 TableName table
= ProtobufUtil
.toTableName(req
.getTableName());
154 tableLocks
.lock(table
);
156 setTableQuota(table
, req
);
158 tableLocks
.unlock(table
);
160 } else if (req
.hasNamespace()) {
161 namespaceLocks
.lock(req
.getNamespace());
163 setNamespaceQuota(req
.getNamespace(), req
);
165 namespaceLocks
.unlock(req
.getNamespace());
167 } else if (req
.hasRegionServer()) {
168 regionServerLocks
.lock(req
.getRegionServer());
170 setRegionServerQuota(req
.getRegionServer(), req
);
172 regionServerLocks
.unlock(req
.getRegionServer());
175 throw new DoNotRetryIOException(new UnsupportedOperationException(
176 "a user, a table, a namespace or region server must be specified"));
178 return SetQuotaResponse
.newBuilder().build();
181 public void setUserQuota(final String userName
, final SetQuotaRequest req
)
182 throws IOException
, InterruptedException
{
183 setQuota(req
, new SetQuotaOperations() {
185 public GlobalQuotaSettingsImpl
fetch() throws IOException
{
186 return new GlobalQuotaSettingsImpl(req
.getUserName(), null, null, null,
187 QuotaUtil
.getUserQuota(masterServices
.getConnection(), userName
));
190 public void update(GlobalQuotaSettingsImpl quotaPojo
) throws IOException
{
191 QuotaUtil
.addUserQuota(masterServices
.getConnection(), userName
, quotaPojo
.toQuotas());
194 public void delete() throws IOException
{
195 QuotaUtil
.deleteUserQuota(masterServices
.getConnection(), userName
);
198 public void preApply(GlobalQuotaSettingsImpl quotaPojo
) throws IOException
{
199 masterServices
.getMasterCoprocessorHost().preSetUserQuota(userName
, quotaPojo
);
202 public void postApply(GlobalQuotaSettingsImpl quotaPojo
) throws IOException
{
203 masterServices
.getMasterCoprocessorHost().postSetUserQuota(userName
, quotaPojo
);
208 public void setUserQuota(final String userName
, final TableName table
,
209 final SetQuotaRequest req
) throws IOException
, InterruptedException
{
210 setQuota(req
, new SetQuotaOperations() {
212 public GlobalQuotaSettingsImpl
fetch() throws IOException
{
213 return new GlobalQuotaSettingsImpl(userName
, table
, null, null,
214 QuotaUtil
.getUserQuota(masterServices
.getConnection(), userName
, table
));
217 public void update(GlobalQuotaSettingsImpl quotaPojo
) throws IOException
{
218 QuotaUtil
.addUserQuota(masterServices
.getConnection(), userName
, table
,
219 quotaPojo
.toQuotas());
222 public void delete() throws IOException
{
223 QuotaUtil
.deleteUserQuota(masterServices
.getConnection(), userName
, table
);
226 public void preApply(GlobalQuotaSettingsImpl quotaPojo
) throws IOException
{
227 masterServices
.getMasterCoprocessorHost().preSetUserQuota(userName
, table
, quotaPojo
);
230 public void postApply(GlobalQuotaSettingsImpl quotaPojo
) throws IOException
{
231 masterServices
.getMasterCoprocessorHost().postSetUserQuota(userName
, table
, quotaPojo
);
236 public void setUserQuota(final String userName
, final String namespace
,
237 final SetQuotaRequest req
) throws IOException
, InterruptedException
{
238 setQuota(req
, new SetQuotaOperations() {
240 public GlobalQuotaSettingsImpl
fetch() throws IOException
{
241 return new GlobalQuotaSettingsImpl(userName
, null, namespace
, null,
242 QuotaUtil
.getUserQuota(masterServices
.getConnection(), userName
, namespace
));
245 public void update(GlobalQuotaSettingsImpl quotaPojo
) throws IOException
{
246 QuotaUtil
.addUserQuota(masterServices
.getConnection(), userName
, namespace
,
247 quotaPojo
.toQuotas());
250 public void delete() throws IOException
{
251 QuotaUtil
.deleteUserQuota(masterServices
.getConnection(), userName
, namespace
);
254 public void preApply(GlobalQuotaSettingsImpl quotaPojo
) throws IOException
{
255 masterServices
.getMasterCoprocessorHost().preSetUserQuota(
256 userName
, namespace
, quotaPojo
);
259 public void postApply(GlobalQuotaSettingsImpl quotaPojo
) throws IOException
{
260 masterServices
.getMasterCoprocessorHost().postSetUserQuota(
261 userName
, namespace
, quotaPojo
);
266 public void setTableQuota(final TableName table
, final SetQuotaRequest req
)
267 throws IOException
, InterruptedException
{
268 setQuota(req
, new SetQuotaOperations() {
270 public GlobalQuotaSettingsImpl
fetch() throws IOException
{
271 return new GlobalQuotaSettingsImpl(null, table
, null, null,
272 QuotaUtil
.getTableQuota(masterServices
.getConnection(), table
));
275 public void update(GlobalQuotaSettingsImpl quotaPojo
) throws IOException
{
276 QuotaUtil
.addTableQuota(masterServices
.getConnection(), table
, quotaPojo
.toQuotas());
279 public void delete() throws IOException
{
280 SpaceQuotaSnapshot currSnapshotOfTable
=
281 QuotaTableUtil
.getCurrentSnapshotFromQuotaTable(masterServices
.getConnection(), table
);
282 QuotaUtil
.deleteTableQuota(masterServices
.getConnection(), table
);
283 if (currSnapshotOfTable
!= null) {
284 SpaceQuotaStatus quotaStatus
= currSnapshotOfTable
.getQuotaStatus();
285 if (SpaceViolationPolicy
.DISABLE
== quotaStatus
.getPolicy().orElse(null)
286 && quotaStatus
.isInViolation()) {
287 QuotaUtil
.enableTableIfNotEnabled(masterServices
.getConnection(), table
);
292 public void preApply(GlobalQuotaSettingsImpl quotaPojo
) throws IOException
{
293 masterServices
.getMasterCoprocessorHost().preSetTableQuota(table
, quotaPojo
);
296 public void postApply(GlobalQuotaSettingsImpl quotaPojo
) throws IOException
{
297 masterServices
.getMasterCoprocessorHost().postSetTableQuota(table
, quotaPojo
);
302 public void setNamespaceQuota(final String namespace
, final SetQuotaRequest req
)
303 throws IOException
, InterruptedException
{
304 setQuota(req
, new SetQuotaOperations() {
306 public GlobalQuotaSettingsImpl
fetch() throws IOException
{
307 return new GlobalQuotaSettingsImpl(null, null, namespace
, null,
308 QuotaUtil
.getNamespaceQuota(masterServices
.getConnection(), namespace
));
311 public void update(GlobalQuotaSettingsImpl quotaPojo
) throws IOException
{
312 QuotaUtil
.addNamespaceQuota(masterServices
.getConnection(), namespace
,
313 quotaPojo
.toQuotas());
316 public void delete() throws IOException
{
317 QuotaUtil
.deleteNamespaceQuota(masterServices
.getConnection(), namespace
);
320 public void preApply(GlobalQuotaSettingsImpl quotaPojo
) throws IOException
{
321 masterServices
.getMasterCoprocessorHost().preSetNamespaceQuota(namespace
, quotaPojo
);
324 public void postApply(GlobalQuotaSettingsImpl quotaPojo
) throws IOException
{
325 masterServices
.getMasterCoprocessorHost().postSetNamespaceQuota(namespace
, quotaPojo
);
330 public void setRegionServerQuota(final String regionServer
, final SetQuotaRequest req
)
331 throws IOException
, InterruptedException
{
332 setQuota(req
, new SetQuotaOperations() {
334 public GlobalQuotaSettingsImpl
fetch() throws IOException
{
335 return new GlobalQuotaSettingsImpl(null, null, null, regionServer
,
336 QuotaUtil
.getRegionServerQuota(masterServices
.getConnection(), regionServer
));
340 public void update(GlobalQuotaSettingsImpl quotaPojo
) throws IOException
{
341 QuotaUtil
.addRegionServerQuota(masterServices
.getConnection(), regionServer
,
342 quotaPojo
.toQuotas());
346 public void delete() throws IOException
{
347 QuotaUtil
.deleteRegionServerQuota(masterServices
.getConnection(), regionServer
);
351 public void preApply(GlobalQuotaSettingsImpl quotaPojo
) throws IOException
{
352 masterServices
.getMasterCoprocessorHost().preSetRegionServerQuota(regionServer
, quotaPojo
);
356 public void postApply(GlobalQuotaSettingsImpl quotaPojo
) throws IOException
{
357 masterServices
.getMasterCoprocessorHost().postSetRegionServerQuota(regionServer
, quotaPojo
);
362 public void setNamespaceQuota(NamespaceDescriptor desc
) throws IOException
{
364 this.namespaceQuotaManager
.addNamespace(desc
);
368 public void removeNamespaceQuota(String namespace
) throws IOException
{
370 this.namespaceQuotaManager
.deleteNamespace(namespace
);
374 public SwitchRpcThrottleResponse
switchRpcThrottle(SwitchRpcThrottleRequest request
)
376 boolean rpcThrottle
= request
.getRpcThrottleEnabled();
378 masterServices
.getMasterCoprocessorHost().preSwitchRpcThrottle(rpcThrottle
);
379 boolean oldRpcThrottle
= rpcThrottleStorage
.isRpcThrottleEnabled();
380 if (rpcThrottle
!= oldRpcThrottle
) {
381 LOG
.info("{} switch rpc throttle from {} to {}", masterServices
.getClientIdAuditPrefix(),
382 oldRpcThrottle
, rpcThrottle
);
383 ProcedurePrepareLatch latch
= ProcedurePrepareLatch
.createBlockingLatch();
384 SwitchRpcThrottleProcedure procedure
= new SwitchRpcThrottleProcedure(rpcThrottleStorage
,
385 rpcThrottle
, masterServices
.getServerName(), latch
);
386 masterServices
.getMasterProcedureExecutor().submitProcedure(procedure
);
389 LOG
.warn("Skip switch rpc throttle to {} because it's the same with old value",
392 SwitchRpcThrottleResponse response
= SwitchRpcThrottleResponse
.newBuilder()
393 .setPreviousRpcThrottleEnabled(oldRpcThrottle
).build();
394 masterServices
.getMasterCoprocessorHost().postSwitchRpcThrottle(oldRpcThrottle
, rpcThrottle
);
397 LOG
.warn("Skip switch rpc throttle to {} because rpc quota is disabled", rpcThrottle
);
398 return SwitchRpcThrottleResponse
.newBuilder().setPreviousRpcThrottleEnabled(false).build();
402 public IsRpcThrottleEnabledResponse
isRpcThrottleEnabled(IsRpcThrottleEnabledRequest request
)
405 masterServices
.getMasterCoprocessorHost().preIsRpcThrottleEnabled();
406 boolean enabled
= isRpcThrottleEnabled();
407 IsRpcThrottleEnabledResponse response
=
408 IsRpcThrottleEnabledResponse
.newBuilder().setRpcThrottleEnabled(enabled
).build();
409 masterServices
.getMasterCoprocessorHost().postIsRpcThrottleEnabled(enabled
);
412 LOG
.warn("Skip get rpc throttle because rpc quota is disabled");
413 return IsRpcThrottleEnabledResponse
.newBuilder().setRpcThrottleEnabled(false).build();
417 public boolean isRpcThrottleEnabled() throws IOException
{
418 return initialized ? rpcThrottleStorage
.isRpcThrottleEnabled() : false;
421 public SwitchExceedThrottleQuotaResponse
422 switchExceedThrottleQuota(SwitchExceedThrottleQuotaRequest request
) throws IOException
{
423 boolean enabled
= request
.getExceedThrottleQuotaEnabled();
425 masterServices
.getMasterCoprocessorHost().preSwitchExceedThrottleQuota(enabled
);
426 boolean previousEnabled
=
427 QuotaUtil
.isExceedThrottleQuotaEnabled(masterServices
.getConnection());
428 if (previousEnabled
== enabled
) {
429 LOG
.warn("Skip switch exceed throttle quota to {} because it's the same with old value",
432 QuotaUtil
.switchExceedThrottleQuota(masterServices
.getConnection(), enabled
);
433 LOG
.info("{} switch exceed throttle quota from {} to {}",
434 masterServices
.getClientIdAuditPrefix(), previousEnabled
, enabled
);
436 SwitchExceedThrottleQuotaResponse response
= SwitchExceedThrottleQuotaResponse
.newBuilder()
437 .setPreviousExceedThrottleQuotaEnabled(previousEnabled
).build();
438 masterServices
.getMasterCoprocessorHost().postSwitchExceedThrottleQuota(previousEnabled
,
442 LOG
.warn("Skip switch exceed throttle quota to {} because quota is disabled", enabled
);
443 return SwitchExceedThrottleQuotaResponse
.newBuilder()
444 .setPreviousExceedThrottleQuotaEnabled(false).build();
448 public boolean isExceedThrottleQuotaEnabled() throws IOException
{
449 return initialized ? QuotaUtil
.isExceedThrottleQuotaEnabled(masterServices
.getConnection())
453 private void setQuota(final SetQuotaRequest req
, final SetQuotaOperations quotaOps
)
454 throws IOException
, InterruptedException
{
455 if (req
.hasRemoveAll() && req
.getRemoveAll() == true) {
456 quotaOps
.preApply(null);
458 quotaOps
.postApply(null);
462 // Apply quota changes
463 GlobalQuotaSettingsImpl currentQuota
= quotaOps
.fetch();
464 if (LOG
.isTraceEnabled()) {
466 "Current quota for request(" + TextFormat
.shortDebugString(req
)
467 + "): " + currentQuota
);
469 // Call the appropriate "pre" CP hook with the current quota value (may be null)
470 quotaOps
.preApply(currentQuota
);
471 // Translate the protobuf request back into a POJO
472 QuotaSettings newQuota
= QuotaSettings
.buildFromProto(req
);
473 if (LOG
.isTraceEnabled()) {
474 LOG
.trace("Deserialized quota from request: " + newQuota
);
477 // Merge the current quota settings with the new quota settings the user provided.
479 // NB: while SetQuotaRequest technically allows for multi types of quotas to be set in one
480 // message, the Java API (in Admin/AsyncAdmin) does not. Assume there is only one type.
481 GlobalQuotaSettingsImpl mergedQuota
= currentQuota
.merge(newQuota
);
482 if (LOG
.isTraceEnabled()) {
483 LOG
.trace("Computed merged quota from current quota and user request: " + mergedQuota
);
486 // Submit new changes
487 if (mergedQuota
== null) {
490 quotaOps
.update(mergedQuota
);
492 // Advertise the final result via the "post" CP hook
493 quotaOps
.postApply(mergedQuota
);
496 public void checkNamespaceTableAndRegionQuota(TableName tName
, int regions
) throws IOException
{
498 namespaceQuotaManager
.checkQuotaToCreateTable(tName
, regions
);
502 public void checkAndUpdateNamespaceRegionQuota(TableName tName
, int regions
) throws IOException
{
504 namespaceQuotaManager
.checkQuotaToUpdateRegion(tName
, regions
);
509 * @return cached region count, or -1 if quota manager is disabled or table status not found
511 public int getRegionCountOfTable(TableName tName
) throws IOException
{
513 return namespaceQuotaManager
.getRegionCountOfTable(tName
);
519 public void onRegionMerged(RegionInfo mergedRegion
) throws IOException
{
521 namespaceQuotaManager
.updateQuotaForRegionMerge(mergedRegion
);
526 public void onRegionSplit(RegionInfo hri
) throws IOException
{
528 namespaceQuotaManager
.checkQuotaToSplitRegion(hri
);
533 * Remove table from namespace quota.
535 * @param tName - The table name to update quota usage.
536 * @throws IOException Signals that an I/O exception has occurred.
538 public void removeTableFromNamespaceQuota(TableName tName
) throws IOException
{
540 namespaceQuotaManager
.removeFromNamespaceUsage(tName
);
544 public NamespaceAuditor
getNamespaceQuotaManager() {
545 return this.namespaceQuotaManager
;
549 * Encapsulates CRUD quota operations for some subject.
551 private static interface SetQuotaOperations
{
553 * Fetches the current quota settings for the subject.
555 GlobalQuotaSettingsImpl
fetch() throws IOException
;
557 * Deletes the quota for the subject.
559 void delete() throws IOException
;
561 * Persist the given quota for the subject.
563 void update(GlobalQuotaSettingsImpl quotaPojo
) throws IOException
;
565 * Performs some action before {@link #update(GlobalQuotaSettingsImpl)} with the current
566 * quota for the subject.
568 void preApply(GlobalQuotaSettingsImpl quotaPojo
) throws IOException
;
570 * Performs some action after {@link #update(GlobalQuotaSettingsImpl)} with the resulting
571 * quota from the request action for the subject.
573 void postApply(GlobalQuotaSettingsImpl quotaPojo
) throws IOException
;
576 /* ==========================================================================
580 private void checkQuotaSupport() throws IOException
{
581 if (!QuotaUtil
.isQuotaEnabled(masterServices
.getConfiguration())) {
582 throw new DoNotRetryIOException(
583 new UnsupportedOperationException("quota support disabled"));
586 long maxWaitTime
= masterServices
.getConfiguration().getLong(
587 "hbase.master.wait.for.quota.manager.init", 30000); // default is 30 seconds.
588 long startTime
= EnvironmentEdgeManager
.currentTime();
592 } catch (InterruptedException e
) {
593 LOG
.warn("Interrupted while waiting for Quota Manager to be initialized.");
596 } while (!initialized
&& (EnvironmentEdgeManager
.currentTime() - startTime
) < maxWaitTime
);
598 throw new IOException("Quota manager is uninitialized, please retry later.");
603 private void createQuotaTable() throws IOException
{
604 masterServices
.createSystemTable(QuotaUtil
.QUOTA_TABLE_DESC
);
607 private static class NamedLock
<T
> {
608 private final HashSet
<T
> locks
= new HashSet
<>();
610 public void lock(final T name
) throws InterruptedException
{
611 synchronized (locks
) {
612 while (locks
.contains(name
)) {
619 public void unlock(final T name
) {
620 synchronized (locks
) {
628 public void onRegionSplitReverted(RegionInfo hri
) throws IOException
{
630 this.namespaceQuotaManager
.removeRegionFromNamespaceUsage(hri
);
635 * Holds the size of a region at the given time, millis since the epoch.
637 private static class SizeSnapshotWithTimestamp
{
638 private final long size
;
639 private final long time
;
641 public SizeSnapshotWithTimestamp(long size
, long time
) {
646 public long getSize() {
650 public long getTime() {
655 public boolean equals(Object o
) {
656 if (o
instanceof SizeSnapshotWithTimestamp
) {
657 SizeSnapshotWithTimestamp other
= (SizeSnapshotWithTimestamp
) o
;
658 return size
== other
.size
&& time
== other
.time
;
664 public int hashCode() {
665 HashCodeBuilder hcb
= new HashCodeBuilder();
666 return hcb
.append(size
).append(time
).toHashCode();
670 public String
toString() {
671 StringBuilder sb
= new StringBuilder(32);
672 sb
.append("SizeSnapshotWithTimestamp={size=").append(size
).append("B, ");
673 sb
.append("time=").append(time
).append("}");
674 return sb
.toString();
678 void initializeRegionSizes() {
679 assert regionSizes
== null;
680 this.regionSizes
= new ConcurrentHashMap
<>();
683 public void addRegionSize(RegionInfo hri
, long size
, long time
) {
684 if (regionSizes
== null) {
687 regionSizes
.put(hri
, new SizeSnapshotWithTimestamp(size
, time
));
690 public Map
<RegionInfo
, Long
> snapshotRegionSizes() {
691 if (regionSizes
== null) {
695 Map
<RegionInfo
, Long
> copy
= new HashMap
<>();
696 for (Entry
<RegionInfo
, SizeSnapshotWithTimestamp
> entry
: regionSizes
.entrySet()) {
697 copy
.put(entry
.getKey(), entry
.getValue().getSize());
702 int pruneEntriesOlderThan(long timeToPruneBefore
, QuotaObserverChore quotaObserverChore
) {
703 if (regionSizes
== null) {
706 int numEntriesRemoved
= 0;
707 Iterator
<Entry
<RegionInfo
, SizeSnapshotWithTimestamp
>> iterator
=
708 regionSizes
.entrySet().iterator();
709 while (iterator
.hasNext()) {
710 RegionInfo regionInfo
= iterator
.next().getKey();
711 long currentEntryTime
= regionSizes
.get(regionInfo
).getTime();
712 // do not prune the entries if table is in violation and
713 // violation policy is disable to avoid cycle of enable/disable.
714 // Please refer HBASE-22012 for more details.
715 // prune entries older than time.
716 if (currentEntryTime
< timeToPruneBefore
&& !isInViolationAndPolicyDisable(
717 regionInfo
.getTable(), quotaObserverChore
)) {
722 return numEntriesRemoved
;
726 * Method to check if a table is in violation and policy set on table is DISABLE.
728 * @param tableName tableName to check.
729 * @param quotaObserverChore QuotaObserverChore instance
730 * @return returns true if table is in violation and policy is disable else false.
732 private boolean isInViolationAndPolicyDisable(TableName tableName
,
733 QuotaObserverChore quotaObserverChore
) {
734 boolean isInViolationAtTable
= false;
735 boolean isInViolationAtNamespace
= false;
736 SpaceViolationPolicy tablePolicy
= null;
737 SpaceViolationPolicy namespacePolicy
= null;
738 // Get Current Snapshot for the given table
739 SpaceQuotaSnapshot tableQuotaSnapshot
= quotaObserverChore
.getTableQuotaSnapshot(tableName
);
740 SpaceQuotaSnapshot namespaceQuotaSnapshot
=
741 quotaObserverChore
.getNamespaceQuotaSnapshot(tableName
.getNamespaceAsString());
742 if (tableQuotaSnapshot
!= null) {
743 // check if table in violation
744 isInViolationAtTable
= tableQuotaSnapshot
.getQuotaStatus().isInViolation();
745 Optional
<SpaceViolationPolicy
> policy
= tableQuotaSnapshot
.getQuotaStatus().getPolicy();
746 if (policy
.isPresent()) {
747 tablePolicy
= policy
.get();
750 if (namespaceQuotaSnapshot
!= null) {
751 // check namespace in violation
752 isInViolationAtNamespace
= namespaceQuotaSnapshot
.getQuotaStatus().isInViolation();
753 Optional
<SpaceViolationPolicy
> policy
= namespaceQuotaSnapshot
.getQuotaStatus().getPolicy();
754 if (policy
.isPresent()) {
755 namespacePolicy
= policy
.get();
758 return (tablePolicy
== SpaceViolationPolicy
.DISABLE
&& isInViolationAtTable
) || (
759 namespacePolicy
== SpaceViolationPolicy
.DISABLE
&& isInViolationAtNamespace
);
762 public void processFileArchivals(FileArchiveNotificationRequest request
, Connection conn
,
763 Configuration conf
, FileSystem fs
) throws IOException
{
764 final HashMultimap
<TableName
,Entry
<String
,Long
>> archivedFilesByTable
= HashMultimap
.create();
765 // Group the archived files by table
766 for (FileWithSize fileWithSize
: request
.getArchivedFilesList()) {
767 TableName tn
= ProtobufUtil
.toTableName(fileWithSize
.getTableName());
768 archivedFilesByTable
.put(
769 tn
, Maps
.immutableEntry(fileWithSize
.getName(), fileWithSize
.getSize()));
771 if (LOG
.isTraceEnabled()) {
772 LOG
.trace("Grouped archived files by table: " + archivedFilesByTable
);
774 // Report each set of files to the appropriate object
775 for (TableName tn
: archivedFilesByTable
.keySet()) {
776 final Set
<Entry
<String
,Long
>> filesWithSize
= archivedFilesByTable
.get(tn
);
777 final FileArchiverNotifier notifier
= FileArchiverNotifierFactoryImpl
.getInstance().get(
779 notifier
.addArchivedFiles(filesWithSize
);
784 * Removes each region size entry where the RegionInfo references the provided TableName.
786 * @param tableName tableName.
788 public void removeRegionSizesForTable(TableName tableName
) {
789 regionSizes
.keySet().removeIf(regionInfo
-> regionInfo
.getTable().equals(tableName
));