2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
.quotas
;
21 import java
.io
.IOException
;
22 import java
.util
.Collections
;
23 import java
.util
.HashMap
;
24 import java
.util
.HashSet
;
25 import java
.util
.Iterator
;
27 import java
.util
.Map
.Entry
;
29 import java
.util
.concurrent
.ConcurrentHashMap
;
31 import org
.apache
.commons
.lang3
.builder
.HashCodeBuilder
;
32 import org
.apache
.hadoop
.conf
.Configuration
;
33 import org
.apache
.hadoop
.fs
.FileSystem
;
34 import org
.apache
.hadoop
.hbase
.DoNotRetryIOException
;
35 import org
.apache
.hadoop
.hbase
.MetaTableAccessor
;
36 import org
.apache
.hadoop
.hbase
.NamespaceDescriptor
;
37 import org
.apache
.hadoop
.hbase
.RegionStateListener
;
38 import org
.apache
.hadoop
.hbase
.TableName
;
39 import org
.apache
.hadoop
.hbase
.client
.Connection
;
40 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
41 import org
.apache
.hadoop
.hbase
.master
.MasterServices
;
42 import org
.apache
.hadoop
.hbase
.master
.procedure
.ProcedurePrepareLatch
;
43 import org
.apache
.hadoop
.hbase
.master
.procedure
.SwitchRpcThrottleProcedure
;
44 import org
.apache
.hadoop
.hbase
.namespace
.NamespaceAuditor
;
45 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
46 import org
.apache
.yetus
.audience
.InterfaceAudience
;
47 import org
.apache
.yetus
.audience
.InterfaceStability
;
48 import org
.slf4j
.Logger
;
49 import org
.slf4j
.LoggerFactory
;
51 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.annotations
.VisibleForTesting
;
52 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.HashMultimap
;
53 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.Maps
;
54 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.TextFormat
;
56 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ProtobufUtil
;
57 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.MasterProtos
.IsRpcThrottleEnabledRequest
;
58 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.MasterProtos
.IsRpcThrottleEnabledResponse
;
59 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.MasterProtos
.SetQuotaRequest
;
60 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.MasterProtos
.SetQuotaResponse
;
61 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.MasterProtos
.SwitchRpcThrottleRequest
;
62 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.MasterProtos
.SwitchRpcThrottleResponse
;
63 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.FileArchiveNotificationRequest
;
64 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.FileArchiveNotificationRequest
.FileWithSize
;
67 * Master Quota Manager.
68 * It is responsible for initialize the quota table on the first-run and
69 * provide the admin operations to interact with the quota table.
71 * TODO: FUTURE: The master will be responsible to notify each RS of quota changes
72 * and it will do the "quota aggregation" when the QuotaScope is CLUSTER.
74 @InterfaceAudience.Private
75 @InterfaceStability.Evolving
76 public class MasterQuotaManager
implements RegionStateListener
{
77 private static final Logger LOG
= LoggerFactory
.getLogger(MasterQuotaManager
.class);
78 private static final Map
<RegionInfo
, Long
> EMPTY_MAP
= Collections
.unmodifiableMap(
81 private final MasterServices masterServices
;
82 private NamedLock
<String
> namespaceLocks
;
83 private NamedLock
<TableName
> tableLocks
;
84 private NamedLock
<String
> userLocks
;
85 private NamedLock
<String
> regionServerLocks
;
86 private boolean initialized
= false;
87 private NamespaceAuditor namespaceQuotaManager
;
88 private ConcurrentHashMap
<RegionInfo
, SizeSnapshotWithTimestamp
> regionSizes
;
89 // Storage for quota rpc throttle
90 private RpcThrottleStorage rpcThrottleStorage
;
92 public MasterQuotaManager(final MasterServices masterServices
) {
93 this.masterServices
= masterServices
;
96 public void start() throws IOException
{
97 // If the user doesn't want the quota support skip all the initializations.
98 if (!QuotaUtil
.isQuotaEnabled(masterServices
.getConfiguration())) {
99 LOG
.info("Quota support disabled");
103 // Create the quota table if missing
104 if (!MetaTableAccessor
.tableExists(masterServices
.getConnection(),
105 QuotaUtil
.QUOTA_TABLE_NAME
)) {
106 LOG
.info("Quota table not found. Creating...");
110 LOG
.info("Initializing quota support");
111 namespaceLocks
= new NamedLock
<>();
112 tableLocks
= new NamedLock
<>();
113 userLocks
= new NamedLock
<>();
114 regionServerLocks
= new NamedLock
<>();
115 regionSizes
= new ConcurrentHashMap
<>();
117 namespaceQuotaManager
= new NamespaceAuditor(masterServices
);
118 namespaceQuotaManager
.start();
122 new RpcThrottleStorage(masterServices
.getZooKeeper(), masterServices
.getConfiguration());
128 public boolean isQuotaInitialized() {
129 return initialized
&& namespaceQuotaManager
.isInitialized();
132 /* ==========================================================================
133 * Admin operations to manage the quota table
135 public SetQuotaResponse
setQuota(final SetQuotaRequest req
)
136 throws IOException
, InterruptedException
{
139 if (req
.hasUserName()) {
140 userLocks
.lock(req
.getUserName());
142 if (req
.hasTableName()) {
143 setUserQuota(req
.getUserName(), ProtobufUtil
.toTableName(req
.getTableName()), req
);
144 } else if (req
.hasNamespace()) {
145 setUserQuota(req
.getUserName(), req
.getNamespace(), req
);
147 setUserQuota(req
.getUserName(), req
);
150 userLocks
.unlock(req
.getUserName());
152 } else if (req
.hasTableName()) {
153 TableName table
= ProtobufUtil
.toTableName(req
.getTableName());
154 tableLocks
.lock(table
);
156 setTableQuota(table
, req
);
158 tableLocks
.unlock(table
);
160 } else if (req
.hasNamespace()) {
161 namespaceLocks
.lock(req
.getNamespace());
163 setNamespaceQuota(req
.getNamespace(), req
);
165 namespaceLocks
.unlock(req
.getNamespace());
167 } else if (req
.hasRegionServer()) {
168 regionServerLocks
.lock(req
.getRegionServer());
170 setRegionServerQuota(req
.getRegionServer(), req
);
172 regionServerLocks
.unlock(req
.getRegionServer());
175 throw new DoNotRetryIOException(new UnsupportedOperationException(
176 "a user, a table, a namespace or region server must be specified"));
178 return SetQuotaResponse
.newBuilder().build();
181 public void setUserQuota(final String userName
, final SetQuotaRequest req
)
182 throws IOException
, InterruptedException
{
183 setQuota(req
, new SetQuotaOperations() {
185 public GlobalQuotaSettingsImpl
fetch() throws IOException
{
186 return new GlobalQuotaSettingsImpl(req
.getUserName(), null, null, null,
187 QuotaUtil
.getUserQuota(masterServices
.getConnection(), userName
));
190 public void update(GlobalQuotaSettingsImpl quotaPojo
) throws IOException
{
191 QuotaUtil
.addUserQuota(masterServices
.getConnection(), userName
, quotaPojo
.toQuotas());
194 public void delete() throws IOException
{
195 QuotaUtil
.deleteUserQuota(masterServices
.getConnection(), userName
);
198 public void preApply(GlobalQuotaSettingsImpl quotaPojo
) throws IOException
{
199 masterServices
.getMasterCoprocessorHost().preSetUserQuota(userName
, quotaPojo
);
202 public void postApply(GlobalQuotaSettingsImpl quotaPojo
) throws IOException
{
203 masterServices
.getMasterCoprocessorHost().postSetUserQuota(userName
, quotaPojo
);
208 public void setUserQuota(final String userName
, final TableName table
,
209 final SetQuotaRequest req
) throws IOException
, InterruptedException
{
210 setQuota(req
, new SetQuotaOperations() {
212 public GlobalQuotaSettingsImpl
fetch() throws IOException
{
213 return new GlobalQuotaSettingsImpl(userName
, table
, null, null,
214 QuotaUtil
.getUserQuota(masterServices
.getConnection(), userName
, table
));
217 public void update(GlobalQuotaSettingsImpl quotaPojo
) throws IOException
{
218 QuotaUtil
.addUserQuota(masterServices
.getConnection(), userName
, table
,
219 quotaPojo
.toQuotas());
222 public void delete() throws IOException
{
223 QuotaUtil
.deleteUserQuota(masterServices
.getConnection(), userName
, table
);
226 public void preApply(GlobalQuotaSettingsImpl quotaPojo
) throws IOException
{
227 masterServices
.getMasterCoprocessorHost().preSetUserQuota(userName
, table
, quotaPojo
);
230 public void postApply(GlobalQuotaSettingsImpl quotaPojo
) throws IOException
{
231 masterServices
.getMasterCoprocessorHost().postSetUserQuota(userName
, table
, quotaPojo
);
236 public void setUserQuota(final String userName
, final String namespace
,
237 final SetQuotaRequest req
) throws IOException
, InterruptedException
{
238 setQuota(req
, new SetQuotaOperations() {
240 public GlobalQuotaSettingsImpl
fetch() throws IOException
{
241 return new GlobalQuotaSettingsImpl(userName
, null, namespace
, null,
242 QuotaUtil
.getUserQuota(masterServices
.getConnection(), userName
, namespace
));
245 public void update(GlobalQuotaSettingsImpl quotaPojo
) throws IOException
{
246 QuotaUtil
.addUserQuota(masterServices
.getConnection(), userName
, namespace
,
247 quotaPojo
.toQuotas());
250 public void delete() throws IOException
{
251 QuotaUtil
.deleteUserQuota(masterServices
.getConnection(), userName
, namespace
);
254 public void preApply(GlobalQuotaSettingsImpl quotaPojo
) throws IOException
{
255 masterServices
.getMasterCoprocessorHost().preSetUserQuota(
256 userName
, namespace
, quotaPojo
);
259 public void postApply(GlobalQuotaSettingsImpl quotaPojo
) throws IOException
{
260 masterServices
.getMasterCoprocessorHost().postSetUserQuota(
261 userName
, namespace
, quotaPojo
);
266 public void setTableQuota(final TableName table
, final SetQuotaRequest req
)
267 throws IOException
, InterruptedException
{
268 setQuota(req
, new SetQuotaOperations() {
270 public GlobalQuotaSettingsImpl
fetch() throws IOException
{
271 return new GlobalQuotaSettingsImpl(null, table
, null, null,
272 QuotaUtil
.getTableQuota(masterServices
.getConnection(), table
));
275 public void update(GlobalQuotaSettingsImpl quotaPojo
) throws IOException
{
276 QuotaUtil
.addTableQuota(masterServices
.getConnection(), table
, quotaPojo
.toQuotas());
279 public void delete() throws IOException
{
280 QuotaUtil
.deleteTableQuota(masterServices
.getConnection(), table
);
283 public void preApply(GlobalQuotaSettingsImpl quotaPojo
) throws IOException
{
284 masterServices
.getMasterCoprocessorHost().preSetTableQuota(table
, quotaPojo
);
287 public void postApply(GlobalQuotaSettingsImpl quotaPojo
) throws IOException
{
288 masterServices
.getMasterCoprocessorHost().postSetTableQuota(table
, quotaPojo
);
293 public void setNamespaceQuota(final String namespace
, final SetQuotaRequest req
)
294 throws IOException
, InterruptedException
{
295 setQuota(req
, new SetQuotaOperations() {
297 public GlobalQuotaSettingsImpl
fetch() throws IOException
{
298 return new GlobalQuotaSettingsImpl(null, null, namespace
, null,
299 QuotaUtil
.getNamespaceQuota(masterServices
.getConnection(), namespace
));
302 public void update(GlobalQuotaSettingsImpl quotaPojo
) throws IOException
{
303 QuotaUtil
.addNamespaceQuota(masterServices
.getConnection(), namespace
,
304 ((GlobalQuotaSettingsImpl
) quotaPojo
).toQuotas());
307 public void delete() throws IOException
{
308 QuotaUtil
.deleteNamespaceQuota(masterServices
.getConnection(), namespace
);
311 public void preApply(GlobalQuotaSettingsImpl quotaPojo
) throws IOException
{
312 masterServices
.getMasterCoprocessorHost().preSetNamespaceQuota(namespace
, quotaPojo
);
315 public void postApply(GlobalQuotaSettingsImpl quotaPojo
) throws IOException
{
316 masterServices
.getMasterCoprocessorHost().postSetNamespaceQuota(namespace
, quotaPojo
);
321 public void setRegionServerQuota(final String regionServer
, final SetQuotaRequest req
)
322 throws IOException
, InterruptedException
{
323 setQuota(req
, new SetQuotaOperations() {
325 public GlobalQuotaSettingsImpl
fetch() throws IOException
{
326 return new GlobalQuotaSettingsImpl(null, null, null, regionServer
,
327 QuotaUtil
.getRegionServerQuota(masterServices
.getConnection(), regionServer
));
331 public void update(GlobalQuotaSettingsImpl quotaPojo
) throws IOException
{
332 QuotaUtil
.addRegionServerQuota(masterServices
.getConnection(), regionServer
,
333 ((GlobalQuotaSettingsImpl
) quotaPojo
).toQuotas());
337 public void delete() throws IOException
{
338 QuotaUtil
.deleteRegionServerQuota(masterServices
.getConnection(), regionServer
);
342 public void preApply(GlobalQuotaSettingsImpl quotaPojo
) throws IOException
{
343 masterServices
.getMasterCoprocessorHost().preSetRegionServerQuota(regionServer
, quotaPojo
);
347 public void postApply(GlobalQuotaSettingsImpl quotaPojo
) throws IOException
{
348 masterServices
.getMasterCoprocessorHost().postSetRegionServerQuota(regionServer
, quotaPojo
);
353 public void setNamespaceQuota(NamespaceDescriptor desc
) throws IOException
{
355 this.namespaceQuotaManager
.addNamespace(desc
);
359 public void removeNamespaceQuota(String namespace
) throws IOException
{
361 this.namespaceQuotaManager
.deleteNamespace(namespace
);
365 public SwitchRpcThrottleResponse
switchRpcThrottle(SwitchRpcThrottleRequest request
)
367 boolean rpcThrottle
= request
.getRpcThrottleEnabled();
369 masterServices
.getMasterCoprocessorHost().preSwitchRpcThrottle(rpcThrottle
);
370 boolean oldRpcThrottle
= rpcThrottleStorage
.isRpcThrottleEnabled();
371 if (rpcThrottle
!= oldRpcThrottle
) {
372 LOG
.info("{} switch rpc throttle from {} to {}", masterServices
.getClientIdAuditPrefix(),
373 oldRpcThrottle
, rpcThrottle
);
374 ProcedurePrepareLatch latch
= ProcedurePrepareLatch
.createBlockingLatch();
375 SwitchRpcThrottleProcedure procedure
= new SwitchRpcThrottleProcedure(rpcThrottleStorage
,
376 rpcThrottle
, masterServices
.getServerName(), latch
);
377 masterServices
.getMasterProcedureExecutor().submitProcedure(procedure
);
380 LOG
.warn("Skip switch rpc throttle to {} because it's the same with old value",
383 SwitchRpcThrottleResponse response
= SwitchRpcThrottleResponse
.newBuilder()
384 .setPreviousRpcThrottleEnabled(oldRpcThrottle
).build();
385 masterServices
.getMasterCoprocessorHost().postSwitchRpcThrottle(oldRpcThrottle
, rpcThrottle
);
388 LOG
.warn("Skip switch rpc throttle to {} because rpc quota is disabled", rpcThrottle
);
389 return SwitchRpcThrottleResponse
.newBuilder().setPreviousRpcThrottleEnabled(false).build();
393 public IsRpcThrottleEnabledResponse
isRpcThrottleEnabled(IsRpcThrottleEnabledRequest request
)
396 masterServices
.getMasterCoprocessorHost().preIsRpcThrottleEnabled();
397 boolean enabled
= rpcThrottleStorage
.isRpcThrottleEnabled();
398 IsRpcThrottleEnabledResponse response
=
399 IsRpcThrottleEnabledResponse
.newBuilder().setRpcThrottleEnabled(enabled
).build();
400 masterServices
.getMasterCoprocessorHost().postIsRpcThrottleEnabled(enabled
);
403 LOG
.warn("Skip get rpc throttle because rpc quota is disabled");
404 return IsRpcThrottleEnabledResponse
.newBuilder().setRpcThrottleEnabled(false).build();
408 private void setQuota(final SetQuotaRequest req
, final SetQuotaOperations quotaOps
)
409 throws IOException
, InterruptedException
{
410 if (req
.hasRemoveAll() && req
.getRemoveAll() == true) {
411 quotaOps
.preApply(null);
413 quotaOps
.postApply(null);
417 // Apply quota changes
418 GlobalQuotaSettingsImpl currentQuota
= quotaOps
.fetch();
419 if (LOG
.isTraceEnabled()) {
421 "Current quota for request(" + TextFormat
.shortDebugString(req
)
422 + "): " + currentQuota
);
424 // Call the appropriate "pre" CP hook with the current quota value (may be null)
425 quotaOps
.preApply(currentQuota
);
426 // Translate the protobuf request back into a POJO
427 QuotaSettings newQuota
= QuotaSettings
.buildFromProto(req
);
428 if (LOG
.isTraceEnabled()) {
429 LOG
.trace("Deserialized quota from request: " + newQuota
);
432 // Merge the current quota settings with the new quota settings the user provided.
434 // NB: while SetQuotaRequest technically allows for multi types of quotas to be set in one
435 // message, the Java API (in Admin/AsyncAdmin) does not. Assume there is only one type.
436 GlobalQuotaSettingsImpl mergedQuota
= currentQuota
.merge(newQuota
);
437 if (LOG
.isTraceEnabled()) {
438 LOG
.trace("Computed merged quota from current quota and user request: " + mergedQuota
);
441 // Submit new changes
442 if (mergedQuota
== null) {
445 quotaOps
.update(mergedQuota
);
447 // Advertise the final result via the "post" CP hook
448 quotaOps
.postApply(mergedQuota
);
451 public void checkNamespaceTableAndRegionQuota(TableName tName
, int regions
) throws IOException
{
453 namespaceQuotaManager
.checkQuotaToCreateTable(tName
, regions
);
457 public void checkAndUpdateNamespaceRegionQuota(TableName tName
, int regions
) throws IOException
{
459 namespaceQuotaManager
.checkQuotaToUpdateRegion(tName
, regions
);
464 * @return cached region count, or -1 if quota manager is disabled or table status not found
466 public int getRegionCountOfTable(TableName tName
) throws IOException
{
468 return namespaceQuotaManager
.getRegionCountOfTable(tName
);
474 public void onRegionMerged(RegionInfo mergedRegion
) throws IOException
{
476 namespaceQuotaManager
.updateQuotaForRegionMerge(mergedRegion
);
481 public void onRegionSplit(RegionInfo hri
) throws IOException
{
483 namespaceQuotaManager
.checkQuotaToSplitRegion(hri
);
488 * Remove table from namespace quota.
490 * @param tName - The table name to update quota usage.
491 * @throws IOException Signals that an I/O exception has occurred.
493 public void removeTableFromNamespaceQuota(TableName tName
) throws IOException
{
495 namespaceQuotaManager
.removeFromNamespaceUsage(tName
);
499 public NamespaceAuditor
getNamespaceQuotaManager() {
500 return this.namespaceQuotaManager
;
504 * Encapsulates CRUD quota operations for some subject.
506 private static interface SetQuotaOperations
{
508 * Fetches the current quota settings for the subject.
510 GlobalQuotaSettingsImpl
fetch() throws IOException
;
512 * Deletes the quota for the subject.
514 void delete() throws IOException
;
516 * Persist the given quota for the subject.
518 void update(GlobalQuotaSettingsImpl quotaPojo
) throws IOException
;
520 * Performs some action before {@link #update(GlobalQuotaSettingsImpl)} with the current
521 * quota for the subject.
523 void preApply(GlobalQuotaSettingsImpl quotaPojo
) throws IOException
;
525 * Performs some action after {@link #update(GlobalQuotaSettingsImpl)} with the resulting
526 * quota from the request action for the subject.
528 void postApply(GlobalQuotaSettingsImpl quotaPojo
) throws IOException
;
531 /* ==========================================================================
535 private void checkQuotaSupport() throws IOException
{
536 if (!QuotaUtil
.isQuotaEnabled(masterServices
.getConfiguration())) {
537 throw new DoNotRetryIOException(
538 new UnsupportedOperationException("quota support disabled"));
541 long maxWaitTime
= masterServices
.getConfiguration().getLong(
542 "hbase.master.wait.for.quota.manager.init", 30000); // default is 30 seconds.
543 long startTime
= EnvironmentEdgeManager
.currentTime();
547 } catch (InterruptedException e
) {
548 LOG
.warn("Interrupted while waiting for Quota Manager to be initialized.");
551 } while (!initialized
&& (EnvironmentEdgeManager
.currentTime() - startTime
) < maxWaitTime
);
553 throw new IOException("Quota manager is uninitialized, please retry later.");
558 private void createQuotaTable() throws IOException
{
559 masterServices
.createSystemTable(QuotaUtil
.QUOTA_TABLE_DESC
);
562 private static class NamedLock
<T
> {
563 private final HashSet
<T
> locks
= new HashSet
<>();
565 public void lock(final T name
) throws InterruptedException
{
566 synchronized (locks
) {
567 while (locks
.contains(name
)) {
574 public void unlock(final T name
) {
575 synchronized (locks
) {
583 public void onRegionSplitReverted(RegionInfo hri
) throws IOException
{
585 this.namespaceQuotaManager
.removeRegionFromNamespaceUsage(hri
);
590 * Holds the size of a region at the given time, millis since the epoch.
592 private static class SizeSnapshotWithTimestamp
{
593 private final long size
;
594 private final long time
;
596 public SizeSnapshotWithTimestamp(long size
, long time
) {
601 public long getSize() {
605 public long getTime() {
610 public boolean equals(Object o
) {
611 if (o
instanceof SizeSnapshotWithTimestamp
) {
612 SizeSnapshotWithTimestamp other
= (SizeSnapshotWithTimestamp
) o
;
613 return size
== other
.size
&& time
== other
.time
;
619 public int hashCode() {
620 HashCodeBuilder hcb
= new HashCodeBuilder();
621 return hcb
.append(size
).append(time
).toHashCode();
625 public String
toString() {
626 StringBuilder sb
= new StringBuilder(32);
627 sb
.append("SizeSnapshotWithTimestamp={size=").append(size
).append("B, ");
628 sb
.append("time=").append(time
).append("}");
629 return sb
.toString();
634 void initializeRegionSizes() {
635 assert regionSizes
== null;
636 this.regionSizes
= new ConcurrentHashMap
<>();
639 public void addRegionSize(RegionInfo hri
, long size
, long time
) {
640 if (regionSizes
== null) {
643 regionSizes
.put(hri
, new SizeSnapshotWithTimestamp(size
, time
));
646 public Map
<RegionInfo
, Long
> snapshotRegionSizes() {
647 if (regionSizes
== null) {
651 Map
<RegionInfo
, Long
> copy
= new HashMap
<>();
652 for (Entry
<RegionInfo
, SizeSnapshotWithTimestamp
> entry
: regionSizes
.entrySet()) {
653 copy
.put(entry
.getKey(), entry
.getValue().getSize());
658 int pruneEntriesOlderThan(long timeToPruneBefore
) {
659 if (regionSizes
== null) {
662 int numEntriesRemoved
= 0;
663 Iterator
<Entry
<RegionInfo
,SizeSnapshotWithTimestamp
>> iterator
=
664 regionSizes
.entrySet().iterator();
665 while (iterator
.hasNext()) {
666 long currentEntryTime
= iterator
.next().getValue().getTime();
667 if (currentEntryTime
< timeToPruneBefore
) {
672 return numEntriesRemoved
;
675 public void processFileArchivals(FileArchiveNotificationRequest request
, Connection conn
,
676 Configuration conf
, FileSystem fs
) throws IOException
{
677 final HashMultimap
<TableName
,Entry
<String
,Long
>> archivedFilesByTable
= HashMultimap
.create();
678 // Group the archived files by table
679 for (FileWithSize fileWithSize
: request
.getArchivedFilesList()) {
680 TableName tn
= ProtobufUtil
.toTableName(fileWithSize
.getTableName());
681 archivedFilesByTable
.put(
682 tn
, Maps
.immutableEntry(fileWithSize
.getName(), fileWithSize
.getSize()));
684 if (LOG
.isTraceEnabled()) {
685 LOG
.trace("Grouped archived files by table: " + archivedFilesByTable
);
687 // Report each set of files to the appropriate object
688 for (TableName tn
: archivedFilesByTable
.keySet()) {
689 final Set
<Entry
<String
,Long
>> filesWithSize
= archivedFilesByTable
.get(tn
);
690 final FileArchiverNotifier notifier
= FileArchiverNotifierFactoryImpl
.getInstance().get(
692 notifier
.addArchivedFiles(filesWithSize
);