HBASE-26921 Rewrite the counting cells part in TestMultiVersions (#4316)
[hbase.git] / hbase-server / src / main / java / org / apache / hadoop / hbase / wal / RecoveredEditsOutputSink.java
blob645af60efcb4151a0b694a57bc6f154e5e42196d
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.wal;
20 import java.io.IOException;
21 import java.io.InterruptedIOException;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.TreeMap;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ConcurrentMap;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.Future;
29 import org.apache.hadoop.fs.Path;
30 import org.apache.hadoop.hbase.TableName;
31 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
32 import org.apache.hadoop.hbase.util.Bytes;
33 import org.apache.hadoop.io.MultipleIOException;
34 import org.apache.yetus.audience.InterfaceAudience;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
39 /**
40 * Class that manages the output streams from the log splitting process.
41 * Every region only has one recovered edits file PER split WAL (if we split
42 * multiple WALs during a log-splitting session, on open, a Region may
43 * have multiple recovered.edits files to replay -- one per split WAL).
44 * @see BoundedRecoveredEditsOutputSink which is like this class but imposes upper bound on
45 * the number of writers active at one time (makes for better throughput).
47 @InterfaceAudience.Private
48 class RecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink {
49 private static final Logger LOG = LoggerFactory.getLogger(RecoveredEditsOutputSink.class);
50 private ConcurrentMap<String, RecoveredEditsWriter> writers = new ConcurrentHashMap<>();
52 public RecoveredEditsOutputSink(WALSplitter walSplitter,
53 WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
54 super(walSplitter, controller, entryBuffers, numWriters);
57 @Override
58 public void append(EntryBuffers.RegionEntryBuffer buffer)
59 throws IOException {
60 List<WAL.Entry> entries = buffer.entryBuffer;
61 if (entries.isEmpty()) {
62 LOG.warn("got an empty buffer, skipping");
63 return;
65 RecoveredEditsWriter writer =
66 getRecoveredEditsWriter(buffer.tableName, buffer.encodedRegionName,
67 entries.get(0).getKey().getSequenceId());
68 if (writer != null) {
69 writer.writeRegionEntries(entries);
73 /**
74 * Get a writer and path for a log starting at the given entry. This function is threadsafe so
75 * long as multiple threads are always acting on different regions.
76 * @return null if this region shouldn't output any logs
78 private RecoveredEditsWriter getRecoveredEditsWriter(TableName tableName, byte[] region,
79 long seqId) throws IOException {
80 RecoveredEditsWriter ret = writers.get(Bytes.toString(region));
81 if (ret != null) {
82 return ret;
84 ret = createRecoveredEditsWriter(tableName, region, seqId);
85 if (ret == null) {
86 return null;
88 LOG.trace("Created {}", ret.path);
89 writers.put(Bytes.toString(region), ret);
90 return ret;
93 @Override
94 public List<Path> close() throws IOException {
95 boolean isSuccessful = true;
96 try {
97 isSuccessful = finishWriterThreads();
98 } finally {
99 isSuccessful &= closeWriters();
101 return isSuccessful ? splits : null;
105 * Close all of the output streams.
107 * @return true when there is no error.
109 private boolean closeWriters() throws IOException {
110 List<IOException> thrown = Lists.newArrayList();
111 for (RecoveredEditsWriter writer : writers.values()) {
112 closeCompletionService.submit(() -> {
113 Path dst = closeRecoveredEditsWriter(writer, thrown);
114 LOG.trace("Closed {}", dst);
115 splits.add(dst);
116 return null;
119 boolean progressFailed = false;
120 try {
121 for (int i = 0, n = this.writers.size(); i < n; i++) {
122 Future<Void> future = closeCompletionService.take();
123 future.get();
124 if (!progressFailed && reporter != null && !reporter.progress()) {
125 progressFailed = true;
128 } catch (InterruptedException e) {
129 IOException iie = new InterruptedIOException();
130 iie.initCause(e);
131 throw iie;
132 } catch (ExecutionException e) {
133 throw new IOException(e.getCause());
134 } finally {
135 closeThreadPool.shutdownNow();
137 if (!thrown.isEmpty()) {
138 throw MultipleIOException.createIOException(thrown);
140 return !progressFailed;
143 @Override
144 public Map<String, Long> getOutputCounts() {
145 TreeMap<String, Long> ret = new TreeMap<>();
146 for (Map.Entry<String, RecoveredEditsWriter> entry : writers.entrySet()) {
147 ret.put(entry.getKey(), entry.getValue().editsWritten);
149 return ret;
152 @Override
153 public int getNumberOfRecoveredRegions() {
154 return writers.size();
157 @Override
158 int getNumOpenWriters() {
159 return writers.size();