2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
;
21 import java
.io
.IOException
;
22 import java
.util
.ArrayList
;
23 import java
.util
.List
;
26 import org
.apache
.commons
.logging
.Log
;
27 import org
.apache
.commons
.logging
.LogFactory
;
28 import org
.apache
.hadoop
.conf
.Configuration
;
29 import org
.apache
.hadoop
.hbase
.testclassification
.IntegrationTests
;
30 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
31 import org
.apache
.hadoop
.hbase
.util
.LoadTestTool
;
32 import org
.apache
.hadoop
.hbase
.util
.Threads
;
33 import org
.apache
.hadoop
.util
.StringUtils
;
34 import org
.apache
.hadoop
.util
.ToolRunner
;
35 import org
.junit
.Assert
;
36 import org
.junit
.Test
;
37 import org
.junit
.experimental
.categories
.Category
;
39 import com
.google
.common
.collect
.Sets
;
42 * A base class for tests that do something with the cluster while running
43 * {@link LoadTestTool} to write and verify some data.
45 @Category(IntegrationTests
.class)
46 public class IntegrationTestIngest
extends IntegrationTestBase
{
47 public static final char HIPHEN
= '-';
48 private static final int SERVER_COUNT
= 1; // number of slaves for the smallest cluster
49 protected static final long DEFAULT_RUN_TIME
= 20 * 60 * 1000;
50 protected static final long JUNIT_RUN_TIME
= 10 * 60 * 1000;
52 /** A soft limit on how long we should run */
53 protected static final String RUN_TIME_KEY
= "hbase.%s.runtime";
55 protected static final String NUM_KEYS_PER_SERVER_KEY
= "num_keys_per_server";
56 protected static final long DEFAULT_NUM_KEYS_PER_SERVER
= 2500;
58 protected static final String NUM_WRITE_THREADS_KEY
= "num_write_threads";
59 protected static final int DEFAULT_NUM_WRITE_THREADS
= 20;
61 protected static final String NUM_READ_THREADS_KEY
= "num_read_threads";
62 protected static final int DEFAULT_NUM_READ_THREADS
= 20;
64 // Log is being used in IntegrationTestIngestWithEncryption, hence it is protected
65 protected static final Log LOG
= LogFactory
.getLog(IntegrationTestIngest
.class);
66 protected IntegrationTestingUtility util
;
67 protected HBaseCluster cluster
;
68 protected LoadTestTool loadTool
;
70 protected String
[] LOAD_TEST_TOOL_INIT_ARGS
= {
71 LoadTestTool
.OPT_COLUMN_FAMILIES
,
72 LoadTestTool
.OPT_COMPRESSION
,
73 LoadTestTool
.OPT_DATA_BLOCK_ENCODING
,
74 LoadTestTool
.OPT_INMEMORY
,
75 LoadTestTool
.OPT_ENCRYPTION
,
76 LoadTestTool
.OPT_NUM_REGIONS_PER_SERVER
,
77 LoadTestTool
.OPT_REGION_REPLICATION
,
81 public void setUpCluster() throws Exception
{
82 util
= getTestingUtil(getConf());
83 LOG
.debug("Initializing/checking cluster has " + SERVER_COUNT
+ " servers");
84 util
.initializeCluster(getMinServerCount());
85 LOG
.debug("Done initializing/checking cluster");
86 cluster
= util
.getHBaseClusterInterface();
87 deleteTableIfNecessary();
88 loadTool
= new LoadTestTool();
89 loadTool
.setConf(util
.getConfiguration());
90 // Initialize load test tool before we start breaking things;
91 // LoadTestTool init, even when it is a no-op, is very fragile.
95 protected int getMinServerCount() {
99 protected void initTable() throws IOException
{
100 int ret
= loadTool
.run(getArgsForLoadTestToolInitTable());
101 Assert
.assertEquals("Failed to initialize LoadTestTool", 0, ret
);
105 public int runTestFromCommandLine() throws Exception
{
106 internalRunIngestTest(DEFAULT_RUN_TIME
);
111 public void testIngest() throws Exception
{
112 runIngestTest(JUNIT_RUN_TIME
, 2500, 10, 1024, 10, 20);
115 protected void internalRunIngestTest(long runTime
) throws Exception
{
116 String clazz
= this.getClass().getSimpleName();
117 long numKeysPerServer
= conf
.getLong(String
.format("%s.%s", clazz
, NUM_KEYS_PER_SERVER_KEY
),
118 DEFAULT_NUM_KEYS_PER_SERVER
);
119 int numWriteThreads
= conf
.getInt(
120 String
.format("%s.%s", clazz
, NUM_WRITE_THREADS_KEY
), DEFAULT_NUM_WRITE_THREADS
);
121 int numReadThreads
= conf
.getInt(
122 String
.format("%s.%s", clazz
, NUM_READ_THREADS_KEY
), DEFAULT_NUM_READ_THREADS
);
123 runIngestTest(runTime
, numKeysPerServer
, 10, 1024, numWriteThreads
, numReadThreads
);
127 public TableName
getTablename() {
128 String clazz
= this.getClass().getSimpleName();
129 return TableName
.valueOf(
130 conf
.get(String
.format("%s.%s", clazz
, LoadTestTool
.OPT_TABLE_NAME
), clazz
));
134 protected Set
<String
> getColumnFamilies() {
135 Set
<String
> families
= Sets
.newHashSet();
136 String clazz
= this.getClass().getSimpleName();
137 // parse conf for getting the column famly names because LTT is not initialized yet.
138 String familiesString
= getConf().get(
139 String
.format("%s.%s", clazz
, LoadTestTool
.OPT_COLUMN_FAMILIES
));
140 if (familiesString
== null) {
141 for (byte[] family
: LoadTestTool
.DEFAULT_COLUMN_FAMILIES
) {
142 families
.add(Bytes
.toString(family
));
145 for (String family
: familiesString
.split(",")) {
146 families
.add(family
);
153 private void deleteTableIfNecessary() throws IOException
{
154 if (util
.getAdmin().tableExists(getTablename())) {
155 util
.deleteTable(getTablename());
159 protected void runIngestTest(long defaultRunTime
, long keysPerServerPerIter
, int colsPerKey
,
160 int recordSize
, int writeThreads
, int readThreads
) throws Exception
{
162 LOG
.info("Running ingest");
163 LOG
.info("Cluster size:" + util
.getHBaseClusterInterface().getClusterStatus().getServersSize());
165 long start
= System
.currentTimeMillis();
166 String runtimeKey
= String
.format(RUN_TIME_KEY
, this.getClass().getSimpleName());
167 long runtime
= util
.getConfiguration().getLong(runtimeKey
, defaultRunTime
);
170 long numKeys
= getNumKeys(keysPerServerPerIter
);
171 while (System
.currentTimeMillis() - start
< 0.9 * runtime
) {
172 LOG
.info("Intended run time: " + (runtime
/60000) + " min, left:" +
173 ((runtime
- (System
.currentTimeMillis() - start
))/60000) + " min");
176 ret
= loadTool
.run(getArgsForLoadTestTool("-write",
177 String
.format("%d:%d:%d", colsPerKey
, recordSize
, writeThreads
), startKey
, numKeys
));
179 String errorMsg
= "Load failed with error code " + ret
;
181 Assert
.fail(errorMsg
);
184 ret
= loadTool
.run(getArgsForLoadTestTool("-update", String
.format("60:%d:1", writeThreads
),
187 String errorMsg
= "Update failed with error code " + ret
;
189 Assert
.fail(errorMsg
);
192 ret
= loadTool
.run(getArgsForLoadTestTool("-read", String
.format("100:%d", readThreads
)
193 , startKey
, numKeys
));
195 String errorMsg
= "Verification failed with error code " + ret
;
196 LOG
.error(errorMsg
+ " Rerunning verification after 1 minute for debugging");
197 Threads
.sleep(1000 * 60);
198 ret
= loadTool
.run(getArgsForLoadTestTool("-read", String
.format("100:%d", readThreads
)
199 , startKey
, numKeys
));
201 LOG
.error("Rerun of Verification failed with error code " + ret
);
203 Assert
.fail(errorMsg
);
209 protected String
[] getArgsForLoadTestToolInitTable() {
210 List
<String
> args
= new ArrayList
<>();
212 args
.add(getTablename().getNameAsString());
213 // pass all remaining args from conf with keys <test class name>.<load test tool arg>
214 String clazz
= this.getClass().getSimpleName();
215 for (String arg
: LOAD_TEST_TOOL_INIT_ARGS
) {
216 String val
= conf
.get(String
.format("%s.%s", clazz
, arg
));
222 args
.add("-init_only");
223 return args
.toArray(new String
[args
.size()]);
226 protected String
[] getArgsForLoadTestTool(String mode
, String modeSpecificArg
, long startKey
,
228 List
<String
> args
= new ArrayList
<>(11);
230 args
.add(getTablename().getNameAsString());
231 args
.add("-families");
232 args
.add(getColumnFamiliesAsString());
234 args
.add(modeSpecificArg
);
235 args
.add("-start_key");
236 args
.add(String
.valueOf(startKey
));
237 args
.add("-num_keys");
238 args
.add(String
.valueOf(numKeys
));
239 args
.add("-skip_init");
241 return args
.toArray(new String
[args
.size()]);
244 private String
getColumnFamiliesAsString() {
245 return StringUtils
.join(",", getColumnFamilies());
248 /** Estimates a data size based on the cluster size */
249 protected long getNumKeys(long keysPerServer
)
251 int numRegionServers
= cluster
.getClusterStatus().getServersSize();
252 return keysPerServer
* numRegionServers
;
255 public static void main(String
[] args
) throws Exception
{
256 Configuration conf
= HBaseConfiguration
.create();
257 IntegrationTestingUtility
.setUseDistributedCluster(conf
);
258 int ret
= ToolRunner
.run(conf
, new IntegrationTestIngest(), args
);