2 * Copyright The Apache Software Foundation
4 * Licensed to the Apache Software Foundation (ASF) under one or more
5 * contributor license agreements. See the NOTICE file distributed with this
6 * work for additional information regarding copyright ownership. The ASF
7 * licenses this file to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 * License for the specific language governing permissions and limitations
19 package org
.apache
.hadoop
.hbase
.util
;
21 import java
.io
.IOException
;
22 import java
.nio
.ByteBuffer
;
23 import java
.util
.concurrent
.Callable
;
24 import java
.util
.concurrent
.ExecutionException
;
25 import java
.util
.concurrent
.ExecutorService
;
26 import java
.util
.concurrent
.Future
;
27 import java
.util
.concurrent
.LinkedBlockingQueue
;
28 import java
.util
.concurrent
.ThreadPoolExecutor
;
29 import java
.util
.concurrent
.TimeUnit
;
31 import com
.google
.common
.annotations
.VisibleForTesting
;
32 import org
.apache
.commons
.logging
.Log
;
33 import org
.apache
.commons
.logging
.LogFactory
;
34 import org
.apache
.hadoop
.hbase
.nio
.ByteBuff
;
35 import org
.apache
.hadoop
.hbase
.nio
.MultiByteBuff
;
36 import org
.apache
.hadoop
.hbase
.nio
.SingleByteBuff
;
37 import org
.apache
.hadoop
.util
.StringUtils
;
38 import org
.apache
.yetus
.audience
.InterfaceAudience
;
41 * This class manages an array of ByteBuffers with a default size 4MB. These
42 * buffers are sequential and could be considered as a large buffer.It supports
43 * reading/writing data from this large buffer with a position and offset
45 @InterfaceAudience.Private
46 public class ByteBufferArray
{
47 private static final Log LOG
= LogFactory
.getLog(ByteBufferArray
.class);
49 public static final int DEFAULT_BUFFER_SIZE
= 4 * 1024 * 1024;
52 private int bufferSize
;
57 * We allocate a number of byte buffers as the capacity. In order not to out
58 * of the array bounds for the last byte(see {@link ByteBufferArray#multiple}),
59 * we will allocate one additional buffer with capacity 0;
60 * @param capacity total size of the byte buffer array
61 * @param allocator the ByteBufferAllocator that will create the buffers
62 * @throws IOException throws IOException if there is an exception thrown by the allocator
64 public ByteBufferArray(long capacity
, ByteBufferAllocator allocator
)
66 this.bufferSize
= DEFAULT_BUFFER_SIZE
;
67 if (this.bufferSize
> (capacity
/ 16))
68 this.bufferSize
= (int) roundUp(capacity
/ 16, 32768);
69 this.bufferCount
= (int) (roundUp(capacity
, bufferSize
) / bufferSize
);
70 LOG
.info("Allocating buffers total=" + StringUtils
.byteDesc(capacity
)
71 + ", sizePerBuffer=" + StringUtils
.byteDesc(bufferSize
) + ", count="
73 buffers
= new ByteBuffer
[bufferCount
+ 1];
74 createBuffers(allocator
);
78 void createBuffers(ByteBufferAllocator allocator
)
80 int threadCount
= getThreadCount();
81 ExecutorService service
= new ThreadPoolExecutor(threadCount
, threadCount
, 0L,
82 TimeUnit
.MILLISECONDS
, new LinkedBlockingQueue
<Runnable
>());
83 int perThreadCount
= (int)Math
.floor((double) (bufferCount
) / threadCount
);
84 int lastThreadCount
= bufferCount
- (perThreadCount
* (threadCount
- 1));
85 Future
<ByteBuffer
[]>[] futures
= new Future
[threadCount
];
87 for (int i
= 0; i
< threadCount
; i
++) {
88 // Last thread will have to deal with a different number of buffers
89 int buffersToCreate
= (i
== threadCount
- 1) ? lastThreadCount
: perThreadCount
;
90 futures
[i
] = service
.submit(
91 new BufferCreatorCallable(bufferSize
, buffersToCreate
, allocator
));
94 for (Future
<ByteBuffer
[]> future
: futures
) {
96 ByteBuffer
[] buffers
= future
.get();
97 for (ByteBuffer buffer
: buffers
) {
98 this.buffers
[bufferIndex
++] = buffer
;
100 } catch (InterruptedException
| ExecutionException e
) {
101 LOG
.error("Buffer creation interrupted", e
);
102 throw new IOException(e
);
106 service
.shutdownNow();
108 // always create on heap empty dummy buffer at last
109 this.buffers
[bufferCount
] = ByteBuffer
.allocate(0);
113 int getThreadCount() {
114 return Runtime
.getRuntime().availableProcessors();
118 * A callable that creates buffers of the specified length either onheap/offheap using the
119 * {@link ByteBufferAllocator}
121 private static class BufferCreatorCallable
implements Callable
<ByteBuffer
[]> {
122 private final int bufferCapacity
;
123 private final int bufferCount
;
124 private final ByteBufferAllocator allocator
;
126 BufferCreatorCallable(int bufferCapacity
, int bufferCount
, ByteBufferAllocator allocator
) {
127 this.bufferCapacity
= bufferCapacity
;
128 this.bufferCount
= bufferCount
;
129 this.allocator
= allocator
;
133 public ByteBuffer
[] call() throws Exception
{
134 ByteBuffer
[] buffers
= new ByteBuffer
[this.bufferCount
];
135 for (int i
= 0; i
< this.bufferCount
; i
++) {
136 buffers
[i
] = allocator
.allocate(this.bufferCapacity
);
142 private long roundUp(long n
, long to
) {
143 return ((n
+ to
- 1) / to
) * to
;
147 * Transfers bytes from this buffer array into the given destination array
148 * @param start start position in the ByteBufferArray
149 * @param len The maximum number of bytes to be written to the given array
150 * @param dstArray The array into which bytes are to be written
151 * @return number of bytes read
153 public int getMultiple(long start
, int len
, byte[] dstArray
) {
154 return getMultiple(start
, len
, dstArray
, 0);
158 * Transfers bytes from this buffer array into the given destination array
159 * @param start start offset of this buffer array
160 * @param len The maximum number of bytes to be written to the given array
161 * @param dstArray The array into which bytes are to be written
162 * @param dstOffset The offset within the given array of the first byte to be
164 * @return number of bytes read
166 public int getMultiple(long start
, int len
, byte[] dstArray
, int dstOffset
) {
167 multiple(start
, len
, dstArray
, dstOffset
, GET_MULTIPLE_VISTOR
);
171 private final static Visitor GET_MULTIPLE_VISTOR
= new Visitor() {
173 public void visit(ByteBuffer bb
, int pos
, byte[] array
, int arrayIdx
, int len
) {
174 ByteBufferUtils
.copyFromBufferToArray(array
, bb
, pos
, arrayIdx
, len
);
179 * Transfers bytes from the given source array into this buffer array
180 * @param start start offset of this buffer array
181 * @param len The maximum number of bytes to be read from the given array
182 * @param srcArray The array from which bytes are to be read
184 public void putMultiple(long start
, int len
, byte[] srcArray
) {
185 putMultiple(start
, len
, srcArray
, 0);
189 * Transfers bytes from the given source array into this buffer array
190 * @param start start offset of this buffer array
191 * @param len The maximum number of bytes to be read from the given array
192 * @param srcArray The array from which bytes are to be read
193 * @param srcOffset The offset within the given array of the first byte to be
196 public void putMultiple(long start
, int len
, byte[] srcArray
, int srcOffset
) {
197 multiple(start
, len
, srcArray
, srcOffset
, PUT_MULTIPLE_VISITOR
);
200 private final static Visitor PUT_MULTIPLE_VISITOR
= new Visitor() {
202 public void visit(ByteBuffer bb
, int pos
, byte[] array
, int arrayIdx
, int len
) {
203 ByteBufferUtils
.copyFromArrayToBuffer(bb
, pos
, array
, arrayIdx
, len
);
207 private interface Visitor
{
209 * Visit the given byte buffer, if it is a read action, we will transfer the
210 * bytes from the buffer to the destination array, else if it is a write
211 * action, we will transfer the bytes from the source array to the buffer
212 * @param bb byte buffer
213 * @param pos Start position in ByteBuffer
214 * @param array a source or destination byte array
215 * @param arrayOffset offset of the byte array
216 * @param len read/write length
218 void visit(ByteBuffer bb
, int pos
, byte[] array
, int arrayOffset
, int len
);
222 * Access(read or write) this buffer array with a position and length as the
223 * given array. Here we will only lock one buffer even if it may be need visit
224 * several buffers. The consistency is guaranteed by the caller.
225 * @param start start offset of this buffer array
226 * @param len The maximum number of bytes to be accessed
227 * @param array The array from/to which bytes are to be read/written
228 * @param arrayOffset The offset within the given array of the first byte to
230 * @param visitor implement of how to visit the byte buffer
232 void multiple(long start
, int len
, byte[] array
, int arrayOffset
, Visitor visitor
) {
234 long end
= start
+ len
;
235 int startBuffer
= (int) (start
/ bufferSize
), startOffset
= (int) (start
% bufferSize
);
236 int endBuffer
= (int) (end
/ bufferSize
), endOffset
= (int) (end
% bufferSize
);
237 assert array
.length
>= len
+ arrayOffset
;
238 assert startBuffer
>= 0 && startBuffer
< bufferCount
;
239 assert (endBuffer
>= 0 && endBuffer
< bufferCount
)
240 || (endBuffer
== bufferCount
&& endOffset
== 0);
241 if (startBuffer
>= buffers
.length
|| startBuffer
< 0) {
242 String msg
= "Failed multiple, start=" + start
+ ",startBuffer="
243 + startBuffer
+ ",bufferSize=" + bufferSize
;
245 throw new RuntimeException(msg
);
247 int srcIndex
= 0, cnt
= -1;
248 for (int i
= startBuffer
; i
<= endBuffer
; ++i
) {
249 ByteBuffer bb
= buffers
[i
].duplicate();
251 if (i
== startBuffer
) {
252 cnt
= bufferSize
- startOffset
;
253 if (cnt
> len
) cnt
= len
;
255 } else if (i
== endBuffer
) {
260 visitor
.visit(bb
, pos
, array
, srcIndex
+ arrayOffset
, cnt
);
263 assert srcIndex
== len
;
267 * Creates a ByteBuff from a given array of ByteBuffers from the given offset to the
268 * length specified. For eg, if there are 4 buffers forming an array each with length 10 and
269 * if we call asSubBuffer(5, 10) then we will create an MBB consisting of two BBs
270 * and the first one be a BB from 'position' 5 to a 'length' 5 and the 2nd BB will be from
271 * 'position' 0 to 'length' 5.
274 * @return a ByteBuff formed from the underlying ByteBuffers
276 public ByteBuff
asSubByteBuff(long offset
, int len
) {
278 long end
= offset
+ len
;
279 int startBuffer
= (int) (offset
/ bufferSize
), startBufferOffset
= (int) (offset
% bufferSize
);
280 int endBuffer
= (int) (end
/ bufferSize
), endBufferOffset
= (int) (end
% bufferSize
);
281 // Last buffer in the array is a dummy one with 0 capacity. Avoid sending back that
282 if (endBuffer
== this.bufferCount
) {
284 endBufferOffset
= bufferSize
;
286 assert startBuffer
>= 0 && startBuffer
< bufferCount
;
287 assert (endBuffer
>= 0 && endBuffer
< bufferCount
)
288 || (endBuffer
== bufferCount
&& endBufferOffset
== 0);
289 if (startBuffer
>= buffers
.length
|| startBuffer
< 0) {
290 String msg
= "Failed subArray, start=" + offset
+ ",startBuffer=" + startBuffer
291 + ",bufferSize=" + bufferSize
;
293 throw new RuntimeException(msg
);
295 int srcIndex
= 0, cnt
= -1;
296 ByteBuffer
[] mbb
= new ByteBuffer
[endBuffer
- startBuffer
+ 1];
297 for (int i
= startBuffer
, j
= 0; i
<= endBuffer
; ++i
, j
++) {
298 ByteBuffer bb
= buffers
[i
].duplicate();
299 if (i
== startBuffer
) {
300 cnt
= bufferSize
- startBufferOffset
;
301 if (cnt
> len
) cnt
= len
;
302 bb
.limit(startBufferOffset
+ cnt
).position(startBufferOffset
);
303 } else if (i
== endBuffer
) {
304 cnt
= endBufferOffset
;
305 bb
.position(0).limit(cnt
);
308 bb
.position(0).limit(cnt
);
313 assert srcIndex
== len
;
314 if (mbb
.length
> 1) {
315 return new MultiByteBuff(mbb
);
317 return new SingleByteBuff(mbb
[0]);