HBASE-17614: Move Backup/Restore into separate module (Vladimir Rodionov)
[hbase.git] / hbase-backup / src / test / java / org / apache / hadoop / hbase / backup / TestIncrementalBackupMergeWithFailures.java
blob7011ed32a5645f5e9c238ce26eab5586db34e600
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
19 package org.apache.hadoop.hbase.backup;
21 import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded;
22 import static org.junit.Assert.assertFalse;
23 import static org.junit.Assert.assertTrue;
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.List;
29 import org.apache.commons.lang.StringUtils;
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.fs.FileSystem;
34 import org.apache.hadoop.fs.Path;
35 import org.apache.hadoop.hbase.TableName;
36 import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
37 import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
38 import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupMergeJob;
39 import org.apache.hadoop.hbase.backup.mapreduce.MapReduceHFileSplitterJob;
40 import org.apache.hadoop.hbase.backup.util.BackupUtils;
41 import org.apache.hadoop.hbase.client.Connection;
42 import org.apache.hadoop.hbase.client.ConnectionFactory;
43 import org.apache.hadoop.hbase.client.HBaseAdmin;
44 import org.apache.hadoop.hbase.client.HTable;
45 import org.apache.hadoop.hbase.client.Table;
46 import org.apache.hadoop.hbase.testclassification.LargeTests;
47 import org.apache.hadoop.hbase.util.Pair;
48 import org.junit.Assert;
49 import org.junit.Test;
50 import org.junit.experimental.categories.Category;
52 import com.google.common.collect.Lists;
54 @Category(LargeTests.class)
55 public class TestIncrementalBackupMergeWithFailures extends TestBackupBase {
56 private static final Log LOG = LogFactory.getLog(TestIncrementalBackupMergeWithFailures.class);
58 static enum FailurePhase {
59 PHASE1, PHASE2, PHASE3, PHASE4
61 public final static String FAILURE_PHASE_KEY = "failurePhase";
63 static class BackupMergeJobWithFailures extends MapReduceBackupMergeJob {
65 FailurePhase failurePhase;
67 @Override
68 public void setConf(Configuration conf) {
69 super.setConf(conf);
70 String val = conf.get(FAILURE_PHASE_KEY);
71 if (val != null) {
72 failurePhase = FailurePhase.valueOf(val);
73 } else {
74 Assert.fail("Failure phase is not set");
79 /**
80 * This is the exact copy of parent's run() with injections
81 * of different types of failures
83 @Override
84 public void run(String[] backupIds) throws IOException {
85 String bulkOutputConfKey;
87 // TODO : run player on remote cluster
88 player = new MapReduceHFileSplitterJob();
89 bulkOutputConfKey = MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY;
90 // Player reads all files in arbitrary directory structure and creates
91 // a Map task for each file
92 String bids = StringUtils.join(backupIds, ",");
94 if (LOG.isDebugEnabled()) {
95 LOG.debug("Merge backup images " + bids);
98 List<Pair<TableName, Path>> processedTableList = new ArrayList<Pair<TableName, Path>>();
99 boolean finishedTables = false;
100 Connection conn = ConnectionFactory.createConnection(getConf());
101 BackupSystemTable table = new BackupSystemTable(conn);
102 FileSystem fs = FileSystem.get(getConf());
104 try {
106 // Start backup exclusive operation
107 table.startBackupExclusiveOperation();
108 // Start merge operation
109 table.startMergeOperation(backupIds);
111 // Select most recent backup id
112 String mergedBackupId = findMostRecentBackupId(backupIds);
114 TableName[] tableNames = getTableNamesInBackupImages(backupIds);
115 String backupRoot = null;
117 BackupInfo bInfo = table.readBackupInfo(backupIds[0]);
118 backupRoot = bInfo.getBackupRootDir();
119 // PHASE 1
120 checkFailure(FailurePhase.PHASE1);
122 for (int i = 0; i < tableNames.length; i++) {
124 LOG.info("Merge backup images for " + tableNames[i]);
126 // Find input directories for table
128 Path[] dirPaths = findInputDirectories(fs, backupRoot, tableNames[i], backupIds);
129 String dirs = StringUtils.join(dirPaths, ",");
130 Path bulkOutputPath =
131 BackupUtils.getBulkOutputDir(BackupUtils.getFileNameCompatibleString(tableNames[i]),
132 getConf(), false);
133 // Delete content if exists
134 if (fs.exists(bulkOutputPath)) {
135 if (!fs.delete(bulkOutputPath, true)) {
136 LOG.warn("Can not delete: " + bulkOutputPath);
139 Configuration conf = getConf();
140 conf.set(bulkOutputConfKey, bulkOutputPath.toString());
141 String[] playerArgs = { dirs, tableNames[i].getNameAsString() };
143 int result = 0;
144 // PHASE 2
145 checkFailure(FailurePhase.PHASE2);
146 player.setConf(getConf());
147 result = player.run(playerArgs);
148 if (succeeded(result)) {
149 // Add to processed table list
150 processedTableList.add(new Pair<TableName, Path>(tableNames[i], bulkOutputPath));
151 } else {
152 throw new IOException("Can not merge backup images for " + dirs
153 + " (check Hadoop/MR and HBase logs). Player return code =" + result);
155 LOG.debug("Merge Job finished:" + result);
157 List<TableName> tableList = toTableNameList(processedTableList);
158 // PHASE 3
159 checkFailure(FailurePhase.PHASE3);
160 table.updateProcessedTablesForMerge(tableList);
161 finishedTables = true;
163 // Move data
164 for (Pair<TableName, Path> tn : processedTableList) {
165 moveData(fs, backupRoot, tn.getSecond(), tn.getFirst(), mergedBackupId);
167 // PHASE 4
168 checkFailure(FailurePhase.PHASE4);
169 // Delete old data and update manifest
170 List<String> backupsToDelete = getBackupIdsToDelete(backupIds, mergedBackupId);
171 deleteBackupImages(backupsToDelete, conn, fs, backupRoot);
172 updateBackupManifest(backupRoot, mergedBackupId, backupsToDelete);
173 // Finish merge session
174 table.finishMergeOperation();
175 } catch (RuntimeException e) {
176 throw e;
177 } catch (Exception e) {
178 LOG.error(e);
179 if (!finishedTables) {
180 // cleanup bulk directories and finish merge
181 // merge MUST be repeated (no need for repair)
182 cleanupBulkLoadDirs(fs, toPathList(processedTableList));
183 table.finishMergeOperation();
184 table.finishBackupExclusiveOperation();
185 throw new IOException("Backup merge operation failed, you should try it again", e);
186 } else {
187 // backup repair must be run
188 throw new IOException(
189 "Backup merge operation failed, run backup repair tool to restore system's integrity",
192 } finally {
193 table.close();
194 conn.close();
199 private void checkFailure(FailurePhase phase) throws IOException {
200 if ( failurePhase != null && failurePhase == phase) {
201 throw new IOException (phase.toString());
208 @Test
209 public void TestIncBackupMergeRestore() throws Exception {
211 int ADD_ROWS = 99;
212 // #1 - create full backup for all tables
213 LOG.info("create full backup image for all tables");
215 List<TableName> tables = Lists.newArrayList(table1, table2);
216 // Set custom Merge Job implementation
217 conf1.setClass(BackupRestoreFactory.HBASE_BACKUP_MERGE_IMPL_CLASS,
218 BackupMergeJobWithFailures.class, BackupMergeJob.class);
220 Connection conn = ConnectionFactory.createConnection(conf1);
222 HBaseAdmin admin = null;
223 admin = (HBaseAdmin) conn.getAdmin();
224 BackupAdminImpl client = new BackupAdminImpl(conn);
226 BackupRequest request = createBackupRequest(BackupType.FULL, tables, BACKUP_ROOT_DIR);
227 String backupIdFull = client.backupTables(request);
229 assertTrue(checkSucceeded(backupIdFull));
231 // #2 - insert some data to table1
232 HTable t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
233 LOG.debug("writing " + ADD_ROWS + " rows to " + table1);
235 Assert.assertEquals(TEST_UTIL.countRows(t1), NB_ROWS_IN_BATCH + ADD_ROWS);
236 t1.close();
237 LOG.debug("written " + ADD_ROWS + " rows to " + table1);
239 HTable t2 = insertIntoTable(conn, table2, famName, 1, ADD_ROWS);
241 Assert.assertEquals(TEST_UTIL.countRows(t2), NB_ROWS_IN_BATCH + ADD_ROWS);
242 t2.close();
243 LOG.debug("written " + ADD_ROWS + " rows to " + table2);
245 // #3 - incremental backup for multiple tables
246 tables = Lists.newArrayList(table1, table2);
247 request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR);
248 String backupIdIncMultiple = client.backupTables(request);
250 assertTrue(checkSucceeded(backupIdIncMultiple));
252 t1 = insertIntoTable(conn, table1, famName, 2, ADD_ROWS);
253 t1.close();
255 t2 = insertIntoTable(conn, table2, famName, 2, ADD_ROWS);
256 t2.close();
258 // #3 - incremental backup for multiple tables
259 request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR);
260 String backupIdIncMultiple2 = client.backupTables(request);
261 assertTrue(checkSucceeded(backupIdIncMultiple2));
263 // #4 Merge backup images with failures
265 for ( FailurePhase phase : FailurePhase.values()) {
266 Configuration conf = conn.getConfiguration();
268 conf.set(FAILURE_PHASE_KEY, phase.toString());
270 try (BackupAdmin bAdmin = new BackupAdminImpl(conn);)
272 String[] backups = new String[] { backupIdIncMultiple, backupIdIncMultiple2 };
273 bAdmin.mergeBackups(backups);
274 Assert.fail("Expected IOException");
275 } catch (IOException e) {
276 BackupSystemTable table = new BackupSystemTable(conn);
277 if(phase.ordinal() < FailurePhase.PHASE4.ordinal()) {
278 // No need to repair:
279 // Both Merge and backup exclusive operations are finished
280 assertFalse(table.isMergeInProgress());
281 try {
282 table.finishBackupExclusiveOperation();
283 Assert.fail("IOException is expected");
284 } catch(IOException ee) {
285 // Expected
287 } else {
288 // Repair is required
289 assertTrue(table.isMergeInProgress());
290 try {
291 table.startBackupExclusiveOperation();
292 Assert.fail("IOException is expected");
293 } catch(IOException ee) {
294 // Expected - clean up before proceeding
295 table.finishMergeOperation();
296 table.finishBackupExclusiveOperation();
299 table.close();
300 LOG.debug("Expected :"+ e.getMessage());
304 // Now merge w/o failures
305 Configuration conf = conn.getConfiguration();
306 conf.unset(FAILURE_PHASE_KEY);
307 conf.unset(BackupRestoreFactory.HBASE_BACKUP_MERGE_IMPL_CLASS);
309 try (BackupAdmin bAdmin = new BackupAdminImpl(conn);) {
310 String[] backups = new String[] { backupIdIncMultiple, backupIdIncMultiple2 };
311 bAdmin.mergeBackups(backups);
314 // #6 - restore incremental backup for multiple tables, with overwrite
315 TableName[] tablesRestoreIncMultiple = new TableName[] { table1, table2 };
316 TableName[] tablesMapIncMultiple = new TableName[] { table1_restore, table2_restore };
317 client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple2, false,
318 tablesRestoreIncMultiple, tablesMapIncMultiple, true));
320 Table hTable = conn.getTable(table1_restore);
321 LOG.debug("After incremental restore: " + hTable.getTableDescriptor());
322 LOG.debug("f1 has " + TEST_UTIL.countRows(hTable, famName) + " rows");
323 Assert.assertEquals(TEST_UTIL.countRows(hTable, famName), NB_ROWS_IN_BATCH + 2 * ADD_ROWS);
325 hTable.close();
327 hTable = conn.getTable(table2_restore);
328 Assert.assertEquals(TEST_UTIL.countRows(hTable), NB_ROWS_IN_BATCH + 2 * ADD_ROWS);
329 hTable.close();
331 admin.close();
332 conn.close();