2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
.quotas
;
21 import java
.util
.Arrays
;
22 import java
.util
.List
;
24 import org
.apache
.hadoop
.conf
.Configuration
;
25 import org
.apache
.yetus
.audience
.InterfaceAudience
;
26 import org
.apache
.yetus
.audience
.InterfaceStability
;
27 import org
.slf4j
.Logger
;
28 import org
.slf4j
.LoggerFactory
;
29 import org
.apache
.hadoop
.hbase
.client
.Mutation
;
30 import org
.apache
.hadoop
.hbase
.client
.Result
;
32 @InterfaceAudience.Private
33 @InterfaceStability.Evolving
34 public class DefaultOperationQuota
implements OperationQuota
{
35 private static final Logger LOG
= LoggerFactory
.getLogger(DefaultOperationQuota
.class);
37 private final List
<QuotaLimiter
> limiters
;
38 private final long writeCapacityUnit
;
39 private final long readCapacityUnit
;
41 private long writeAvailable
= 0;
42 private long readAvailable
= 0;
43 private long writeConsumed
= 0;
44 private long readConsumed
= 0;
45 private long writeCapacityUnitConsumed
= 0;
46 private long readCapacityUnitConsumed
= 0;
47 private final long[] operationSize
;
49 public DefaultOperationQuota(final Configuration conf
, final QuotaLimiter
... limiters
) {
50 this(conf
, Arrays
.asList(limiters
));
54 * NOTE: The order matters. It should be something like [user, table, namespace, global]
56 public DefaultOperationQuota(final Configuration conf
, final List
<QuotaLimiter
> limiters
) {
57 this.writeCapacityUnit
=
58 conf
.getLong(QuotaUtil
.WRITE_CAPACITY_UNIT_CONF_KEY
, QuotaUtil
.DEFAULT_WRITE_CAPACITY_UNIT
);
59 this.readCapacityUnit
=
60 conf
.getLong(QuotaUtil
.READ_CAPACITY_UNIT_CONF_KEY
, QuotaUtil
.DEFAULT_READ_CAPACITY_UNIT
);
61 this.limiters
= limiters
;
62 int size
= OperationType
.values().length
;
63 operationSize
= new long[size
];
65 for (int i
= 0; i
< size
; ++i
) {
71 public void checkQuota(int numWrites
, int numReads
, int numScans
) throws RpcThrottlingException
{
72 writeConsumed
= estimateConsume(OperationType
.MUTATE
, numWrites
, 100);
73 readConsumed
= estimateConsume(OperationType
.GET
, numReads
, 100);
74 readConsumed
+= estimateConsume(OperationType
.SCAN
, numScans
, 1000);
76 writeCapacityUnitConsumed
= calculateWriteCapacityUnit(writeConsumed
);
77 readCapacityUnitConsumed
= calculateReadCapacityUnit(readConsumed
);
79 writeAvailable
= Long
.MAX_VALUE
;
80 readAvailable
= Long
.MAX_VALUE
;
81 for (final QuotaLimiter limiter
: limiters
) {
82 if (limiter
.isBypass()) continue;
84 limiter
.checkQuota(numWrites
, writeConsumed
, numReads
+ numScans
, readConsumed
,
85 writeCapacityUnitConsumed
, readCapacityUnitConsumed
);
86 readAvailable
= Math
.min(readAvailable
, limiter
.getReadAvailable());
87 writeAvailable
= Math
.min(writeAvailable
, limiter
.getWriteAvailable());
90 for (final QuotaLimiter limiter
: limiters
) {
91 limiter
.grabQuota(numWrites
, writeConsumed
, numReads
+ numScans
, readConsumed
,
92 writeCapacityUnitConsumed
, readCapacityUnitConsumed
);
98 // Adjust the quota consumed for the specified operation
99 long writeDiff
= operationSize
[OperationType
.MUTATE
.ordinal()] - writeConsumed
;
100 long readDiff
= operationSize
[OperationType
.GET
.ordinal()]
101 + operationSize
[OperationType
.SCAN
.ordinal()] - readConsumed
;
102 long writeCapacityUnitDiff
= calculateWriteCapacityUnitDiff(
103 operationSize
[OperationType
.MUTATE
.ordinal()], writeConsumed
);
104 long readCapacityUnitDiff
= calculateReadCapacityUnitDiff(
105 operationSize
[OperationType
.GET
.ordinal()] + operationSize
[OperationType
.SCAN
.ordinal()],
108 for (final QuotaLimiter limiter
: limiters
) {
109 if (writeDiff
!= 0) {
110 limiter
.consumeWrite(writeDiff
, writeCapacityUnitDiff
);
113 limiter
.consumeRead(readDiff
, readCapacityUnitDiff
);
119 public long getReadAvailable() {
120 return readAvailable
;
124 public long getWriteAvailable() {
125 return writeAvailable
;
129 public void addGetResult(final Result result
) {
130 operationSize
[OperationType
.GET
.ordinal()] += QuotaUtil
.calculateResultSize(result
);
134 public void addScanResult(final List
<Result
> results
) {
135 operationSize
[OperationType
.SCAN
.ordinal()] += QuotaUtil
.calculateResultSize(results
);
139 public void addMutation(final Mutation mutation
) {
140 operationSize
[OperationType
.MUTATE
.ordinal()] += QuotaUtil
.calculateMutationSize(mutation
);
143 private long estimateConsume(final OperationType type
, int numReqs
, long avgSize
) {
145 return avgSize
* numReqs
;
150 private long calculateWriteCapacityUnit(final long size
) {
151 return (long) Math
.ceil(size
* 1.0 / this.writeCapacityUnit
);
154 private long calculateReadCapacityUnit(final long size
) {
155 return (long) Math
.ceil(size
* 1.0 / this.readCapacityUnit
);
158 private long calculateWriteCapacityUnitDiff(final long actualSize
, final long estimateSize
) {
159 return calculateWriteCapacityUnit(actualSize
) - calculateWriteCapacityUnit(estimateSize
);
162 private long calculateReadCapacityUnitDiff(final long actualSize
, final long estimateSize
) {
163 return calculateReadCapacityUnit(actualSize
) - calculateReadCapacityUnit(estimateSize
);