HBASE-26412 Handle sink failure in RegionReplicationSink (#3815)
[hbase.git] / hbase-server / src / main / java / org / apache / hadoop / hbase / regionserver / AbstractMultiFileWriter.java
blob82c3867c103c529b4c261f4a185bb71e19f31eb9
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.regionserver;
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.Collection;
23 import java.util.Collections;
24 import java.util.List;
25 import org.apache.hadoop.fs.Path;
26 import org.apache.yetus.audience.InterfaceAudience;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
30 /**
31 * Base class for cell sink that separates the provided cells into multiple files.
33 @InterfaceAudience.Private
34 public abstract class AbstractMultiFileWriter implements CellSink, ShipperListener {
36 private static final Logger LOG = LoggerFactory.getLogger(AbstractMultiFileWriter.class);
38 /** Factory that is used to produce single StoreFile.Writer-s */
39 protected WriterFactory writerFactory;
41 /** Source scanner that is tracking KV count; may be null if source is not StoreScanner */
42 protected StoreScanner sourceScanner;
44 public interface WriterFactory {
45 public StoreFileWriter createWriter() throws IOException;
46 default StoreFileWriter createWriterWithStoragePolicy(String fileStoragePolicy)
47 throws IOException {
48 return createWriter();
52 /**
53 * Initializes multi-writer before usage.
54 * @param sourceScanner Optional store scanner to obtain the information about read progress.
55 * @param factory Factory used to produce individual file writers.
57 public void init(StoreScanner sourceScanner, WriterFactory factory) {
58 this.writerFactory = factory;
59 this.sourceScanner = sourceScanner;
62 /**
63 * Commit all writers.
64 * <p>
65 * Notice that here we use the same <code>maxSeqId</code> for all output files since we haven't
66 * find an easy to find enough sequence ids for different output files in some corner cases. See
67 * comments in HBASE-15400 for more details.
69 public List<Path> commitWriters(long maxSeqId, boolean majorCompaction) throws IOException {
70 return commitWriters(maxSeqId, majorCompaction, Collections.EMPTY_SET);
73 public List<Path> commitWriters(long maxSeqId, boolean majorCompaction,
74 Collection<HStoreFile> storeFiles) throws IOException {
75 preCommitWriters();
76 Collection<StoreFileWriter> writers = this.writers();
77 if (LOG.isDebugEnabled()) {
78 LOG.debug(
79 "Commit " + writers.size() + " writers, maxSeqId=" + maxSeqId + ", majorCompaction=" +
80 majorCompaction);
82 List<Path> paths = new ArrayList<>();
83 for (StoreFileWriter writer : writers) {
84 if (writer == null) {
85 continue;
87 writer.appendMetadata(maxSeqId, majorCompaction, storeFiles);
88 preCloseWriter(writer);
89 paths.add(writer.getPath());
90 writer.close();
92 return paths;
95 /**
96 * Close all writers without throwing any exceptions. This is used when compaction failed usually.
98 public List<Path> abortWriters() {
99 List<Path> paths = new ArrayList<>();
100 for (StoreFileWriter writer : writers()) {
101 try {
102 if (writer != null) {
103 paths.add(writer.getPath());
104 writer.close();
106 } catch (Exception ex) {
107 LOG.error("Failed to close the writer after an unfinished compaction.", ex);
110 return paths;
114 * Returns all writers. This is used to prevent deleting currently writen storefiles
115 * during cleanup.
117 public abstract Collection<StoreFileWriter> writers();
120 * Subclasses override this method to be called at the end of a successful sequence of append; all
121 * appends are processed before this method is called.
123 protected void preCommitWriters() throws IOException {
127 * Subclasses override this method to be called before we close the give writer. Usually you can
128 * append extra metadata to the writer.
130 protected void preCloseWriter(StoreFileWriter writer) throws IOException {
133 @Override
134 public void beforeShipped() throws IOException {
135 Collection<StoreFileWriter> writers = writers();
136 if (writers != null) {
137 for (StoreFileWriter writer : writers) {
138 if (writer != null) {
139 writer.beforeShipped();