3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
.mapred
;
21 import static org
.junit
.Assert
.assertEquals
;
22 import static org
.junit
.Assert
.assertTrue
;
25 import java
.io
.IOException
;
26 import java
.util
.ArrayList
;
27 import java
.util
.Iterator
;
28 import java
.util
.List
;
31 import org
.apache
.commons
.logging
.Log
;
32 import org
.apache
.commons
.logging
.LogFactory
;
33 import org
.apache
.hadoop
.conf
.Configuration
;
34 import org
.apache
.hadoop
.fs
.FileUtil
;
35 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
36 import org
.apache
.hadoop
.hbase
.TableName
;
37 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
38 import org
.apache
.hadoop
.hbase
.testclassification
.MapReduceTests
;
39 import org
.apache
.hadoop
.hbase
.client
.Put
;
40 import org
.apache
.hadoop
.hbase
.client
.Result
;
41 import org
.apache
.hadoop
.hbase
.client
.Table
;
42 import org
.apache
.hadoop
.hbase
.io
.ImmutableBytesWritable
;
43 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
44 import org
.apache
.hadoop
.mapred
.JobClient
;
45 import org
.apache
.hadoop
.mapred
.JobConf
;
46 import org
.apache
.hadoop
.mapred
.MapReduceBase
;
47 import org
.apache
.hadoop
.mapred
.OutputCollector
;
48 import org
.apache
.hadoop
.mapred
.Reporter
;
49 import org
.apache
.hadoop
.mapred
.RunningJob
;
50 import org
.junit
.AfterClass
;
51 import org
.junit
.Assert
;
52 import org
.junit
.Before
;
53 import org
.junit
.BeforeClass
;
54 import org
.junit
.Test
;
55 import org
.junit
.experimental
.categories
.Category
;
57 import com
.google
.common
.collect
.ImmutableMap
;
58 import com
.google
.common
.collect
.ImmutableSet
;
60 @Category({MapReduceTests
.class, LargeTests
.class})
61 public class TestTableMapReduceUtil
{
63 private static final Log LOG
= LogFactory
64 .getLog(TestTableMapReduceUtil
.class);
66 private static Table presidentsTable
;
67 private static final String TABLE_NAME
= "People";
69 private static final byte[] COLUMN_FAMILY
= Bytes
.toBytes("info");
70 private static final byte[] COLUMN_QUALIFIER
= Bytes
.toBytes("name");
72 private static ImmutableSet
<String
> presidentsRowKeys
= ImmutableSet
.of(
73 "president1", "president2", "president3");
74 private static Iterator
<String
> presidentNames
= ImmutableSet
.of(
75 "John F. Kennedy", "George W. Bush", "Barack Obama").iterator();
77 private static ImmutableSet
<String
> actorsRowKeys
= ImmutableSet
.of("actor1",
79 private static Iterator
<String
> actorNames
= ImmutableSet
.of(
80 "Jack Nicholson", "Martin Freeman").iterator();
82 private static String PRESIDENT_PATTERN
= "president";
83 private static String ACTOR_PATTERN
= "actor";
84 private static ImmutableMap
<String
, ImmutableSet
<String
>> relation
= ImmutableMap
85 .of(PRESIDENT_PATTERN
, presidentsRowKeys
, ACTOR_PATTERN
, actorsRowKeys
);
87 private static final HBaseTestingUtility UTIL
= new HBaseTestingUtility();
90 public static void beforeClass() throws Exception
{
91 UTIL
.startMiniCluster();
92 presidentsTable
= createAndFillTable(TableName
.valueOf(TABLE_NAME
));
96 public static void afterClass() throws Exception
{
97 UTIL
.shutdownMiniCluster();
101 public void before() throws IOException
{
103 UTIL
.ensureSomeRegionServersAvailable(1);
104 LOG
.info("before done");
107 public static Table
createAndFillTable(TableName tableName
) throws IOException
{
108 Table table
= UTIL
.createTable(tableName
, COLUMN_FAMILY
);
109 createPutCommand(table
);
113 private static void createPutCommand(Table table
) throws IOException
{
114 for (String president
: presidentsRowKeys
) {
115 if (presidentNames
.hasNext()) {
116 Put p
= new Put(Bytes
.toBytes(president
));
117 p
.addColumn(COLUMN_FAMILY
, COLUMN_QUALIFIER
, Bytes
.toBytes(presidentNames
.next()));
122 for (String actor
: actorsRowKeys
) {
123 if (actorNames
.hasNext()) {
124 Put p
= new Put(Bytes
.toBytes(actor
));
125 p
.addColumn(COLUMN_FAMILY
, COLUMN_QUALIFIER
, Bytes
.toBytes(actorNames
.next()));
132 * Check what the given number of reduce tasks for the given job configuration
133 * does not exceed the number of regions for the given table.
136 public void shouldNumberOfReduceTaskNotExceedNumberOfRegionsForGivenTable()
138 Assert
.assertNotNull(presidentsTable
);
139 Configuration cfg
= UTIL
.getConfiguration();
140 JobConf jobConf
= new JobConf(cfg
);
141 TableMapReduceUtil
.setNumReduceTasks(TABLE_NAME
, jobConf
);
142 TableMapReduceUtil
.limitNumReduceTasks(TABLE_NAME
, jobConf
);
143 TableMapReduceUtil
.setScannerCaching(jobConf
, 100);
144 assertEquals(1, jobConf
.getNumReduceTasks());
145 assertEquals(100, jobConf
.getInt("hbase.client.scanner.caching", 0));
147 jobConf
.setNumReduceTasks(10);
148 TableMapReduceUtil
.setNumMapTasks(TABLE_NAME
, jobConf
);
149 TableMapReduceUtil
.limitNumReduceTasks(TABLE_NAME
, jobConf
);
150 assertEquals(1, jobConf
.getNumReduceTasks());
154 public void shouldNumberOfMapTaskNotExceedNumberOfRegionsForGivenTable()
156 Configuration cfg
= UTIL
.getConfiguration();
157 JobConf jobConf
= new JobConf(cfg
);
158 TableMapReduceUtil
.setNumReduceTasks(TABLE_NAME
, jobConf
);
159 TableMapReduceUtil
.limitNumMapTasks(TABLE_NAME
, jobConf
);
160 assertEquals(1, jobConf
.getNumMapTasks());
162 jobConf
.setNumMapTasks(10);
163 TableMapReduceUtil
.setNumMapTasks(TABLE_NAME
, jobConf
);
164 TableMapReduceUtil
.limitNumMapTasks(TABLE_NAME
, jobConf
);
165 assertEquals(1, jobConf
.getNumMapTasks());
169 @SuppressWarnings("deprecation")
170 public void shoudBeValidMapReduceEvaluation() throws Exception
{
171 Configuration cfg
= UTIL
.getConfiguration();
172 JobConf jobConf
= new JobConf(cfg
);
174 jobConf
.setJobName("process row task");
175 jobConf
.setNumReduceTasks(1);
176 TableMapReduceUtil
.initTableMapJob(TABLE_NAME
, new String(COLUMN_FAMILY
),
177 ClassificatorMapper
.class, ImmutableBytesWritable
.class, Put
.class,
179 TableMapReduceUtil
.initTableReduceJob(TABLE_NAME
,
180 ClassificatorRowReduce
.class, jobConf
);
181 RunningJob job
= JobClient
.runJob(jobConf
);
182 assertTrue(job
.isSuccessful());
185 FileUtil
.fullyDelete(new File(jobConf
.get("hadoop.tmp.dir")));
190 @SuppressWarnings("deprecation")
191 public void shoudBeValidMapReduceWithPartitionerEvaluation()
193 Configuration cfg
= UTIL
.getConfiguration();
194 JobConf jobConf
= new JobConf(cfg
);
196 jobConf
.setJobName("process row task");
197 jobConf
.setNumReduceTasks(2);
198 TableMapReduceUtil
.initTableMapJob(TABLE_NAME
, new String(COLUMN_FAMILY
),
199 ClassificatorMapper
.class, ImmutableBytesWritable
.class, Put
.class,
202 TableMapReduceUtil
.initTableReduceJob(TABLE_NAME
,
203 ClassificatorRowReduce
.class, jobConf
, HRegionPartitioner
.class);
204 RunningJob job
= JobClient
.runJob(jobConf
);
205 assertTrue(job
.isSuccessful());
208 FileUtil
.fullyDelete(new File(jobConf
.get("hadoop.tmp.dir")));
212 @SuppressWarnings("deprecation")
213 static class ClassificatorRowReduce
extends MapReduceBase
implements
214 TableReduce
<ImmutableBytesWritable
, Put
> {
217 public void reduce(ImmutableBytesWritable key
, Iterator
<Put
> values
,
218 OutputCollector
<ImmutableBytesWritable
, Put
> output
, Reporter reporter
)
220 String strKey
= Bytes
.toString(key
.get());
221 List
<Put
> result
= new ArrayList
<>();
222 while (values
.hasNext())
223 result
.add(values
.next());
225 if (relation
.keySet().contains(strKey
)) {
226 Set
<String
> set
= relation
.get(strKey
);
228 assertEquals(set
.size(), result
.size());
230 throwAccertionError("Test infrastructure error: set is null");
233 throwAccertionError("Test infrastructure error: key not found in map");
237 private void throwAccertionError(String errorMessage
) throws AssertionError
{
238 throw new AssertionError(errorMessage
);
242 @SuppressWarnings("deprecation")
243 static class ClassificatorMapper
extends MapReduceBase
implements
244 TableMap
<ImmutableBytesWritable
, Put
> {
247 public void map(ImmutableBytesWritable row
, Result result
,
248 OutputCollector
<ImmutableBytesWritable
, Put
> outCollector
,
249 Reporter reporter
) throws IOException
{
250 String rowKey
= Bytes
.toString(result
.getRow());
251 final ImmutableBytesWritable pKey
= new ImmutableBytesWritable(
252 Bytes
.toBytes(PRESIDENT_PATTERN
));
253 final ImmutableBytesWritable aKey
= new ImmutableBytesWritable(
254 Bytes
.toBytes(ACTOR_PATTERN
));
255 ImmutableBytesWritable outKey
= null;
257 if (rowKey
.startsWith(PRESIDENT_PATTERN
)) {
259 } else if (rowKey
.startsWith(ACTOR_PATTERN
)) {
262 throw new AssertionError("unexpected rowKey");
265 String name
= Bytes
.toString(result
.getValue(COLUMN_FAMILY
,
267 outCollector
.collect(outKey
,
268 new Put(Bytes
.toBytes("rowKey2"))
269 .addColumn(COLUMN_FAMILY
, COLUMN_QUALIFIER
, Bytes
.toBytes(name
)));