From b9c415ac6113b7a07373c0ccfbc5cb2f340410c1 Mon Sep 17 00:00:00 2001 From: Sean Busbey Date: Tue, 11 Aug 2020 22:07:46 -0500 Subject: [PATCH] Revert "BackPort HBASE-11554 Remove Reusable poolmap Rpc client type. (#2208)" incorrect commit message and author This reverts commit c645cb54e61e2f5d2b61a407cfa03783b129e313. --- .../java/org/apache/hadoop/hbase/util/PoolMap.java | 49 +++++++++++- .../hadoop/hbase/util/TestReusablePoolMap.java | 90 ++++++++++++++++++++++ 2 files changed, 138 insertions(+), 1 deletion(-) create mode 100644 hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestReusablePoolMap.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java index 3cd38f3416..f174c964ad 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java @@ -228,7 +228,7 @@ public class PoolMap implements Map { } public enum PoolType { - ThreadLocal, RoundRobin; + Reusable, ThreadLocal, RoundRobin; public static PoolType valueOf(String poolTypeName, PoolType defaultPoolType, PoolType... allowedPoolTypes) { @@ -270,6 +270,8 @@ public class PoolMap implements Map { protected Pool createPool() { switch (poolType) { + case Reusable: + return new ReusablePool<>(poolMaxSize); case RoundRobin: return new RoundRobinPool<>(poolMaxSize); case ThreadLocal: @@ -279,6 +281,51 @@ public class PoolMap implements Map { } /** + * The ReusablePool represents a {@link PoolMap.Pool} that builds + * on the {@link java.util.LinkedList} class. It essentially allows resources to be + * checked out, at which point it is removed from this pool. When the resource + * is no longer required, it should be returned to the pool in order to be + * reused. + * + *

+ * If {@link #maxSize} is set to {@link Integer#MAX_VALUE}, then the size of + * the pool is unbounded. Otherwise, it caps the number of consumers that can + * check out a resource from this pool to the (non-zero positive) value + * specified in {@link #maxSize}. + *

+ * + * @param + * the type of the resource + */ + @SuppressWarnings("serial") + public static class ReusablePool extends ConcurrentLinkedQueue implements Pool { + private int maxSize; + + public ReusablePool(int maxSize) { + this.maxSize = maxSize; + + } + + @Override + public R get() { + return poll(); + } + + @Override + public R put(R resource) { + if (super.size() < maxSize) { + add(resource); + } + return null; + } + + @Override + public Collection values() { + return this; + } + } + + /** * The RoundRobinPool represents a {@link PoolMap.Pool}, which * stores its resources in an {@link ArrayList}. It load-balances access to * its resources by returning a different resource every time a given key is diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestReusablePoolMap.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestReusablePoolMap.java new file mode 100644 index 0000000000..3fcaebb5fe --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestReusablePoolMap.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.PoolMap.PoolType; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MiscTests.class, SmallTests.class }) +public class TestReusablePoolMap extends PoolMapTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestReusablePoolMap.class); + + @Override + protected PoolType getPoolType() { + return PoolType.Reusable; + } + + @Test + public void testSingleThreadedClient() throws InterruptedException, ExecutionException { + Random rand = ThreadLocalRandom.current(); + String randomKey = String.valueOf(rand.nextInt()); + String randomValue = String.valueOf(rand.nextInt()); + // As long as we poll values we put, the pool size should remain zero + runThread(randomKey, randomValue, randomValue); + assertEquals(0, poolMap.size(randomKey)); + } + + @Test + public void testMultiThreadedClients() throws InterruptedException, ExecutionException { + Random rand = ThreadLocalRandom.current(); + // As long as we poll values we put, the pool size should remain zero + for (int i = 0; i < POOL_SIZE; i++) { + String randomKey = String.valueOf(rand.nextInt()); + String randomValue = String.valueOf(rand.nextInt()); + runThread(randomKey, randomValue, randomValue); + assertEquals(0, poolMap.size(randomKey)); + } + poolMap.clear(); + String randomKey = String.valueOf(rand.nextInt()); + for (int i = 0; i < POOL_SIZE - 1; i++) { + String randomValue = String.valueOf(rand.nextInt()); + runThread(randomKey, randomValue, randomValue); + assertEquals(0, poolMap.size(randomKey)); + } + assertEquals(0, poolMap.size(randomKey)); + } + + @Test + public void testPoolCap() throws InterruptedException, ExecutionException { + Random rand = ThreadLocalRandom.current(); + // As long as we poll values we put, the pool size should remain zero + String randomKey = String.valueOf(rand.nextInt()); + List randomValues = new ArrayList<>(); + for (int i = 0; i < POOL_SIZE * 2; i++) { + String randomValue = String.valueOf(rand.nextInt()); + randomValues.add(randomValue); + runThread(randomKey, randomValue, randomValue); + } + assertEquals(0, poolMap.size(randomKey)); + } +} -- 2.11.4.GIT