HBASE-17532 Replaced explicit type with diamond operator
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / mapred / TestTableMapReduceUtil.java
blob22dda357846a93aab6295e4a87651f905a65ca73
1 /**
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org.apache.hadoop.hbase.mapred;
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertTrue;
24 import java.io.File;
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.Iterator;
28 import java.util.List;
29 import java.util.Set;
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.fs.FileUtil;
35 import org.apache.hadoop.hbase.HBaseTestingUtility;
36 import org.apache.hadoop.hbase.TableName;
37 import org.apache.hadoop.hbase.testclassification.LargeTests;
38 import org.apache.hadoop.hbase.testclassification.MapReduceTests;
39 import org.apache.hadoop.hbase.client.Put;
40 import org.apache.hadoop.hbase.client.Result;
41 import org.apache.hadoop.hbase.client.Table;
42 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
43 import org.apache.hadoop.hbase.util.Bytes;
44 import org.apache.hadoop.mapred.JobClient;
45 import org.apache.hadoop.mapred.JobConf;
46 import org.apache.hadoop.mapred.MapReduceBase;
47 import org.apache.hadoop.mapred.OutputCollector;
48 import org.apache.hadoop.mapred.Reporter;
49 import org.apache.hadoop.mapred.RunningJob;
50 import org.junit.AfterClass;
51 import org.junit.Assert;
52 import org.junit.Before;
53 import org.junit.BeforeClass;
54 import org.junit.Test;
55 import org.junit.experimental.categories.Category;
57 import com.google.common.collect.ImmutableMap;
58 import com.google.common.collect.ImmutableSet;
60 @Category({MapReduceTests.class, LargeTests.class})
61 public class TestTableMapReduceUtil {
63 private static final Log LOG = LogFactory
64 .getLog(TestTableMapReduceUtil.class);
66 private static Table presidentsTable;
67 private static final String TABLE_NAME = "People";
69 private static final byte[] COLUMN_FAMILY = Bytes.toBytes("info");
70 private static final byte[] COLUMN_QUALIFIER = Bytes.toBytes("name");
72 private static ImmutableSet<String> presidentsRowKeys = ImmutableSet.of(
73 "president1", "president2", "president3");
74 private static Iterator<String> presidentNames = ImmutableSet.of(
75 "John F. Kennedy", "George W. Bush", "Barack Obama").iterator();
77 private static ImmutableSet<String> actorsRowKeys = ImmutableSet.of("actor1",
78 "actor2");
79 private static Iterator<String> actorNames = ImmutableSet.of(
80 "Jack Nicholson", "Martin Freeman").iterator();
82 private static String PRESIDENT_PATTERN = "president";
83 private static String ACTOR_PATTERN = "actor";
84 private static ImmutableMap<String, ImmutableSet<String>> relation = ImmutableMap
85 .of(PRESIDENT_PATTERN, presidentsRowKeys, ACTOR_PATTERN, actorsRowKeys);
87 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
89 @BeforeClass
90 public static void beforeClass() throws Exception {
91 UTIL.startMiniCluster();
92 presidentsTable = createAndFillTable(TableName.valueOf(TABLE_NAME));
95 @AfterClass
96 public static void afterClass() throws Exception {
97 UTIL.shutdownMiniCluster();
100 @Before
101 public void before() throws IOException {
102 LOG.info("before");
103 UTIL.ensureSomeRegionServersAvailable(1);
104 LOG.info("before done");
107 public static Table createAndFillTable(TableName tableName) throws IOException {
108 Table table = UTIL.createTable(tableName, COLUMN_FAMILY);
109 createPutCommand(table);
110 return table;
113 private static void createPutCommand(Table table) throws IOException {
114 for (String president : presidentsRowKeys) {
115 if (presidentNames.hasNext()) {
116 Put p = new Put(Bytes.toBytes(president));
117 p.addColumn(COLUMN_FAMILY, COLUMN_QUALIFIER, Bytes.toBytes(presidentNames.next()));
118 table.put(p);
122 for (String actor : actorsRowKeys) {
123 if (actorNames.hasNext()) {
124 Put p = new Put(Bytes.toBytes(actor));
125 p.addColumn(COLUMN_FAMILY, COLUMN_QUALIFIER, Bytes.toBytes(actorNames.next()));
126 table.put(p);
132 * Check what the given number of reduce tasks for the given job configuration
133 * does not exceed the number of regions for the given table.
135 @Test
136 public void shouldNumberOfReduceTaskNotExceedNumberOfRegionsForGivenTable()
137 throws IOException {
138 Assert.assertNotNull(presidentsTable);
139 Configuration cfg = UTIL.getConfiguration();
140 JobConf jobConf = new JobConf(cfg);
141 TableMapReduceUtil.setNumReduceTasks(TABLE_NAME, jobConf);
142 TableMapReduceUtil.limitNumReduceTasks(TABLE_NAME, jobConf);
143 TableMapReduceUtil.setScannerCaching(jobConf, 100);
144 assertEquals(1, jobConf.getNumReduceTasks());
145 assertEquals(100, jobConf.getInt("hbase.client.scanner.caching", 0));
147 jobConf.setNumReduceTasks(10);
148 TableMapReduceUtil.setNumMapTasks(TABLE_NAME, jobConf);
149 TableMapReduceUtil.limitNumReduceTasks(TABLE_NAME, jobConf);
150 assertEquals(1, jobConf.getNumReduceTasks());
153 @Test
154 public void shouldNumberOfMapTaskNotExceedNumberOfRegionsForGivenTable()
155 throws IOException {
156 Configuration cfg = UTIL.getConfiguration();
157 JobConf jobConf = new JobConf(cfg);
158 TableMapReduceUtil.setNumReduceTasks(TABLE_NAME, jobConf);
159 TableMapReduceUtil.limitNumMapTasks(TABLE_NAME, jobConf);
160 assertEquals(1, jobConf.getNumMapTasks());
162 jobConf.setNumMapTasks(10);
163 TableMapReduceUtil.setNumMapTasks(TABLE_NAME, jobConf);
164 TableMapReduceUtil.limitNumMapTasks(TABLE_NAME, jobConf);
165 assertEquals(1, jobConf.getNumMapTasks());
168 @Test
169 @SuppressWarnings("deprecation")
170 public void shoudBeValidMapReduceEvaluation() throws Exception {
171 Configuration cfg = UTIL.getConfiguration();
172 JobConf jobConf = new JobConf(cfg);
173 try {
174 jobConf.setJobName("process row task");
175 jobConf.setNumReduceTasks(1);
176 TableMapReduceUtil.initTableMapJob(TABLE_NAME, new String(COLUMN_FAMILY),
177 ClassificatorMapper.class, ImmutableBytesWritable.class, Put.class,
178 jobConf);
179 TableMapReduceUtil.initTableReduceJob(TABLE_NAME,
180 ClassificatorRowReduce.class, jobConf);
181 RunningJob job = JobClient.runJob(jobConf);
182 assertTrue(job.isSuccessful());
183 } finally {
184 if (jobConf != null)
185 FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir")));
189 @Test
190 @SuppressWarnings("deprecation")
191 public void shoudBeValidMapReduceWithPartitionerEvaluation()
192 throws IOException {
193 Configuration cfg = UTIL.getConfiguration();
194 JobConf jobConf = new JobConf(cfg);
195 try {
196 jobConf.setJobName("process row task");
197 jobConf.setNumReduceTasks(2);
198 TableMapReduceUtil.initTableMapJob(TABLE_NAME, new String(COLUMN_FAMILY),
199 ClassificatorMapper.class, ImmutableBytesWritable.class, Put.class,
200 jobConf);
202 TableMapReduceUtil.initTableReduceJob(TABLE_NAME,
203 ClassificatorRowReduce.class, jobConf, HRegionPartitioner.class);
204 RunningJob job = JobClient.runJob(jobConf);
205 assertTrue(job.isSuccessful());
206 } finally {
207 if (jobConf != null)
208 FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir")));
212 @SuppressWarnings("deprecation")
213 static class ClassificatorRowReduce extends MapReduceBase implements
214 TableReduce<ImmutableBytesWritable, Put> {
216 @Override
217 public void reduce(ImmutableBytesWritable key, Iterator<Put> values,
218 OutputCollector<ImmutableBytesWritable, Put> output, Reporter reporter)
219 throws IOException {
220 String strKey = Bytes.toString(key.get());
221 List<Put> result = new ArrayList<>();
222 while (values.hasNext())
223 result.add(values.next());
225 if (relation.keySet().contains(strKey)) {
226 Set<String> set = relation.get(strKey);
227 if (set != null) {
228 assertEquals(set.size(), result.size());
229 } else {
230 throwAccertionError("Test infrastructure error: set is null");
232 } else {
233 throwAccertionError("Test infrastructure error: key not found in map");
237 private void throwAccertionError(String errorMessage) throws AssertionError {
238 throw new AssertionError(errorMessage);
242 @SuppressWarnings("deprecation")
243 static class ClassificatorMapper extends MapReduceBase implements
244 TableMap<ImmutableBytesWritable, Put> {
246 @Override
247 public void map(ImmutableBytesWritable row, Result result,
248 OutputCollector<ImmutableBytesWritable, Put> outCollector,
249 Reporter reporter) throws IOException {
250 String rowKey = Bytes.toString(result.getRow());
251 final ImmutableBytesWritable pKey = new ImmutableBytesWritable(
252 Bytes.toBytes(PRESIDENT_PATTERN));
253 final ImmutableBytesWritable aKey = new ImmutableBytesWritable(
254 Bytes.toBytes(ACTOR_PATTERN));
255 ImmutableBytesWritable outKey = null;
257 if (rowKey.startsWith(PRESIDENT_PATTERN)) {
258 outKey = pKey;
259 } else if (rowKey.startsWith(ACTOR_PATTERN)) {
260 outKey = aKey;
261 } else {
262 throw new AssertionError("unexpected rowKey");
265 String name = Bytes.toString(result.getValue(COLUMN_FAMILY,
266 COLUMN_QUALIFIER));
267 outCollector.collect(outKey,
268 new Put(Bytes.toBytes("rowKey2"))
269 .addColumn(COLUMN_FAMILY, COLUMN_QUALIFIER, Bytes.toBytes(name)));