HBASE-26921 Rewrite the counting cells part in TestMultiVersions (#4316)
[hbase.git] / hbase-server / src / main / java / org / apache / hadoop / hbase / wal / EntryBuffers.java
blob0ca1219bd26f5bf3da58c1a629de731f1b036651
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.wal;
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Set;
25 import java.util.TreeMap;
26 import java.util.TreeSet;
28 import org.apache.hadoop.hbase.TableName;
29 import org.apache.hadoop.hbase.io.HeapSize;
30 import org.apache.hadoop.hbase.util.Bytes;
31 import org.apache.hadoop.hbase.util.ClassSize;
32 import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
33 import org.apache.yetus.audience.InterfaceAudience;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
37 /**
38 * Class which accumulates edits and separates them into a buffer per region while simultaneously
39 * accounting RAM usage. Blocks if the RAM usage crosses a predefined threshold. Writer threads then
40 * pull region-specific buffers from this class.
42 @InterfaceAudience.Private
43 class EntryBuffers {
44 private static final Logger LOG = LoggerFactory.getLogger(EntryBuffers.class);
46 private final PipelineController controller;
48 final Map<byte[], RegionEntryBuffer> buffers = new TreeMap<>(Bytes.BYTES_COMPARATOR);
51 * Track which regions are currently in the middle of writing. We don't allow an IO thread to pick
52 * up bytes from a region if we're already writing data for that region in a different IO thread.
54 private final Set<byte[]> currentlyWriting = new TreeSet<>(Bytes.BYTES_COMPARATOR);
56 protected long totalBuffered = 0;
57 protected final long maxHeapUsage;
59 public EntryBuffers(PipelineController controller, long maxHeapUsage) {
60 this.controller = controller;
61 this.maxHeapUsage = maxHeapUsage;
64 /**
65 * Append a log entry into the corresponding region buffer. Blocks if the total heap usage has
66 * crossed the specified threshold.
68 void appendEntry(WAL.Entry entry) throws InterruptedException, IOException {
69 WALKey key = entry.getKey();
70 RegionEntryBuffer buffer;
71 long incrHeap;
72 synchronized (this) {
73 buffer = buffers.get(key.getEncodedRegionName());
74 if (buffer == null) {
75 buffer = new RegionEntryBuffer(key.getTableName(), key.getEncodedRegionName());
76 buffers.put(key.getEncodedRegionName(), buffer);
78 incrHeap = buffer.appendEntry(entry);
81 // If we crossed the chunk threshold, wait for more space to be available
82 synchronized (controller.dataAvailable) {
83 totalBuffered += incrHeap;
84 while (totalBuffered > maxHeapUsage && controller.thrown.get() == null) {
85 LOG.debug("Used {} bytes of buffered edits, waiting for IO threads", totalBuffered);
86 controller.dataAvailable.wait(2000);
88 controller.dataAvailable.notifyAll();
90 controller.checkForErrors();
93 /**
94 * @return RegionEntryBuffer a buffer of edits to be written.
96 synchronized RegionEntryBuffer getChunkToWrite() {
97 long biggestSize = 0;
98 byte[] biggestBufferKey = null;
100 for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) {
101 long size = entry.getValue().heapSize();
102 if (size > biggestSize && (!currentlyWriting.contains(entry.getKey()))) {
103 biggestSize = size;
104 biggestBufferKey = entry.getKey();
107 if (biggestBufferKey == null) {
108 return null;
111 RegionEntryBuffer buffer = buffers.remove(biggestBufferKey);
112 currentlyWriting.add(biggestBufferKey);
113 return buffer;
116 void doneWriting(RegionEntryBuffer buffer) {
117 synchronized (this) {
118 boolean removed = currentlyWriting.remove(buffer.encodedRegionName);
119 assert removed;
121 long size = buffer.heapSize();
123 synchronized (controller.dataAvailable) {
124 totalBuffered -= size;
125 // We may unblock writers
126 controller.dataAvailable.notifyAll();
130 synchronized boolean isRegionCurrentlyWriting(byte[] region) {
131 return currentlyWriting.contains(region);
135 * A buffer of some number of edits for a given region.
136 * This accumulates edits and also provides a memory optimization in order to
137 * share a single byte array instance for the table and region name.
138 * Also tracks memory usage of the accumulated edits.
140 static class RegionEntryBuffer implements HeapSize {
141 private long heapInBuffer = 0;
142 final List<WAL.Entry> entryBuffer;
143 final TableName tableName;
144 final byte[] encodedRegionName;
146 RegionEntryBuffer(TableName tableName, byte[] region) {
147 this.tableName = tableName;
148 this.encodedRegionName = region;
149 this.entryBuffer = new ArrayList<>();
152 long appendEntry(WAL.Entry entry) {
153 internify(entry);
154 entryBuffer.add(entry);
155 // TODO linkedlist entry
156 long incrHeap = entry.getEdit().heapSize() +
157 ClassSize.align(2 * ClassSize.REFERENCE); // WALKey pointers
158 heapInBuffer += incrHeap;
159 return incrHeap;
162 private void internify(WAL.Entry entry) {
163 WALKeyImpl k = entry.getKey();
164 k.internTableName(this.tableName);
165 k.internEncodedRegionName(this.encodedRegionName);
168 @Override
169 public long heapSize() {
170 return heapInBuffer;
173 public byte[] getEncodedRegionName() {
174 return encodedRegionName;
177 public TableName getTableName() {
178 return tableName;