2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.wal
;
20 import java
.io
.IOException
;
21 import java
.util
.ArrayList
;
22 import java
.util
.List
;
25 import java
.util
.TreeMap
;
26 import java
.util
.TreeSet
;
28 import org
.apache
.hadoop
.hbase
.TableName
;
29 import org
.apache
.hadoop
.hbase
.io
.HeapSize
;
30 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
31 import org
.apache
.hadoop
.hbase
.util
.ClassSize
;
32 import org
.apache
.hadoop
.hbase
.wal
.WALSplitter
.PipelineController
;
33 import org
.apache
.yetus
.audience
.InterfaceAudience
;
34 import org
.slf4j
.Logger
;
35 import org
.slf4j
.LoggerFactory
;
38 * Class which accumulates edits and separates them into a buffer per region while simultaneously
39 * accounting RAM usage. Blocks if the RAM usage crosses a predefined threshold. Writer threads then
40 * pull region-specific buffers from this class.
42 @InterfaceAudience.Private
44 private static final Logger LOG
= LoggerFactory
.getLogger(EntryBuffers
.class);
46 private final PipelineController controller
;
48 final Map
<byte[], RegionEntryBuffer
> buffers
= new TreeMap
<>(Bytes
.BYTES_COMPARATOR
);
51 * Track which regions are currently in the middle of writing. We don't allow an IO thread to pick
52 * up bytes from a region if we're already writing data for that region in a different IO thread.
54 private final Set
<byte[]> currentlyWriting
= new TreeSet
<>(Bytes
.BYTES_COMPARATOR
);
56 protected long totalBuffered
= 0;
57 protected final long maxHeapUsage
;
59 public EntryBuffers(PipelineController controller
, long maxHeapUsage
) {
60 this.controller
= controller
;
61 this.maxHeapUsage
= maxHeapUsage
;
65 * Append a log entry into the corresponding region buffer. Blocks if the total heap usage has
66 * crossed the specified threshold.
68 void appendEntry(WAL
.Entry entry
) throws InterruptedException
, IOException
{
69 WALKey key
= entry
.getKey();
70 RegionEntryBuffer buffer
;
73 buffer
= buffers
.get(key
.getEncodedRegionName());
75 buffer
= new RegionEntryBuffer(key
.getTableName(), key
.getEncodedRegionName());
76 buffers
.put(key
.getEncodedRegionName(), buffer
);
78 incrHeap
= buffer
.appendEntry(entry
);
81 // If we crossed the chunk threshold, wait for more space to be available
82 synchronized (controller
.dataAvailable
) {
83 totalBuffered
+= incrHeap
;
84 while (totalBuffered
> maxHeapUsage
&& controller
.thrown
.get() == null) {
85 LOG
.debug("Used {} bytes of buffered edits, waiting for IO threads", totalBuffered
);
86 controller
.dataAvailable
.wait(2000);
88 controller
.dataAvailable
.notifyAll();
90 controller
.checkForErrors();
94 * @return RegionEntryBuffer a buffer of edits to be written.
96 synchronized RegionEntryBuffer
getChunkToWrite() {
98 byte[] biggestBufferKey
= null;
100 for (Map
.Entry
<byte[], RegionEntryBuffer
> entry
: buffers
.entrySet()) {
101 long size
= entry
.getValue().heapSize();
102 if (size
> biggestSize
&& (!currentlyWriting
.contains(entry
.getKey()))) {
104 biggestBufferKey
= entry
.getKey();
107 if (biggestBufferKey
== null) {
111 RegionEntryBuffer buffer
= buffers
.remove(biggestBufferKey
);
112 currentlyWriting
.add(biggestBufferKey
);
116 void doneWriting(RegionEntryBuffer buffer
) {
117 synchronized (this) {
118 boolean removed
= currentlyWriting
.remove(buffer
.encodedRegionName
);
121 long size
= buffer
.heapSize();
123 synchronized (controller
.dataAvailable
) {
124 totalBuffered
-= size
;
125 // We may unblock writers
126 controller
.dataAvailable
.notifyAll();
130 synchronized boolean isRegionCurrentlyWriting(byte[] region
) {
131 return currentlyWriting
.contains(region
);
135 * A buffer of some number of edits for a given region.
136 * This accumulates edits and also provides a memory optimization in order to
137 * share a single byte array instance for the table and region name.
138 * Also tracks memory usage of the accumulated edits.
140 static class RegionEntryBuffer
implements HeapSize
{
141 private long heapInBuffer
= 0;
142 final List
<WAL
.Entry
> entryBuffer
;
143 final TableName tableName
;
144 final byte[] encodedRegionName
;
146 RegionEntryBuffer(TableName tableName
, byte[] region
) {
147 this.tableName
= tableName
;
148 this.encodedRegionName
= region
;
149 this.entryBuffer
= new ArrayList
<>();
152 long appendEntry(WAL
.Entry entry
) {
154 entryBuffer
.add(entry
);
155 // TODO linkedlist entry
156 long incrHeap
= entry
.getEdit().heapSize() +
157 ClassSize
.align(2 * ClassSize
.REFERENCE
); // WALKey pointers
158 heapInBuffer
+= incrHeap
;
162 private void internify(WAL
.Entry entry
) {
163 WALKeyImpl k
= entry
.getKey();
164 k
.internTableName(this.tableName
);
165 k
.internEncodedRegionName(this.encodedRegionName
);
169 public long heapSize() {
173 public byte[] getEncodedRegionName() {
174 return encodedRegionName
;
177 public TableName
getTableName() {