HBASE-17532 Replaced explicit type with diamond operator
[hbase.git] / hbase-it / src / test / java / org / apache / hadoop / hbase / IntegrationTestIngest.java
blob7b6635ed4b9f45dc520ea0cb5e5a930b623e3a32
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
19 package org.apache.hadoop.hbase;
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.Set;
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.testclassification.IntegrationTests;
30 import org.apache.hadoop.hbase.util.Bytes;
31 import org.apache.hadoop.hbase.util.LoadTestTool;
32 import org.apache.hadoop.hbase.util.Threads;
33 import org.apache.hadoop.util.StringUtils;
34 import org.apache.hadoop.util.ToolRunner;
35 import org.junit.Assert;
36 import org.junit.Test;
37 import org.junit.experimental.categories.Category;
39 import com.google.common.collect.Sets;
41 /**
42 * A base class for tests that do something with the cluster while running
43 * {@link LoadTestTool} to write and verify some data.
45 @Category(IntegrationTests.class)
46 public class IntegrationTestIngest extends IntegrationTestBase {
47 public static final char HIPHEN = '-';
48 private static final int SERVER_COUNT = 1; // number of slaves for the smallest cluster
49 protected static final long DEFAULT_RUN_TIME = 20 * 60 * 1000;
50 protected static final long JUNIT_RUN_TIME = 10 * 60 * 1000;
52 /** A soft limit on how long we should run */
53 protected static final String RUN_TIME_KEY = "hbase.%s.runtime";
55 protected static final String NUM_KEYS_PER_SERVER_KEY = "num_keys_per_server";
56 protected static final long DEFAULT_NUM_KEYS_PER_SERVER = 2500;
58 protected static final String NUM_WRITE_THREADS_KEY = "num_write_threads";
59 protected static final int DEFAULT_NUM_WRITE_THREADS = 20;
61 protected static final String NUM_READ_THREADS_KEY = "num_read_threads";
62 protected static final int DEFAULT_NUM_READ_THREADS = 20;
64 // Log is being used in IntegrationTestIngestWithEncryption, hence it is protected
65 protected static final Log LOG = LogFactory.getLog(IntegrationTestIngest.class);
66 protected IntegrationTestingUtility util;
67 protected HBaseCluster cluster;
68 protected LoadTestTool loadTool;
70 protected String[] LOAD_TEST_TOOL_INIT_ARGS = {
71 LoadTestTool.OPT_COLUMN_FAMILIES,
72 LoadTestTool.OPT_COMPRESSION,
73 LoadTestTool.OPT_DATA_BLOCK_ENCODING,
74 LoadTestTool.OPT_INMEMORY,
75 LoadTestTool.OPT_ENCRYPTION,
76 LoadTestTool.OPT_NUM_REGIONS_PER_SERVER,
77 LoadTestTool.OPT_REGION_REPLICATION,
80 @Override
81 public void setUpCluster() throws Exception {
82 util = getTestingUtil(getConf());
83 LOG.debug("Initializing/checking cluster has " + SERVER_COUNT + " servers");
84 util.initializeCluster(getMinServerCount());
85 LOG.debug("Done initializing/checking cluster");
86 cluster = util.getHBaseClusterInterface();
87 deleteTableIfNecessary();
88 loadTool = new LoadTestTool();
89 loadTool.setConf(util.getConfiguration());
90 // Initialize load test tool before we start breaking things;
91 // LoadTestTool init, even when it is a no-op, is very fragile.
92 initTable();
95 protected int getMinServerCount() {
96 return SERVER_COUNT;
99 protected void initTable() throws IOException {
100 int ret = loadTool.run(getArgsForLoadTestToolInitTable());
101 Assert.assertEquals("Failed to initialize LoadTestTool", 0, ret);
104 @Override
105 public int runTestFromCommandLine() throws Exception {
106 internalRunIngestTest(DEFAULT_RUN_TIME);
107 return 0;
110 @Test
111 public void testIngest() throws Exception {
112 runIngestTest(JUNIT_RUN_TIME, 2500, 10, 1024, 10, 20);
115 protected void internalRunIngestTest(long runTime) throws Exception {
116 String clazz = this.getClass().getSimpleName();
117 long numKeysPerServer = conf.getLong(String.format("%s.%s", clazz, NUM_KEYS_PER_SERVER_KEY),
118 DEFAULT_NUM_KEYS_PER_SERVER);
119 int numWriteThreads = conf.getInt(
120 String.format("%s.%s", clazz, NUM_WRITE_THREADS_KEY), DEFAULT_NUM_WRITE_THREADS);
121 int numReadThreads = conf.getInt(
122 String.format("%s.%s", clazz, NUM_READ_THREADS_KEY), DEFAULT_NUM_READ_THREADS);
123 runIngestTest(runTime, numKeysPerServer, 10, 1024, numWriteThreads, numReadThreads);
126 @Override
127 public TableName getTablename() {
128 String clazz = this.getClass().getSimpleName();
129 return TableName.valueOf(
130 conf.get(String.format("%s.%s", clazz, LoadTestTool.OPT_TABLE_NAME), clazz));
133 @Override
134 protected Set<String> getColumnFamilies() {
135 Set<String> families = Sets.newHashSet();
136 String clazz = this.getClass().getSimpleName();
137 // parse conf for getting the column famly names because LTT is not initialized yet.
138 String familiesString = getConf().get(
139 String.format("%s.%s", clazz, LoadTestTool.OPT_COLUMN_FAMILIES));
140 if (familiesString == null) {
141 for (byte[] family : LoadTestTool.DEFAULT_COLUMN_FAMILIES) {
142 families.add(Bytes.toString(family));
144 } else {
145 for (String family : familiesString.split(",")) {
146 families.add(family);
150 return families;
153 private void deleteTableIfNecessary() throws IOException {
154 if (util.getAdmin().tableExists(getTablename())) {
155 util.deleteTable(getTablename());
159 protected void runIngestTest(long defaultRunTime, long keysPerServerPerIter, int colsPerKey,
160 int recordSize, int writeThreads, int readThreads) throws Exception {
162 LOG.info("Running ingest");
163 LOG.info("Cluster size:" + util.getHBaseClusterInterface().getClusterStatus().getServersSize());
165 long start = System.currentTimeMillis();
166 String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName());
167 long runtime = util.getConfiguration().getLong(runtimeKey, defaultRunTime);
168 long startKey = 0;
170 long numKeys = getNumKeys(keysPerServerPerIter);
171 while (System.currentTimeMillis() - start < 0.9 * runtime) {
172 LOG.info("Intended run time: " + (runtime/60000) + " min, left:" +
173 ((runtime - (System.currentTimeMillis() - start))/60000) + " min");
175 int ret = -1;
176 ret = loadTool.run(getArgsForLoadTestTool("-write",
177 String.format("%d:%d:%d", colsPerKey, recordSize, writeThreads), startKey, numKeys));
178 if (0 != ret) {
179 String errorMsg = "Load failed with error code " + ret;
180 LOG.error(errorMsg);
181 Assert.fail(errorMsg);
184 ret = loadTool.run(getArgsForLoadTestTool("-update", String.format("60:%d:1", writeThreads),
185 startKey, numKeys));
186 if (0 != ret) {
187 String errorMsg = "Update failed with error code " + ret;
188 LOG.error(errorMsg);
189 Assert.fail(errorMsg);
192 ret = loadTool.run(getArgsForLoadTestTool("-read", String.format("100:%d", readThreads)
193 , startKey, numKeys));
194 if (0 != ret) {
195 String errorMsg = "Verification failed with error code " + ret;
196 LOG.error(errorMsg + " Rerunning verification after 1 minute for debugging");
197 Threads.sleep(1000 * 60);
198 ret = loadTool.run(getArgsForLoadTestTool("-read", String.format("100:%d", readThreads)
199 , startKey, numKeys));
200 if (0 != ret) {
201 LOG.error("Rerun of Verification failed with error code " + ret);
203 Assert.fail(errorMsg);
205 startKey += numKeys;
209 protected String[] getArgsForLoadTestToolInitTable() {
210 List<String> args = new ArrayList<>();
211 args.add("-tn");
212 args.add(getTablename().getNameAsString());
213 // pass all remaining args from conf with keys <test class name>.<load test tool arg>
214 String clazz = this.getClass().getSimpleName();
215 for (String arg : LOAD_TEST_TOOL_INIT_ARGS) {
216 String val = conf.get(String.format("%s.%s", clazz, arg));
217 if (val != null) {
218 args.add("-" + arg);
219 args.add(val);
222 args.add("-init_only");
223 return args.toArray(new String[args.size()]);
226 protected String[] getArgsForLoadTestTool(String mode, String modeSpecificArg, long startKey,
227 long numKeys) {
228 List<String> args = new ArrayList<>(11);
229 args.add("-tn");
230 args.add(getTablename().getNameAsString());
231 args.add("-families");
232 args.add(getColumnFamiliesAsString());
233 args.add(mode);
234 args.add(modeSpecificArg);
235 args.add("-start_key");
236 args.add(String.valueOf(startKey));
237 args.add("-num_keys");
238 args.add(String.valueOf(numKeys));
239 args.add("-skip_init");
241 return args.toArray(new String[args.size()]);
244 private String getColumnFamiliesAsString() {
245 return StringUtils.join(",", getColumnFamilies());
248 /** Estimates a data size based on the cluster size */
249 protected long getNumKeys(long keysPerServer)
250 throws IOException {
251 int numRegionServers = cluster.getClusterStatus().getServersSize();
252 return keysPerServer * numRegionServers;
255 public static void main(String[] args) throws Exception {
256 Configuration conf = HBaseConfiguration.create();
257 IntegrationTestingUtility.setUseDistributedCluster(conf);
258 int ret = ToolRunner.run(conf, new IntegrationTestIngest(), args);
259 System.exit(ret);