HBASE-26286: Add support for specifying store file tracker when restoring or cloning...
[hbase.git] / hbase-server / src / main / java / org / apache / hadoop / hbase / quotas / FileArchiverNotifierImpl.java
blobd81c1f348b87d39abf6852ba2ddaab0f277ebaa9
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to you under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 package org.apache.hadoop.hbase.quotas;
19 import java.io.IOException;
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.HashMap;
24 import java.util.HashSet;
25 import java.util.Iterator;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Map.Entry;
29 import java.util.Objects;
30 import java.util.Set;
31 import java.util.concurrent.locks.ReentrantReadWriteLock;
32 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
33 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
34 import java.util.function.Predicate;
35 import java.util.stream.Collectors;
36 import org.apache.commons.lang3.builder.HashCodeBuilder;
37 import org.apache.hadoop.conf.Configuration;
38 import org.apache.hadoop.fs.FileStatus;
39 import org.apache.hadoop.fs.FileSystem;
40 import org.apache.hadoop.fs.Path;
41 import org.apache.hadoop.hbase.TableName;
42 import org.apache.hadoop.hbase.client.Connection;
43 import org.apache.hadoop.hbase.client.Get;
44 import org.apache.hadoop.hbase.client.Put;
45 import org.apache.hadoop.hbase.client.Result;
46 import org.apache.hadoop.hbase.client.Table;
47 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
48 import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
49 import org.apache.hadoop.hbase.util.CommonFSUtils;
50 import org.apache.hadoop.hbase.util.FSUtils;
51 import org.apache.hadoop.hbase.util.HFileArchiveUtil;
52 import org.apache.hadoop.util.StringUtils;
53 import org.apache.yetus.audience.InterfaceAudience;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
57 import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
58 import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
59 import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
61 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
62 import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
63 import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
64 import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest.FamilyFiles;
65 import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest.StoreFile;
67 /**
68 * Tracks file archiving and updates the hbase quota table.
70 @InterfaceAudience.Private
71 public class FileArchiverNotifierImpl implements FileArchiverNotifier {
72 private static final Logger LOG = LoggerFactory.getLogger(FileArchiverNotifierImpl.class);
73 private final Connection conn;
74 private final Configuration conf;
75 private final FileSystem fs;
76 private final TableName tn;
77 private final ReadLock readLock;
78 private final WriteLock writeLock;
79 private volatile long lastFullCompute = Long.MIN_VALUE;
80 private List<String> currentSnapshots = Collections.emptyList();
81 private static final Map<String,Object> NAMESPACE_LOCKS = new HashMap<>();
83 /**
84 * An Exception thrown when SnapshotSize updates to hbase:quota fail to be written.
86 @InterfaceAudience.Private
87 public static class QuotaSnapshotSizeSerializationException extends IOException {
88 private static final long serialVersionUID = 1L;
90 public QuotaSnapshotSizeSerializationException(String msg) {
91 super(msg);
95 public FileArchiverNotifierImpl(
96 Connection conn, Configuration conf, FileSystem fs, TableName tn) {
97 this.conn = conn;
98 this.conf = conf;
99 this.fs = fs;
100 this.tn = tn;
101 ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
102 readLock = lock.readLock();
103 writeLock = lock.writeLock();
106 static synchronized Object getLockForNamespace(String namespace) {
107 return NAMESPACE_LOCKS.computeIfAbsent(namespace, (ns) -> new Object());
111 * Returns a strictly-increasing measure of time extracted by {@link System#nanoTime()}.
113 long getLastFullCompute() {
114 return lastFullCompute;
117 @Override
118 public void addArchivedFiles(Set<Entry<String, Long>> fileSizes) throws IOException {
119 long start = System.nanoTime();
120 readLock.lock();
121 try {
122 // We want to catch the case where we got an archival request, but there was a full
123 // re-computation in progress that was blocking us. Most likely, the full computation is going
124 // to already include the changes we were going to make.
126 // Same as "start < lastFullCompute" but avoiding numeric overflow per the
127 // System.nanoTime() javadoc
128 if (lastFullCompute != Long.MIN_VALUE && start - lastFullCompute < 0) {
129 if (LOG.isTraceEnabled()) {
130 LOG.trace("A full computation was performed after this request was received."
131 + " Ignoring requested updates: " + fileSizes);
133 return;
136 if (LOG.isTraceEnabled()) {
137 LOG.trace("currentSnapshots: " + currentSnapshots + " fileSize: "+ fileSizes);
140 // Write increment to quota table for the correct snapshot. Only do this if we have snapshots
141 // and some files that were archived.
142 if (!currentSnapshots.isEmpty() && !fileSizes.isEmpty()) {
143 // We get back the files which no snapshot referenced (the files which will be deleted soon)
144 groupArchivedFiledBySnapshotAndRecordSize(currentSnapshots, fileSizes);
146 } finally {
147 readLock.unlock();
152 * For each file in the map, this updates the first snapshot (lexicographic snapshot name) that
153 * references this file. The result of this computation is serialized to the quota table.
155 * @param snapshots A collection of HBase snapshots to group the files into
156 * @param fileSizes A map of file names to their sizes
158 void groupArchivedFiledBySnapshotAndRecordSize(
159 List<String> snapshots, Set<Entry<String, Long>> fileSizes) throws IOException {
160 // Make a copy as we'll modify it.
161 final Map<String,Long> filesToUpdate = new HashMap<>(fileSizes.size());
162 for (Entry<String,Long> entry : fileSizes) {
163 filesToUpdate.put(entry.getKey(), entry.getValue());
165 // Track the change in size to each snapshot
166 final Map<String,Long> snapshotSizeChanges = new HashMap<>();
167 for (String snapshot : snapshots) {
168 // For each file in `filesToUpdate`, check if `snapshot` refers to it.
169 // If `snapshot` does, remove it from `filesToUpdate` and add it to `snapshotSizeChanges`.
170 bucketFilesToSnapshot(snapshot, filesToUpdate, snapshotSizeChanges);
171 if (filesToUpdate.isEmpty()) {
172 // If we have no more files recently archived, we have nothing more to check
173 break;
176 // We have computed changes to the snapshot size, we need to record them.
177 if (!snapshotSizeChanges.isEmpty()) {
178 if (LOG.isTraceEnabled()) {
179 LOG.trace("Writing snapshot size changes for: " + snapshotSizeChanges);
181 persistSnapshotSizeChanges(snapshotSizeChanges);
186 * For the given snapshot, find all files which this {@code snapshotName} references. After a file
187 * is found to be referenced by the snapshot, it is removed from {@code filesToUpdate} and
188 * {@code snapshotSizeChanges} is updated in concert.
190 * @param snapshotName The snapshot to check
191 * @param filesToUpdate A mapping of archived files to their size
192 * @param snapshotSizeChanges A mapping of snapshots and their change in size
194 void bucketFilesToSnapshot(
195 String snapshotName, Map<String,Long> filesToUpdate, Map<String,Long> snapshotSizeChanges)
196 throws IOException {
197 // A quick check to avoid doing work if the caller unnecessarily invoked this method.
198 if (filesToUpdate.isEmpty()) {
199 return;
202 Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(
203 snapshotName, CommonFSUtils.getRootDir(conf));
204 SnapshotDescription sd = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
205 SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, sd);
206 // For each region referenced by the snapshot
207 for (SnapshotRegionManifest rm : manifest.getRegionManifests()) {
208 // For each column family in this region
209 for (FamilyFiles ff : rm.getFamilyFilesList()) {
210 // And each store file in that family
211 for (StoreFile sf : ff.getStoreFilesList()) {
212 Long valueOrNull = filesToUpdate.remove(sf.getName());
213 if (valueOrNull != null) {
214 // This storefile was recently archived, we should update this snapshot with its size
215 snapshotSizeChanges.merge(snapshotName, valueOrNull, Long::sum);
217 // Short-circuit, if we have no more files that were archived, we don't need to iterate
218 // over the rest of the snapshot.
219 if (filesToUpdate.isEmpty()) {
220 return;
228 * Reads the current size for each snapshot to update, generates a new update based on that value,
229 * and then writes the new update.
231 * @param snapshotSizeChanges A map of snapshot name to size change
233 void persistSnapshotSizeChanges(Map<String,Long> snapshotSizeChanges) throws IOException {
234 try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
235 // Create a list (with a more typical ordering implied)
236 final List<Entry<String,Long>> snapshotSizeEntries = new ArrayList<>(
237 snapshotSizeChanges.entrySet());
238 // Create the Gets for each snapshot we need to update
239 final List<Get> snapshotSizeGets = snapshotSizeEntries.stream()
240 .map((e) -> QuotaTableUtil.makeGetForSnapshotSize(tn, e.getKey()))
241 .collect(Collectors.toList());
242 final Iterator<Entry<String,Long>> iterator = snapshotSizeEntries.iterator();
243 // A List to store each Put we'll create from the Get's we retrieve
244 final List<Put> updates = new ArrayList<>(snapshotSizeEntries.size());
246 // TODO Push this down to the RegionServer with a coprocessor:
248 // We would really like to piggy-back on the row-lock already being grabbed
249 // to handle the update of the row in the quota table. However, because the value
250 // is a serialized protobuf, the standard Increment API doesn't work for us. With a CP, we
251 // can just send the size deltas to the RS and atomically update the serialized PB object
252 // while relying on the row-lock for synchronization.
254 // Synchronizing on the namespace string is a "minor smell" but passable as this is
255 // only invoked via a single caller (the active Master). Using the namespace name lets us
256 // have some parallelism without worry of on caller seeing stale data from the quota table.
257 synchronized (getLockForNamespace(tn.getNamespaceAsString())) {
258 final Result[] existingSnapshotSizes = quotaTable.get(snapshotSizeGets);
259 long totalSizeChange = 0;
260 // Read the current size values (if they exist) to generate the new value
261 for (Result result : existingSnapshotSizes) {
262 Entry<String,Long> entry = iterator.next();
263 String snapshot = entry.getKey();
264 Long size = entry.getValue();
265 // Track the total size change for the namespace this table belongs in
266 totalSizeChange += size;
267 // Get the size of the previous value (or zero)
268 long previousSize = getSnapshotSizeFromResult(result);
269 // Create an update. A file was archived from the table, so the table's size goes
270 // down, but the snapshot's size goes up.
271 updates.add(QuotaTableUtil.createPutForSnapshotSize(tn, snapshot, previousSize + size));
274 // Create an update for the summation of all snapshots in the namespace
275 if (totalSizeChange != 0) {
276 long previousSize = getPreviousNamespaceSnapshotSize(
277 quotaTable, tn.getNamespaceAsString());
278 updates.add(QuotaTableUtil.createPutForNamespaceSnapshotSize(
279 tn.getNamespaceAsString(), previousSize + totalSizeChange));
282 // Send all of the quota table updates in one batch.
283 List<Object> failures = new ArrayList<>();
284 final Object[] results = new Object[updates.size()];
285 quotaTable.batch(updates, results);
286 for (Object result : results) {
287 // A null result is an error condition (all RPC attempts failed)
288 if (!(result instanceof Result)) {
289 failures.add(result);
292 // Propagate a failure if any updates failed
293 if (!failures.isEmpty()) {
294 throw new QuotaSnapshotSizeSerializationException(
295 "Failed to write some snapshot size updates: " + failures);
298 } catch (InterruptedException e) {
299 Thread.currentThread().interrupt();
300 return;
305 * Fetches the current size of all snapshots in the given {@code namespace}.
307 * @param quotaTable The HBase quota table
308 * @param namespace Namespace to fetch the sum of snapshot sizes for
309 * @return The size of all snapshot sizes for the namespace in bytes.
311 long getPreviousNamespaceSnapshotSize(Table quotaTable, String namespace) throws IOException {
312 // Update the size of each snapshot for all snapshots in a namespace.
313 Result r = quotaTable.get(
314 QuotaTableUtil.createGetNamespaceSnapshotSize(namespace));
315 return getSnapshotSizeFromResult(r);
319 * Extracts the size component from a serialized {@link SpaceQuotaSnapshot} protobuf.
321 * @param r A Result containing one cell with a SpaceQuotaSnapshot protobuf
322 * @return The size in bytes of the snapshot.
324 long getSnapshotSizeFromResult(Result r) throws InvalidProtocolBufferException {
325 // Per javadoc, Result should only be null if an exception was thrown. So, if we're here,
326 // we should be non-null. If we can't advance to the first cell, same as "no cell".
327 if (!r.isEmpty() && r.advance()) {
328 return QuotaTableUtil.parseSnapshotSize(r.current());
330 return 0L;
333 @Override
334 public long computeAndStoreSnapshotSizes(
335 Collection<String> currentSnapshots) throws IOException {
336 // Record what the current snapshots are
337 this.currentSnapshots = new ArrayList<>(currentSnapshots);
338 Collections.sort(this.currentSnapshots);
340 // compute new size for table + snapshots for that table
341 List<SnapshotWithSize> snapshotSizes = computeSnapshotSizes(this.currentSnapshots);
342 if (LOG.isTraceEnabled()) {
343 LOG.trace("Computed snapshot sizes for " + tn + " of " + snapshotSizes);
346 // Compute the total size of all snapshots against our table
347 final long totalSnapshotSize = snapshotSizes.stream().mapToLong((sws) -> sws.getSize()).sum();
349 writeLock.lock();
350 try {
351 // Persist the size of each snapshot
352 try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
353 persistSnapshotSizes(quotaTable, snapshotSizes);
356 // Report the last time we did a recomputation
357 lastFullCompute = System.nanoTime();
359 return totalSnapshotSize;
360 } finally {
361 writeLock.unlock();
365 @Override
366 public String toString() {
367 StringBuilder sb = new StringBuilder();
368 sb.append(getClass().getSimpleName()).append("[");
369 sb.append("tableName=").append(tn).append(", currentSnapshots=");
370 sb.append(currentSnapshots).append(", lastFullCompute=").append(lastFullCompute);
371 return sb.append("]").toString();
375 * Computes the size of each snapshot against the table referenced by {@code this}.
377 * @param snapshots A sorted list of snapshots against {@code tn}.
378 * @return A list of the size for each snapshot against {@code tn}.
380 List<SnapshotWithSize> computeSnapshotSizes(List<String> snapshots) throws IOException {
381 final List<SnapshotWithSize> snapshotSizes = new ArrayList<>(snapshots.size());
382 final Path rootDir = CommonFSUtils.getRootDir(conf);
384 // Get the map of store file names to store file path for this table
385 final Set<String> tableReferencedStoreFiles;
386 try {
387 tableReferencedStoreFiles = FSUtils.getTableStoreFilePathMap(fs, rootDir).keySet();
388 } catch (InterruptedException e) {
389 Thread.currentThread().interrupt();
390 return null;
393 if (LOG.isTraceEnabled()) {
394 LOG.trace("Paths for " + tn + ": " + tableReferencedStoreFiles);
397 // For each snapshot on this table, get the files which the snapshot references which
398 // the table does not.
399 Set<String> snapshotReferencedFiles = new HashSet<>();
400 for (String snapshotName : snapshots) {
401 Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
402 SnapshotDescription sd = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
403 SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, sd);
405 if (LOG.isTraceEnabled()) {
406 LOG.trace("Files referenced by other snapshots: " + snapshotReferencedFiles);
409 // Get the set of files from the manifest that this snapshot references which are not also
410 // referenced by the originating table.
411 Set<StoreFileReference> unreferencedStoreFileNames = getStoreFilesFromSnapshot(
412 manifest, (sfn) -> !tableReferencedStoreFiles.contains(sfn)
413 && !snapshotReferencedFiles.contains(sfn));
415 if (LOG.isTraceEnabled()) {
416 LOG.trace("Snapshot " + snapshotName + " solely references the files: "
417 + unreferencedStoreFileNames);
420 // Compute the size of the store files for this snapshot
421 long size = getSizeOfStoreFiles(tn, unreferencedStoreFileNames);
422 if (LOG.isTraceEnabled()) {
423 LOG.trace("Computed size of " + snapshotName + " to be " + size);
426 // Persist this snapshot's size into the map
427 snapshotSizes.add(new SnapshotWithSize(snapshotName, size));
429 // Make sure that we don't double-count the same file
430 for (StoreFileReference ref : unreferencedStoreFileNames) {
431 for (String fileNames : ref.getFamilyToFilesMapping().values()) {
432 snapshotReferencedFiles.add(fileNames);
437 return snapshotSizes;
441 * Computes the size of each store file in {@code storeFileNames}
443 long getSizeOfStoreFiles(TableName tn, Set<StoreFileReference> storeFileNames) {
444 return storeFileNames.stream()
445 .collect(Collectors.summingLong((sfr) -> getSizeOfStoreFile(tn, sfr)));
449 * Computes the size of the store files for a single region.
451 long getSizeOfStoreFile(TableName tn, StoreFileReference storeFileName) {
452 String regionName = storeFileName.getRegionName();
453 return storeFileName.getFamilyToFilesMapping()
454 .entries().stream()
455 .collect(Collectors.summingLong((e) ->
456 getSizeOfStoreFile(tn, regionName, e.getKey(), e.getValue())));
460 * Computes the size of the store file given its name, region and family name in
461 * the archive directory.
463 long getSizeOfStoreFile(
464 TableName tn, String regionName, String family, String storeFile) {
465 Path familyArchivePath;
466 try {
467 familyArchivePath = HFileArchiveUtil.getStoreArchivePath(conf, tn, regionName, family);
468 } catch (IOException e) {
469 LOG.warn("Could not compute path for the archive directory for the region", e);
470 return 0L;
472 Path fileArchivePath = new Path(familyArchivePath, storeFile);
473 try {
474 if (fs.exists(fileArchivePath)) {
475 FileStatus[] status = fs.listStatus(fileArchivePath);
476 if (1 != status.length) {
477 LOG.warn("Expected " + fileArchivePath +
478 " to be a file but was a directory, ignoring reference");
479 return 0L;
481 return status[0].getLen();
483 } catch (IOException e) {
484 LOG.warn("Could not obtain the status of " + fileArchivePath, e);
485 return 0L;
487 LOG.warn("Expected " + fileArchivePath + " to exist but does not, ignoring reference.");
488 return 0L;
492 * Extracts the names of the store files referenced by this snapshot which satisfy the given
493 * predicate (the predicate returns {@code true}).
495 Set<StoreFileReference> getStoreFilesFromSnapshot(
496 SnapshotManifest manifest, Predicate<String> filter) {
497 Set<StoreFileReference> references = new HashSet<>();
498 // For each region referenced by the snapshot
499 for (SnapshotRegionManifest rm : manifest.getRegionManifests()) {
500 StoreFileReference regionReference = new StoreFileReference(
501 ProtobufUtil.toRegionInfo(rm.getRegionInfo()).getEncodedName());
503 // For each column family in this region
504 for (FamilyFiles ff : rm.getFamilyFilesList()) {
505 final String familyName = ff.getFamilyName().toStringUtf8();
506 // And each store file in that family
507 for (StoreFile sf : ff.getStoreFilesList()) {
508 String storeFileName = sf.getName();
509 // A snapshot only "inherits" a files size if it uniquely refers to it (no table
510 // and no other snapshot references it).
511 if (filter.test(storeFileName)) {
512 regionReference.addFamilyStoreFile(familyName, storeFileName);
516 // Only add this Region reference if we retained any files.
517 if (!regionReference.getFamilyToFilesMapping().isEmpty()) {
518 references.add(regionReference);
521 return references;
525 * Writes the snapshot sizes to the provided {@code table}.
527 void persistSnapshotSizes(
528 Table table, List<SnapshotWithSize> snapshotSizes) throws IOException {
529 // Convert each entry in the map to a Put and write them to the quota table
530 table.put(snapshotSizes
531 .stream()
532 .map(sws -> QuotaTableUtil.createPutForSnapshotSize(
533 tn, sws.getName(), sws.getSize()))
534 .collect(Collectors.toList()));
538 * A struct encapsulating the name of a snapshot and its "size" on the filesystem. This size is
539 * defined as the amount of filesystem space taken by the files the snapshot refers to which
540 * the originating table no longer refers to.
542 static class SnapshotWithSize {
543 private final String name;
544 private final long size;
546 SnapshotWithSize(String name, long size) {
547 this.name = Objects.requireNonNull(name);
548 this.size = size;
551 String getName() {
552 return name;
555 long getSize() {
556 return size;
559 @Override
560 public int hashCode() {
561 return new HashCodeBuilder().append(name).append(size).toHashCode();
564 @Override
565 public boolean equals(Object o) {
566 if (this == o) {
567 return true;
570 if (!(o instanceof SnapshotWithSize)) {
571 return false;
574 SnapshotWithSize other = (SnapshotWithSize) o;
575 return name.equals(other.name) && size == other.size;
578 @Override
579 public String toString() {
580 StringBuilder sb = new StringBuilder(32);
581 return sb.append("SnapshotWithSize:[").append(name).append(" ")
582 .append(StringUtils.byteDesc(size)).append("]").toString();
587 * A reference to a collection of files in the archive directory for a single region.
589 static class StoreFileReference {
590 private final String regionName;
591 private final Multimap<String,String> familyToFiles;
593 StoreFileReference(String regionName) {
594 this.regionName = Objects.requireNonNull(regionName);
595 familyToFiles = HashMultimap.create();
598 String getRegionName() {
599 return regionName;
602 Multimap<String,String> getFamilyToFilesMapping() {
603 return familyToFiles;
606 void addFamilyStoreFile(String family, String storeFileName) {
607 familyToFiles.put(family, storeFileName);
610 @Override
611 public int hashCode() {
612 return new HashCodeBuilder().append(regionName).append(familyToFiles).toHashCode();
615 @Override
616 public boolean equals(Object o) {
617 if (this == o) {
618 return true;
620 if (!(o instanceof StoreFileReference)) {
621 return false;
623 StoreFileReference other = (StoreFileReference) o;
624 return regionName.equals(other.regionName) && familyToFiles.equals(other.familyToFiles);
627 @Override
628 public String toString() {
629 StringBuilder sb = new StringBuilder();
630 return sb.append("StoreFileReference[region=").append(regionName).append(", files=")
631 .append(familyToFiles).append("]").toString();