2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.util
;
20 import static org
.junit
.Assert
.assertEquals
;
22 import java
.io
.IOException
;
23 import java
.util
.ArrayList
;
24 import java
.util
.Collection
;
25 import java
.util
.EnumSet
;
26 import java
.util
.List
;
27 import org
.apache
.hadoop
.conf
.Configuration
;
28 import org
.apache
.hadoop
.hbase
.ClusterMetrics
.Option
;
29 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
30 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
31 import org
.apache
.hadoop
.hbase
.HConstants
;
32 import org
.apache
.hadoop
.hbase
.TableName
;
33 import org
.apache
.hadoop
.hbase
.TableNotFoundException
;
34 import org
.apache
.hadoop
.hbase
.client
.Admin
;
35 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptor
;
36 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptorBuilder
;
37 import org
.apache
.hadoop
.hbase
.client
.TableDescriptor
;
38 import org
.apache
.hadoop
.hbase
.client
.TableDescriptorBuilder
;
39 import org
.apache
.hadoop
.hbase
.io
.compress
.Compression
;
40 import org
.apache
.hadoop
.hbase
.io
.encoding
.DataBlockEncoding
;
41 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
42 import org
.apache
.hadoop
.hbase
.testclassification
.MiscTests
;
43 import org
.apache
.hadoop
.hbase
.util
.test
.LoadTestDataGenerator
;
44 import org
.junit
.After
;
45 import org
.junit
.Before
;
46 import org
.junit
.ClassRule
;
47 import org
.junit
.Test
;
48 import org
.junit
.experimental
.categories
.Category
;
49 import org
.junit
.runner
.RunWith
;
50 import org
.junit
.runners
.Parameterized
;
51 import org
.junit
.runners
.Parameterized
.Parameters
;
52 import org
.slf4j
.Logger
;
53 import org
.slf4j
.LoggerFactory
;
56 * A write/read/verify load test on a mini HBase cluster. Tests reading
59 @Category({MiscTests
.class, MediumTests
.class})
60 @RunWith(Parameterized
.class)
61 public class TestMiniClusterLoadSequential
{
64 public static final HBaseClassTestRule CLASS_RULE
=
65 HBaseClassTestRule
.forClass(TestMiniClusterLoadSequential
.class);
67 private static final Logger LOG
= LoggerFactory
.getLogger(
68 TestMiniClusterLoadSequential
.class);
70 protected static final TableName TABLE
=
71 TableName
.valueOf("load_test_tbl");
72 protected static final byte[] CF
= Bytes
.toBytes("load_test_cf");
73 protected static final int NUM_THREADS
= 8;
74 protected static final int NUM_RS
= 2;
75 protected static final int TIMEOUT_MS
= 180000;
76 protected static final HBaseTestingUtil TEST_UTIL
=
77 new HBaseTestingUtil();
79 protected final Configuration conf
= TEST_UTIL
.getConfiguration();
80 protected final boolean isMultiPut
;
81 protected final DataBlockEncoding dataBlockEncoding
;
83 protected MultiThreadedWriter writerThreads
;
84 protected MultiThreadedReader readerThreads
;
85 protected int numKeys
;
87 protected Compression
.Algorithm compression
= Compression
.Algorithm
.NONE
;
89 public TestMiniClusterLoadSequential(boolean isMultiPut
,
90 DataBlockEncoding dataBlockEncoding
) {
91 this.isMultiPut
= isMultiPut
;
92 this.dataBlockEncoding
= dataBlockEncoding
;
93 conf
.setInt(HConstants
.HREGION_MEMSTORE_FLUSH_SIZE
, 1024 * 1024);
95 // We don't want any region reassignments by the load balancer during the test.
96 conf
.setFloat(HConstants
.LOAD_BALANCER_SLOP_KEY
, 10.0f
);
100 public static Collection
<Object
[]> parameters() {
101 List
<Object
[]> parameters
= new ArrayList
<>();
102 for (boolean multiPut
: new boolean[]{false, true}) {
103 for (DataBlockEncoding dataBlockEncoding
: new DataBlockEncoding
[] {
104 DataBlockEncoding
.NONE
, DataBlockEncoding
.PREFIX
}) {
105 parameters
.add(new Object
[]{multiPut
, dataBlockEncoding
});
112 public void setUp() throws Exception
{
113 LOG
.debug("Test setup: isMultiPut=" + isMultiPut
);
114 TEST_UTIL
.startMiniCluster(NUM_RS
);
118 public void tearDown() throws Exception
{
119 LOG
.debug("Test teardown: isMultiPut=" + isMultiPut
);
120 TEST_UTIL
.shutdownMiniCluster();
123 protected MultiThreadedReader
prepareReaderThreads(LoadTestDataGenerator dataGen
,
124 Configuration conf
, TableName tableName
, double verifyPercent
) throws IOException
{
125 MultiThreadedReader reader
= new MultiThreadedReader(dataGen
, conf
, tableName
, verifyPercent
);
129 protected MultiThreadedWriter
prepareWriterThreads(LoadTestDataGenerator dataGen
,
130 Configuration conf
, TableName tableName
) throws IOException
{
131 MultiThreadedWriter writer
= new MultiThreadedWriter(dataGen
, conf
, tableName
);
132 writer
.setMultiPut(isMultiPut
);
137 public void loadTest() throws Exception
{
138 prepareForLoadTest();
139 runLoadTestOnExistingTable();
142 protected void runLoadTestOnExistingTable() throws IOException
{
143 writerThreads
.start(0, numKeys
, NUM_THREADS
);
144 writerThreads
.waitForFinish();
145 assertEquals(0, writerThreads
.getNumWriteFailures());
147 readerThreads
.start(0, numKeys
, NUM_THREADS
);
148 readerThreads
.waitForFinish();
149 assertEquals(0, readerThreads
.getNumReadFailures());
150 assertEquals(0, readerThreads
.getNumReadErrors());
151 assertEquals(numKeys
, readerThreads
.getNumKeysVerified());
154 protected void createPreSplitLoadTestTable(TableDescriptor tableDescriptor
,
155 ColumnFamilyDescriptor familyDescriptor
) throws IOException
{
156 HBaseTestingUtil
.createPreSplitLoadTestTable(conf
, tableDescriptor
, familyDescriptor
);
157 TEST_UTIL
.waitUntilAllRegionsAssigned(tableDescriptor
.getTableName());
160 protected void prepareForLoadTest() throws IOException
{
161 LOG
.info("Starting load test: dataBlockEncoding=" + dataBlockEncoding
+
162 ", isMultiPut=" + isMultiPut
);
164 Admin admin
= TEST_UTIL
.getAdmin();
165 while (admin
.getClusterMetrics(EnumSet
.of(Option
.LIVE_SERVERS
))
166 .getLiveServerMetrics().size() < NUM_RS
) {
167 LOG
.info("Sleeping until " + NUM_RS
+ " RSs are online");
168 Threads
.sleepWithoutInterrupt(1000);
172 TableDescriptor tableDescriptor
= TableDescriptorBuilder
.newBuilder(TABLE
).build();
173 ColumnFamilyDescriptor familyDescriptor
= ColumnFamilyDescriptorBuilder
.newBuilder(CF
)
174 .setCompressionType(compression
).setDataBlockEncoding(dataBlockEncoding
).build();
175 createPreSplitLoadTestTable(tableDescriptor
, familyDescriptor
);
177 LoadTestDataGenerator dataGen
= new MultiThreadedAction
.DefaultDataGenerator(CF
);
178 writerThreads
= prepareWriterThreads(dataGen
, conf
, TABLE
);
179 readerThreads
= prepareReaderThreads(dataGen
, conf
, TABLE
, 100);
182 protected int numKeys() {
186 protected ColumnFamilyDescriptor
getColumnDesc(Admin admin
)
187 throws TableNotFoundException
, IOException
{
188 return admin
.getDescriptor(TABLE
).getColumnFamily(CF
);