2 * Copyright The Apache Software Foundation
4 * Licensed to the Apache Software Foundation (ASF) under one or more
5 * contributor license agreements. See the NOTICE file distributed with this
6 * work for additional information regarding copyright ownership. The ASF
7 * licenses this file to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 * License for the specific language governing permissions and limitations
19 package org
.apache
.hadoop
.hbase
.util
;
21 import java
.io
.IOException
;
22 import java
.nio
.ByteBuffer
;
23 import java
.util
.ArrayList
;
24 import java
.util
.Iterator
;
25 import java
.util
.List
;
26 import java
.util
.concurrent
.ExecutorService
;
27 import java
.util
.concurrent
.Executors
;
28 import java
.util
.concurrent
.Future
;
29 import java
.util
.function
.BiConsumer
;
31 import org
.apache
.hadoop
.hbase
.nio
.ByteBuff
;
32 import org
.apache
.hadoop
.util
.StringUtils
;
33 import org
.apache
.yetus
.audience
.InterfaceAudience
;
34 import org
.slf4j
.Logger
;
35 import org
.slf4j
.LoggerFactory
;
38 * This class manages an array of ByteBuffers with a default size 4MB. These buffers are sequential
39 * and could be considered as a large buffer.It supports reading/writing data from this large buffer
40 * with a position and offset
42 @InterfaceAudience.Private
43 public class ByteBufferArray
{
44 private static final Logger LOG
= LoggerFactory
.getLogger(ByteBufferArray
.class);
46 public static final int DEFAULT_BUFFER_SIZE
= 4 * 1024 * 1024;
47 private final int bufferSize
;
48 private final int bufferCount
;
49 final ByteBuffer
[] buffers
;
52 * We allocate a number of byte buffers as the capacity.
53 * @param capacity total size of the byte buffer array
54 * @param allocator the ByteBufferAllocator that will create the buffers
55 * @throws IOException throws IOException if there is an exception thrown by the allocator
57 public ByteBufferArray(long capacity
, ByteBufferAllocator allocator
) throws IOException
{
58 this(getBufferSize(capacity
), getBufferCount(capacity
),
59 Runtime
.getRuntime().availableProcessors(), capacity
, allocator
);
62 ByteBufferArray(int bufferSize
, int bufferCount
, int threadCount
, long capacity
,
63 ByteBufferAllocator alloc
) throws IOException
{
64 this.bufferSize
= bufferSize
;
65 this.bufferCount
= bufferCount
;
66 LOG
.info("Allocating buffers total={}, sizePerBuffer={}, count={}",
67 StringUtils
.byteDesc(capacity
), StringUtils
.byteDesc(bufferSize
), bufferCount
);
68 this.buffers
= new ByteBuffer
[bufferCount
];
69 createBuffers(threadCount
, alloc
);
72 private void createBuffers(int threadCount
, ByteBufferAllocator alloc
) throws IOException
{
73 ExecutorService pool
= Executors
.newFixedThreadPool(threadCount
);
74 int perThreadCount
= bufferCount
/ threadCount
;
75 int reminder
= bufferCount
% threadCount
;
77 List
<Future
<ByteBuffer
[]>> futures
= new ArrayList
<>(threadCount
);
78 // Dispatch the creation task to each thread.
79 for (int i
= 0; i
< threadCount
; i
++) {
80 final int chunkSize
= perThreadCount
+ ((i
== threadCount
- 1) ? reminder
: 0);
81 futures
.add(pool
.submit(() -> {
82 ByteBuffer
[] chunk
= new ByteBuffer
[chunkSize
];
83 for (int k
= 0; k
< chunkSize
; k
++) {
84 chunk
[k
] = alloc
.allocate(bufferSize
);
89 // Append the buffers created by each thread.
92 for (Future
<ByteBuffer
[]> f
: futures
) {
93 for (ByteBuffer b
: f
.get()) {
94 this.buffers
[bufferIndex
++] = b
;
97 assert bufferIndex
== bufferCount
;
98 } catch (Exception e
) {
99 LOG
.error("Buffer creation interrupted", e
);
100 throw new IOException(e
);
107 static int getBufferSize(long capacity
) {
108 int bufferSize
= DEFAULT_BUFFER_SIZE
;
109 if (bufferSize
> (capacity
/ 16)) {
110 bufferSize
= (int) roundUp(capacity
/ 16, 32768);
115 private static int getBufferCount(long capacity
) {
116 int bufferSize
= getBufferSize(capacity
);
117 return (int) (roundUp(capacity
, bufferSize
) / bufferSize
);
120 private static long roundUp(long n
, long to
) {
121 return ((n
+ to
- 1) / to
) * to
;
125 * Transfers bytes from this buffers array into the given destination {@link ByteBuff}
126 * @param offset start position in this big logical array.
127 * @param dst the destination ByteBuff. Notice that its position will be advanced.
128 * @return number of bytes read
130 public int read(long offset
, ByteBuff dst
) {
131 return internalTransfer(offset
, dst
, READER
);
135 * Transfers bytes from the given source {@link ByteBuff} into this buffer array
136 * @param offset start offset of this big logical array.
137 * @param src the source ByteBuff. Notice that its position will be advanced.
138 * @return number of bytes write
140 public int write(long offset
, ByteBuff src
) {
141 return internalTransfer(offset
, src
, WRITER
);
145 * Transfer bytes from source {@link ByteBuff} to destination {@link ByteBuffer}. Position of both
146 * source and destination will be advanced.
148 private static final BiConsumer
<ByteBuffer
, ByteBuff
> WRITER
= (dst
, src
) -> {
149 int off
= src
.position(), len
= dst
.remaining();
150 src
.get(dst
, off
, len
);
151 src
.position(off
+ len
);
155 * Transfer bytes from source {@link ByteBuffer} to destination {@link ByteBuff}, Position of both
156 * source and destination will be advanced.
158 private static final BiConsumer
<ByteBuffer
, ByteBuff
> READER
= (src
, dst
) -> {
159 int off
= dst
.position(), len
= src
.remaining(), srcOff
= src
.position();
160 dst
.put(off
, ByteBuff
.wrap(src
), srcOff
, len
);
161 src
.position(srcOff
+ len
);
162 dst
.position(off
+ len
);
166 * Transferring all remaining bytes from b to the buffers array starting at offset, or
167 * transferring bytes from the buffers array at offset to b until b is filled. Notice that
168 * position of ByteBuff b will be advanced.
169 * @param offset where we start in the big logical array.
170 * @param b the ByteBuff to transfer from or to
171 * @param transfer the transfer interface.
172 * @return the length of bytes we transferred.
174 private int internalTransfer(long offset
, ByteBuff b
, BiConsumer
<ByteBuffer
, ByteBuff
> transfer
) {
175 int expectedTransferLen
= b
.remaining();
176 if (expectedTransferLen
== 0) {
179 BufferIterator it
= new BufferIterator(offset
, expectedTransferLen
);
180 while (it
.hasNext()) {
181 ByteBuffer a
= it
.next();
182 transfer
.accept(a
, b
);
183 assert !a
.hasRemaining();
185 assert expectedTransferLen
== it
.getSum() : "Expected transfer length (=" + expectedTransferLen
186 + ") don't match the actual transfer length(=" + it
.getSum() + ")";
187 return expectedTransferLen
;
191 * Creates a sub-array from a given array of ByteBuffers from the given offset to the length
192 * specified. For eg, if there are 4 buffers forming an array each with length 10 and if we call
193 * asSubByteBuffers(5, 10) then we will create an sub-array consisting of two BBs and the first
194 * one be a BB from 'position' 5 to a 'length' 5 and the 2nd BB will be from 'position' 0 to
196 * @param offset the position in the whole array which is composited by multiple byte buffers.
197 * @param len the length of bytes
198 * @return the underlying ByteBuffers, each ByteBuffer is a slice from the backend and will have a
201 public ByteBuffer
[] asSubByteBuffers(long offset
, final int len
) {
202 BufferIterator it
= new BufferIterator(offset
, len
);
203 ByteBuffer
[] mbb
= new ByteBuffer
[it
.getBufferCount()];
204 for (int i
= 0; i
< mbb
.length
; i
++) {
208 assert it
.getSum() == len
;
213 * Iterator to fetch ByteBuffers from offset with given length in this big logical array.
215 private class BufferIterator
implements Iterator
<ByteBuffer
> {
216 private final int len
;
217 private int startBuffer
, startOffset
, endBuffer
, endOffset
;
218 private int curIndex
, sum
= 0;
220 private int index(long pos
) {
221 return (int) (pos
/ bufferSize
);
224 private int offset(long pos
) {
225 return (int) (pos
% bufferSize
);
228 public BufferIterator(long offset
, int len
) {
229 assert len
>= 0 && offset
>= 0;
232 this.startBuffer
= index(offset
);
233 this.startOffset
= offset(offset
);
235 this.endBuffer
= index(offset
+ len
);
236 this.endOffset
= offset(offset
+ len
);
237 if (startBuffer
< endBuffer
&& endOffset
== 0) {
239 endOffset
= bufferSize
;
241 assert startBuffer
>= 0 && startBuffer
< bufferCount
;
242 assert endBuffer
>= 0 && endBuffer
< bufferCount
;
244 // initialize the index to the first buffer index.
245 this.curIndex
= startBuffer
;
249 public boolean hasNext() {
250 return this.curIndex
<= endBuffer
;
254 * The returned ByteBuffer is an sliced one, it won't affect the position or limit of the
258 public ByteBuffer
next() {
259 ByteBuffer bb
= buffers
[curIndex
].duplicate();
260 if (curIndex
== startBuffer
) {
261 bb
.position(startOffset
).limit(Math
.min(bufferSize
, startOffset
+ len
));
262 } else if (curIndex
== endBuffer
) {
263 bb
.position(0).limit(endOffset
);
265 bb
.position(0).limit(bufferSize
);
268 sum
+= bb
.remaining();
269 // Make sure that its pos is zero, it's important because MBB will count from zero for all nio
278 int getBufferCount() {
279 return this.endBuffer
- this.startBuffer
+ 1;