HBASE-26921 Rewrite the counting cells part in TestMultiVersions (#4316)
[hbase.git] / hbase-common / src / main / java / org / apache / hadoop / hbase / util / ByteBufferArray.java
blob0c4c52f99fe7fab71a8cd7fca0ef7d1cd25552c4
1 /**
2 * Copyright The Apache Software Foundation
4 * Licensed to the Apache Software Foundation (ASF) under one or more
5 * contributor license agreements. See the NOTICE file distributed with this
6 * work for additional information regarding copyright ownership. The ASF
7 * licenses this file to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 * License for the specific language governing permissions and limitations
17 * under the License.
19 package org.apache.hadoop.hbase.util;
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.util.ArrayList;
24 import java.util.Iterator;
25 import java.util.List;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.Future;
29 import java.util.function.BiConsumer;
31 import org.apache.hadoop.hbase.nio.ByteBuff;
32 import org.apache.hadoop.util.StringUtils;
33 import org.apache.yetus.audience.InterfaceAudience;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
37 /**
38 * This class manages an array of ByteBuffers with a default size 4MB. These buffers are sequential
39 * and could be considered as a large buffer.It supports reading/writing data from this large buffer
40 * with a position and offset
42 @InterfaceAudience.Private
43 public class ByteBufferArray {
44 private static final Logger LOG = LoggerFactory.getLogger(ByteBufferArray.class);
46 public static final int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
47 private final int bufferSize;
48 private final int bufferCount;
49 final ByteBuffer[] buffers;
51 /**
52 * We allocate a number of byte buffers as the capacity.
53 * @param capacity total size of the byte buffer array
54 * @param allocator the ByteBufferAllocator that will create the buffers
55 * @throws IOException throws IOException if there is an exception thrown by the allocator
57 public ByteBufferArray(long capacity, ByteBufferAllocator allocator) throws IOException {
58 this(getBufferSize(capacity), getBufferCount(capacity),
59 Runtime.getRuntime().availableProcessors(), capacity, allocator);
62 ByteBufferArray(int bufferSize, int bufferCount, int threadCount, long capacity,
63 ByteBufferAllocator alloc) throws IOException {
64 this.bufferSize = bufferSize;
65 this.bufferCount = bufferCount;
66 LOG.info("Allocating buffers total={}, sizePerBuffer={}, count={}",
67 StringUtils.byteDesc(capacity), StringUtils.byteDesc(bufferSize), bufferCount);
68 this.buffers = new ByteBuffer[bufferCount];
69 createBuffers(threadCount, alloc);
72 private void createBuffers(int threadCount, ByteBufferAllocator alloc) throws IOException {
73 ExecutorService pool = Executors.newFixedThreadPool(threadCount);
74 int perThreadCount = bufferCount / threadCount;
75 int reminder = bufferCount % threadCount;
76 try {
77 List<Future<ByteBuffer[]>> futures = new ArrayList<>(threadCount);
78 // Dispatch the creation task to each thread.
79 for (int i = 0; i < threadCount; i++) {
80 final int chunkSize = perThreadCount + ((i == threadCount - 1) ? reminder : 0);
81 futures.add(pool.submit(() -> {
82 ByteBuffer[] chunk = new ByteBuffer[chunkSize];
83 for (int k = 0; k < chunkSize; k++) {
84 chunk[k] = alloc.allocate(bufferSize);
86 return chunk;
87 }));
89 // Append the buffers created by each thread.
90 int bufferIndex = 0;
91 try {
92 for (Future<ByteBuffer[]> f : futures) {
93 for (ByteBuffer b : f.get()) {
94 this.buffers[bufferIndex++] = b;
97 assert bufferIndex == bufferCount;
98 } catch (Exception e) {
99 LOG.error("Buffer creation interrupted", e);
100 throw new IOException(e);
102 } finally {
103 pool.shutdownNow();
107 static int getBufferSize(long capacity) {
108 int bufferSize = DEFAULT_BUFFER_SIZE;
109 if (bufferSize > (capacity / 16)) {
110 bufferSize = (int) roundUp(capacity / 16, 32768);
112 return bufferSize;
115 private static int getBufferCount(long capacity) {
116 int bufferSize = getBufferSize(capacity);
117 return (int) (roundUp(capacity, bufferSize) / bufferSize);
120 private static long roundUp(long n, long to) {
121 return ((n + to - 1) / to) * to;
125 * Transfers bytes from this buffers array into the given destination {@link ByteBuff}
126 * @param offset start position in this big logical array.
127 * @param dst the destination ByteBuff. Notice that its position will be advanced.
128 * @return number of bytes read
130 public int read(long offset, ByteBuff dst) {
131 return internalTransfer(offset, dst, READER);
135 * Transfers bytes from the given source {@link ByteBuff} into this buffer array
136 * @param offset start offset of this big logical array.
137 * @param src the source ByteBuff. Notice that its position will be advanced.
138 * @return number of bytes write
140 public int write(long offset, ByteBuff src) {
141 return internalTransfer(offset, src, WRITER);
145 * Transfer bytes from source {@link ByteBuff} to destination {@link ByteBuffer}. Position of both
146 * source and destination will be advanced.
148 private static final BiConsumer<ByteBuffer, ByteBuff> WRITER = (dst, src) -> {
149 int off = src.position(), len = dst.remaining();
150 src.get(dst, off, len);
151 src.position(off + len);
155 * Transfer bytes from source {@link ByteBuffer} to destination {@link ByteBuff}, Position of both
156 * source and destination will be advanced.
158 private static final BiConsumer<ByteBuffer, ByteBuff> READER = (src, dst) -> {
159 int off = dst.position(), len = src.remaining(), srcOff = src.position();
160 dst.put(off, ByteBuff.wrap(src), srcOff, len);
161 src.position(srcOff + len);
162 dst.position(off + len);
166 * Transferring all remaining bytes from b to the buffers array starting at offset, or
167 * transferring bytes from the buffers array at offset to b until b is filled. Notice that
168 * position of ByteBuff b will be advanced.
169 * @param offset where we start in the big logical array.
170 * @param b the ByteBuff to transfer from or to
171 * @param transfer the transfer interface.
172 * @return the length of bytes we transferred.
174 private int internalTransfer(long offset, ByteBuff b, BiConsumer<ByteBuffer, ByteBuff> transfer) {
175 int expectedTransferLen = b.remaining();
176 if (expectedTransferLen == 0) {
177 return 0;
179 BufferIterator it = new BufferIterator(offset, expectedTransferLen);
180 while (it.hasNext()) {
181 ByteBuffer a = it.next();
182 transfer.accept(a, b);
183 assert !a.hasRemaining();
185 assert expectedTransferLen == it.getSum() : "Expected transfer length (=" + expectedTransferLen
186 + ") don't match the actual transfer length(=" + it.getSum() + ")";
187 return expectedTransferLen;
191 * Creates a sub-array from a given array of ByteBuffers from the given offset to the length
192 * specified. For eg, if there are 4 buffers forming an array each with length 10 and if we call
193 * asSubByteBuffers(5, 10) then we will create an sub-array consisting of two BBs and the first
194 * one be a BB from 'position' 5 to a 'length' 5 and the 2nd BB will be from 'position' 0 to
195 * 'length' 5.
196 * @param offset the position in the whole array which is composited by multiple byte buffers.
197 * @param len the length of bytes
198 * @return the underlying ByteBuffers, each ByteBuffer is a slice from the backend and will have a
199 * zero position.
201 public ByteBuffer[] asSubByteBuffers(long offset, final int len) {
202 BufferIterator it = new BufferIterator(offset, len);
203 ByteBuffer[] mbb = new ByteBuffer[it.getBufferCount()];
204 for (int i = 0; i < mbb.length; i++) {
205 assert it.hasNext();
206 mbb[i] = it.next();
208 assert it.getSum() == len;
209 return mbb;
213 * Iterator to fetch ByteBuffers from offset with given length in this big logical array.
215 private class BufferIterator implements Iterator<ByteBuffer> {
216 private final int len;
217 private int startBuffer, startOffset, endBuffer, endOffset;
218 private int curIndex, sum = 0;
220 private int index(long pos) {
221 return (int) (pos / bufferSize);
224 private int offset(long pos) {
225 return (int) (pos % bufferSize);
228 public BufferIterator(long offset, int len) {
229 assert len >= 0 && offset >= 0;
230 this.len = len;
232 this.startBuffer = index(offset);
233 this.startOffset = offset(offset);
235 this.endBuffer = index(offset + len);
236 this.endOffset = offset(offset + len);
237 if (startBuffer < endBuffer && endOffset == 0) {
238 endBuffer--;
239 endOffset = bufferSize;
241 assert startBuffer >= 0 && startBuffer < bufferCount;
242 assert endBuffer >= 0 && endBuffer < bufferCount;
244 // initialize the index to the first buffer index.
245 this.curIndex = startBuffer;
248 @Override
249 public boolean hasNext() {
250 return this.curIndex <= endBuffer;
254 * The returned ByteBuffer is an sliced one, it won't affect the position or limit of the
255 * original one.
257 @Override
258 public ByteBuffer next() {
259 ByteBuffer bb = buffers[curIndex].duplicate();
260 if (curIndex == startBuffer) {
261 bb.position(startOffset).limit(Math.min(bufferSize, startOffset + len));
262 } else if (curIndex == endBuffer) {
263 bb.position(0).limit(endOffset);
264 } else {
265 bb.position(0).limit(bufferSize);
267 curIndex++;
268 sum += bb.remaining();
269 // Make sure that its pos is zero, it's important because MBB will count from zero for all nio
270 // ByteBuffers.
271 return bb.slice();
274 int getSum() {
275 return sum;
278 int getBufferCount() {
279 return this.endBuffer - this.startBuffer + 1;