2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.wal
;
20 import java
.io
.IOException
;
21 import java
.io
.InterruptedIOException
;
22 import java
.util
.List
;
24 import java
.util
.TreeMap
;
25 import java
.util
.concurrent
.ConcurrentHashMap
;
26 import java
.util
.concurrent
.ConcurrentMap
;
27 import java
.util
.concurrent
.ExecutionException
;
28 import java
.util
.concurrent
.Future
;
29 import org
.apache
.hadoop
.fs
.Path
;
30 import org
.apache
.hadoop
.hbase
.TableName
;
31 import org
.apache
.hadoop
.hbase
.monitoring
.MonitoredTask
;
32 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
33 import org
.apache
.hadoop
.io
.MultipleIOException
;
34 import org
.apache
.yetus
.audience
.InterfaceAudience
;
35 import org
.slf4j
.Logger
;
36 import org
.slf4j
.LoggerFactory
;
37 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.Lists
;
40 * Class that manages the output streams from the log splitting process.
41 * Every region only has one recovered edits file PER split WAL (if we split
42 * multiple WALs during a log-splitting session, on open, a Region may
43 * have multiple recovered.edits files to replay -- one per split WAL).
44 * @see BoundedRecoveredEditsOutputSink which is like this class but imposes upper bound on
45 * the number of writers active at one time (makes for better throughput).
47 @InterfaceAudience.Private
48 class RecoveredEditsOutputSink
extends AbstractRecoveredEditsOutputSink
{
49 private static final Logger LOG
= LoggerFactory
.getLogger(RecoveredEditsOutputSink
.class);
50 private ConcurrentMap
<String
, RecoveredEditsWriter
> writers
= new ConcurrentHashMap
<>();
52 public RecoveredEditsOutputSink(WALSplitter walSplitter
,
53 WALSplitter
.PipelineController controller
, EntryBuffers entryBuffers
, int numWriters
) {
54 super(walSplitter
, controller
, entryBuffers
, numWriters
);
58 public void append(EntryBuffers
.RegionEntryBuffer buffer
)
60 List
<WAL
.Entry
> entries
= buffer
.entryBuffer
;
61 if (entries
.isEmpty()) {
62 LOG
.warn("got an empty buffer, skipping");
65 RecoveredEditsWriter writer
=
66 getRecoveredEditsWriter(buffer
.tableName
, buffer
.encodedRegionName
,
67 entries
.get(0).getKey().getSequenceId());
69 writer
.writeRegionEntries(entries
);
74 * Get a writer and path for a log starting at the given entry. This function is threadsafe so
75 * long as multiple threads are always acting on different regions.
76 * @return null if this region shouldn't output any logs
78 private RecoveredEditsWriter
getRecoveredEditsWriter(TableName tableName
, byte[] region
,
79 long seqId
) throws IOException
{
80 RecoveredEditsWriter ret
= writers
.get(Bytes
.toString(region
));
84 ret
= createRecoveredEditsWriter(tableName
, region
, seqId
);
88 LOG
.trace("Created {}", ret
.path
);
89 writers
.put(Bytes
.toString(region
), ret
);
94 public List
<Path
> close() throws IOException
{
95 boolean isSuccessful
= true;
97 isSuccessful
= finishWriterThreads();
99 isSuccessful
&= closeWriters();
101 return isSuccessful ? splits
: null;
105 * Close all of the output streams.
107 * @return true when there is no error.
109 private boolean closeWriters() throws IOException
{
110 List
<IOException
> thrown
= Lists
.newArrayList();
111 for (RecoveredEditsWriter writer
: writers
.values()) {
112 closeCompletionService
.submit(() -> {
113 Path dst
= closeRecoveredEditsWriter(writer
, thrown
);
114 LOG
.trace("Closed {}", dst
);
119 boolean progressFailed
= false;
121 for (int i
= 0, n
= this.writers
.size(); i
< n
; i
++) {
122 Future
<Void
> future
= closeCompletionService
.take();
124 if (!progressFailed
&& reporter
!= null && !reporter
.progress()) {
125 progressFailed
= true;
128 } catch (InterruptedException e
) {
129 IOException iie
= new InterruptedIOException();
132 } catch (ExecutionException e
) {
133 throw new IOException(e
.getCause());
135 closeThreadPool
.shutdownNow();
137 if (!thrown
.isEmpty()) {
138 throw MultipleIOException
.createIOException(thrown
);
140 return !progressFailed
;
144 public Map
<String
, Long
> getOutputCounts() {
145 TreeMap
<String
, Long
> ret
= new TreeMap
<>();
146 for (Map
.Entry
<String
, RecoveredEditsWriter
> entry
: writers
.entrySet()) {
147 ret
.put(entry
.getKey(), entry
.getValue().editsWritten
);
153 public int getNumberOfRecoveredRegions() {
154 return writers
.size();
158 int getNumOpenWriters() {
159 return writers
.size();