2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.io
;
20 import java
.nio
.ByteBuffer
;
21 import java
.util
.Queue
;
22 import java
.util
.concurrent
.ConcurrentLinkedQueue
;
23 import java
.util
.concurrent
.atomic
.AtomicInteger
;
25 import com
.google
.common
.annotations
.VisibleForTesting
;
27 import org
.apache
.commons
.logging
.Log
;
28 import org
.apache
.commons
.logging
.LogFactory
;
29 import org
.apache
.hadoop
.hbase
.classification
.InterfaceAudience
;
32 * Like Hadoops' ByteBufferPool only you do not specify desired size when getting a ByteBuffer. This
33 * pool keeps an upper bound on the count of ByteBuffers in the pool and a fixed size of ByteBuffer
34 * that it will create. When requested, if a free ByteBuffer is already present, it will return
35 * that. And when no free ByteBuffer available and we are below the max count, it will create a new
36 * one and return that.
39 * Note: This pool returns off heap ByteBuffers by default. If on heap ByteBuffers to be pooled,
40 * pass 'directByteBuffer' as false while construction of the pool.
42 * This class is thread safe.
44 * @see ByteBufferListOutputStream
46 @InterfaceAudience.Private
47 public class ByteBufferPool
{
48 private static final Log LOG
= LogFactory
.getLog(ByteBufferPool
.class);
49 // TODO better config names?
50 // hbase.ipc.server.reservoir.initial.max -> hbase.ipc.server.reservoir.max.buffer.count
51 // hbase.ipc.server.reservoir.initial.buffer.size -> hbase.ipc.server.reservoir.buffer.size
52 public static final String MAX_POOL_SIZE_KEY
= "hbase.ipc.server.reservoir.initial.max";
53 public static final String BUFFER_SIZE_KEY
= "hbase.ipc.server.reservoir.initial.buffer.size";
54 public static final int DEFAULT_BUFFER_SIZE
= 64 * 1024;// 64 KB. Making it same as the chunk size
55 // what we will write/read to/from the
57 private final Queue
<ByteBuffer
> buffers
= new ConcurrentLinkedQueue
<>();
59 private final int bufferSize
;
60 private final int maxPoolSize
;
61 private AtomicInteger count
; // Count of the BBs created already for this pool.
62 private final boolean directByteBuffer
; //Whether this pool should return DirectByteBuffers
63 private boolean maxPoolSizeInfoLevelLogged
= false;
66 * @param bufferSize Size of each buffer created by this pool.
67 * @param maxPoolSize Max number of buffers to keep in this pool.
69 public ByteBufferPool(int bufferSize
, int maxPoolSize
) {
70 this(bufferSize
, maxPoolSize
, true);
74 * @param bufferSize Size of each buffer created by this pool.
75 * @param maxPoolSize Max number of buffers to keep in this pool.
76 * @param directByteBuffer Whether to create direct ByteBuffer or on heap ByteBuffer.
78 public ByteBufferPool(int bufferSize
, int maxPoolSize
, boolean directByteBuffer
) {
79 this.bufferSize
= bufferSize
;
80 this.maxPoolSize
= maxPoolSize
;
81 this.directByteBuffer
= directByteBuffer
;
82 // TODO can add initialPoolSize config also and make those many BBs ready for use.
83 LOG
.info("Created ByteBufferPool with bufferSize : " + bufferSize
+ " and maxPoolSize : "
85 this.count
= new AtomicInteger(0);
89 * @return One free ByteBuffer from the pool. If no free ByteBuffer and we have not reached the
90 * maximum pool size, it will create a new one and return. In case of max pool size also
91 * reached, will return null. When pool returned a ByteBuffer, make sure to return it back
93 * @see #putbackBuffer(ByteBuffer)
95 public ByteBuffer
getBuffer() {
96 ByteBuffer bb
= buffers
.poll();
98 // Clear sets limit == capacity. Position == 0.
103 int c
= this.count
.intValue();
104 if (c
>= this.maxPoolSize
) {
105 if (maxPoolSizeInfoLevelLogged
) {
106 if (LOG
.isDebugEnabled()) {
107 LOG
.debug("Pool already reached its max capacity : " + this.maxPoolSize
108 + " and no free buffers now. Consider increasing the value for '"
109 + MAX_POOL_SIZE_KEY
+ "' ?");
112 LOG
.info("Pool already reached its max capacity : " + this.maxPoolSize
113 + " and no free buffers now. Consider increasing the value for '" + MAX_POOL_SIZE_KEY
115 maxPoolSizeInfoLevelLogged
= true;
119 if (!this.count
.compareAndSet(c
, c
+ 1)) {
122 if (LOG
.isTraceEnabled()) {
123 LOG
.trace("Creating a new offheap ByteBuffer of size: " + this.bufferSize
);
125 return this.directByteBuffer ? ByteBuffer
.allocateDirect(this.bufferSize
)
126 : ByteBuffer
.allocate(this.bufferSize
);
131 * Return back a ByteBuffer after its use. Do not try to return put back a ByteBuffer, not
132 * obtained from this pool.
133 * @param buf ByteBuffer to return.
135 public void putbackBuffer(ByteBuffer buf
) {
136 if (buf
.capacity() != this.bufferSize
|| (this.directByteBuffer ^ buf
.isDirect())) {
137 LOG
.warn("Trying to put a buffer, not created by this pool! Will be just ignored");
143 public int getBufferSize() {
144 return this.bufferSize
;
148 * @return Number of free buffers
151 public int getQueueSize() {
152 return buffers
.size();