HBASE-23868 : Replace usages of HColumnDescriptor(byte [] familyName)… (#1222)
[hbase.git] / hbase-backup / src / test / java / org / apache / hadoop / hbase / backup / TestIncrementalBackupMergeWithFailures.java
blob1bde63ba55278b71cef7c724bcb791ac15d59adf
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.backup;
20 import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded;
21 import static org.junit.Assert.assertFalse;
22 import static org.junit.Assert.assertTrue;
24 import java.io.IOException;
25 import java.util.ArrayList;
26 import java.util.List;
27 import org.apache.commons.lang3.StringUtils;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.fs.FileSystem;
30 import org.apache.hadoop.fs.Path;
31 import org.apache.hadoop.hbase.HBaseClassTestRule;
32 import org.apache.hadoop.hbase.TableName;
33 import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
34 import org.apache.hadoop.hbase.backup.impl.BackupCommands;
35 import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
36 import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupMergeJob;
37 import org.apache.hadoop.hbase.backup.mapreduce.MapReduceHFileSplitterJob;
38 import org.apache.hadoop.hbase.backup.util.BackupUtils;
39 import org.apache.hadoop.hbase.client.Admin;
40 import org.apache.hadoop.hbase.client.Connection;
41 import org.apache.hadoop.hbase.client.ConnectionFactory;
42 import org.apache.hadoop.hbase.client.Table;
43 import org.apache.hadoop.hbase.testclassification.LargeTests;
44 import org.apache.hadoop.hbase.util.Pair;
45 import org.junit.Assert;
46 import org.junit.ClassRule;
47 import org.junit.Test;
48 import org.junit.experimental.categories.Category;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
52 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
54 @Category(LargeTests.class)
55 public class TestIncrementalBackupMergeWithFailures extends TestBackupBase {
57 @ClassRule
58 public static final HBaseClassTestRule CLASS_RULE =
59 HBaseClassTestRule.forClass(TestIncrementalBackupMergeWithFailures.class);
61 private static final Logger LOG =
62 LoggerFactory.getLogger(TestIncrementalBackupMergeWithFailures.class);
64 enum FailurePhase {
65 PHASE1, PHASE2, PHASE3, PHASE4
68 public final static String FAILURE_PHASE_KEY = "failurePhase";
70 static class BackupMergeJobWithFailures extends MapReduceBackupMergeJob {
71 FailurePhase failurePhase;
73 @Override
74 public void setConf(Configuration conf) {
75 super.setConf(conf);
76 String val = conf.get(FAILURE_PHASE_KEY);
77 if (val != null) {
78 failurePhase = FailurePhase.valueOf(val);
79 } else {
80 Assert.fail("Failure phase is not set");
84 /**
85 * This is the exact copy of parent's run() with injections
86 * of different types of failures
88 @Override
89 public void run(String[] backupIds) throws IOException {
90 String bulkOutputConfKey;
92 // TODO : run player on remote cluster
93 player = new MapReduceHFileSplitterJob();
94 bulkOutputConfKey = MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY;
95 // Player reads all files in arbitrary directory structure and creates
96 // a Map task for each file
97 String bids = StringUtils.join(backupIds, ",");
99 if (LOG.isDebugEnabled()) {
100 LOG.debug("Merge backup images " + bids);
103 List<Pair<TableName, Path>> processedTableList = new ArrayList<>();
104 boolean finishedTables = false;
105 Connection conn = ConnectionFactory.createConnection(getConf());
106 BackupSystemTable table = new BackupSystemTable(conn);
107 FileSystem fs = FileSystem.get(getConf());
109 try {
110 // Start backup exclusive operation
111 table.startBackupExclusiveOperation();
112 // Start merge operation
113 table.startMergeOperation(backupIds);
115 // Select most recent backup id
116 String mergedBackupId = BackupUtils.findMostRecentBackupId(backupIds);
118 TableName[] tableNames = getTableNamesInBackupImages(backupIds);
120 BackupInfo bInfo = table.readBackupInfo(backupIds[0]);
121 String backupRoot = bInfo.getBackupRootDir();
122 // PHASE 1
123 checkFailure(FailurePhase.PHASE1);
125 for (int i = 0; i < tableNames.length; i++) {
126 LOG.info("Merge backup images for " + tableNames[i]);
128 // Find input directories for table
129 Path[] dirPaths = findInputDirectories(fs, backupRoot, tableNames[i], backupIds);
130 String dirs = StringUtils.join(dirPaths, ",");
131 Path bulkOutputPath =
132 BackupUtils.getBulkOutputDir(BackupUtils.getFileNameCompatibleString(tableNames[i]),
133 getConf(), false);
134 // Delete content if exists
135 if (fs.exists(bulkOutputPath)) {
136 if (!fs.delete(bulkOutputPath, true)) {
137 LOG.warn("Can not delete: " + bulkOutputPath);
140 Configuration conf = getConf();
141 conf.set(bulkOutputConfKey, bulkOutputPath.toString());
142 String[] playerArgs = { dirs, tableNames[i].getNameAsString() };
144 // PHASE 2
145 checkFailure(FailurePhase.PHASE2);
146 player.setConf(getConf());
147 int result = player.run(playerArgs);
148 if (succeeded(result)) {
149 // Add to processed table list
150 processedTableList.add(new Pair<>(tableNames[i], bulkOutputPath));
151 } else {
152 throw new IOException("Can not merge backup images for " + dirs
153 + " (check Hadoop/MR and HBase logs). Player return code =" + result);
155 LOG.debug("Merge Job finished:" + result);
157 List<TableName> tableList = toTableNameList(processedTableList);
158 // PHASE 3
159 checkFailure(FailurePhase.PHASE3);
160 table.updateProcessedTablesForMerge(tableList);
161 finishedTables = true;
163 // (modification of a backup file system)
164 // Move existing mergedBackupId data into tmp directory
165 // we will need it later in case of a failure
166 Path tmpBackupDir = HBackupFileSystem.getBackupTmpDirPathForBackupId(backupRoot,
167 mergedBackupId);
168 Path backupDirPath = HBackupFileSystem.getBackupPath(backupRoot, mergedBackupId);
169 if (!fs.rename(backupDirPath, tmpBackupDir)) {
170 throw new IOException("Failed to rename "+ backupDirPath +" to "+tmpBackupDir);
171 } else {
172 LOG.debug("Renamed "+ backupDirPath +" to "+ tmpBackupDir);
174 // Move new data into backup dest
175 for (Pair<TableName, Path> tn : processedTableList) {
176 moveData(fs, backupRoot, tn.getSecond(), tn.getFirst(), mergedBackupId);
178 checkFailure(FailurePhase.PHASE4);
179 // Update backup manifest
180 List<String> backupsToDelete = getBackupIdsToDelete(backupIds, mergedBackupId);
181 updateBackupManifest(tmpBackupDir.getParent().toString(), mergedBackupId, backupsToDelete);
182 // Copy meta files back from tmp to backup dir
183 copyMetaData(fs, tmpBackupDir, backupDirPath);
184 // Delete tmp dir (Rename back during repair)
185 if (!fs.delete(tmpBackupDir, true)) {
186 // WARN and ignore
187 LOG.warn("Could not delete tmp dir: "+ tmpBackupDir);
189 // Delete old data
190 deleteBackupImages(backupsToDelete, conn, fs, backupRoot);
191 // Finish merge session
192 table.finishMergeOperation();
193 // Release lock
194 table.finishBackupExclusiveOperation();
195 } catch (RuntimeException e) {
196 throw e;
197 } catch (Exception e) {
198 LOG.error(e.toString(), e);
199 if (!finishedTables) {
200 // cleanup bulk directories and finish merge
201 // merge MUST be repeated (no need for repair)
202 cleanupBulkLoadDirs(fs, toPathList(processedTableList));
203 table.finishMergeOperation();
204 table.finishBackupExclusiveOperation();
205 throw new IOException("Backup merge operation failed, you should try it again", e);
206 } else {
207 // backup repair must be run
208 throw new IOException(
209 "Backup merge operation failed, run backup repair tool to restore system's integrity",
212 } finally {
213 table.close();
214 conn.close();
218 private void checkFailure(FailurePhase phase) throws IOException {
219 if (failurePhase != null && failurePhase == phase) {
220 throw new IOException(phase.toString());
225 @Test
226 public void TestIncBackupMergeRestore() throws Exception {
227 int ADD_ROWS = 99;
228 // #1 - create full backup for all tables
229 LOG.info("create full backup image for all tables");
231 List<TableName> tables = Lists.newArrayList(table1, table2);
232 // Set custom Merge Job implementation
233 conf1.setClass(BackupRestoreFactory.HBASE_BACKUP_MERGE_IMPL_CLASS,
234 BackupMergeJobWithFailures.class, BackupMergeJob.class);
236 Connection conn = ConnectionFactory.createConnection(conf1);
238 Admin admin = conn.getAdmin();
239 BackupAdminImpl client = new BackupAdminImpl(conn);
241 BackupRequest request = createBackupRequest(BackupType.FULL, tables, BACKUP_ROOT_DIR);
242 String backupIdFull = client.backupTables(request);
244 assertTrue(checkSucceeded(backupIdFull));
246 // #2 - insert some data to table1
247 Table t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
248 LOG.debug("writing " + ADD_ROWS + " rows to " + table1);
250 Assert.assertEquals(TEST_UTIL.countRows(t1), NB_ROWS_IN_BATCH + ADD_ROWS);
251 t1.close();
252 LOG.debug("written " + ADD_ROWS + " rows to " + table1);
254 Table t2 = insertIntoTable(conn, table2, famName, 1, ADD_ROWS);
256 Assert.assertEquals(TEST_UTIL.countRows(t2), NB_ROWS_IN_BATCH + ADD_ROWS);
257 t2.close();
258 LOG.debug("written " + ADD_ROWS + " rows to " + table2);
260 // #3 - incremental backup for multiple tables
261 tables = Lists.newArrayList(table1, table2);
262 request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR);
263 String backupIdIncMultiple = client.backupTables(request);
265 assertTrue(checkSucceeded(backupIdIncMultiple));
267 t1 = insertIntoTable(conn, table1, famName, 2, ADD_ROWS);
268 t1.close();
270 t2 = insertIntoTable(conn, table2, famName, 2, ADD_ROWS);
271 t2.close();
273 // #3 - incremental backup for multiple tables
274 request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR);
275 String backupIdIncMultiple2 = client.backupTables(request);
276 assertTrue(checkSucceeded(backupIdIncMultiple2));
277 // #4 Merge backup images with failures
279 for (FailurePhase phase : FailurePhase.values()) {
280 Configuration conf = conn.getConfiguration();
282 conf.set(FAILURE_PHASE_KEY, phase.toString());
284 try (BackupAdmin bAdmin = new BackupAdminImpl(conn)) {
285 String[] backups = new String[] { backupIdIncMultiple, backupIdIncMultiple2 };
286 bAdmin.mergeBackups(backups);
287 Assert.fail("Expected IOException");
288 } catch (IOException e) {
289 BackupSystemTable table = new BackupSystemTable(conn);
290 if(phase.ordinal() < FailurePhase.PHASE4.ordinal()) {
291 // No need to repair:
292 // Both Merge and backup exclusive operations are finished
293 assertFalse(table.isMergeInProgress());
294 try {
295 table.finishBackupExclusiveOperation();
296 Assert.fail("IOException is expected");
297 } catch(IOException ee) {
298 // Expected
300 } else {
301 // Repair is required
302 assertTrue(table.isMergeInProgress());
303 try {
304 table.startBackupExclusiveOperation();
305 Assert.fail("IOException is expected");
306 } catch(IOException ee) {
307 // Expected - clean up before proceeding
308 //table.finishMergeOperation();
309 //table.finishBackupExclusiveOperation();
312 table.close();
313 LOG.debug("Expected :"+ e.getMessage());
316 // Now merge w/o failures
317 Configuration conf = conn.getConfiguration();
318 conf.unset(FAILURE_PHASE_KEY);
319 conf.unset(BackupRestoreFactory.HBASE_BACKUP_MERGE_IMPL_CLASS);
320 // Now run repair
321 BackupSystemTable sysTable = new BackupSystemTable(conn);
322 BackupCommands.RepairCommand.repairFailedBackupMergeIfAny(conn, sysTable);
323 // Now repeat merge
324 try (BackupAdmin bAdmin = new BackupAdminImpl(conn)) {
325 String[] backups = new String[] { backupIdIncMultiple, backupIdIncMultiple2 };
326 bAdmin.mergeBackups(backups);
329 // #6 - restore incremental backup for multiple tables, with overwrite
330 TableName[] tablesRestoreIncMultiple = new TableName[] { table1, table2 };
331 TableName[] tablesMapIncMultiple = new TableName[] { table1_restore, table2_restore };
332 client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple2, false,
333 tablesRestoreIncMultiple, tablesMapIncMultiple, true));
335 Table hTable = conn.getTable(table1_restore);
336 LOG.debug("After incremental restore: " + hTable.getDescriptor());
337 LOG.debug("f1 has " + TEST_UTIL.countRows(hTable, famName) + " rows");
338 Assert.assertEquals(TEST_UTIL.countRows(hTable, famName), NB_ROWS_IN_BATCH + 2 * ADD_ROWS);
340 hTable.close();
342 hTable = conn.getTable(table2_restore);
343 Assert.assertEquals(TEST_UTIL.countRows(hTable), NB_ROWS_IN_BATCH + 2 * ADD_ROWS);
344 hTable.close();
346 admin.close();
347 conn.close();