HBASE-21843 RegionGroupingProvider breaks the meta wal file name pattern which may...
[hbase.git] / hbase-server / src / main / java / org / apache / hadoop / hbase / quotas / MasterQuotaManager.java
blob389e1791dfbac934b0650058c5bc286f61153b0f
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
19 package org.apache.hadoop.hbase.quotas;
21 import java.io.IOException;
22 import java.util.Collections;
23 import java.util.HashMap;
24 import java.util.HashSet;
25 import java.util.Iterator;
26 import java.util.Map;
27 import java.util.Map.Entry;
28 import java.util.Set;
29 import java.util.concurrent.ConcurrentHashMap;
31 import org.apache.commons.lang3.builder.HashCodeBuilder;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.fs.FileSystem;
34 import org.apache.hadoop.hbase.DoNotRetryIOException;
35 import org.apache.hadoop.hbase.MetaTableAccessor;
36 import org.apache.hadoop.hbase.NamespaceDescriptor;
37 import org.apache.hadoop.hbase.RegionStateListener;
38 import org.apache.hadoop.hbase.TableName;
39 import org.apache.hadoop.hbase.client.Connection;
40 import org.apache.hadoop.hbase.client.RegionInfo;
41 import org.apache.hadoop.hbase.master.MasterServices;
42 import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
43 import org.apache.hadoop.hbase.master.procedure.SwitchRpcThrottleProcedure;
44 import org.apache.hadoop.hbase.namespace.NamespaceAuditor;
45 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
46 import org.apache.yetus.audience.InterfaceAudience;
47 import org.apache.yetus.audience.InterfaceStability;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
51 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
52 import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
53 import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
54 import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
56 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
57 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
58 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
59 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
60 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
61 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
62 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
63 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationRequest;
64 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationRequest.FileWithSize;
66 /**
67 * Master Quota Manager.
68 * It is responsible for initialize the quota table on the first-run and
69 * provide the admin operations to interact with the quota table.
71 * TODO: FUTURE: The master will be responsible to notify each RS of quota changes
72 * and it will do the "quota aggregation" when the QuotaScope is CLUSTER.
74 @InterfaceAudience.Private
75 @InterfaceStability.Evolving
76 public class MasterQuotaManager implements RegionStateListener {
77 private static final Logger LOG = LoggerFactory.getLogger(MasterQuotaManager.class);
78 private static final Map<RegionInfo, Long> EMPTY_MAP = Collections.unmodifiableMap(
79 new HashMap<>());
81 private final MasterServices masterServices;
82 private NamedLock<String> namespaceLocks;
83 private NamedLock<TableName> tableLocks;
84 private NamedLock<String> userLocks;
85 private NamedLock<String> regionServerLocks;
86 private boolean initialized = false;
87 private NamespaceAuditor namespaceQuotaManager;
88 private ConcurrentHashMap<RegionInfo, SizeSnapshotWithTimestamp> regionSizes;
89 // Storage for quota rpc throttle
90 private RpcThrottleStorage rpcThrottleStorage;
92 public MasterQuotaManager(final MasterServices masterServices) {
93 this.masterServices = masterServices;
96 public void start() throws IOException {
97 // If the user doesn't want the quota support skip all the initializations.
98 if (!QuotaUtil.isQuotaEnabled(masterServices.getConfiguration())) {
99 LOG.info("Quota support disabled");
100 return;
103 // Create the quota table if missing
104 if (!MetaTableAccessor.tableExists(masterServices.getConnection(),
105 QuotaUtil.QUOTA_TABLE_NAME)) {
106 LOG.info("Quota table not found. Creating...");
107 createQuotaTable();
110 LOG.info("Initializing quota support");
111 namespaceLocks = new NamedLock<>();
112 tableLocks = new NamedLock<>();
113 userLocks = new NamedLock<>();
114 regionServerLocks = new NamedLock<>();
115 regionSizes = new ConcurrentHashMap<>();
117 namespaceQuotaManager = new NamespaceAuditor(masterServices);
118 namespaceQuotaManager.start();
119 initialized = true;
121 rpcThrottleStorage =
122 new RpcThrottleStorage(masterServices.getZooKeeper(), masterServices.getConfiguration());
125 public void stop() {
128 public boolean isQuotaInitialized() {
129 return initialized && namespaceQuotaManager.isInitialized();
132 /* ==========================================================================
133 * Admin operations to manage the quota table
135 public SetQuotaResponse setQuota(final SetQuotaRequest req)
136 throws IOException, InterruptedException {
137 checkQuotaSupport();
139 if (req.hasUserName()) {
140 userLocks.lock(req.getUserName());
141 try {
142 if (req.hasTableName()) {
143 setUserQuota(req.getUserName(), ProtobufUtil.toTableName(req.getTableName()), req);
144 } else if (req.hasNamespace()) {
145 setUserQuota(req.getUserName(), req.getNamespace(), req);
146 } else {
147 setUserQuota(req.getUserName(), req);
149 } finally {
150 userLocks.unlock(req.getUserName());
152 } else if (req.hasTableName()) {
153 TableName table = ProtobufUtil.toTableName(req.getTableName());
154 tableLocks.lock(table);
155 try {
156 setTableQuota(table, req);
157 } finally {
158 tableLocks.unlock(table);
160 } else if (req.hasNamespace()) {
161 namespaceLocks.lock(req.getNamespace());
162 try {
163 setNamespaceQuota(req.getNamespace(), req);
164 } finally {
165 namespaceLocks.unlock(req.getNamespace());
167 } else if (req.hasRegionServer()) {
168 regionServerLocks.lock(req.getRegionServer());
169 try {
170 setRegionServerQuota(req.getRegionServer(), req);
171 } finally {
172 regionServerLocks.unlock(req.getRegionServer());
174 } else {
175 throw new DoNotRetryIOException(new UnsupportedOperationException(
176 "a user, a table, a namespace or region server must be specified"));
178 return SetQuotaResponse.newBuilder().build();
181 public void setUserQuota(final String userName, final SetQuotaRequest req)
182 throws IOException, InterruptedException {
183 setQuota(req, new SetQuotaOperations() {
184 @Override
185 public GlobalQuotaSettingsImpl fetch() throws IOException {
186 return new GlobalQuotaSettingsImpl(req.getUserName(), null, null, null,
187 QuotaUtil.getUserQuota(masterServices.getConnection(), userName));
189 @Override
190 public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
191 QuotaUtil.addUserQuota(masterServices.getConnection(), userName, quotaPojo.toQuotas());
193 @Override
194 public void delete() throws IOException {
195 QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName);
197 @Override
198 public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
199 masterServices.getMasterCoprocessorHost().preSetUserQuota(userName, quotaPojo);
201 @Override
202 public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
203 masterServices.getMasterCoprocessorHost().postSetUserQuota(userName, quotaPojo);
208 public void setUserQuota(final String userName, final TableName table,
209 final SetQuotaRequest req) throws IOException, InterruptedException {
210 setQuota(req, new SetQuotaOperations() {
211 @Override
212 public GlobalQuotaSettingsImpl fetch() throws IOException {
213 return new GlobalQuotaSettingsImpl(userName, table, null, null,
214 QuotaUtil.getUserQuota(masterServices.getConnection(), userName, table));
216 @Override
217 public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
218 QuotaUtil.addUserQuota(masterServices.getConnection(), userName, table,
219 quotaPojo.toQuotas());
221 @Override
222 public void delete() throws IOException {
223 QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName, table);
225 @Override
226 public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
227 masterServices.getMasterCoprocessorHost().preSetUserQuota(userName, table, quotaPojo);
229 @Override
230 public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
231 masterServices.getMasterCoprocessorHost().postSetUserQuota(userName, table, quotaPojo);
236 public void setUserQuota(final String userName, final String namespace,
237 final SetQuotaRequest req) throws IOException, InterruptedException {
238 setQuota(req, new SetQuotaOperations() {
239 @Override
240 public GlobalQuotaSettingsImpl fetch() throws IOException {
241 return new GlobalQuotaSettingsImpl(userName, null, namespace, null,
242 QuotaUtil.getUserQuota(masterServices.getConnection(), userName, namespace));
244 @Override
245 public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
246 QuotaUtil.addUserQuota(masterServices.getConnection(), userName, namespace,
247 quotaPojo.toQuotas());
249 @Override
250 public void delete() throws IOException {
251 QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName, namespace);
253 @Override
254 public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
255 masterServices.getMasterCoprocessorHost().preSetUserQuota(
256 userName, namespace, quotaPojo);
258 @Override
259 public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
260 masterServices.getMasterCoprocessorHost().postSetUserQuota(
261 userName, namespace, quotaPojo);
266 public void setTableQuota(final TableName table, final SetQuotaRequest req)
267 throws IOException, InterruptedException {
268 setQuota(req, new SetQuotaOperations() {
269 @Override
270 public GlobalQuotaSettingsImpl fetch() throws IOException {
271 return new GlobalQuotaSettingsImpl(null, table, null, null,
272 QuotaUtil.getTableQuota(masterServices.getConnection(), table));
274 @Override
275 public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
276 QuotaUtil.addTableQuota(masterServices.getConnection(), table, quotaPojo.toQuotas());
278 @Override
279 public void delete() throws IOException {
280 QuotaUtil.deleteTableQuota(masterServices.getConnection(), table);
282 @Override
283 public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
284 masterServices.getMasterCoprocessorHost().preSetTableQuota(table, quotaPojo);
286 @Override
287 public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
288 masterServices.getMasterCoprocessorHost().postSetTableQuota(table, quotaPojo);
293 public void setNamespaceQuota(final String namespace, final SetQuotaRequest req)
294 throws IOException, InterruptedException {
295 setQuota(req, new SetQuotaOperations() {
296 @Override
297 public GlobalQuotaSettingsImpl fetch() throws IOException {
298 return new GlobalQuotaSettingsImpl(null, null, namespace, null,
299 QuotaUtil.getNamespaceQuota(masterServices.getConnection(), namespace));
301 @Override
302 public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
303 QuotaUtil.addNamespaceQuota(masterServices.getConnection(), namespace,
304 ((GlobalQuotaSettingsImpl) quotaPojo).toQuotas());
306 @Override
307 public void delete() throws IOException {
308 QuotaUtil.deleteNamespaceQuota(masterServices.getConnection(), namespace);
310 @Override
311 public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
312 masterServices.getMasterCoprocessorHost().preSetNamespaceQuota(namespace, quotaPojo);
314 @Override
315 public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
316 masterServices.getMasterCoprocessorHost().postSetNamespaceQuota(namespace, quotaPojo);
321 public void setRegionServerQuota(final String regionServer, final SetQuotaRequest req)
322 throws IOException, InterruptedException {
323 setQuota(req, new SetQuotaOperations() {
324 @Override
325 public GlobalQuotaSettingsImpl fetch() throws IOException {
326 return new GlobalQuotaSettingsImpl(null, null, null, regionServer,
327 QuotaUtil.getRegionServerQuota(masterServices.getConnection(), regionServer));
330 @Override
331 public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
332 QuotaUtil.addRegionServerQuota(masterServices.getConnection(), regionServer,
333 ((GlobalQuotaSettingsImpl) quotaPojo).toQuotas());
336 @Override
337 public void delete() throws IOException {
338 QuotaUtil.deleteRegionServerQuota(masterServices.getConnection(), regionServer);
341 @Override
342 public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
343 masterServices.getMasterCoprocessorHost().preSetRegionServerQuota(regionServer, quotaPojo);
346 @Override
347 public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
348 masterServices.getMasterCoprocessorHost().postSetRegionServerQuota(regionServer, quotaPojo);
353 public void setNamespaceQuota(NamespaceDescriptor desc) throws IOException {
354 if (initialized) {
355 this.namespaceQuotaManager.addNamespace(desc);
359 public void removeNamespaceQuota(String namespace) throws IOException {
360 if (initialized) {
361 this.namespaceQuotaManager.deleteNamespace(namespace);
365 public SwitchRpcThrottleResponse switchRpcThrottle(SwitchRpcThrottleRequest request)
366 throws IOException {
367 boolean rpcThrottle = request.getRpcThrottleEnabled();
368 if (initialized) {
369 masterServices.getMasterCoprocessorHost().preSwitchRpcThrottle(rpcThrottle);
370 boolean oldRpcThrottle = rpcThrottleStorage.isRpcThrottleEnabled();
371 if (rpcThrottle != oldRpcThrottle) {
372 LOG.info("{} switch rpc throttle from {} to {}", masterServices.getClientIdAuditPrefix(),
373 oldRpcThrottle, rpcThrottle);
374 ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch();
375 SwitchRpcThrottleProcedure procedure = new SwitchRpcThrottleProcedure(rpcThrottleStorage,
376 rpcThrottle, masterServices.getServerName(), latch);
377 masterServices.getMasterProcedureExecutor().submitProcedure(procedure);
378 latch.await();
379 } else {
380 LOG.warn("Skip switch rpc throttle to {} because it's the same with old value",
381 rpcThrottle);
383 SwitchRpcThrottleResponse response = SwitchRpcThrottleResponse.newBuilder()
384 .setPreviousRpcThrottleEnabled(oldRpcThrottle).build();
385 masterServices.getMasterCoprocessorHost().postSwitchRpcThrottle(oldRpcThrottle, rpcThrottle);
386 return response;
387 } else {
388 LOG.warn("Skip switch rpc throttle to {} because rpc quota is disabled", rpcThrottle);
389 return SwitchRpcThrottleResponse.newBuilder().setPreviousRpcThrottleEnabled(false).build();
393 public IsRpcThrottleEnabledResponse isRpcThrottleEnabled(IsRpcThrottleEnabledRequest request)
394 throws IOException {
395 if (initialized) {
396 masterServices.getMasterCoprocessorHost().preIsRpcThrottleEnabled();
397 boolean enabled = rpcThrottleStorage.isRpcThrottleEnabled();
398 IsRpcThrottleEnabledResponse response =
399 IsRpcThrottleEnabledResponse.newBuilder().setRpcThrottleEnabled(enabled).build();
400 masterServices.getMasterCoprocessorHost().postIsRpcThrottleEnabled(enabled);
401 return response;
402 } else {
403 LOG.warn("Skip get rpc throttle because rpc quota is disabled");
404 return IsRpcThrottleEnabledResponse.newBuilder().setRpcThrottleEnabled(false).build();
408 private void setQuota(final SetQuotaRequest req, final SetQuotaOperations quotaOps)
409 throws IOException, InterruptedException {
410 if (req.hasRemoveAll() && req.getRemoveAll() == true) {
411 quotaOps.preApply(null);
412 quotaOps.delete();
413 quotaOps.postApply(null);
414 return;
417 // Apply quota changes
418 GlobalQuotaSettingsImpl currentQuota = quotaOps.fetch();
419 if (LOG.isTraceEnabled()) {
420 LOG.trace(
421 "Current quota for request(" + TextFormat.shortDebugString(req)
422 + "): " + currentQuota);
424 // Call the appropriate "pre" CP hook with the current quota value (may be null)
425 quotaOps.preApply(currentQuota);
426 // Translate the protobuf request back into a POJO
427 QuotaSettings newQuota = QuotaSettings.buildFromProto(req);
428 if (LOG.isTraceEnabled()) {
429 LOG.trace("Deserialized quota from request: " + newQuota);
432 // Merge the current quota settings with the new quota settings the user provided.
434 // NB: while SetQuotaRequest technically allows for multi types of quotas to be set in one
435 // message, the Java API (in Admin/AsyncAdmin) does not. Assume there is only one type.
436 GlobalQuotaSettingsImpl mergedQuota = currentQuota.merge(newQuota);
437 if (LOG.isTraceEnabled()) {
438 LOG.trace("Computed merged quota from current quota and user request: " + mergedQuota);
441 // Submit new changes
442 if (mergedQuota == null) {
443 quotaOps.delete();
444 } else {
445 quotaOps.update(mergedQuota);
447 // Advertise the final result via the "post" CP hook
448 quotaOps.postApply(mergedQuota);
451 public void checkNamespaceTableAndRegionQuota(TableName tName, int regions) throws IOException {
452 if (initialized) {
453 namespaceQuotaManager.checkQuotaToCreateTable(tName, regions);
457 public void checkAndUpdateNamespaceRegionQuota(TableName tName, int regions) throws IOException {
458 if (initialized) {
459 namespaceQuotaManager.checkQuotaToUpdateRegion(tName, regions);
464 * @return cached region count, or -1 if quota manager is disabled or table status not found
466 public int getRegionCountOfTable(TableName tName) throws IOException {
467 if (initialized) {
468 return namespaceQuotaManager.getRegionCountOfTable(tName);
470 return -1;
473 @Override
474 public void onRegionMerged(RegionInfo mergedRegion) throws IOException {
475 if (initialized) {
476 namespaceQuotaManager.updateQuotaForRegionMerge(mergedRegion);
480 @Override
481 public void onRegionSplit(RegionInfo hri) throws IOException {
482 if (initialized) {
483 namespaceQuotaManager.checkQuotaToSplitRegion(hri);
488 * Remove table from namespace quota.
490 * @param tName - The table name to update quota usage.
491 * @throws IOException Signals that an I/O exception has occurred.
493 public void removeTableFromNamespaceQuota(TableName tName) throws IOException {
494 if (initialized) {
495 namespaceQuotaManager.removeFromNamespaceUsage(tName);
499 public NamespaceAuditor getNamespaceQuotaManager() {
500 return this.namespaceQuotaManager;
504 * Encapsulates CRUD quota operations for some subject.
506 private static interface SetQuotaOperations {
508 * Fetches the current quota settings for the subject.
510 GlobalQuotaSettingsImpl fetch() throws IOException;
512 * Deletes the quota for the subject.
514 void delete() throws IOException;
516 * Persist the given quota for the subject.
518 void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException;
520 * Performs some action before {@link #update(GlobalQuotaSettingsImpl)} with the current
521 * quota for the subject.
523 void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException;
525 * Performs some action after {@link #update(GlobalQuotaSettingsImpl)} with the resulting
526 * quota from the request action for the subject.
528 void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException;
531 /* ==========================================================================
532 * Helpers
535 private void checkQuotaSupport() throws IOException {
536 if (!QuotaUtil.isQuotaEnabled(masterServices.getConfiguration())) {
537 throw new DoNotRetryIOException(
538 new UnsupportedOperationException("quota support disabled"));
540 if (!initialized) {
541 long maxWaitTime = masterServices.getConfiguration().getLong(
542 "hbase.master.wait.for.quota.manager.init", 30000); // default is 30 seconds.
543 long startTime = EnvironmentEdgeManager.currentTime();
544 do {
545 try {
546 Thread.sleep(100);
547 } catch (InterruptedException e) {
548 LOG.warn("Interrupted while waiting for Quota Manager to be initialized.");
549 break;
551 } while (!initialized && (EnvironmentEdgeManager.currentTime() - startTime) < maxWaitTime);
552 if (!initialized) {
553 throw new IOException("Quota manager is uninitialized, please retry later.");
558 private void createQuotaTable() throws IOException {
559 masterServices.createSystemTable(QuotaUtil.QUOTA_TABLE_DESC);
562 private static class NamedLock<T> {
563 private final HashSet<T> locks = new HashSet<>();
565 public void lock(final T name) throws InterruptedException {
566 synchronized (locks) {
567 while (locks.contains(name)) {
568 locks.wait();
570 locks.add(name);
574 public void unlock(final T name) {
575 synchronized (locks) {
576 locks.remove(name);
577 locks.notifyAll();
582 @Override
583 public void onRegionSplitReverted(RegionInfo hri) throws IOException {
584 if (initialized) {
585 this.namespaceQuotaManager.removeRegionFromNamespaceUsage(hri);
590 * Holds the size of a region at the given time, millis since the epoch.
592 private static class SizeSnapshotWithTimestamp {
593 private final long size;
594 private final long time;
596 public SizeSnapshotWithTimestamp(long size, long time) {
597 this.size = size;
598 this.time = time;
601 public long getSize() {
602 return size;
605 public long getTime() {
606 return time;
609 @Override
610 public boolean equals(Object o) {
611 if (o instanceof SizeSnapshotWithTimestamp) {
612 SizeSnapshotWithTimestamp other = (SizeSnapshotWithTimestamp) o;
613 return size == other.size && time == other.time;
615 return false;
618 @Override
619 public int hashCode() {
620 HashCodeBuilder hcb = new HashCodeBuilder();
621 return hcb.append(size).append(time).toHashCode();
624 @Override
625 public String toString() {
626 StringBuilder sb = new StringBuilder(32);
627 sb.append("SizeSnapshotWithTimestamp={size=").append(size).append("B, ");
628 sb.append("time=").append(time).append("}");
629 return sb.toString();
633 @VisibleForTesting
634 void initializeRegionSizes() {
635 assert regionSizes == null;
636 this.regionSizes = new ConcurrentHashMap<>();
639 public void addRegionSize(RegionInfo hri, long size, long time) {
640 if (regionSizes == null) {
641 return;
643 regionSizes.put(hri, new SizeSnapshotWithTimestamp(size, time));
646 public Map<RegionInfo, Long> snapshotRegionSizes() {
647 if (regionSizes == null) {
648 return EMPTY_MAP;
651 Map<RegionInfo, Long> copy = new HashMap<>();
652 for (Entry<RegionInfo, SizeSnapshotWithTimestamp> entry : regionSizes.entrySet()) {
653 copy.put(entry.getKey(), entry.getValue().getSize());
655 return copy;
658 int pruneEntriesOlderThan(long timeToPruneBefore) {
659 if (regionSizes == null) {
660 return 0;
662 int numEntriesRemoved = 0;
663 Iterator<Entry<RegionInfo,SizeSnapshotWithTimestamp>> iterator =
664 regionSizes.entrySet().iterator();
665 while (iterator.hasNext()) {
666 long currentEntryTime = iterator.next().getValue().getTime();
667 if (currentEntryTime < timeToPruneBefore) {
668 iterator.remove();
669 numEntriesRemoved++;
672 return numEntriesRemoved;
675 public void processFileArchivals(FileArchiveNotificationRequest request, Connection conn,
676 Configuration conf, FileSystem fs) throws IOException {
677 final HashMultimap<TableName,Entry<String,Long>> archivedFilesByTable = HashMultimap.create();
678 // Group the archived files by table
679 for (FileWithSize fileWithSize : request.getArchivedFilesList()) {
680 TableName tn = ProtobufUtil.toTableName(fileWithSize.getTableName());
681 archivedFilesByTable.put(
682 tn, Maps.immutableEntry(fileWithSize.getName(), fileWithSize.getSize()));
684 if (LOG.isTraceEnabled()) {
685 LOG.trace("Grouped archived files by table: " + archivedFilesByTable);
687 // Report each set of files to the appropriate object
688 for (TableName tn : archivedFilesByTable.keySet()) {
689 final Set<Entry<String,Long>> filesWithSize = archivedFilesByTable.get(tn);
690 final FileArchiverNotifier notifier = FileArchiverNotifierFactoryImpl.getInstance().get(
691 conn, conf, fs, tn);
692 notifier.addArchivedFiles(filesWithSize);