3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
.mapreduce
;
21 import java
.io
.IOException
;
22 import java
.util
.HashMap
;
24 import java
.util
.Random
;
26 import org
.apache
.commons
.logging
.Log
;
27 import org
.apache
.commons
.logging
.LogFactory
;
28 import org
.apache
.hadoop
.conf
.Configured
;
29 import org
.apache
.hadoop
.fs
.FileSystem
;
30 import org
.apache
.hadoop
.fs
.Path
;
31 import org
.apache
.hadoop
.hbase
.HBaseConfiguration
;
32 import org
.apache
.hadoop
.hbase
.HConstants
;
33 import org
.apache
.hadoop
.hbase
.TableName
;
34 import org
.apache
.hadoop
.hbase
.classification
.InterfaceAudience
;
35 import org
.apache
.hadoop
.hbase
.classification
.InterfaceStability
;
36 import org
.apache
.hadoop
.hbase
.client
.Admin
;
37 import org
.apache
.hadoop
.hbase
.client
.Connection
;
38 import org
.apache
.hadoop
.hbase
.client
.ConnectionFactory
;
39 import org
.apache
.hadoop
.hbase
.client
.Scan
;
40 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
41 import org
.apache
.hadoop
.mapreduce
.Job
;
42 import org
.apache
.hadoop
.util
.Tool
;
43 import org
.apache
.hadoop
.util
.ToolRunner
;
46 * Tool used to copy a table to another one which can be on a different setup.
47 * It is also configurable with a start and time as well as a specification
48 * of the region server implementation if different from the local cluster.
50 @InterfaceAudience.Public
51 @InterfaceStability.Stable
52 public class CopyTable
extends Configured
implements Tool
{
53 private static final Log LOG
= LogFactory
.getLog(CopyTable
.class);
55 final static String NAME
= "copytable";
57 long endTime
= HConstants
.LATEST_TIMESTAMP
;
58 int batch
= Integer
.MAX_VALUE
;
61 String tableName
= null;
62 String startRow
= null;
63 String stopRow
= null;
64 String dstTableName
= null;
65 String peerAddress
= null;
66 String families
= null;
67 boolean allCells
= false;
68 static boolean shuffle
= false;
70 boolean bulkload
= false;
71 Path bulkloadDir
= null;
73 private final static String JOB_NAME_CONF_KEY
= "mapreduce.job.name";
76 * Sets up the actual job.
78 * @param args The command line parameters.
79 * @return The newly created job.
80 * @throws IOException When setting up the job fails.
82 public Job
createSubmittableJob(String
[] args
)
84 if (!doCommandLine(args
)) {
88 Job job
= Job
.getInstance(getConf(), getConf().get(JOB_NAME_CONF_KEY
, NAME
+ "_" + tableName
));
89 job
.setJarByClass(CopyTable
.class);
90 Scan scan
= new Scan();
93 scan
.setCacheBlocks(false);
96 scan
.setCaching(cacheRow
);
98 scan
.setCaching(getConf().getInt(HConstants
.HBASE_CLIENT_SCANNER_CACHING
, 100));
101 scan
.setTimeRange(startTime
, endTime
);
107 job
.getConfiguration().set(TableInputFormat
.SHUFFLE_MAPS
, "true");
110 scan
.setMaxVersions(versions
);
113 if (startRow
!= null) {
114 scan
.setStartRow(Bytes
.toBytesBinary(startRow
));
117 if (stopRow
!= null) {
118 scan
.setStopRow(Bytes
.toBytesBinary(stopRow
));
121 if(families
!= null) {
122 String
[] fams
= families
.split(",");
123 Map
<String
,String
> cfRenameMap
= new HashMap
<>();
124 for(String fam
: fams
) {
126 if(fam
.contains(":")) {
127 // fam looks like "sourceCfName:destCfName"
128 String
[] srcAndDest
= fam
.split(":", 2);
129 sourceCf
= srcAndDest
[0];
130 String destCf
= srcAndDest
[1];
131 cfRenameMap
.put(sourceCf
, destCf
);
133 // fam is just "sourceCf"
136 scan
.addFamily(Bytes
.toBytes(sourceCf
));
138 Import
.configureCfRenaming(job
.getConfiguration(), cfRenameMap
);
140 job
.setNumReduceTasks(0);
143 TableMapReduceUtil
.initTableMapperJob(tableName
, scan
, Import
.KeyValueImporter
.class, null,
146 // We need to split the inputs by destination tables so that output of Map can be bulk-loaded.
147 TableInputFormat
.configureSplitTable(job
, TableName
.valueOf(dstTableName
));
149 FileSystem fs
= FileSystem
.get(getConf());
150 Random rand
= new Random();
151 Path root
= new Path(fs
.getWorkingDirectory(), "copytable");
154 bulkloadDir
= new Path(root
, "" + rand
.nextLong());
155 if (!fs
.exists(bulkloadDir
)) {
160 System
.out
.println("HFiles will be stored at " + this.bulkloadDir
);
161 HFileOutputFormat2
.setOutputPath(job
, bulkloadDir
);
162 try (Connection conn
= ConnectionFactory
.createConnection(getConf());
163 Admin admin
= conn
.getAdmin()) {
164 HFileOutputFormat2
.configureIncrementalLoadMap(job
,
165 admin
.getTableDescriptor((TableName
.valueOf(dstTableName
))));
168 TableMapReduceUtil
.initTableMapperJob(tableName
, scan
,
169 Import
.Importer
.class, null, null, job
);
171 TableMapReduceUtil
.initTableReducerJob(dstTableName
, null, job
, null, peerAddress
, null,
179 * @param errorMsg Error message. Can be null.
181 private static void printUsage(final String errorMsg
) {
182 if (errorMsg
!= null && errorMsg
.length() > 0) {
183 System
.err
.println("ERROR: " + errorMsg
);
185 System
.err
.println("Usage: CopyTable [general options] [--starttime=X] [--endtime=Y] " +
186 "[--new.name=NEW] [--peer.adr=ADR] <tablename>");
187 System
.err
.println();
188 System
.err
.println("Options:");
189 System
.err
.println(" rs.class hbase.regionserver.class of the peer cluster");
190 System
.err
.println(" specify if different from current cluster");
191 System
.err
.println(" rs.impl hbase.regionserver.impl of the peer cluster");
192 System
.err
.println(" startrow the start row");
193 System
.err
.println(" stoprow the stop row");
194 System
.err
.println(" starttime beginning of the time range (unixtime in millis)");
195 System
.err
.println(" without endtime means from starttime to forever");
196 System
.err
.println(" endtime end of the time range. Ignored if no starttime specified.");
197 System
.err
.println(" versions number of cell versions to copy");
198 System
.err
.println(" new.name new table's name");
199 System
.err
.println(" peer.adr Address of the peer cluster given in the format");
200 System
.err
.println(" hbase.zookeeper.quorum:hbase.zookeeper.client"
201 + ".port:zookeeper.znode.parent");
202 System
.err
.println(" families comma-separated list of families to copy");
203 System
.err
.println(" To copy from cf1 to cf2, give sourceCfName:destCfName. ");
204 System
.err
.println(" To keep the same name, just give \"cfName\"");
205 System
.err
.println(" all.cells also copy delete markers and deleted cells");
206 System
.err
.println(" bulkload Write input into HFiles and bulk load to the destination "
208 System
.err
.println();
209 System
.err
.println("Args:");
210 System
.err
.println(" tablename Name of the table to copy");
211 System
.err
.println();
212 System
.err
.println("Examples:");
213 System
.err
.println(" To copy 'TestTable' to a cluster that uses replication for a 1 hour window:");
214 System
.err
.println(" $ hbase " +
215 "org.apache.hadoop.hbase.mapreduce.CopyTable --starttime=1265875194289 --endtime=1265878794289 " +
216 "--peer.adr=server1,server2,server3:2181:/hbase --families=myOldCf:myNewCf,cf2,cf3 TestTable ");
217 System
.err
.println("For performance consider the following general option:\n"
218 + " It is recommended that you set the following to >=100. A higher value uses more memory but\n"
219 + " decreases the round trip time to the server and may increase performance.\n"
220 + " -Dhbase.client.scanner.caching=100\n"
221 + " The following should always be set to false, to prevent writing data twice, which may produce \n"
222 + " inaccurate results.\n"
223 + " -Dmapreduce.map.speculative=false");
226 private boolean doCommandLine(final String
[] args
) {
227 // Process command-line args. TODO: Better cmd-line processing
228 // (but hopefully something not as painful as cli options).
229 if (args
.length
< 1) {
234 for (int i
= 0; i
< args
.length
; i
++) {
235 String cmd
= args
[i
];
236 if (cmd
.equals("-h") || cmd
.startsWith("--h")) {
241 final String startRowArgKey
= "--startrow=";
242 if (cmd
.startsWith(startRowArgKey
)) {
243 startRow
= cmd
.substring(startRowArgKey
.length());
247 final String stopRowArgKey
= "--stoprow=";
248 if (cmd
.startsWith(stopRowArgKey
)) {
249 stopRow
= cmd
.substring(stopRowArgKey
.length());
253 final String startTimeArgKey
= "--starttime=";
254 if (cmd
.startsWith(startTimeArgKey
)) {
255 startTime
= Long
.parseLong(cmd
.substring(startTimeArgKey
.length()));
259 final String endTimeArgKey
= "--endtime=";
260 if (cmd
.startsWith(endTimeArgKey
)) {
261 endTime
= Long
.parseLong(cmd
.substring(endTimeArgKey
.length()));
265 final String batchArgKey
= "--batch=";
266 if (cmd
.startsWith(batchArgKey
)) {
267 batch
= Integer
.parseInt(cmd
.substring(batchArgKey
.length()));
271 final String cacheRowArgKey
= "--cacheRow=";
272 if (cmd
.startsWith(cacheRowArgKey
)) {
273 cacheRow
= Integer
.parseInt(cmd
.substring(cacheRowArgKey
.length()));
277 final String versionsArgKey
= "--versions=";
278 if (cmd
.startsWith(versionsArgKey
)) {
279 versions
= Integer
.parseInt(cmd
.substring(versionsArgKey
.length()));
283 final String newNameArgKey
= "--new.name=";
284 if (cmd
.startsWith(newNameArgKey
)) {
285 dstTableName
= cmd
.substring(newNameArgKey
.length());
289 final String peerAdrArgKey
= "--peer.adr=";
290 if (cmd
.startsWith(peerAdrArgKey
)) {
291 peerAddress
= cmd
.substring(peerAdrArgKey
.length());
295 final String familiesArgKey
= "--families=";
296 if (cmd
.startsWith(familiesArgKey
)) {
297 families
= cmd
.substring(familiesArgKey
.length());
301 if (cmd
.startsWith("--all.cells")) {
306 if (cmd
.startsWith("--bulkload")) {
311 if (cmd
.startsWith("--shuffle")) {
316 if (i
== args
.length
-1) {
319 printUsage("Invalid argument '" + cmd
+ "'");
323 if (dstTableName
== null && peerAddress
== null) {
324 printUsage("At least a new table name or a " +
325 "peer address must be specified");
328 if ((endTime
!= 0) && (startTime
> endTime
)) {
329 printUsage("Invalid time range filter: starttime=" + startTime
+ " > endtime=" + endTime
);
333 if (bulkload
&& peerAddress
!= null) {
334 printUsage("Remote bulkload is not supported!");
338 // set dstTableName if necessary
339 if (dstTableName
== null) {
340 dstTableName
= tableName
;
342 } catch (Exception e
) {
344 printUsage("Can't start because " + e
.getMessage());
353 * @param args The command line parameters.
354 * @throws Exception When running the job fails.
356 public static void main(String
[] args
) throws Exception
{
357 int ret
= ToolRunner
.run(HBaseConfiguration
.create(), new CopyTable(), args
);
362 public int run(String
[] args
) throws Exception
{
363 Job job
= createSubmittableJob(args
);
364 if (job
== null) return 1;
365 if (!job
.waitForCompletion(true)) {
366 LOG
.info("Map-reduce job failed!");
368 LOG
.info("Files are not bulkloaded!");
374 code
= new LoadIncrementalHFiles(this.getConf()).run(new String
[]{this.bulkloadDir
.toString(),
377 // bulkloadDir is deleted only LoadIncrementalHFiles was successful so that one can rerun
378 // LoadIncrementalHFiles.
379 FileSystem fs
= FileSystem
.get(this.getConf());
380 if (!fs
.delete(this.bulkloadDir
, true)) {
381 LOG
.error("Deleting folder " + bulkloadDir
+ " failed!");