2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.mapreduce
;
20 import java
.io
.IOException
;
21 import java
.lang
.reflect
.Constructor
;
22 import java
.lang
.reflect
.InvocationTargetException
;
23 import java
.lang
.reflect
.Method
;
25 import org
.apache
.hadoop
.conf
.Configuration
;
26 import org
.apache
.hadoop
.mapred
.JobConf
;
27 import org
.apache
.hadoop
.mapred
.MiniMRCluster
;
28 import org
.apache
.hadoop
.mapreduce
.Job
;
29 import org
.apache
.hadoop
.mapreduce
.JobContext
;
30 import org
.apache
.hadoop
.mapreduce
.JobID
;
33 * This class provides shims for HBase to interact with the Hadoop 1.0.x and the
34 * Hadoop 0.23.x series.
36 * NOTE: No testing done against 0.22.x, or 0.21.x.
38 abstract public class MapreduceTestingShim
{
39 private static MapreduceTestingShim instance
;
40 private static Class
[] emptyParam
= new Class
[] {};
44 // This class exists in hadoop 0.22+ but not in Hadoop 20.x/1.x
46 .forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl");
47 instance
= new MapreduceV2Shim();
48 } catch (Exception e
) {
49 instance
= new MapreduceV1Shim();
53 abstract public JobContext
newJobContext(Configuration jobConf
)
56 abstract public Job
newJob(Configuration conf
) throws IOException
;
58 abstract public JobConf
obtainJobConf(MiniMRCluster cluster
);
60 abstract public String
obtainMROutputDirProp();
62 public static JobContext
createJobContext(Configuration jobConf
)
64 return instance
.newJobContext(jobConf
);
67 public static JobConf
getJobConf(MiniMRCluster cluster
) {
68 return instance
.obtainJobConf(cluster
);
71 public static Job
createJob(Configuration conf
) throws IOException
{
72 return instance
.newJob(conf
);
75 public static String
getMROutputDirProp() {
76 return instance
.obtainMROutputDirProp();
79 private static class MapreduceV1Shim
extends MapreduceTestingShim
{
81 public JobContext
newJobContext(Configuration jobConf
) throws IOException
{
83 // return new JobContext(jobConf, new JobID());
84 JobID jobId
= new JobID();
85 Constructor
<JobContext
> c
;
87 c
= JobContext
.class.getConstructor(Configuration
.class, JobID
.class);
88 return c
.newInstance(jobConf
, jobId
);
89 } catch (Exception e
) {
90 throw new IllegalStateException(
91 "Failed to instantiate new JobContext(jobConf, new JobID())", e
);
96 public Job
newJob(Configuration conf
) throws IOException
{
98 // return new Job(conf);
101 c
= Job
.class.getConstructor(Configuration
.class);
102 return c
.newInstance(conf
);
103 } catch (Exception e
) {
104 throw new IllegalStateException(
105 "Failed to instantiate new Job(conf)", e
);
110 public JobConf
obtainJobConf(MiniMRCluster cluster
) {
111 if (cluster
== null) return null;
113 Object runner
= cluster
.getJobTrackerRunner();
114 Method meth
= runner
.getClass().getDeclaredMethod("getJobTracker", emptyParam
);
115 Object tracker
= meth
.invoke(runner
, new Object
[]{});
116 Method m
= tracker
.getClass().getDeclaredMethod("getConf", emptyParam
);
117 return (JobConf
) m
.invoke(tracker
, new Object
[]{});
118 } catch (NoSuchMethodException nsme
) {
120 } catch (InvocationTargetException ite
) {
122 } catch (IllegalAccessException iae
) {
128 public String
obtainMROutputDirProp() {
129 return "mapred.output.dir";
133 private static class MapreduceV2Shim
extends MapreduceTestingShim
{
135 public JobContext
newJobContext(Configuration jobConf
) {
136 return newJob(jobConf
);
140 public Job
newJob(Configuration jobConf
) {
142 // return Job.getInstance(jobConf);
144 Method m
= Job
.class.getMethod("getInstance", Configuration
.class);
145 return (Job
) m
.invoke(null, jobConf
); // static method, then arg
146 } catch (Exception e
) {
148 throw new IllegalStateException(
149 "Failed to return from Job.getInstance(jobConf)");
154 public JobConf
obtainJobConf(MiniMRCluster cluster
) {
156 Method meth
= MiniMRCluster
.class.getMethod("getJobTrackerConf", emptyParam
);
157 return (JobConf
) meth
.invoke(cluster
, new Object
[]{});
158 } catch (NoSuchMethodException nsme
) {
160 } catch (InvocationTargetException ite
) {
162 } catch (IllegalAccessException iae
) {
168 public String
obtainMROutputDirProp() {
169 // This is a copy of o.a.h.mapreduce.lib.output.FileOutputFormat.OUTDIR
170 // from Hadoop 0.23.x. If we use the source directly we break the hadoop 1.x compile.
171 return "mapreduce.output.fileoutputformat.outputdir";