3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
.mapreduce
;
22 import java
.io
.IOException
;
24 import java
.net
.URLDecoder
;
25 import java
.util
.ArrayList
;
26 import java
.util
.Collection
;
27 import java
.util
.Enumeration
;
28 import java
.util
.HashMap
;
29 import java
.util
.HashSet
;
30 import java
.util
.List
;
33 import java
.util
.zip
.ZipEntry
;
34 import java
.util
.zip
.ZipFile
;
36 import org
.apache
.commons
.logging
.Log
;
37 import org
.apache
.commons
.logging
.LogFactory
;
38 import org
.apache
.hadoop
.conf
.Configuration
;
39 import org
.apache
.hadoop
.fs
.FileSystem
;
40 import org
.apache
.hadoop
.fs
.Path
;
41 import org
.apache
.hadoop
.hbase
.HBaseConfiguration
;
42 import org
.apache
.hadoop
.hbase
.HConstants
;
43 import org
.apache
.hadoop
.hbase
.MetaTableAccessor
;
44 import org
.apache
.hadoop
.hbase
.TableName
;
45 import org
.apache
.hadoop
.hbase
.classification
.InterfaceAudience
;
46 import org
.apache
.hadoop
.hbase
.classification
.InterfaceStability
;
47 import org
.apache
.hadoop
.hbase
.client
.Connection
;
48 import org
.apache
.hadoop
.hbase
.client
.ConnectionFactory
;
49 import org
.apache
.hadoop
.hbase
.client
.Put
;
50 import org
.apache
.hadoop
.hbase
.client
.Scan
;
51 import org
.apache
.hadoop
.hbase
.io
.ImmutableBytesWritable
;
52 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ProtobufUtil
;
53 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
;
54 import org
.apache
.hadoop
.hbase
.security
.User
;
55 import org
.apache
.hadoop
.hbase
.security
.UserProvider
;
56 import org
.apache
.hadoop
.hbase
.security
.token
.TokenUtil
;
57 import org
.apache
.hadoop
.hbase
.util
.Base64
;
58 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
59 import org
.apache
.hadoop
.hbase
.zookeeper
.ZKConfig
;
60 import org
.apache
.hadoop
.io
.Writable
;
61 import org
.apache
.hadoop
.mapreduce
.InputFormat
;
62 import org
.apache
.hadoop
.mapreduce
.Job
;
63 import org
.apache
.hadoop
.util
.StringUtils
;
65 import com
.codahale
.metrics
.MetricRegistry
;
68 * Utility for {@link TableMapper} and {@link TableReducer}
70 @SuppressWarnings({ "rawtypes", "unchecked" })
71 @InterfaceAudience.Public
72 @InterfaceStability.Stable
73 public class TableMapReduceUtil
{
74 private static final Log LOG
= LogFactory
.getLog(TableMapReduceUtil
.class);
77 * Use this before submitting a TableMap job. It will appropriately set up
80 * @param table The table name to read from.
81 * @param scan The scan instance with the columns, time range etc.
82 * @param mapper The mapper class to use.
83 * @param outputKeyClass The class of the output key.
84 * @param outputValueClass The class of the output value.
85 * @param job The current job to adjust. Make sure the passed job is
86 * carrying all necessary HBase configuration.
87 * @throws IOException When setting up the details fails.
89 public static void initTableMapperJob(String table
, Scan scan
,
90 Class
<?
extends TableMapper
> mapper
,
91 Class
<?
> outputKeyClass
,
92 Class
<?
> outputValueClass
, Job job
)
94 initTableMapperJob(table
, scan
, mapper
, outputKeyClass
, outputValueClass
,
100 * Use this before submitting a TableMap job. It will appropriately set up
103 * @param table The table name to read from.
104 * @param scan The scan instance with the columns, time range etc.
105 * @param mapper The mapper class to use.
106 * @param outputKeyClass The class of the output key.
107 * @param outputValueClass The class of the output value.
108 * @param job The current job to adjust. Make sure the passed job is
109 * carrying all necessary HBase configuration.
110 * @throws IOException When setting up the details fails.
112 public static void initTableMapperJob(TableName table
,
114 Class
<?
extends TableMapper
> mapper
,
115 Class
<?
> outputKeyClass
,
116 Class
<?
> outputValueClass
,
117 Job job
) throws IOException
{
118 initTableMapperJob(table
.getNameAsString(),
128 * Use this before submitting a TableMap job. It will appropriately set up
131 * @param table Binary representation of the table name to read from.
132 * @param scan The scan instance with the columns, time range etc.
133 * @param mapper The mapper class to use.
134 * @param outputKeyClass The class of the output key.
135 * @param outputValueClass The class of the output value.
136 * @param job The current job to adjust. Make sure the passed job is
137 * carrying all necessary HBase configuration.
138 * @throws IOException When setting up the details fails.
140 public static void initTableMapperJob(byte[] table
, Scan scan
,
141 Class
<?
extends TableMapper
> mapper
,
142 Class
<?
> outputKeyClass
,
143 Class
<?
> outputValueClass
, Job job
)
145 initTableMapperJob(Bytes
.toString(table
), scan
, mapper
, outputKeyClass
, outputValueClass
,
150 * Use this before submitting a TableMap job. It will appropriately set up
153 * @param table The table name to read from.
154 * @param scan The scan instance with the columns, time range etc.
155 * @param mapper The mapper class to use.
156 * @param outputKeyClass The class of the output key.
157 * @param outputValueClass The class of the output value.
158 * @param job The current job to adjust. Make sure the passed job is
159 * carrying all necessary HBase configuration.
160 * @param addDependencyJars upload HBase jars and jars for any of the configured
161 * job classes via the distributed cache (tmpjars).
162 * @throws IOException When setting up the details fails.
164 public static void initTableMapperJob(String table
, Scan scan
,
165 Class
<?
extends TableMapper
> mapper
,
166 Class
<?
> outputKeyClass
,
167 Class
<?
> outputValueClass
, Job job
,
168 boolean addDependencyJars
, Class
<?
extends InputFormat
> inputFormatClass
)
170 initTableMapperJob(table
, scan
, mapper
, outputKeyClass
, outputValueClass
, job
,
171 addDependencyJars
, true, inputFormatClass
);
176 * Use this before submitting a TableMap job. It will appropriately set up
179 * @param table The table name to read from.
180 * @param scan The scan instance with the columns, time range etc.
181 * @param mapper The mapper class to use.
182 * @param outputKeyClass The class of the output key.
183 * @param outputValueClass The class of the output value.
184 * @param job The current job to adjust. Make sure the passed job is
185 * carrying all necessary HBase configuration.
186 * @param addDependencyJars upload HBase jars and jars for any of the configured
187 * job classes via the distributed cache (tmpjars).
188 * @param initCredentials whether to initialize hbase auth credentials for the job
189 * @param inputFormatClass the input format
190 * @throws IOException When setting up the details fails.
192 public static void initTableMapperJob(String table
, Scan scan
,
193 Class
<?
extends TableMapper
> mapper
,
194 Class
<?
> outputKeyClass
,
195 Class
<?
> outputValueClass
, Job job
,
196 boolean addDependencyJars
, boolean initCredentials
,
197 Class
<?
extends InputFormat
> inputFormatClass
)
199 job
.setInputFormatClass(inputFormatClass
);
200 if (outputValueClass
!= null) job
.setMapOutputValueClass(outputValueClass
);
201 if (outputKeyClass
!= null) job
.setMapOutputKeyClass(outputKeyClass
);
202 job
.setMapperClass(mapper
);
203 if (Put
.class.equals(outputValueClass
)) {
204 job
.setCombinerClass(PutCombiner
.class);
206 Configuration conf
= job
.getConfiguration();
207 HBaseConfiguration
.merge(conf
, HBaseConfiguration
.create(conf
));
208 conf
.set(TableInputFormat
.INPUT_TABLE
, table
);
209 conf
.set(TableInputFormat
.SCAN
, convertScanToString(scan
));
210 conf
.setStrings("io.serializations", conf
.get("io.serializations"),
211 MutationSerialization
.class.getName(), ResultSerialization
.class.getName(),
212 KeyValueSerialization
.class.getName());
213 if (addDependencyJars
) {
214 addDependencyJars(job
);
216 if (initCredentials
) {
217 initCredentials(job
);
222 * Use this before submitting a TableMap job. It will appropriately set up
225 * @param table Binary representation of the table name to read from.
226 * @param scan The scan instance with the columns, time range etc.
227 * @param mapper The mapper class to use.
228 * @param outputKeyClass The class of the output key.
229 * @param outputValueClass The class of the output value.
230 * @param job The current job to adjust. Make sure the passed job is
231 * carrying all necessary HBase configuration.
232 * @param addDependencyJars upload HBase jars and jars for any of the configured
233 * job classes via the distributed cache (tmpjars).
234 * @param inputFormatClass The class of the input format
235 * @throws IOException When setting up the details fails.
237 public static void initTableMapperJob(byte[] table
, Scan scan
,
238 Class
<?
extends TableMapper
> mapper
,
239 Class
<?
> outputKeyClass
,
240 Class
<?
> outputValueClass
, Job job
,
241 boolean addDependencyJars
, Class
<?
extends InputFormat
> inputFormatClass
)
243 initTableMapperJob(Bytes
.toString(table
), scan
, mapper
, outputKeyClass
,
244 outputValueClass
, job
, addDependencyJars
, inputFormatClass
);
248 * Use this before submitting a TableMap job. It will appropriately set up
251 * @param table Binary representation of the table name to read from.
252 * @param scan The scan instance with the columns, time range etc.
253 * @param mapper The mapper class to use.
254 * @param outputKeyClass The class of the output key.
255 * @param outputValueClass The class of the output value.
256 * @param job The current job to adjust. Make sure the passed job is
257 * carrying all necessary HBase configuration.
258 * @param addDependencyJars upload HBase jars and jars for any of the configured
259 * job classes via the distributed cache (tmpjars).
260 * @throws IOException When setting up the details fails.
262 public static void initTableMapperJob(byte[] table
, Scan scan
,
263 Class
<?
extends TableMapper
> mapper
,
264 Class
<?
> outputKeyClass
,
265 Class
<?
> outputValueClass
, Job job
,
266 boolean addDependencyJars
)
268 initTableMapperJob(Bytes
.toString(table
), scan
, mapper
, outputKeyClass
,
269 outputValueClass
, job
, addDependencyJars
, TableInputFormat
.class);
273 * Use this before submitting a TableMap job. It will appropriately set up
276 * @param table The table name to read from.
277 * @param scan The scan instance with the columns, time range etc.
278 * @param mapper The mapper class to use.
279 * @param outputKeyClass The class of the output key.
280 * @param outputValueClass The class of the output value.
281 * @param job The current job to adjust. Make sure the passed job is
282 * carrying all necessary HBase configuration.
283 * @param addDependencyJars upload HBase jars and jars for any of the configured
284 * job classes via the distributed cache (tmpjars).
285 * @throws IOException When setting up the details fails.
287 public static void initTableMapperJob(String table
, Scan scan
,
288 Class
<?
extends TableMapper
> mapper
,
289 Class
<?
> outputKeyClass
,
290 Class
<?
> outputValueClass
, Job job
,
291 boolean addDependencyJars
)
293 initTableMapperJob(table
, scan
, mapper
, outputKeyClass
,
294 outputValueClass
, job
, addDependencyJars
, TableInputFormat
.class);
298 * Enable a basic on-heap cache for these jobs. Any BlockCache implementation based on
299 * direct memory will likely cause the map tasks to OOM when opening the region. This
300 * is done here instead of in TableSnapshotRegionRecordReader in case an advanced user
301 * wants to override this behavior in their job.
303 public static void resetCacheConfig(Configuration conf
) {
305 HConstants
.HFILE_BLOCK_CACHE_SIZE_KEY
, HConstants
.HFILE_BLOCK_CACHE_SIZE_DEFAULT
);
306 conf
.setFloat(HConstants
.BUCKET_CACHE_SIZE_KEY
, 0f
);
307 conf
.unset(HConstants
.BUCKET_CACHE_IOENGINE_KEY
);
311 * Sets up the job for reading from one or more table snapshots, with one or more scans
313 * It bypasses hbase servers and read directly from snapshot files.
315 * @param snapshotScans map of snapshot name to scans on that snapshot.
316 * @param mapper The mapper class to use.
317 * @param outputKeyClass The class of the output key.
318 * @param outputValueClass The class of the output value.
319 * @param job The current job to adjust. Make sure the passed job is
320 * carrying all necessary HBase configuration.
321 * @param addDependencyJars upload HBase jars and jars for any of the configured
322 * job classes via the distributed cache (tmpjars).
324 public static void initMultiTableSnapshotMapperJob(Map
<String
, Collection
<Scan
>> snapshotScans
,
325 Class
<?
extends TableMapper
> mapper
, Class
<?
> outputKeyClass
, Class
<?
> outputValueClass
,
326 Job job
, boolean addDependencyJars
, Path tmpRestoreDir
) throws IOException
{
327 MultiTableSnapshotInputFormat
.setInput(job
.getConfiguration(), snapshotScans
, tmpRestoreDir
);
329 job
.setInputFormatClass(MultiTableSnapshotInputFormat
.class);
330 if (outputValueClass
!= null) {
331 job
.setMapOutputValueClass(outputValueClass
);
333 if (outputKeyClass
!= null) {
334 job
.setMapOutputKeyClass(outputKeyClass
);
336 job
.setMapperClass(mapper
);
337 Configuration conf
= job
.getConfiguration();
338 HBaseConfiguration
.merge(conf
, HBaseConfiguration
.create(conf
));
340 if (addDependencyJars
) {
341 addDependencyJars(job
);
342 addDependencyJarsForClasses(job
.getConfiguration(), MetricRegistry
.class);
345 resetCacheConfig(job
.getConfiguration());
349 * Sets up the job for reading from a table snapshot. It bypasses hbase servers
350 * and read directly from snapshot files.
352 * @param snapshotName The name of the snapshot (of a table) to read from.
353 * @param scan The scan instance with the columns, time range etc.
354 * @param mapper The mapper class to use.
355 * @param outputKeyClass The class of the output key.
356 * @param outputValueClass The class of the output value.
357 * @param job The current job to adjust. Make sure the passed job is
358 * carrying all necessary HBase configuration.
359 * @param addDependencyJars upload HBase jars and jars for any of the configured
360 * job classes via the distributed cache (tmpjars).
362 * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should
363 * have write permissions to this directory, and this should not be a subdirectory of rootdir.
364 * After the job is finished, restore directory can be deleted.
365 * @throws IOException When setting up the details fails.
366 * @see TableSnapshotInputFormat
368 public static void initTableSnapshotMapperJob(String snapshotName
, Scan scan
,
369 Class
<?
extends TableMapper
> mapper
,
370 Class
<?
> outputKeyClass
,
371 Class
<?
> outputValueClass
, Job job
,
372 boolean addDependencyJars
, Path tmpRestoreDir
)
374 TableSnapshotInputFormat
.setInput(job
, snapshotName
, tmpRestoreDir
);
375 initTableMapperJob(snapshotName
, scan
, mapper
, outputKeyClass
,
376 outputValueClass
, job
, addDependencyJars
, false, TableSnapshotInputFormat
.class);
377 resetCacheConfig(job
.getConfiguration());
381 * Use this before submitting a Multi TableMap job. It will appropriately set
384 * @param scans The list of {@link Scan} objects to read from.
385 * @param mapper The mapper class to use.
386 * @param outputKeyClass The class of the output key.
387 * @param outputValueClass The class of the output value.
388 * @param job The current job to adjust. Make sure the passed job is carrying
389 * all necessary HBase configuration.
390 * @throws IOException When setting up the details fails.
392 public static void initTableMapperJob(List
<Scan
> scans
,
393 Class
<?
extends TableMapper
> mapper
,
394 Class
<?
> outputKeyClass
,
395 Class
<?
> outputValueClass
, Job job
) throws IOException
{
396 initTableMapperJob(scans
, mapper
, outputKeyClass
, outputValueClass
, job
,
401 * Use this before submitting a Multi TableMap job. It will appropriately set
404 * @param scans The list of {@link Scan} objects to read from.
405 * @param mapper The mapper class to use.
406 * @param outputKeyClass The class of the output key.
407 * @param outputValueClass The class of the output value.
408 * @param job The current job to adjust. Make sure the passed job is carrying
409 * all necessary HBase configuration.
410 * @param addDependencyJars upload HBase jars and jars for any of the
411 * configured job classes via the distributed cache (tmpjars).
412 * @throws IOException When setting up the details fails.
414 public static void initTableMapperJob(List
<Scan
> scans
,
415 Class
<?
extends TableMapper
> mapper
,
416 Class
<?
> outputKeyClass
,
417 Class
<?
> outputValueClass
, Job job
,
418 boolean addDependencyJars
) throws IOException
{
419 initTableMapperJob(scans
, mapper
, outputKeyClass
, outputValueClass
, job
,
420 addDependencyJars
, true);
424 * Use this before submitting a Multi TableMap job. It will appropriately set
427 * @param scans The list of {@link Scan} objects to read from.
428 * @param mapper The mapper class to use.
429 * @param outputKeyClass The class of the output key.
430 * @param outputValueClass The class of the output value.
431 * @param job The current job to adjust. Make sure the passed job is carrying
432 * all necessary HBase configuration.
433 * @param addDependencyJars upload HBase jars and jars for any of the
434 * configured job classes via the distributed cache (tmpjars).
435 * @param initCredentials whether to initialize hbase auth credentials for the job
436 * @throws IOException When setting up the details fails.
438 public static void initTableMapperJob(List
<Scan
> scans
,
439 Class
<?
extends TableMapper
> mapper
,
440 Class
<?
> outputKeyClass
,
441 Class
<?
> outputValueClass
, Job job
,
442 boolean addDependencyJars
,
443 boolean initCredentials
) throws IOException
{
444 job
.setInputFormatClass(MultiTableInputFormat
.class);
445 if (outputValueClass
!= null) {
446 job
.setMapOutputValueClass(outputValueClass
);
448 if (outputKeyClass
!= null) {
449 job
.setMapOutputKeyClass(outputKeyClass
);
451 job
.setMapperClass(mapper
);
452 Configuration conf
= job
.getConfiguration();
453 HBaseConfiguration
.merge(conf
, HBaseConfiguration
.create(conf
));
454 List
<String
> scanStrings
= new ArrayList
<>();
456 for (Scan scan
: scans
) {
457 scanStrings
.add(convertScanToString(scan
));
459 job
.getConfiguration().setStrings(MultiTableInputFormat
.SCANS
,
460 scanStrings
.toArray(new String
[scanStrings
.size()]));
462 if (addDependencyJars
) {
463 addDependencyJars(job
);
466 if (initCredentials
) {
467 initCredentials(job
);
471 public static void initCredentials(Job job
) throws IOException
{
472 UserProvider userProvider
= UserProvider
.instantiate(job
.getConfiguration());
473 if (userProvider
.isHadoopSecurityEnabled()) {
474 // propagate delegation related props from launcher job to MR job
475 if (System
.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
476 job
.getConfiguration().set("mapreduce.job.credentials.binary",
477 System
.getenv("HADOOP_TOKEN_FILE_LOCATION"));
481 if (userProvider
.isHBaseSecurityEnabled()) {
483 // init credentials for remote cluster
484 String quorumAddress
= job
.getConfiguration().get(TableOutputFormat
.QUORUM_ADDRESS
);
485 User user
= userProvider
.getCurrent();
486 if (quorumAddress
!= null) {
487 Configuration peerConf
= HBaseConfiguration
.createClusterConf(job
.getConfiguration(),
488 quorumAddress
, TableOutputFormat
.OUTPUT_CONF_PREFIX
);
489 Connection peerConn
= ConnectionFactory
.createConnection(peerConf
);
491 TokenUtil
.addTokenForJob(peerConn
, user
, job
);
497 Connection conn
= ConnectionFactory
.createConnection(job
.getConfiguration());
499 TokenUtil
.addTokenForJob(conn
, user
, job
);
503 } catch (InterruptedException ie
) {
504 LOG
.info("Interrupted obtaining user authentication token");
505 Thread
.currentThread().interrupt();
511 * Obtain an authentication token, for the specified cluster, on behalf of the current user
512 * and add it to the credentials for the given map reduce job.
514 * The quorumAddress is the key to the ZK ensemble, which contains:
515 * hbase.zookeeper.quorum, hbase.zookeeper.client.port and
516 * zookeeper.znode.parent
518 * @param job The job that requires the permission.
519 * @param quorumAddress string that contains the 3 required configuratins
520 * @throws IOException When the authentication token cannot be obtained.
521 * @deprecated Since 1.2.0, use {@link #initCredentialsForCluster(Job, Configuration)} instead.
524 public static void initCredentialsForCluster(Job job
, String quorumAddress
)
526 Configuration peerConf
= HBaseConfiguration
.createClusterConf(job
.getConfiguration(),
528 initCredentialsForCluster(job
, peerConf
);
532 * Obtain an authentication token, for the specified cluster, on behalf of the current user
533 * and add it to the credentials for the given map reduce job.
535 * @param job The job that requires the permission.
536 * @param conf The configuration to use in connecting to the peer cluster
537 * @throws IOException When the authentication token cannot be obtained.
539 public static void initCredentialsForCluster(Job job
, Configuration conf
)
541 UserProvider userProvider
= UserProvider
.instantiate(job
.getConfiguration());
542 if (userProvider
.isHBaseSecurityEnabled()) {
544 Connection peerConn
= ConnectionFactory
.createConnection(conf
);
546 TokenUtil
.addTokenForJob(peerConn
, userProvider
.getCurrent(), job
);
550 } catch (InterruptedException e
) {
551 LOG
.info("Interrupted obtaining user authentication token");
552 Thread
.interrupted();
558 * Writes the given scan into a Base64 encoded string.
560 * @param scan The scan to write out.
561 * @return The scan saved in a Base64 encoded string.
562 * @throws IOException When writing the scan fails.
564 public static String
convertScanToString(Scan scan
) throws IOException
{
565 ClientProtos
.Scan proto
= ProtobufUtil
.toScan(scan
);
566 return Base64
.encodeBytes(proto
.toByteArray());
570 * Converts the given Base64 string back into a Scan instance.
572 * @param base64 The scan details.
573 * @return The newly created Scan instance.
574 * @throws IOException When reading the scan instance fails.
576 public static Scan
convertStringToScan(String base64
) throws IOException
{
577 byte [] decoded
= Base64
.decode(base64
);
578 return ProtobufUtil
.toScan(ClientProtos
.Scan
.parseFrom(decoded
));
582 * Use this before submitting a TableReduce job. It will
583 * appropriately set up the JobConf.
585 * @param table The output table.
586 * @param reducer The reducer class to use.
587 * @param job The current job to adjust.
588 * @throws IOException When determining the region count fails.
590 public static void initTableReducerJob(String table
,
591 Class
<?
extends TableReducer
> reducer
, Job job
)
593 initTableReducerJob(table
, reducer
, job
, null);
597 * Use this before submitting a TableReduce job. It will
598 * appropriately set up the JobConf.
600 * @param table The output table.
601 * @param reducer The reducer class to use.
602 * @param job The current job to adjust.
603 * @param partitioner Partitioner to use. Pass <code>null</code> to use
604 * default partitioner.
605 * @throws IOException When determining the region count fails.
607 public static void initTableReducerJob(String table
,
608 Class
<?
extends TableReducer
> reducer
, Job job
,
609 Class partitioner
) throws IOException
{
610 initTableReducerJob(table
, reducer
, job
, partitioner
, null, null, null);
614 * Use this before submitting a TableReduce job. It will
615 * appropriately set up the JobConf.
617 * @param table The output table.
618 * @param reducer The reducer class to use.
619 * @param job The current job to adjust. Make sure the passed job is
620 * carrying all necessary HBase configuration.
621 * @param partitioner Partitioner to use. Pass <code>null</code> to use
622 * default partitioner.
623 * @param quorumAddress Distant cluster to write to; default is null for
624 * output to the cluster that is designated in <code>hbase-site.xml</code>.
625 * Set this String to the zookeeper ensemble of an alternate remote cluster
626 * when you would have the reduce write a cluster that is other than the
627 * default; e.g. copying tables between clusters, the source would be
628 * designated by <code>hbase-site.xml</code> and this param would have the
629 * ensemble address of the remote cluster. The format to pass is particular.
630 * Pass <code> <hbase.zookeeper.quorum>:<
631 * hbase.zookeeper.client.port>:<zookeeper.znode.parent>
632 * </code> such as <code>server,server2,server3:2181:/hbase</code>.
633 * @param serverClass redefined hbase.regionserver.class
634 * @param serverImpl redefined hbase.regionserver.impl
635 * @throws IOException When determining the region count fails.
637 public static void initTableReducerJob(String table
,
638 Class
<?
extends TableReducer
> reducer
, Job job
,
639 Class partitioner
, String quorumAddress
, String serverClass
,
640 String serverImpl
) throws IOException
{
641 initTableReducerJob(table
, reducer
, job
, partitioner
, quorumAddress
,
642 serverClass
, serverImpl
, true);
646 * Use this before submitting a TableReduce job. It will
647 * appropriately set up the JobConf.
649 * @param table The output table.
650 * @param reducer The reducer class to use.
651 * @param job The current job to adjust. Make sure the passed job is
652 * carrying all necessary HBase configuration.
653 * @param partitioner Partitioner to use. Pass <code>null</code> to use
654 * default partitioner.
655 * @param quorumAddress Distant cluster to write to; default is null for
656 * output to the cluster that is designated in <code>hbase-site.xml</code>.
657 * Set this String to the zookeeper ensemble of an alternate remote cluster
658 * when you would have the reduce write a cluster that is other than the
659 * default; e.g. copying tables between clusters, the source would be
660 * designated by <code>hbase-site.xml</code> and this param would have the
661 * ensemble address of the remote cluster. The format to pass is particular.
662 * Pass <code> <hbase.zookeeper.quorum>:<
663 * hbase.zookeeper.client.port>:<zookeeper.znode.parent>
664 * </code> such as <code>server,server2,server3:2181:/hbase</code>.
665 * @param serverClass redefined hbase.regionserver.class
666 * @param serverImpl redefined hbase.regionserver.impl
667 * @param addDependencyJars upload HBase jars and jars for any of the configured
668 * job classes via the distributed cache (tmpjars).
669 * @throws IOException When determining the region count fails.
671 public static void initTableReducerJob(String table
,
672 Class
<?
extends TableReducer
> reducer
, Job job
,
673 Class partitioner
, String quorumAddress
, String serverClass
,
674 String serverImpl
, boolean addDependencyJars
) throws IOException
{
676 Configuration conf
= job
.getConfiguration();
677 HBaseConfiguration
.merge(conf
, HBaseConfiguration
.create(conf
));
678 job
.setOutputFormatClass(TableOutputFormat
.class);
679 if (reducer
!= null) job
.setReducerClass(reducer
);
680 conf
.set(TableOutputFormat
.OUTPUT_TABLE
, table
);
681 conf
.setStrings("io.serializations", conf
.get("io.serializations"),
682 MutationSerialization
.class.getName(), ResultSerialization
.class.getName());
683 // If passed a quorum/ensemble address, pass it on to TableOutputFormat.
684 if (quorumAddress
!= null) {
685 // Calling this will validate the format
686 ZKConfig
.validateClusterKey(quorumAddress
);
687 conf
.set(TableOutputFormat
.QUORUM_ADDRESS
,quorumAddress
);
689 if (serverClass
!= null && serverImpl
!= null) {
690 conf
.set(TableOutputFormat
.REGION_SERVER_CLASS
, serverClass
);
691 conf
.set(TableOutputFormat
.REGION_SERVER_IMPL
, serverImpl
);
693 job
.setOutputKeyClass(ImmutableBytesWritable
.class);
694 job
.setOutputValueClass(Writable
.class);
695 if (partitioner
== HRegionPartitioner
.class) {
696 job
.setPartitionerClass(HRegionPartitioner
.class);
697 int regions
= MetaTableAccessor
.getRegionCount(conf
, TableName
.valueOf(table
));
698 if (job
.getNumReduceTasks() > regions
) {
699 job
.setNumReduceTasks(regions
);
701 } else if (partitioner
!= null) {
702 job
.setPartitionerClass(partitioner
);
705 if (addDependencyJars
) {
706 addDependencyJars(job
);
709 initCredentials(job
);
713 * Ensures that the given number of reduce tasks for the given job
714 * configuration does not exceed the number of regions for the given table.
716 * @param table The table to get the region count for.
717 * @param job The current job to adjust.
718 * @throws IOException When retrieving the table details fails.
720 public static void limitNumReduceTasks(String table
, Job job
)
723 MetaTableAccessor
.getRegionCount(job
.getConfiguration(), TableName
.valueOf(table
));
724 if (job
.getNumReduceTasks() > regions
)
725 job
.setNumReduceTasks(regions
);
729 * Sets the number of reduce tasks for the given job configuration to the
730 * number of regions the given table has.
732 * @param table The table to get the region count for.
733 * @param job The current job to adjust.
734 * @throws IOException When retrieving the table details fails.
736 public static void setNumReduceTasks(String table
, Job job
)
738 job
.setNumReduceTasks(MetaTableAccessor
.getRegionCount(job
.getConfiguration(),
739 TableName
.valueOf(table
)));
743 * Sets the number of rows to return and cache with each scanner iteration.
744 * Higher caching values will enable faster mapreduce jobs at the expense of
745 * requiring more heap to contain the cached rows.
747 * @param job The current job to adjust.
748 * @param batchSize The number of rows to return in batch with each scanner
751 public static void setScannerCaching(Job job
, int batchSize
) {
752 job
.getConfiguration().setInt("hbase.client.scanner.caching", batchSize
);
756 * Add HBase and its dependencies (only) to the job configuration.
758 * This is intended as a low-level API, facilitating code reuse between this
759 * class and its mapred counterpart. It also of use to external tools that
760 * need to build a MapReduce job that interacts with HBase but want
761 * fine-grained control over the jars shipped to the cluster.
763 * @param conf The Configuration object to extend with dependencies.
764 * @see org.apache.hadoop.hbase.mapred.TableMapReduceUtil
765 * @see <a href="https://issues.apache.org/jira/browse/PIG-3285">PIG-3285</a>
767 public static void addHBaseDependencyJars(Configuration conf
) throws IOException
{
769 // PrefixTreeCodec is part of the hbase-prefix-tree module. If not included in MR jobs jar
770 // dependencies, MR jobs that write encoded hfiles will fail.
771 // We used reflection here so to prevent a circular module dependency.
772 // TODO - if we extract the MR into a module, make it depend on hbase-prefix-tree.
773 Class prefixTreeCodecClass
= null;
775 prefixTreeCodecClass
=
776 Class
.forName("org.apache.hadoop.hbase.codec.prefixtree.PrefixTreeCodec");
777 } catch (ClassNotFoundException e
) {
778 // this will show up in unit tests but should not show in real deployments
779 LOG
.warn("The hbase-prefix-tree module jar containing PrefixTreeCodec is not present." +
780 " Continuing without it.");
783 addDependencyJarsForClasses(conf
,
784 // explicitly pull a class from each module
785 org
.apache
.hadoop
.hbase
.HConstants
.class, // hbase-common
786 org
.apache
.hadoop
.hbase
.protobuf
.generated
.ClientProtos
.class, // hbase-protocol
787 org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.class, // hbase-protocol-shaded
788 org
.apache
.hadoop
.hbase
.client
.Put
.class, // hbase-client
789 org
.apache
.hadoop
.hbase
.CompatibilityFactory
.class, // hbase-hadoop-compat
790 org
.apache
.hadoop
.hbase
.mapreduce
.TableMapper
.class, // hbase-server
791 org
.apache
.hadoop
.hbase
.metrics
.impl
.FastLongHistogram
.class, // hbase-metrics
792 prefixTreeCodecClass
, // hbase-prefix-tree (if null will be skipped)
793 // pull necessary dependencies
794 org
.apache
.zookeeper
.ZooKeeper
.class,
795 io
.netty
.channel
.Channel
.class,
796 com
.google
.protobuf
.Message
.class,
797 com
.google
.common
.collect
.Lists
.class,
798 org
.apache
.htrace
.Trace
.class,
799 com
.codahale
.metrics
.MetricRegistry
.class);
803 * Returns a classpath string built from the content of the "tmpjars" value in {@code conf}.
804 * Also exposed to shell scripts via `bin/hbase mapredcp`.
806 public static String
buildDependencyClasspath(Configuration conf
) {
808 throw new IllegalArgumentException("Must provide a configuration object.");
810 Set
<String
> paths
= new HashSet
<>(conf
.getStringCollection("tmpjars"));
811 if (paths
.isEmpty()) {
812 throw new IllegalArgumentException("Configuration contains no tmpjars.");
814 StringBuilder sb
= new StringBuilder();
815 for (String s
: paths
) {
816 // entries can take the form 'file:/path/to/file.jar'.
817 int idx
= s
.indexOf(":");
818 if (idx
!= -1) s
= s
.substring(idx
+ 1);
819 if (sb
.length() > 0) sb
.append(File
.pathSeparator
);
822 return sb
.toString();
826 * Add the HBase dependency jars as well as jars for any of the configured
827 * job classes to the job configuration, so that JobClient will ship them
828 * to the cluster and add them to the DistributedCache.
830 public static void addDependencyJars(Job job
) throws IOException
{
831 addHBaseDependencyJars(job
.getConfiguration());
833 addDependencyJarsForClasses(job
.getConfiguration(),
834 // when making changes here, consider also mapred.TableMapReduceUtil
836 job
.getMapOutputKeyClass(),
837 job
.getMapOutputValueClass(),
838 job
.getInputFormatClass(),
839 job
.getOutputKeyClass(),
840 job
.getOutputValueClass(),
841 job
.getOutputFormatClass(),
842 job
.getPartitionerClass(),
843 job
.getCombinerClass());
844 } catch (ClassNotFoundException e
) {
845 throw new IOException(e
);
850 * Add the jars containing the given classes to the job's configuration
851 * such that JobClient will ship them to the cluster and add them to
852 * the DistributedCache.
853 * @deprecated rely on {@link #addDependencyJars(Job)} instead.
856 public static void addDependencyJars(Configuration conf
,
857 Class
<?
>... classes
) throws IOException
{
858 LOG
.warn("The addDependencyJars(Configuration, Class<?>...) method has been deprecated since it"
859 + " is easy to use incorrectly. Most users should rely on addDependencyJars(Job) " +
860 "instead. See HBASE-8386 for more details.");
861 addDependencyJarsForClasses(conf
, classes
);
865 * Add the jars containing the given classes to the job's configuration
866 * such that JobClient will ship them to the cluster and add them to
867 * the DistributedCache.
869 * N.B. that this method at most adds one jar per class given. If there is more than one
870 * jar available containing a class with the same name as a given class, we don't define
871 * which of those jars might be chosen.
873 * @param conf The Hadoop Configuration to modify
874 * @param classes will add just those dependencies needed to find the given classes
875 * @throws IOException if an underlying library call fails.
877 @InterfaceAudience.Private
878 public static void addDependencyJarsForClasses(Configuration conf
,
879 Class
<?
>... classes
) throws IOException
{
881 FileSystem localFs
= FileSystem
.getLocal(conf
);
882 Set
<String
> jars
= new HashSet
<>();
883 // Add jars that are already in the tmpjars variable
884 jars
.addAll(conf
.getStringCollection("tmpjars"));
886 // add jars as we find them to a map of contents jar name so that we can avoid
887 // creating new jars for classes that have already been packaged.
888 Map
<String
, String
> packagedClasses
= new HashMap
<>();
890 // Add jars containing the specified classes
891 for (Class
<?
> clazz
: classes
) {
892 if (clazz
== null) continue;
894 Path path
= findOrCreateJar(clazz
, localFs
, packagedClasses
);
896 LOG
.warn("Could not find jar for class " + clazz
+
897 " in order to ship it to the cluster.");
900 if (!localFs
.exists(path
)) {
901 LOG
.warn("Could not validate jar file " + path
+ " for class "
905 jars
.add(path
.toString());
907 if (jars
.isEmpty()) return;
909 conf
.set("tmpjars", StringUtils
.arrayToString(jars
.toArray(new String
[jars
.size()])));
913 * Finds the Jar for a class or creates it if it doesn't exist. If the class is in
914 * a directory in the classpath, it creates a Jar on the fly with the
915 * contents of the directory and returns the path to that Jar. If a Jar is
916 * created, it is created in the system temporary directory. Otherwise,
917 * returns an existing jar that contains a class of the same name. Maintains
918 * a mapping from jar contents to the tmp jar created.
919 * @param my_class the class to find.
920 * @param fs the FileSystem with which to qualify the returned path.
921 * @param packagedClasses a map of class name to path.
922 * @return a jar file that contains the class.
923 * @throws IOException
925 private static Path
findOrCreateJar(Class
<?
> my_class
, FileSystem fs
,
926 Map
<String
, String
> packagedClasses
)
928 // attempt to locate an existing jar for the class.
929 String jar
= findContainingJar(my_class
, packagedClasses
);
930 if (null == jar
|| jar
.isEmpty()) {
931 jar
= getJar(my_class
);
932 updateMap(jar
, packagedClasses
);
935 if (null == jar
|| jar
.isEmpty()) {
939 LOG
.debug(String
.format("For class %s, using jar %s", my_class
.getName(), jar
));
940 return new Path(jar
).makeQualified(fs
);
944 * Add entries to <code>packagedClasses</code> corresponding to class files
945 * contained in <code>jar</code>.
946 * @param jar The jar who's content to list.
947 * @param packagedClasses map[class -> jar]
949 private static void updateMap(String jar
, Map
<String
, String
> packagedClasses
) throws IOException
{
950 if (null == jar
|| jar
.isEmpty()) {
955 zip
= new ZipFile(jar
);
956 for (Enumeration
<?
extends ZipEntry
> iter
= zip
.entries(); iter
.hasMoreElements();) {
957 ZipEntry entry
= iter
.nextElement();
958 if (entry
.getName().endsWith("class")) {
959 packagedClasses
.put(entry
.getName(), jar
);
963 if (null != zip
) zip
.close();
968 * Find a jar that contains a class of the same name, if any. It will return
969 * a jar file, even if that is not the first thing on the class path that
970 * has a class with the same name. Looks first on the classpath and then in
971 * the <code>packagedClasses</code> map.
972 * @param my_class the class to find.
973 * @return a jar file that contains the class, or null.
974 * @throws IOException
976 private static String
findContainingJar(Class
<?
> my_class
, Map
<String
, String
> packagedClasses
)
978 ClassLoader loader
= my_class
.getClassLoader();
980 String class_file
= my_class
.getName().replaceAll("\\.", "/") + ".class";
982 if (loader
!= null) {
983 // first search the classpath
984 for (Enumeration
<URL
> itr
= loader
.getResources(class_file
); itr
.hasMoreElements();) {
985 URL url
= itr
.nextElement();
986 if ("jar".equals(url
.getProtocol())) {
987 String toReturn
= url
.getPath();
988 if (toReturn
.startsWith("file:")) {
989 toReturn
= toReturn
.substring("file:".length());
991 // URLDecoder is a misnamed class, since it actually decodes
992 // x-www-form-urlencoded MIME type rather than actual
993 // URL encoding (which the file path has). Therefore it would
994 // decode +s to ' 's which is incorrect (spaces are actually
995 // either unencoded or encoded as "%20"). Replace +s first, so
996 // that they are kept sacred during the decoding process.
997 toReturn
= toReturn
.replaceAll("\\+", "%2B");
998 toReturn
= URLDecoder
.decode(toReturn
, "UTF-8");
999 return toReturn
.replaceAll("!.*$", "");
1004 // now look in any jars we've packaged using JarFinder. Returns null when
1006 return packagedClasses
.get(class_file
);
1010 * Invoke 'getJar' on a custom JarFinder implementation. Useful for some job
1011 * configuration contexts (HBASE-8140) and also for testing on MRv2.
1012 * check if we have HADOOP-9426.
1013 * @param my_class the class to find.
1014 * @return a jar file that contains the class, or null.
1016 private static String
getJar(Class
<?
> my_class
) {
1019 ret
= JarFinder
.getJar(my_class
);
1020 } catch (Exception e
) {
1021 // toss all other exceptions, related to reflection failure
1022 throw new RuntimeException("getJar invocation failed.", e
);