HBASE-19497 Fix findbugs and error-prone warnings in hbase-common (branch-2)
[hbase.git] / hbase-common / src / main / java / org / apache / hadoop / hbase / util / ByteBufferArray.java
blobb2e5c9b751efa2388a7ebcf10da6c3ce9b2cf118
1 /**
2 * Copyright The Apache Software Foundation
4 * Licensed to the Apache Software Foundation (ASF) under one or more
5 * contributor license agreements. See the NOTICE file distributed with this
6 * work for additional information regarding copyright ownership. The ASF
7 * licenses this file to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 * License for the specific language governing permissions and limitations
17 * under the License.
19 package org.apache.hadoop.hbase.util;
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.util.concurrent.Callable;
24 import java.util.concurrent.ExecutionException;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.Future;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.ThreadPoolExecutor;
29 import java.util.concurrent.TimeUnit;
31 import com.google.common.annotations.VisibleForTesting;
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.hbase.nio.ByteBuff;
35 import org.apache.hadoop.hbase.nio.MultiByteBuff;
36 import org.apache.hadoop.hbase.nio.SingleByteBuff;
37 import org.apache.hadoop.util.StringUtils;
38 import org.apache.yetus.audience.InterfaceAudience;
40 /**
41 * This class manages an array of ByteBuffers with a default size 4MB. These
42 * buffers are sequential and could be considered as a large buffer.It supports
43 * reading/writing data from this large buffer with a position and offset
45 @InterfaceAudience.Private
46 public class ByteBufferArray {
47 private static final Log LOG = LogFactory.getLog(ByteBufferArray.class);
49 public static final int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
50 @VisibleForTesting
51 ByteBuffer buffers[];
52 private int bufferSize;
53 @VisibleForTesting
54 int bufferCount;
56 /**
57 * We allocate a number of byte buffers as the capacity. In order not to out
58 * of the array bounds for the last byte(see {@link ByteBufferArray#multiple}),
59 * we will allocate one additional buffer with capacity 0;
60 * @param capacity total size of the byte buffer array
61 * @param allocator the ByteBufferAllocator that will create the buffers
62 * @throws IOException throws IOException if there is an exception thrown by the allocator
64 public ByteBufferArray(long capacity, ByteBufferAllocator allocator)
65 throws IOException {
66 this.bufferSize = DEFAULT_BUFFER_SIZE;
67 if (this.bufferSize > (capacity / 16))
68 this.bufferSize = (int) roundUp(capacity / 16, 32768);
69 this.bufferCount = (int) (roundUp(capacity, bufferSize) / bufferSize);
70 LOG.info("Allocating buffers total=" + StringUtils.byteDesc(capacity)
71 + ", sizePerBuffer=" + StringUtils.byteDesc(bufferSize) + ", count="
72 + bufferCount);
73 buffers = new ByteBuffer[bufferCount + 1];
74 createBuffers(allocator);
77 @VisibleForTesting
78 void createBuffers(ByteBufferAllocator allocator)
79 throws IOException {
80 int threadCount = getThreadCount();
81 ExecutorService service = new ThreadPoolExecutor(threadCount, threadCount, 0L,
82 TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
83 int perThreadCount = (int)Math.floor((double) (bufferCount) / threadCount);
84 int lastThreadCount = bufferCount - (perThreadCount * (threadCount - 1));
85 Future<ByteBuffer[]>[] futures = new Future[threadCount];
86 try {
87 for (int i = 0; i < threadCount; i++) {
88 // Last thread will have to deal with a different number of buffers
89 int buffersToCreate = (i == threadCount - 1) ? lastThreadCount : perThreadCount;
90 futures[i] = service.submit(
91 new BufferCreatorCallable(bufferSize, buffersToCreate, allocator));
93 int bufferIndex = 0;
94 for (Future<ByteBuffer[]> future : futures) {
95 try {
96 ByteBuffer[] buffers = future.get();
97 for (ByteBuffer buffer : buffers) {
98 this.buffers[bufferIndex++] = buffer;
100 } catch (InterruptedException | ExecutionException e) {
101 LOG.error("Buffer creation interrupted", e);
102 throw new IOException(e);
105 } finally {
106 service.shutdownNow();
108 // always create on heap empty dummy buffer at last
109 this.buffers[bufferCount] = ByteBuffer.allocate(0);
112 @VisibleForTesting
113 int getThreadCount() {
114 return Runtime.getRuntime().availableProcessors();
118 * A callable that creates buffers of the specified length either onheap/offheap using the
119 * {@link ByteBufferAllocator}
121 private static class BufferCreatorCallable implements Callable<ByteBuffer[]> {
122 private final int bufferCapacity;
123 private final int bufferCount;
124 private final ByteBufferAllocator allocator;
126 BufferCreatorCallable(int bufferCapacity, int bufferCount, ByteBufferAllocator allocator) {
127 this.bufferCapacity = bufferCapacity;
128 this.bufferCount = bufferCount;
129 this.allocator = allocator;
132 @Override
133 public ByteBuffer[] call() throws Exception {
134 ByteBuffer[] buffers = new ByteBuffer[this.bufferCount];
135 for (int i = 0; i < this.bufferCount; i++) {
136 buffers[i] = allocator.allocate(this.bufferCapacity);
138 return buffers;
142 private long roundUp(long n, long to) {
143 return ((n + to - 1) / to) * to;
147 * Transfers bytes from this buffer array into the given destination array
148 * @param start start position in the ByteBufferArray
149 * @param len The maximum number of bytes to be written to the given array
150 * @param dstArray The array into which bytes are to be written
151 * @return number of bytes read
153 public int getMultiple(long start, int len, byte[] dstArray) {
154 return getMultiple(start, len, dstArray, 0);
158 * Transfers bytes from this buffer array into the given destination array
159 * @param start start offset of this buffer array
160 * @param len The maximum number of bytes to be written to the given array
161 * @param dstArray The array into which bytes are to be written
162 * @param dstOffset The offset within the given array of the first byte to be
163 * written
164 * @return number of bytes read
166 public int getMultiple(long start, int len, byte[] dstArray, int dstOffset) {
167 multiple(start, len, dstArray, dstOffset, GET_MULTIPLE_VISTOR);
168 return len;
171 private final static Visitor GET_MULTIPLE_VISTOR = new Visitor() {
172 @Override
173 public void visit(ByteBuffer bb, int pos, byte[] array, int arrayIdx, int len) {
174 ByteBufferUtils.copyFromBufferToArray(array, bb, pos, arrayIdx, len);
179 * Transfers bytes from the given source array into this buffer array
180 * @param start start offset of this buffer array
181 * @param len The maximum number of bytes to be read from the given array
182 * @param srcArray The array from which bytes are to be read
184 public void putMultiple(long start, int len, byte[] srcArray) {
185 putMultiple(start, len, srcArray, 0);
189 * Transfers bytes from the given source array into this buffer array
190 * @param start start offset of this buffer array
191 * @param len The maximum number of bytes to be read from the given array
192 * @param srcArray The array from which bytes are to be read
193 * @param srcOffset The offset within the given array of the first byte to be
194 * read
196 public void putMultiple(long start, int len, byte[] srcArray, int srcOffset) {
197 multiple(start, len, srcArray, srcOffset, PUT_MULTIPLE_VISITOR);
200 private final static Visitor PUT_MULTIPLE_VISITOR = new Visitor() {
201 @Override
202 public void visit(ByteBuffer bb, int pos, byte[] array, int arrayIdx, int len) {
203 ByteBufferUtils.copyFromArrayToBuffer(bb, pos, array, arrayIdx, len);
207 private interface Visitor {
209 * Visit the given byte buffer, if it is a read action, we will transfer the
210 * bytes from the buffer to the destination array, else if it is a write
211 * action, we will transfer the bytes from the source array to the buffer
212 * @param bb byte buffer
213 * @param pos Start position in ByteBuffer
214 * @param array a source or destination byte array
215 * @param arrayOffset offset of the byte array
216 * @param len read/write length
218 void visit(ByteBuffer bb, int pos, byte[] array, int arrayOffset, int len);
222 * Access(read or write) this buffer array with a position and length as the
223 * given array. Here we will only lock one buffer even if it may be need visit
224 * several buffers. The consistency is guaranteed by the caller.
225 * @param start start offset of this buffer array
226 * @param len The maximum number of bytes to be accessed
227 * @param array The array from/to which bytes are to be read/written
228 * @param arrayOffset The offset within the given array of the first byte to
229 * be read or written
230 * @param visitor implement of how to visit the byte buffer
232 void multiple(long start, int len, byte[] array, int arrayOffset, Visitor visitor) {
233 assert len >= 0;
234 long end = start + len;
235 int startBuffer = (int) (start / bufferSize), startOffset = (int) (start % bufferSize);
236 int endBuffer = (int) (end / bufferSize), endOffset = (int) (end % bufferSize);
237 assert array.length >= len + arrayOffset;
238 assert startBuffer >= 0 && startBuffer < bufferCount;
239 assert (endBuffer >= 0 && endBuffer < bufferCount)
240 || (endBuffer == bufferCount && endOffset == 0);
241 if (startBuffer >= buffers.length || startBuffer < 0) {
242 String msg = "Failed multiple, start=" + start + ",startBuffer="
243 + startBuffer + ",bufferSize=" + bufferSize;
244 LOG.error(msg);
245 throw new RuntimeException(msg);
247 int srcIndex = 0, cnt = -1;
248 for (int i = startBuffer; i <= endBuffer; ++i) {
249 ByteBuffer bb = buffers[i].duplicate();
250 int pos = 0;
251 if (i == startBuffer) {
252 cnt = bufferSize - startOffset;
253 if (cnt > len) cnt = len;
254 pos = startOffset;
255 } else if (i == endBuffer) {
256 cnt = endOffset;
257 } else {
258 cnt = bufferSize;
260 visitor.visit(bb, pos, array, srcIndex + arrayOffset, cnt);
261 srcIndex += cnt;
263 assert srcIndex == len;
267 * Creates a ByteBuff from a given array of ByteBuffers from the given offset to the
268 * length specified. For eg, if there are 4 buffers forming an array each with length 10 and
269 * if we call asSubBuffer(5, 10) then we will create an MBB consisting of two BBs
270 * and the first one be a BB from 'position' 5 to a 'length' 5 and the 2nd BB will be from
271 * 'position' 0 to 'length' 5.
272 * @param offset
273 * @param len
274 * @return a ByteBuff formed from the underlying ByteBuffers
276 public ByteBuff asSubByteBuff(long offset, int len) {
277 assert len >= 0;
278 long end = offset + len;
279 int startBuffer = (int) (offset / bufferSize), startBufferOffset = (int) (offset % bufferSize);
280 int endBuffer = (int) (end / bufferSize), endBufferOffset = (int) (end % bufferSize);
281 // Last buffer in the array is a dummy one with 0 capacity. Avoid sending back that
282 if (endBuffer == this.bufferCount) {
283 endBuffer--;
284 endBufferOffset = bufferSize;
286 assert startBuffer >= 0 && startBuffer < bufferCount;
287 assert (endBuffer >= 0 && endBuffer < bufferCount)
288 || (endBuffer == bufferCount && endBufferOffset == 0);
289 if (startBuffer >= buffers.length || startBuffer < 0) {
290 String msg = "Failed subArray, start=" + offset + ",startBuffer=" + startBuffer
291 + ",bufferSize=" + bufferSize;
292 LOG.error(msg);
293 throw new RuntimeException(msg);
295 int srcIndex = 0, cnt = -1;
296 ByteBuffer[] mbb = new ByteBuffer[endBuffer - startBuffer + 1];
297 for (int i = startBuffer, j = 0; i <= endBuffer; ++i, j++) {
298 ByteBuffer bb = buffers[i].duplicate();
299 if (i == startBuffer) {
300 cnt = bufferSize - startBufferOffset;
301 if (cnt > len) cnt = len;
302 bb.limit(startBufferOffset + cnt).position(startBufferOffset);
303 } else if (i == endBuffer) {
304 cnt = endBufferOffset;
305 bb.position(0).limit(cnt);
306 } else {
307 cnt = bufferSize;
308 bb.position(0).limit(cnt);
310 mbb[j] = bb.slice();
311 srcIndex += cnt;
313 assert srcIndex == len;
314 if (mbb.length > 1) {
315 return new MultiByteBuff(mbb);
316 } else {
317 return new SingleByteBuff(mbb[0]);