3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
.mapreduce
;
22 import java
.io
.IOException
;
24 import java
.net
.URLDecoder
;
25 import java
.util
.ArrayList
;
26 import java
.util
.Collection
;
27 import java
.util
.Enumeration
;
28 import java
.util
.HashMap
;
29 import java
.util
.HashSet
;
30 import java
.util
.List
;
33 import java
.util
.zip
.ZipEntry
;
34 import java
.util
.zip
.ZipFile
;
36 import org
.apache
.commons
.logging
.Log
;
37 import org
.apache
.commons
.logging
.LogFactory
;
38 import org
.apache
.hadoop
.conf
.Configuration
;
39 import org
.apache
.hadoop
.fs
.FileSystem
;
40 import org
.apache
.hadoop
.fs
.Path
;
41 import org
.apache
.hadoop
.hbase
.HBaseConfiguration
;
42 import org
.apache
.hadoop
.hbase
.HConstants
;
43 import org
.apache
.hadoop
.hbase
.MetaTableAccessor
;
44 import org
.apache
.hadoop
.hbase
.TableName
;
45 import org
.apache
.yetus
.audience
.InterfaceAudience
;
46 import org
.apache
.hadoop
.hbase
.client
.Connection
;
47 import org
.apache
.hadoop
.hbase
.client
.ConnectionFactory
;
48 import org
.apache
.hadoop
.hbase
.client
.Put
;
49 import org
.apache
.hadoop
.hbase
.client
.Scan
;
50 import org
.apache
.hadoop
.hbase
.io
.ImmutableBytesWritable
;
51 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ProtobufUtil
;
52 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
;
53 import org
.apache
.hadoop
.hbase
.security
.User
;
54 import org
.apache
.hadoop
.hbase
.security
.UserProvider
;
55 import org
.apache
.hadoop
.hbase
.security
.token
.TokenUtil
;
56 import org
.apache
.hadoop
.hbase
.util
.Base64
;
57 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
58 import org
.apache
.hadoop
.hbase
.util
.RegionSplitter
;
59 import org
.apache
.hadoop
.hbase
.zookeeper
.ZKConfig
;
60 import org
.apache
.hadoop
.io
.Writable
;
61 import org
.apache
.hadoop
.mapreduce
.InputFormat
;
62 import org
.apache
.hadoop
.mapreduce
.Job
;
63 import org
.apache
.hadoop
.util
.StringUtils
;
65 import com
.codahale
.metrics
.MetricRegistry
;
68 * Utility for {@link TableMapper} and {@link TableReducer}
70 @SuppressWarnings({ "rawtypes", "unchecked" })
71 @InterfaceAudience.Public
72 public class TableMapReduceUtil
{
73 private static final Log LOG
= LogFactory
.getLog(TableMapReduceUtil
.class);
76 * Use this before submitting a TableMap job. It will appropriately set up
79 * @param table The table name to read from.
80 * @param scan The scan instance with the columns, time range etc.
81 * @param mapper The mapper class to use.
82 * @param outputKeyClass The class of the output key.
83 * @param outputValueClass The class of the output value.
84 * @param job The current job to adjust. Make sure the passed job is
85 * carrying all necessary HBase configuration.
86 * @throws IOException When setting up the details fails.
88 public static void initTableMapperJob(String table
, Scan scan
,
89 Class
<?
extends TableMapper
> mapper
,
90 Class
<?
> outputKeyClass
,
91 Class
<?
> outputValueClass
, Job job
)
93 initTableMapperJob(table
, scan
, mapper
, outputKeyClass
, outputValueClass
,
99 * Use this before submitting a TableMap job. It will appropriately set up
102 * @param table The table name to read from.
103 * @param scan The scan instance with the columns, time range etc.
104 * @param mapper The mapper class to use.
105 * @param outputKeyClass The class of the output key.
106 * @param outputValueClass The class of the output value.
107 * @param job The current job to adjust. Make sure the passed job is
108 * carrying all necessary HBase configuration.
109 * @throws IOException When setting up the details fails.
111 public static void initTableMapperJob(TableName table
,
113 Class
<?
extends TableMapper
> mapper
,
114 Class
<?
> outputKeyClass
,
115 Class
<?
> outputValueClass
,
116 Job job
) throws IOException
{
117 initTableMapperJob(table
.getNameAsString(),
127 * Use this before submitting a TableMap job. It will appropriately set up
130 * @param table Binary representation of the table name to read from.
131 * @param scan The scan instance with the columns, time range etc.
132 * @param mapper The mapper class to use.
133 * @param outputKeyClass The class of the output key.
134 * @param outputValueClass The class of the output value.
135 * @param job The current job to adjust. Make sure the passed job is
136 * carrying all necessary HBase configuration.
137 * @throws IOException When setting up the details fails.
139 public static void initTableMapperJob(byte[] table
, Scan scan
,
140 Class
<?
extends TableMapper
> mapper
,
141 Class
<?
> outputKeyClass
,
142 Class
<?
> outputValueClass
, Job job
)
144 initTableMapperJob(Bytes
.toString(table
), scan
, mapper
, outputKeyClass
, outputValueClass
,
149 * Use this before submitting a TableMap job. It will appropriately set up
152 * @param table The table name to read from.
153 * @param scan The scan instance with the columns, time range etc.
154 * @param mapper The mapper class to use.
155 * @param outputKeyClass The class of the output key.
156 * @param outputValueClass The class of the output value.
157 * @param job The current job to adjust. Make sure the passed job is
158 * carrying all necessary HBase configuration.
159 * @param addDependencyJars upload HBase jars and jars for any of the configured
160 * job classes via the distributed cache (tmpjars).
161 * @throws IOException When setting up the details fails.
163 public static void initTableMapperJob(String table
, Scan scan
,
164 Class
<?
extends TableMapper
> mapper
,
165 Class
<?
> outputKeyClass
,
166 Class
<?
> outputValueClass
, Job job
,
167 boolean addDependencyJars
, Class
<?
extends InputFormat
> inputFormatClass
)
169 initTableMapperJob(table
, scan
, mapper
, outputKeyClass
, outputValueClass
, job
,
170 addDependencyJars
, true, inputFormatClass
);
175 * Use this before submitting a TableMap job. It will appropriately set up
178 * @param table The table name to read from.
179 * @param scan The scan instance with the columns, time range etc.
180 * @param mapper The mapper class to use.
181 * @param outputKeyClass The class of the output key.
182 * @param outputValueClass The class of the output value.
183 * @param job The current job to adjust. Make sure the passed job is
184 * carrying all necessary HBase configuration.
185 * @param addDependencyJars upload HBase jars and jars for any of the configured
186 * job classes via the distributed cache (tmpjars).
187 * @param initCredentials whether to initialize hbase auth credentials for the job
188 * @param inputFormatClass the input format
189 * @throws IOException When setting up the details fails.
191 public static void initTableMapperJob(String table
, Scan scan
,
192 Class
<?
extends TableMapper
> mapper
,
193 Class
<?
> outputKeyClass
,
194 Class
<?
> outputValueClass
, Job job
,
195 boolean addDependencyJars
, boolean initCredentials
,
196 Class
<?
extends InputFormat
> inputFormatClass
)
198 job
.setInputFormatClass(inputFormatClass
);
199 if (outputValueClass
!= null) job
.setMapOutputValueClass(outputValueClass
);
200 if (outputKeyClass
!= null) job
.setMapOutputKeyClass(outputKeyClass
);
201 job
.setMapperClass(mapper
);
202 if (Put
.class.equals(outputValueClass
)) {
203 job
.setCombinerClass(PutCombiner
.class);
205 Configuration conf
= job
.getConfiguration();
206 HBaseConfiguration
.merge(conf
, HBaseConfiguration
.create(conf
));
207 conf
.set(TableInputFormat
.INPUT_TABLE
, table
);
208 conf
.set(TableInputFormat
.SCAN
, convertScanToString(scan
));
209 conf
.setStrings("io.serializations", conf
.get("io.serializations"),
210 MutationSerialization
.class.getName(), ResultSerialization
.class.getName(),
211 CellSerialization
.class.getName());
212 if (addDependencyJars
) {
213 addDependencyJars(job
);
215 if (initCredentials
) {
216 initCredentials(job
);
221 * Use this before submitting a TableMap job. It will appropriately set up
224 * @param table Binary representation of the table name to read from.
225 * @param scan The scan instance with the columns, time range etc.
226 * @param mapper The mapper class to use.
227 * @param outputKeyClass The class of the output key.
228 * @param outputValueClass The class of the output value.
229 * @param job The current job to adjust. Make sure the passed job is
230 * carrying all necessary HBase configuration.
231 * @param addDependencyJars upload HBase jars and jars for any of the configured
232 * job classes via the distributed cache (tmpjars).
233 * @param inputFormatClass The class of the input format
234 * @throws IOException When setting up the details fails.
236 public static void initTableMapperJob(byte[] table
, Scan scan
,
237 Class
<?
extends TableMapper
> mapper
,
238 Class
<?
> outputKeyClass
,
239 Class
<?
> outputValueClass
, Job job
,
240 boolean addDependencyJars
, Class
<?
extends InputFormat
> inputFormatClass
)
242 initTableMapperJob(Bytes
.toString(table
), scan
, mapper
, outputKeyClass
,
243 outputValueClass
, job
, addDependencyJars
, inputFormatClass
);
247 * Use this before submitting a TableMap job. It will appropriately set up
250 * @param table Binary representation of the table name to read from.
251 * @param scan The scan instance with the columns, time range etc.
252 * @param mapper The mapper class to use.
253 * @param outputKeyClass The class of the output key.
254 * @param outputValueClass The class of the output value.
255 * @param job The current job to adjust. Make sure the passed job is
256 * carrying all necessary HBase configuration.
257 * @param addDependencyJars upload HBase jars and jars for any of the configured
258 * job classes via the distributed cache (tmpjars).
259 * @throws IOException When setting up the details fails.
261 public static void initTableMapperJob(byte[] table
, Scan scan
,
262 Class
<?
extends TableMapper
> mapper
,
263 Class
<?
> outputKeyClass
,
264 Class
<?
> outputValueClass
, Job job
,
265 boolean addDependencyJars
)
267 initTableMapperJob(Bytes
.toString(table
), scan
, mapper
, outputKeyClass
,
268 outputValueClass
, job
, addDependencyJars
, TableInputFormat
.class);
272 * Use this before submitting a TableMap job. It will appropriately set up
275 * @param table The table name to read from.
276 * @param scan The scan instance with the columns, time range etc.
277 * @param mapper The mapper class to use.
278 * @param outputKeyClass The class of the output key.
279 * @param outputValueClass The class of the output value.
280 * @param job The current job to adjust. Make sure the passed job is
281 * carrying all necessary HBase configuration.
282 * @param addDependencyJars upload HBase jars and jars for any of the configured
283 * job classes via the distributed cache (tmpjars).
284 * @throws IOException When setting up the details fails.
286 public static void initTableMapperJob(String table
, Scan scan
,
287 Class
<?
extends TableMapper
> mapper
,
288 Class
<?
> outputKeyClass
,
289 Class
<?
> outputValueClass
, Job job
,
290 boolean addDependencyJars
)
292 initTableMapperJob(table
, scan
, mapper
, outputKeyClass
,
293 outputValueClass
, job
, addDependencyJars
, TableInputFormat
.class);
297 * Enable a basic on-heap cache for these jobs. Any BlockCache implementation based on
298 * direct memory will likely cause the map tasks to OOM when opening the region. This
299 * is done here instead of in TableSnapshotRegionRecordReader in case an advanced user
300 * wants to override this behavior in their job.
302 public static void resetCacheConfig(Configuration conf
) {
304 HConstants
.HFILE_BLOCK_CACHE_SIZE_KEY
, HConstants
.HFILE_BLOCK_CACHE_SIZE_DEFAULT
);
305 conf
.setFloat(HConstants
.BUCKET_CACHE_SIZE_KEY
, 0f
);
306 conf
.unset(HConstants
.BUCKET_CACHE_IOENGINE_KEY
);
310 * Sets up the job for reading from one or more table snapshots, with one or more scans
312 * It bypasses hbase servers and read directly from snapshot files.
314 * @param snapshotScans map of snapshot name to scans on that snapshot.
315 * @param mapper The mapper class to use.
316 * @param outputKeyClass The class of the output key.
317 * @param outputValueClass The class of the output value.
318 * @param job The current job to adjust. Make sure the passed job is
319 * carrying all necessary HBase configuration.
320 * @param addDependencyJars upload HBase jars and jars for any of the configured
321 * job classes via the distributed cache (tmpjars).
323 public static void initMultiTableSnapshotMapperJob(Map
<String
, Collection
<Scan
>> snapshotScans
,
324 Class
<?
extends TableMapper
> mapper
, Class
<?
> outputKeyClass
, Class
<?
> outputValueClass
,
325 Job job
, boolean addDependencyJars
, Path tmpRestoreDir
) throws IOException
{
326 MultiTableSnapshotInputFormat
.setInput(job
.getConfiguration(), snapshotScans
, tmpRestoreDir
);
328 job
.setInputFormatClass(MultiTableSnapshotInputFormat
.class);
329 if (outputValueClass
!= null) {
330 job
.setMapOutputValueClass(outputValueClass
);
332 if (outputKeyClass
!= null) {
333 job
.setMapOutputKeyClass(outputKeyClass
);
335 job
.setMapperClass(mapper
);
336 Configuration conf
= job
.getConfiguration();
337 HBaseConfiguration
.merge(conf
, HBaseConfiguration
.create(conf
));
339 if (addDependencyJars
) {
340 addDependencyJars(job
);
341 addDependencyJarsForClasses(job
.getConfiguration(), MetricRegistry
.class);
344 resetCacheConfig(job
.getConfiguration());
348 * Sets up the job for reading from a table snapshot. It bypasses hbase servers
349 * and read directly from snapshot files.
351 * @param snapshotName The name of the snapshot (of a table) to read from.
352 * @param scan The scan instance with the columns, time range etc.
353 * @param mapper The mapper class to use.
354 * @param outputKeyClass The class of the output key.
355 * @param outputValueClass The class of the output value.
356 * @param job The current job to adjust. Make sure the passed job is
357 * carrying all necessary HBase configuration.
358 * @param addDependencyJars upload HBase jars and jars for any of the configured
359 * job classes via the distributed cache (tmpjars).
361 * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should
362 * have write permissions to this directory, and this should not be a subdirectory of rootdir.
363 * After the job is finished, restore directory can be deleted.
364 * @throws IOException When setting up the details fails.
365 * @see TableSnapshotInputFormat
367 public static void initTableSnapshotMapperJob(String snapshotName
, Scan scan
,
368 Class
<?
extends TableMapper
> mapper
,
369 Class
<?
> outputKeyClass
,
370 Class
<?
> outputValueClass
, Job job
,
371 boolean addDependencyJars
, Path tmpRestoreDir
)
373 TableSnapshotInputFormat
.setInput(job
, snapshotName
, tmpRestoreDir
);
374 initTableMapperJob(snapshotName
, scan
, mapper
, outputKeyClass
,
375 outputValueClass
, job
, addDependencyJars
, false, TableSnapshotInputFormat
.class);
376 resetCacheConfig(job
.getConfiguration());
380 * Sets up the job for reading from a table snapshot. It bypasses hbase servers
381 * and read directly from snapshot files.
383 * @param snapshotName The name of the snapshot (of a table) to read from.
384 * @param scan The scan instance with the columns, time range etc.
385 * @param mapper The mapper class to use.
386 * @param outputKeyClass The class of the output key.
387 * @param outputValueClass The class of the output value.
388 * @param job The current job to adjust. Make sure the passed job is
389 * carrying all necessary HBase configuration.
390 * @param addDependencyJars upload HBase jars and jars for any of the configured
391 * job classes via the distributed cache (tmpjars).
393 * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should
394 * have write permissions to this directory, and this should not be a subdirectory of rootdir.
395 * After the job is finished, restore directory can be deleted.
396 * @param splitAlgo algorithm to split
397 * @param numSplitsPerRegion how many input splits to generate per one region
398 * @throws IOException When setting up the details fails.
399 * @see TableSnapshotInputFormat
401 public static void initTableSnapshotMapperJob(String snapshotName
, Scan scan
,
402 Class
<?
extends TableMapper
> mapper
,
403 Class
<?
> outputKeyClass
,
404 Class
<?
> outputValueClass
, Job job
,
405 boolean addDependencyJars
, Path tmpRestoreDir
,
406 RegionSplitter
.SplitAlgorithm splitAlgo
,
407 int numSplitsPerRegion
)
409 TableSnapshotInputFormat
.setInput(job
, snapshotName
, tmpRestoreDir
, splitAlgo
,
411 initTableMapperJob(snapshotName
, scan
, mapper
, outputKeyClass
,
412 outputValueClass
, job
, addDependencyJars
, false, TableSnapshotInputFormat
.class);
413 resetCacheConfig(job
.getConfiguration());
417 * Use this before submitting a Multi TableMap job. It will appropriately set
420 * @param scans The list of {@link Scan} objects to read from.
421 * @param mapper The mapper class to use.
422 * @param outputKeyClass The class of the output key.
423 * @param outputValueClass The class of the output value.
424 * @param job The current job to adjust. Make sure the passed job is carrying
425 * all necessary HBase configuration.
426 * @throws IOException When setting up the details fails.
428 public static void initTableMapperJob(List
<Scan
> scans
,
429 Class
<?
extends TableMapper
> mapper
,
430 Class
<?
> outputKeyClass
,
431 Class
<?
> outputValueClass
, Job job
) throws IOException
{
432 initTableMapperJob(scans
, mapper
, outputKeyClass
, outputValueClass
, job
,
437 * Use this before submitting a Multi TableMap job. It will appropriately set
440 * @param scans The list of {@link Scan} objects to read from.
441 * @param mapper The mapper class to use.
442 * @param outputKeyClass The class of the output key.
443 * @param outputValueClass The class of the output value.
444 * @param job The current job to adjust. Make sure the passed job is carrying
445 * all necessary HBase configuration.
446 * @param addDependencyJars upload HBase jars and jars for any of the
447 * configured job classes via the distributed cache (tmpjars).
448 * @throws IOException When setting up the details fails.
450 public static void initTableMapperJob(List
<Scan
> scans
,
451 Class
<?
extends TableMapper
> mapper
,
452 Class
<?
> outputKeyClass
,
453 Class
<?
> outputValueClass
, Job job
,
454 boolean addDependencyJars
) throws IOException
{
455 initTableMapperJob(scans
, mapper
, outputKeyClass
, outputValueClass
, job
,
456 addDependencyJars
, true);
460 * Use this before submitting a Multi TableMap job. It will appropriately set
463 * @param scans The list of {@link Scan} objects to read from.
464 * @param mapper The mapper class to use.
465 * @param outputKeyClass The class of the output key.
466 * @param outputValueClass The class of the output value.
467 * @param job The current job to adjust. Make sure the passed job is carrying
468 * all necessary HBase configuration.
469 * @param addDependencyJars upload HBase jars and jars for any of the
470 * configured job classes via the distributed cache (tmpjars).
471 * @param initCredentials whether to initialize hbase auth credentials for the job
472 * @throws IOException When setting up the details fails.
474 public static void initTableMapperJob(List
<Scan
> scans
,
475 Class
<?
extends TableMapper
> mapper
,
476 Class
<?
> outputKeyClass
,
477 Class
<?
> outputValueClass
, Job job
,
478 boolean addDependencyJars
,
479 boolean initCredentials
) throws IOException
{
480 job
.setInputFormatClass(MultiTableInputFormat
.class);
481 if (outputValueClass
!= null) {
482 job
.setMapOutputValueClass(outputValueClass
);
484 if (outputKeyClass
!= null) {
485 job
.setMapOutputKeyClass(outputKeyClass
);
487 job
.setMapperClass(mapper
);
488 Configuration conf
= job
.getConfiguration();
489 HBaseConfiguration
.merge(conf
, HBaseConfiguration
.create(conf
));
490 List
<String
> scanStrings
= new ArrayList
<>();
492 for (Scan scan
: scans
) {
493 scanStrings
.add(convertScanToString(scan
));
495 job
.getConfiguration().setStrings(MultiTableInputFormat
.SCANS
,
496 scanStrings
.toArray(new String
[scanStrings
.size()]));
498 if (addDependencyJars
) {
499 addDependencyJars(job
);
502 if (initCredentials
) {
503 initCredentials(job
);
507 public static void initCredentials(Job job
) throws IOException
{
508 UserProvider userProvider
= UserProvider
.instantiate(job
.getConfiguration());
509 if (userProvider
.isHadoopSecurityEnabled()) {
510 // propagate delegation related props from launcher job to MR job
511 if (System
.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
512 job
.getConfiguration().set("mapreduce.job.credentials.binary",
513 System
.getenv("HADOOP_TOKEN_FILE_LOCATION"));
517 if (userProvider
.isHBaseSecurityEnabled()) {
519 // init credentials for remote cluster
520 String quorumAddress
= job
.getConfiguration().get(TableOutputFormat
.QUORUM_ADDRESS
);
521 User user
= userProvider
.getCurrent();
522 if (quorumAddress
!= null) {
523 Configuration peerConf
= HBaseConfiguration
.createClusterConf(job
.getConfiguration(),
524 quorumAddress
, TableOutputFormat
.OUTPUT_CONF_PREFIX
);
525 Connection peerConn
= ConnectionFactory
.createConnection(peerConf
);
527 TokenUtil
.addTokenForJob(peerConn
, user
, job
);
533 Connection conn
= ConnectionFactory
.createConnection(job
.getConfiguration());
535 TokenUtil
.addTokenForJob(conn
, user
, job
);
539 } catch (InterruptedException ie
) {
540 LOG
.info("Interrupted obtaining user authentication token");
541 Thread
.currentThread().interrupt();
547 * Obtain an authentication token, for the specified cluster, on behalf of the current user
548 * and add it to the credentials for the given map reduce job.
550 * The quorumAddress is the key to the ZK ensemble, which contains:
551 * hbase.zookeeper.quorum, hbase.zookeeper.client.port and
552 * zookeeper.znode.parent
554 * @param job The job that requires the permission.
555 * @param quorumAddress string that contains the 3 required configuratins
556 * @throws IOException When the authentication token cannot be obtained.
557 * @deprecated Since 1.2.0, use {@link #initCredentialsForCluster(Job, Configuration)} instead.
560 public static void initCredentialsForCluster(Job job
, String quorumAddress
)
562 Configuration peerConf
= HBaseConfiguration
.createClusterConf(job
.getConfiguration(),
564 initCredentialsForCluster(job
, peerConf
);
568 * Obtain an authentication token, for the specified cluster, on behalf of the current user
569 * and add it to the credentials for the given map reduce job.
571 * @param job The job that requires the permission.
572 * @param conf The configuration to use in connecting to the peer cluster
573 * @throws IOException When the authentication token cannot be obtained.
575 public static void initCredentialsForCluster(Job job
, Configuration conf
)
577 UserProvider userProvider
= UserProvider
.instantiate(job
.getConfiguration());
578 if (userProvider
.isHBaseSecurityEnabled()) {
580 Connection peerConn
= ConnectionFactory
.createConnection(conf
);
582 TokenUtil
.addTokenForJob(peerConn
, userProvider
.getCurrent(), job
);
586 } catch (InterruptedException e
) {
587 LOG
.info("Interrupted obtaining user authentication token");
588 Thread
.interrupted();
594 * Writes the given scan into a Base64 encoded string.
596 * @param scan The scan to write out.
597 * @return The scan saved in a Base64 encoded string.
598 * @throws IOException When writing the scan fails.
600 public static String
convertScanToString(Scan scan
) throws IOException
{
601 ClientProtos
.Scan proto
= ProtobufUtil
.toScan(scan
);
602 return Base64
.encodeBytes(proto
.toByteArray());
606 * Converts the given Base64 string back into a Scan instance.
608 * @param base64 The scan details.
609 * @return The newly created Scan instance.
610 * @throws IOException When reading the scan instance fails.
612 public static Scan
convertStringToScan(String base64
) throws IOException
{
613 byte [] decoded
= Base64
.decode(base64
);
614 return ProtobufUtil
.toScan(ClientProtos
.Scan
.parseFrom(decoded
));
618 * Use this before submitting a TableReduce job. It will
619 * appropriately set up the JobConf.
621 * @param table The output table.
622 * @param reducer The reducer class to use.
623 * @param job The current job to adjust.
624 * @throws IOException When determining the region count fails.
626 public static void initTableReducerJob(String table
,
627 Class
<?
extends TableReducer
> reducer
, Job job
)
629 initTableReducerJob(table
, reducer
, job
, null);
633 * Use this before submitting a TableReduce job. It will
634 * appropriately set up the JobConf.
636 * @param table The output table.
637 * @param reducer The reducer class to use.
638 * @param job The current job to adjust.
639 * @param partitioner Partitioner to use. Pass <code>null</code> to use
640 * default partitioner.
641 * @throws IOException When determining the region count fails.
643 public static void initTableReducerJob(String table
,
644 Class
<?
extends TableReducer
> reducer
, Job job
,
645 Class partitioner
) throws IOException
{
646 initTableReducerJob(table
, reducer
, job
, partitioner
, null, null, null);
650 * Use this before submitting a TableReduce job. It will
651 * appropriately set up the JobConf.
653 * @param table The output table.
654 * @param reducer The reducer class to use.
655 * @param job The current job to adjust. Make sure the passed job is
656 * carrying all necessary HBase configuration.
657 * @param partitioner Partitioner to use. Pass <code>null</code> to use
658 * default partitioner.
659 * @param quorumAddress Distant cluster to write to; default is null for
660 * output to the cluster that is designated in <code>hbase-site.xml</code>.
661 * Set this String to the zookeeper ensemble of an alternate remote cluster
662 * when you would have the reduce write a cluster that is other than the
663 * default; e.g. copying tables between clusters, the source would be
664 * designated by <code>hbase-site.xml</code> and this param would have the
665 * ensemble address of the remote cluster. The format to pass is particular.
666 * Pass <code> <hbase.zookeeper.quorum>:<
667 * hbase.zookeeper.client.port>:<zookeeper.znode.parent>
668 * </code> such as <code>server,server2,server3:2181:/hbase</code>.
669 * @param serverClass redefined hbase.regionserver.class
670 * @param serverImpl redefined hbase.regionserver.impl
671 * @throws IOException When determining the region count fails.
673 public static void initTableReducerJob(String table
,
674 Class
<?
extends TableReducer
> reducer
, Job job
,
675 Class partitioner
, String quorumAddress
, String serverClass
,
676 String serverImpl
) throws IOException
{
677 initTableReducerJob(table
, reducer
, job
, partitioner
, quorumAddress
,
678 serverClass
, serverImpl
, true);
682 * Use this before submitting a TableReduce job. It will
683 * appropriately set up the JobConf.
685 * @param table The output table.
686 * @param reducer The reducer class to use.
687 * @param job The current job to adjust. Make sure the passed job is
688 * carrying all necessary HBase configuration.
689 * @param partitioner Partitioner to use. Pass <code>null</code> to use
690 * default partitioner.
691 * @param quorumAddress Distant cluster to write to; default is null for
692 * output to the cluster that is designated in <code>hbase-site.xml</code>.
693 * Set this String to the zookeeper ensemble of an alternate remote cluster
694 * when you would have the reduce write a cluster that is other than the
695 * default; e.g. copying tables between clusters, the source would be
696 * designated by <code>hbase-site.xml</code> and this param would have the
697 * ensemble address of the remote cluster. The format to pass is particular.
698 * Pass <code> <hbase.zookeeper.quorum>:<
699 * hbase.zookeeper.client.port>:<zookeeper.znode.parent>
700 * </code> such as <code>server,server2,server3:2181:/hbase</code>.
701 * @param serverClass redefined hbase.regionserver.class
702 * @param serverImpl redefined hbase.regionserver.impl
703 * @param addDependencyJars upload HBase jars and jars for any of the configured
704 * job classes via the distributed cache (tmpjars).
705 * @throws IOException When determining the region count fails.
707 public static void initTableReducerJob(String table
,
708 Class
<?
extends TableReducer
> reducer
, Job job
,
709 Class partitioner
, String quorumAddress
, String serverClass
,
710 String serverImpl
, boolean addDependencyJars
) throws IOException
{
712 Configuration conf
= job
.getConfiguration();
713 HBaseConfiguration
.merge(conf
, HBaseConfiguration
.create(conf
));
714 job
.setOutputFormatClass(TableOutputFormat
.class);
715 if (reducer
!= null) job
.setReducerClass(reducer
);
716 conf
.set(TableOutputFormat
.OUTPUT_TABLE
, table
);
717 conf
.setStrings("io.serializations", conf
.get("io.serializations"),
718 MutationSerialization
.class.getName(), ResultSerialization
.class.getName());
719 // If passed a quorum/ensemble address, pass it on to TableOutputFormat.
720 if (quorumAddress
!= null) {
721 // Calling this will validate the format
722 ZKConfig
.validateClusterKey(quorumAddress
);
723 conf
.set(TableOutputFormat
.QUORUM_ADDRESS
,quorumAddress
);
725 if (serverClass
!= null && serverImpl
!= null) {
726 conf
.set(TableOutputFormat
.REGION_SERVER_CLASS
, serverClass
);
727 conf
.set(TableOutputFormat
.REGION_SERVER_IMPL
, serverImpl
);
729 job
.setOutputKeyClass(ImmutableBytesWritable
.class);
730 job
.setOutputValueClass(Writable
.class);
731 if (partitioner
== HRegionPartitioner
.class) {
732 job
.setPartitionerClass(HRegionPartitioner
.class);
733 int regions
= MetaTableAccessor
.getRegionCount(conf
, TableName
.valueOf(table
));
734 if (job
.getNumReduceTasks() > regions
) {
735 job
.setNumReduceTasks(regions
);
737 } else if (partitioner
!= null) {
738 job
.setPartitionerClass(partitioner
);
741 if (addDependencyJars
) {
742 addDependencyJars(job
);
745 initCredentials(job
);
749 * Ensures that the given number of reduce tasks for the given job
750 * configuration does not exceed the number of regions for the given table.
752 * @param table The table to get the region count for.
753 * @param job The current job to adjust.
754 * @throws IOException When retrieving the table details fails.
756 public static void limitNumReduceTasks(String table
, Job job
)
759 MetaTableAccessor
.getRegionCount(job
.getConfiguration(), TableName
.valueOf(table
));
760 if (job
.getNumReduceTasks() > regions
)
761 job
.setNumReduceTasks(regions
);
765 * Sets the number of reduce tasks for the given job configuration to the
766 * number of regions the given table has.
768 * @param table The table to get the region count for.
769 * @param job The current job to adjust.
770 * @throws IOException When retrieving the table details fails.
772 public static void setNumReduceTasks(String table
, Job job
)
774 job
.setNumReduceTasks(MetaTableAccessor
.getRegionCount(job
.getConfiguration(),
775 TableName
.valueOf(table
)));
779 * Sets the number of rows to return and cache with each scanner iteration.
780 * Higher caching values will enable faster mapreduce jobs at the expense of
781 * requiring more heap to contain the cached rows.
783 * @param job The current job to adjust.
784 * @param batchSize The number of rows to return in batch with each scanner
787 public static void setScannerCaching(Job job
, int batchSize
) {
788 job
.getConfiguration().setInt("hbase.client.scanner.caching", batchSize
);
792 * Add HBase and its dependencies (only) to the job configuration.
794 * This is intended as a low-level API, facilitating code reuse between this
795 * class and its mapred counterpart. It also of use to external tools that
796 * need to build a MapReduce job that interacts with HBase but want
797 * fine-grained control over the jars shipped to the cluster.
799 * @param conf The Configuration object to extend with dependencies.
800 * @see org.apache.hadoop.hbase.mapred.TableMapReduceUtil
801 * @see <a href="https://issues.apache.org/jira/browse/PIG-3285">PIG-3285</a>
803 public static void addHBaseDependencyJars(Configuration conf
) throws IOException
{
804 addDependencyJarsForClasses(conf
,
805 // explicitly pull a class from each module
806 org
.apache
.hadoop
.hbase
.HConstants
.class, // hbase-common
807 org
.apache
.hadoop
.hbase
.protobuf
.generated
.ClientProtos
.class, // hbase-protocol
808 org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.class, // hbase-protocol-shaded
809 org
.apache
.hadoop
.hbase
.client
.Put
.class, // hbase-client
810 org
.apache
.hadoop
.hbase
.ipc
.RpcServer
.class, // hbase-server
811 org
.apache
.hadoop
.hbase
.CompatibilityFactory
.class, // hbase-hadoop-compat
812 org
.apache
.hadoop
.hbase
.mapreduce
.JobUtil
.class, // hbase-hadoop2-compat
813 org
.apache
.hadoop
.hbase
.mapreduce
.TableMapper
.class, // hbase-mapreduce
814 org
.apache
.hadoop
.hbase
.metrics
.impl
.FastLongHistogram
.class, // hbase-metrics
815 org
.apache
.hadoop
.hbase
.metrics
.Snapshot
.class, // hbase-metrics-api
816 org
.apache
.zookeeper
.ZooKeeper
.class,
817 org
.apache
.hadoop
.hbase
.shaded
.io
.netty
.channel
.Channel
.class,
818 com
.google
.protobuf
.Message
.class,
819 org
.apache
.hadoop
.hbase
.shaded
.com
.google
.protobuf
.UnsafeByteOperations
.class,
820 org
.apache
.hadoop
.hbase
.shaded
.com
.google
.common
.collect
.Lists
.class,
821 org
.apache
.htrace
.core
.Tracer
.class,
822 com
.codahale
.metrics
.MetricRegistry
.class,
823 org
.apache
.commons
.lang3
.ArrayUtils
.class,
824 com
.fasterxml
.jackson
.databind
.ObjectMapper
.class,
825 com
.fasterxml
.jackson
.core
.Versioned
.class,
826 com
.fasterxml
.jackson
.annotation
.JsonView
.class,
827 org
.apache
.hadoop
.hbase
.zookeeper
.ZKWatcher
.class);
831 * Returns a classpath string built from the content of the "tmpjars" value in {@code conf}.
832 * Also exposed to shell scripts via `bin/hbase mapredcp`.
834 public static String
buildDependencyClasspath(Configuration conf
) {
836 throw new IllegalArgumentException("Must provide a configuration object.");
838 Set
<String
> paths
= new HashSet
<>(conf
.getStringCollection("tmpjars"));
839 if (paths
.isEmpty()) {
840 throw new IllegalArgumentException("Configuration contains no tmpjars.");
842 StringBuilder sb
= new StringBuilder();
843 for (String s
: paths
) {
844 // entries can take the form 'file:/path/to/file.jar'.
845 int idx
= s
.indexOf(":");
846 if (idx
!= -1) s
= s
.substring(idx
+ 1);
847 if (sb
.length() > 0) sb
.append(File
.pathSeparator
);
850 return sb
.toString();
854 * Add the HBase dependency jars as well as jars for any of the configured
855 * job classes to the job configuration, so that JobClient will ship them
856 * to the cluster and add them to the DistributedCache.
858 public static void addDependencyJars(Job job
) throws IOException
{
859 addHBaseDependencyJars(job
.getConfiguration());
861 addDependencyJarsForClasses(job
.getConfiguration(),
862 // when making changes here, consider also mapred.TableMapReduceUtil
864 job
.getMapOutputKeyClass(),
865 job
.getMapOutputValueClass(),
866 job
.getInputFormatClass(),
867 job
.getOutputKeyClass(),
868 job
.getOutputValueClass(),
869 job
.getOutputFormatClass(),
870 job
.getPartitionerClass(),
871 job
.getCombinerClass());
872 } catch (ClassNotFoundException e
) {
873 throw new IOException(e
);
878 * Add the jars containing the given classes to the job's configuration
879 * such that JobClient will ship them to the cluster and add them to
880 * the DistributedCache.
881 * @deprecated rely on {@link #addDependencyJars(Job)} instead.
884 public static void addDependencyJars(Configuration conf
,
885 Class
<?
>... classes
) throws IOException
{
886 LOG
.warn("The addDependencyJars(Configuration, Class<?>...) method has been deprecated since it"
887 + " is easy to use incorrectly. Most users should rely on addDependencyJars(Job) " +
888 "instead. See HBASE-8386 for more details.");
889 addDependencyJarsForClasses(conf
, classes
);
893 * Add the jars containing the given classes to the job's configuration
894 * such that JobClient will ship them to the cluster and add them to
895 * the DistributedCache.
897 * N.B. that this method at most adds one jar per class given. If there is more than one
898 * jar available containing a class with the same name as a given class, we don't define
899 * which of those jars might be chosen.
901 * @param conf The Hadoop Configuration to modify
902 * @param classes will add just those dependencies needed to find the given classes
903 * @throws IOException if an underlying library call fails.
905 @InterfaceAudience.Private
906 public static void addDependencyJarsForClasses(Configuration conf
,
907 Class
<?
>... classes
) throws IOException
{
909 FileSystem localFs
= FileSystem
.getLocal(conf
);
910 Set
<String
> jars
= new HashSet
<>();
911 // Add jars that are already in the tmpjars variable
912 jars
.addAll(conf
.getStringCollection("tmpjars"));
914 // add jars as we find them to a map of contents jar name so that we can avoid
915 // creating new jars for classes that have already been packaged.
916 Map
<String
, String
> packagedClasses
= new HashMap
<>();
918 // Add jars containing the specified classes
919 for (Class
<?
> clazz
: classes
) {
920 if (clazz
== null) continue;
922 Path path
= findOrCreateJar(clazz
, localFs
, packagedClasses
);
924 LOG
.warn("Could not find jar for class " + clazz
+
925 " in order to ship it to the cluster.");
928 if (!localFs
.exists(path
)) {
929 LOG
.warn("Could not validate jar file " + path
+ " for class "
933 jars
.add(path
.toString());
935 if (jars
.isEmpty()) return;
937 conf
.set("tmpjars", StringUtils
.arrayToString(jars
.toArray(new String
[jars
.size()])));
941 * Finds the Jar for a class or creates it if it doesn't exist. If the class is in
942 * a directory in the classpath, it creates a Jar on the fly with the
943 * contents of the directory and returns the path to that Jar. If a Jar is
944 * created, it is created in the system temporary directory. Otherwise,
945 * returns an existing jar that contains a class of the same name. Maintains
946 * a mapping from jar contents to the tmp jar created.
947 * @param my_class the class to find.
948 * @param fs the FileSystem with which to qualify the returned path.
949 * @param packagedClasses a map of class name to path.
950 * @return a jar file that contains the class.
951 * @throws IOException
953 private static Path
findOrCreateJar(Class
<?
> my_class
, FileSystem fs
,
954 Map
<String
, String
> packagedClasses
)
956 // attempt to locate an existing jar for the class.
957 String jar
= findContainingJar(my_class
, packagedClasses
);
958 if (null == jar
|| jar
.isEmpty()) {
959 jar
= getJar(my_class
);
960 updateMap(jar
, packagedClasses
);
963 if (null == jar
|| jar
.isEmpty()) {
967 LOG
.debug(String
.format("For class %s, using jar %s", my_class
.getName(), jar
));
968 return new Path(jar
).makeQualified(fs
);
972 * Add entries to <code>packagedClasses</code> corresponding to class files
973 * contained in <code>jar</code>.
974 * @param jar The jar who's content to list.
975 * @param packagedClasses map[class -> jar]
977 private static void updateMap(String jar
, Map
<String
, String
> packagedClasses
) throws IOException
{
978 if (null == jar
|| jar
.isEmpty()) {
983 zip
= new ZipFile(jar
);
984 for (Enumeration
<?
extends ZipEntry
> iter
= zip
.entries(); iter
.hasMoreElements();) {
985 ZipEntry entry
= iter
.nextElement();
986 if (entry
.getName().endsWith("class")) {
987 packagedClasses
.put(entry
.getName(), jar
);
991 if (null != zip
) zip
.close();
996 * Find a jar that contains a class of the same name, if any. It will return
997 * a jar file, even if that is not the first thing on the class path that
998 * has a class with the same name. Looks first on the classpath and then in
999 * the <code>packagedClasses</code> map.
1000 * @param my_class the class to find.
1001 * @return a jar file that contains the class, or null.
1002 * @throws IOException
1004 private static String
findContainingJar(Class
<?
> my_class
, Map
<String
, String
> packagedClasses
)
1005 throws IOException
{
1006 ClassLoader loader
= my_class
.getClassLoader();
1008 String class_file
= my_class
.getName().replaceAll("\\.", "/") + ".class";
1010 if (loader
!= null) {
1011 // first search the classpath
1012 for (Enumeration
<URL
> itr
= loader
.getResources(class_file
); itr
.hasMoreElements();) {
1013 URL url
= itr
.nextElement();
1014 if ("jar".equals(url
.getProtocol())) {
1015 String toReturn
= url
.getPath();
1016 if (toReturn
.startsWith("file:")) {
1017 toReturn
= toReturn
.substring("file:".length());
1019 // URLDecoder is a misnamed class, since it actually decodes
1020 // x-www-form-urlencoded MIME type rather than actual
1021 // URL encoding (which the file path has). Therefore it would
1022 // decode +s to ' 's which is incorrect (spaces are actually
1023 // either unencoded or encoded as "%20"). Replace +s first, so
1024 // that they are kept sacred during the decoding process.
1025 toReturn
= toReturn
.replaceAll("\\+", "%2B");
1026 toReturn
= URLDecoder
.decode(toReturn
, "UTF-8");
1027 return toReturn
.replaceAll("!.*$", "");
1032 // now look in any jars we've packaged using JarFinder. Returns null when
1034 return packagedClasses
.get(class_file
);
1038 * Invoke 'getJar' on a custom JarFinder implementation. Useful for some job
1039 * configuration contexts (HBASE-8140) and also for testing on MRv2.
1040 * check if we have HADOOP-9426.
1041 * @param my_class the class to find.
1042 * @return a jar file that contains the class, or null.
1044 private static String
getJar(Class
<?
> my_class
) {
1047 ret
= JarFinder
.getJar(my_class
);
1048 } catch (Exception e
) {
1049 // toss all other exceptions, related to reflection failure
1050 throw new RuntimeException("getJar invocation failed.", e
);