2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to you under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 package org
.apache
.hadoop
.hbase
.quotas
;
19 import java
.io
.IOException
;
20 import java
.util
.Collection
;
21 import java
.util
.HashMap
;
23 import java
.util
.Objects
;
24 import java
.util
.concurrent
.ConcurrentHashMap
;
25 import java
.util
.concurrent
.atomic
.AtomicReference
;
26 import java
.util
.Map
.Entry
;
28 import org
.apache
.hadoop
.hbase
.TableName
;
29 import org
.apache
.yetus
.audience
.InterfaceAudience
;
30 import org
.slf4j
.Logger
;
31 import org
.slf4j
.LoggerFactory
;
32 import org
.apache
.hadoop
.hbase
.client
.Connection
;
33 import org
.apache
.hadoop
.hbase
.quotas
.SpaceQuotaSnapshot
.SpaceQuotaStatus
;
34 import org
.apache
.hadoop
.hbase
.regionserver
.RegionServerServices
;
36 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.annotations
.VisibleForTesting
;
37 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.TextFormat
;
39 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ProtobufUtil
;
40 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.HBaseProtos
;
41 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
;
44 * A manager for filesystem space quotas in the RegionServer.
46 * This class is the centralized point for what a RegionServer knows about space quotas
47 * on tables. For each table, it tracks two different things: the {@link SpaceQuotaSnapshot}
48 * and a {@link SpaceViolationPolicyEnforcement} (which may be null when a quota is not
49 * being violated). Both of these are sensitive on when they were last updated. The
50 * {link SpaceQutoaViolationPolicyRefresherChore} periodically runs and updates
51 * the state on <code>this</code>.
53 @InterfaceAudience.Private
54 public class RegionServerSpaceQuotaManager
{
55 private static final Logger LOG
= LoggerFactory
.getLogger(RegionServerSpaceQuotaManager
.class);
57 private final RegionServerServices rsServices
;
59 private SpaceQuotaRefresherChore spaceQuotaRefresher
;
60 private AtomicReference
<Map
<TableName
, SpaceQuotaSnapshot
>> currentQuotaSnapshots
;
61 private boolean started
= false;
62 private final ConcurrentHashMap
<TableName
,SpaceViolationPolicyEnforcement
> enforcedPolicies
;
63 private SpaceViolationPolicyEnforcementFactory factory
;
64 private RegionSizeStore regionSizeStore
;
65 private RegionSizeReportingChore regionSizeReporter
;
67 public RegionServerSpaceQuotaManager(RegionServerServices rsServices
) {
68 this(rsServices
, SpaceViolationPolicyEnforcementFactory
.getInstance());
72 RegionServerSpaceQuotaManager(
73 RegionServerServices rsServices
, SpaceViolationPolicyEnforcementFactory factory
) {
74 this.rsServices
= Objects
.requireNonNull(rsServices
);
75 this.factory
= factory
;
76 this.enforcedPolicies
= new ConcurrentHashMap
<>();
77 this.currentQuotaSnapshots
= new AtomicReference
<>(new HashMap
<>());
78 // Initialize the size store to not track anything -- create the real one if we're start()'ed
79 this.regionSizeStore
= NoOpRegionSizeStore
.getInstance();
82 public synchronized void start() throws IOException
{
83 if (!QuotaUtil
.isQuotaEnabled(rsServices
.getConfiguration())) {
84 LOG
.info("Quota support disabled, not starting space quota manager.");
89 LOG
.warn("RegionServerSpaceQuotaManager has already been started!");
93 this.spaceQuotaRefresher
= new SpaceQuotaRefresherChore(this, rsServices
.getClusterConnection());
94 rsServices
.getChoreService().scheduleChore(spaceQuotaRefresher
);
95 this.regionSizeReporter
= new RegionSizeReportingChore(rsServices
);
96 rsServices
.getChoreService().scheduleChore(regionSizeReporter
);
97 // Instantiate the real RegionSizeStore
98 this.regionSizeStore
= RegionSizeStoreFactory
.getInstance().createStore();
102 public synchronized void stop() {
103 if (spaceQuotaRefresher
!= null) {
104 spaceQuotaRefresher
.cancel();
105 spaceQuotaRefresher
= null;
107 if (regionSizeReporter
!= null) {
108 regionSizeReporter
.cancel();
109 regionSizeReporter
= null;
115 * @return if the {@code Chore} has been started.
117 public boolean isStarted() {
122 * Copies the last {@link SpaceQuotaSnapshot}s that were recorded. The current view
123 * of what the RegionServer thinks the table's utilization is.
125 public Map
<TableName
,SpaceQuotaSnapshot
> copyQuotaSnapshots() {
126 return new HashMap
<>(currentQuotaSnapshots
.get());
130 * Updates the current {@link SpaceQuotaSnapshot}s for the RegionServer.
132 * @param newSnapshots The space quota snapshots.
134 public void updateQuotaSnapshot(Map
<TableName
,SpaceQuotaSnapshot
> newSnapshots
) {
135 currentQuotaSnapshots
.set(Objects
.requireNonNull(newSnapshots
));
139 * Creates an object well-suited for the RegionServer to use in verifying active policies.
141 public ActivePolicyEnforcement
getActiveEnforcements() {
142 return new ActivePolicyEnforcement(copyActiveEnforcements(), copyQuotaSnapshots(), rsServices
);
146 * Converts a map of table to {@link SpaceViolationPolicyEnforcement}s into
147 * {@link SpaceViolationPolicy}s.
149 public Map
<TableName
, SpaceQuotaSnapshot
> getActivePoliciesAsMap() {
150 final Map
<TableName
, SpaceViolationPolicyEnforcement
> enforcements
=
151 copyActiveEnforcements();
152 final Map
<TableName
, SpaceQuotaSnapshot
> policies
= new HashMap
<>();
153 for (Entry
<TableName
, SpaceViolationPolicyEnforcement
> entry
: enforcements
.entrySet()) {
154 final SpaceQuotaSnapshot snapshot
= entry
.getValue().getQuotaSnapshot();
155 if (snapshot
!= null) {
156 policies
.put(entry
.getKey(), snapshot
);
163 * Enforces the given violationPolicy on the given table in this RegionServer.
165 public void enforceViolationPolicy(TableName tableName
, SpaceQuotaSnapshot snapshot
) {
166 SpaceQuotaStatus status
= snapshot
.getQuotaStatus();
167 if (!status
.isInViolation()) {
168 throw new IllegalStateException(
169 tableName
+ " is not in violation. Violation policy should not be enabled.");
171 if (LOG
.isTraceEnabled()) {
173 "Enabling violation policy enforcement on " + tableName
174 + " with policy " + status
.getPolicy());
176 // Construct this outside of the lock
177 final SpaceViolationPolicyEnforcement enforcement
= getFactory().create(
178 getRegionServerServices(), tableName
, snapshot
);
179 // "Enables" the policy
180 // HBASE-XXXX: Should this synchronize on the actual table name instead of the map? That would
181 // allow policy enable/disable on different tables to happen concurrently. As written now, only
182 // one table will be allowed to transition at a time. This is probably OK, but not sure if
183 // it would become a bottleneck at large clusters/number of tables.
184 synchronized (enforcedPolicies
) {
186 enforcement
.enable();
187 } catch (IOException e
) {
188 LOG
.error("Failed to enable space violation policy for " + tableName
189 + ". This table will not enter violation.", e
);
192 enforcedPolicies
.put(tableName
, enforcement
);
197 * Disables enforcement on any violation policy on the given <code>tableName</code>.
199 public void disableViolationPolicyEnforcement(TableName tableName
) {
200 if (LOG
.isTraceEnabled()) {
201 LOG
.trace("Disabling violation policy enforcement on " + tableName
);
203 // "Disables" the policy
204 synchronized (enforcedPolicies
) {
205 SpaceViolationPolicyEnforcement enforcement
= enforcedPolicies
.remove(tableName
);
206 if (enforcement
!= null) {
208 enforcement
.disable();
209 } catch (IOException e
) {
210 LOG
.error("Failed to disable space violation policy for " + tableName
211 + ". This table will remain in violation.", e
);
212 enforcedPolicies
.put(tableName
, enforcement
);
219 * Returns whether or not compactions should be disabled for the given <code>tableName</code> per
220 * a space quota violation policy. A convenience method.
222 * @param tableName The table to check
223 * @return True if compactions should be disabled for the table, false otherwise.
225 public boolean areCompactionsDisabled(TableName tableName
) {
226 SpaceViolationPolicyEnforcement enforcement
= this.enforcedPolicies
.get(Objects
.requireNonNull(tableName
));
227 if (enforcement
!= null) {
228 return enforcement
.areCompactionsDisabled();
234 * Returns the {@link RegionSizeStore} tracking filesystem utilization by each region.
236 * @return A {@link RegionSizeStore} implementation.
238 public RegionSizeStore
getRegionSizeStore() {
239 return regionSizeStore
;
243 * Builds the protobuf message to inform the Master of files being archived.
245 * @param tn The table the files previously belonged to.
246 * @param archivedFiles The files and their size in bytes that were archived.
247 * @return The protobuf representation
249 public RegionServerStatusProtos
.FileArchiveNotificationRequest
buildFileArchiveRequest(
250 TableName tn
, Collection
<Entry
<String
,Long
>> archivedFiles
) {
251 RegionServerStatusProtos
.FileArchiveNotificationRequest
.Builder builder
=
252 RegionServerStatusProtos
.FileArchiveNotificationRequest
.newBuilder();
253 HBaseProtos
.TableName protoTn
= ProtobufUtil
.toProtoTableName(tn
);
254 for (Entry
<String
,Long
> archivedFile
: archivedFiles
) {
255 RegionServerStatusProtos
.FileArchiveNotificationRequest
.FileWithSize fws
=
256 RegionServerStatusProtos
.FileArchiveNotificationRequest
.FileWithSize
.newBuilder()
257 .setName(archivedFile
.getKey())
258 .setSize(archivedFile
.getValue())
259 .setTableName(protoTn
)
261 builder
.addArchivedFiles(fws
);
263 final RegionServerStatusProtos
.FileArchiveNotificationRequest request
= builder
.build();
264 if (LOG
.isTraceEnabled()) {
265 LOG
.trace("Reporting file archival to Master: " + TextFormat
.shortDebugString(request
));
271 * Returns the collection of tables which have quota violation policies enforced on
274 Map
<TableName
,SpaceViolationPolicyEnforcement
> copyActiveEnforcements() {
275 // Allows reads to happen concurrently (or while the map is being updated)
276 return new HashMap
<>(this.enforcedPolicies
);
279 RegionServerServices
getRegionServerServices() {
283 Connection
getConnection() {
284 return rsServices
.getConnection();
287 SpaceViolationPolicyEnforcementFactory
getFactory() {