HBASE-17532 Replaced explicit type with diamond operator
[hbase.git] / hbase-server / src / main / java / org / apache / hadoop / hbase / mapreduce / CopyTable.java
blob8f0504a01923d271dafbf71832e1372c80dd1484
1 /**
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org.apache.hadoop.hbase.mapreduce;
21 import java.io.IOException;
22 import java.util.HashMap;
23 import java.util.Map;
24 import java.util.Random;
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.conf.Configured;
29 import org.apache.hadoop.fs.FileSystem;
30 import org.apache.hadoop.fs.Path;
31 import org.apache.hadoop.hbase.HBaseConfiguration;
32 import org.apache.hadoop.hbase.HConstants;
33 import org.apache.hadoop.hbase.TableName;
34 import org.apache.hadoop.hbase.classification.InterfaceAudience;
35 import org.apache.hadoop.hbase.classification.InterfaceStability;
36 import org.apache.hadoop.hbase.client.Admin;
37 import org.apache.hadoop.hbase.client.Connection;
38 import org.apache.hadoop.hbase.client.ConnectionFactory;
39 import org.apache.hadoop.hbase.client.Scan;
40 import org.apache.hadoop.hbase.util.Bytes;
41 import org.apache.hadoop.mapreduce.Job;
42 import org.apache.hadoop.util.Tool;
43 import org.apache.hadoop.util.ToolRunner;
45 /**
46 * Tool used to copy a table to another one which can be on a different setup.
47 * It is also configurable with a start and time as well as a specification
48 * of the region server implementation if different from the local cluster.
50 @InterfaceAudience.Public
51 @InterfaceStability.Stable
52 public class CopyTable extends Configured implements Tool {
53 private static final Log LOG = LogFactory.getLog(CopyTable.class);
55 final static String NAME = "copytable";
56 long startTime = 0;
57 long endTime = HConstants.LATEST_TIMESTAMP;
58 int batch = Integer.MAX_VALUE;
59 int cacheRow = -1;
60 int versions = -1;
61 String tableName = null;
62 String startRow = null;
63 String stopRow = null;
64 String dstTableName = null;
65 String peerAddress = null;
66 String families = null;
67 boolean allCells = false;
68 static boolean shuffle = false;
70 boolean bulkload = false;
71 Path bulkloadDir = null;
73 private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
75 /**
76 * Sets up the actual job.
78 * @param args The command line parameters.
79 * @return The newly created job.
80 * @throws IOException When setting up the job fails.
82 public Job createSubmittableJob(String[] args)
83 throws IOException {
84 if (!doCommandLine(args)) {
85 return null;
88 Job job = Job.getInstance(getConf(), getConf().get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
89 job.setJarByClass(CopyTable.class);
90 Scan scan = new Scan();
92 scan.setBatch(batch);
93 scan.setCacheBlocks(false);
95 if (cacheRow > 0) {
96 scan.setCaching(cacheRow);
97 } else {
98 scan.setCaching(getConf().getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING, 100));
101 scan.setTimeRange(startTime, endTime);
103 if (allCells) {
104 scan.setRaw(true);
106 if (shuffle) {
107 job.getConfiguration().set(TableInputFormat.SHUFFLE_MAPS, "true");
109 if (versions >= 0) {
110 scan.setMaxVersions(versions);
113 if (startRow != null) {
114 scan.setStartRow(Bytes.toBytesBinary(startRow));
117 if (stopRow != null) {
118 scan.setStopRow(Bytes.toBytesBinary(stopRow));
121 if(families != null) {
122 String[] fams = families.split(",");
123 Map<String,String> cfRenameMap = new HashMap<>();
124 for(String fam : fams) {
125 String sourceCf;
126 if(fam.contains(":")) {
127 // fam looks like "sourceCfName:destCfName"
128 String[] srcAndDest = fam.split(":", 2);
129 sourceCf = srcAndDest[0];
130 String destCf = srcAndDest[1];
131 cfRenameMap.put(sourceCf, destCf);
132 } else {
133 // fam is just "sourceCf"
134 sourceCf = fam;
136 scan.addFamily(Bytes.toBytes(sourceCf));
138 Import.configureCfRenaming(job.getConfiguration(), cfRenameMap);
140 job.setNumReduceTasks(0);
142 if (bulkload) {
143 TableMapReduceUtil.initTableMapperJob(tableName, scan, Import.KeyValueImporter.class, null,
144 null, job);
146 // We need to split the inputs by destination tables so that output of Map can be bulk-loaded.
147 TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName));
149 FileSystem fs = FileSystem.get(getConf());
150 Random rand = new Random();
151 Path root = new Path(fs.getWorkingDirectory(), "copytable");
152 fs.mkdirs(root);
153 while (true) {
154 bulkloadDir = new Path(root, "" + rand.nextLong());
155 if (!fs.exists(bulkloadDir)) {
156 break;
160 System.out.println("HFiles will be stored at " + this.bulkloadDir);
161 HFileOutputFormat2.setOutputPath(job, bulkloadDir);
162 try (Connection conn = ConnectionFactory.createConnection(getConf());
163 Admin admin = conn.getAdmin()) {
164 HFileOutputFormat2.configureIncrementalLoadMap(job,
165 admin.getTableDescriptor((TableName.valueOf(dstTableName))));
167 } else {
168 TableMapReduceUtil.initTableMapperJob(tableName, scan,
169 Import.Importer.class, null, null, job);
171 TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress, null,
172 null);
175 return job;
179 * @param errorMsg Error message. Can be null.
181 private static void printUsage(final String errorMsg) {
182 if (errorMsg != null && errorMsg.length() > 0) {
183 System.err.println("ERROR: " + errorMsg);
185 System.err.println("Usage: CopyTable [general options] [--starttime=X] [--endtime=Y] " +
186 "[--new.name=NEW] [--peer.adr=ADR] <tablename>");
187 System.err.println();
188 System.err.println("Options:");
189 System.err.println(" rs.class hbase.regionserver.class of the peer cluster");
190 System.err.println(" specify if different from current cluster");
191 System.err.println(" rs.impl hbase.regionserver.impl of the peer cluster");
192 System.err.println(" startrow the start row");
193 System.err.println(" stoprow the stop row");
194 System.err.println(" starttime beginning of the time range (unixtime in millis)");
195 System.err.println(" without endtime means from starttime to forever");
196 System.err.println(" endtime end of the time range. Ignored if no starttime specified.");
197 System.err.println(" versions number of cell versions to copy");
198 System.err.println(" new.name new table's name");
199 System.err.println(" peer.adr Address of the peer cluster given in the format");
200 System.err.println(" hbase.zookeeper.quorum:hbase.zookeeper.client"
201 + ".port:zookeeper.znode.parent");
202 System.err.println(" families comma-separated list of families to copy");
203 System.err.println(" To copy from cf1 to cf2, give sourceCfName:destCfName. ");
204 System.err.println(" To keep the same name, just give \"cfName\"");
205 System.err.println(" all.cells also copy delete markers and deleted cells");
206 System.err.println(" bulkload Write input into HFiles and bulk load to the destination "
207 + "table");
208 System.err.println();
209 System.err.println("Args:");
210 System.err.println(" tablename Name of the table to copy");
211 System.err.println();
212 System.err.println("Examples:");
213 System.err.println(" To copy 'TestTable' to a cluster that uses replication for a 1 hour window:");
214 System.err.println(" $ hbase " +
215 "org.apache.hadoop.hbase.mapreduce.CopyTable --starttime=1265875194289 --endtime=1265878794289 " +
216 "--peer.adr=server1,server2,server3:2181:/hbase --families=myOldCf:myNewCf,cf2,cf3 TestTable ");
217 System.err.println("For performance consider the following general option:\n"
218 + " It is recommended that you set the following to >=100. A higher value uses more memory but\n"
219 + " decreases the round trip time to the server and may increase performance.\n"
220 + " -Dhbase.client.scanner.caching=100\n"
221 + " The following should always be set to false, to prevent writing data twice, which may produce \n"
222 + " inaccurate results.\n"
223 + " -Dmapreduce.map.speculative=false");
226 private boolean doCommandLine(final String[] args) {
227 // Process command-line args. TODO: Better cmd-line processing
228 // (but hopefully something not as painful as cli options).
229 if (args.length < 1) {
230 printUsage(null);
231 return false;
233 try {
234 for (int i = 0; i < args.length; i++) {
235 String cmd = args[i];
236 if (cmd.equals("-h") || cmd.startsWith("--h")) {
237 printUsage(null);
238 return false;
241 final String startRowArgKey = "--startrow=";
242 if (cmd.startsWith(startRowArgKey)) {
243 startRow = cmd.substring(startRowArgKey.length());
244 continue;
247 final String stopRowArgKey = "--stoprow=";
248 if (cmd.startsWith(stopRowArgKey)) {
249 stopRow = cmd.substring(stopRowArgKey.length());
250 continue;
253 final String startTimeArgKey = "--starttime=";
254 if (cmd.startsWith(startTimeArgKey)) {
255 startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
256 continue;
259 final String endTimeArgKey = "--endtime=";
260 if (cmd.startsWith(endTimeArgKey)) {
261 endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
262 continue;
265 final String batchArgKey = "--batch=";
266 if (cmd.startsWith(batchArgKey)) {
267 batch = Integer.parseInt(cmd.substring(batchArgKey.length()));
268 continue;
271 final String cacheRowArgKey = "--cacheRow=";
272 if (cmd.startsWith(cacheRowArgKey)) {
273 cacheRow = Integer.parseInt(cmd.substring(cacheRowArgKey.length()));
274 continue;
277 final String versionsArgKey = "--versions=";
278 if (cmd.startsWith(versionsArgKey)) {
279 versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
280 continue;
283 final String newNameArgKey = "--new.name=";
284 if (cmd.startsWith(newNameArgKey)) {
285 dstTableName = cmd.substring(newNameArgKey.length());
286 continue;
289 final String peerAdrArgKey = "--peer.adr=";
290 if (cmd.startsWith(peerAdrArgKey)) {
291 peerAddress = cmd.substring(peerAdrArgKey.length());
292 continue;
295 final String familiesArgKey = "--families=";
296 if (cmd.startsWith(familiesArgKey)) {
297 families = cmd.substring(familiesArgKey.length());
298 continue;
301 if (cmd.startsWith("--all.cells")) {
302 allCells = true;
303 continue;
306 if (cmd.startsWith("--bulkload")) {
307 bulkload = true;
308 continue;
311 if (cmd.startsWith("--shuffle")) {
312 shuffle = true;
313 continue;
316 if (i == args.length-1) {
317 tableName = cmd;
318 } else {
319 printUsage("Invalid argument '" + cmd + "'");
320 return false;
323 if (dstTableName == null && peerAddress == null) {
324 printUsage("At least a new table name or a " +
325 "peer address must be specified");
326 return false;
328 if ((endTime != 0) && (startTime > endTime)) {
329 printUsage("Invalid time range filter: starttime=" + startTime + " > endtime=" + endTime);
330 return false;
333 if (bulkload && peerAddress != null) {
334 printUsage("Remote bulkload is not supported!");
335 return false;
338 // set dstTableName if necessary
339 if (dstTableName == null) {
340 dstTableName = tableName;
342 } catch (Exception e) {
343 e.printStackTrace();
344 printUsage("Can't start because " + e.getMessage());
345 return false;
347 return true;
351 * Main entry point.
353 * @param args The command line parameters.
354 * @throws Exception When running the job fails.
356 public static void main(String[] args) throws Exception {
357 int ret = ToolRunner.run(HBaseConfiguration.create(), new CopyTable(), args);
358 System.exit(ret);
361 @Override
362 public int run(String[] args) throws Exception {
363 Job job = createSubmittableJob(args);
364 if (job == null) return 1;
365 if (!job.waitForCompletion(true)) {
366 LOG.info("Map-reduce job failed!");
367 if (bulkload) {
368 LOG.info("Files are not bulkloaded!");
370 return 1;
372 int code = 0;
373 if (bulkload) {
374 code = new LoadIncrementalHFiles(this.getConf()).run(new String[]{this.bulkloadDir.toString(),
375 this.dstTableName});
376 if (code == 0) {
377 // bulkloadDir is deleted only LoadIncrementalHFiles was successful so that one can rerun
378 // LoadIncrementalHFiles.
379 FileSystem fs = FileSystem.get(this.getConf());
380 if (!fs.delete(this.bulkloadDir, true)) {
381 LOG.error("Deleting folder " + bulkloadDir + " failed!");
382 code = 1;
386 return code;