2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.backup
;
20 import static org
.apache
.hadoop
.hbase
.backup
.util
.BackupUtils
.succeeded
;
21 import static org
.junit
.Assert
.assertFalse
;
22 import static org
.junit
.Assert
.assertTrue
;
24 import java
.io
.IOException
;
25 import java
.util
.ArrayList
;
26 import java
.util
.List
;
27 import org
.apache
.commons
.lang3
.StringUtils
;
28 import org
.apache
.hadoop
.conf
.Configuration
;
29 import org
.apache
.hadoop
.fs
.FileSystem
;
30 import org
.apache
.hadoop
.fs
.Path
;
31 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
32 import org
.apache
.hadoop
.hbase
.TableName
;
33 import org
.apache
.hadoop
.hbase
.backup
.impl
.BackupAdminImpl
;
34 import org
.apache
.hadoop
.hbase
.backup
.impl
.BackupCommands
;
35 import org
.apache
.hadoop
.hbase
.backup
.impl
.BackupSystemTable
;
36 import org
.apache
.hadoop
.hbase
.backup
.mapreduce
.MapReduceBackupMergeJob
;
37 import org
.apache
.hadoop
.hbase
.backup
.mapreduce
.MapReduceHFileSplitterJob
;
38 import org
.apache
.hadoop
.hbase
.backup
.util
.BackupUtils
;
39 import org
.apache
.hadoop
.hbase
.client
.Admin
;
40 import org
.apache
.hadoop
.hbase
.client
.Connection
;
41 import org
.apache
.hadoop
.hbase
.client
.ConnectionFactory
;
42 import org
.apache
.hadoop
.hbase
.client
.Table
;
43 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
44 import org
.apache
.hadoop
.hbase
.util
.Pair
;
45 import org
.junit
.Assert
;
46 import org
.junit
.ClassRule
;
47 import org
.junit
.Test
;
48 import org
.junit
.experimental
.categories
.Category
;
49 import org
.slf4j
.Logger
;
50 import org
.slf4j
.LoggerFactory
;
52 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.Lists
;
54 @Category(LargeTests
.class)
55 public class TestIncrementalBackupMergeWithFailures
extends TestBackupBase
{
58 public static final HBaseClassTestRule CLASS_RULE
=
59 HBaseClassTestRule
.forClass(TestIncrementalBackupMergeWithFailures
.class);
61 private static final Logger LOG
=
62 LoggerFactory
.getLogger(TestIncrementalBackupMergeWithFailures
.class);
65 PHASE1
, PHASE2
, PHASE3
, PHASE4
68 public final static String FAILURE_PHASE_KEY
= "failurePhase";
70 static class BackupMergeJobWithFailures
extends MapReduceBackupMergeJob
{
71 FailurePhase failurePhase
;
74 public void setConf(Configuration conf
) {
76 String val
= conf
.get(FAILURE_PHASE_KEY
);
78 failurePhase
= FailurePhase
.valueOf(val
);
80 Assert
.fail("Failure phase is not set");
85 * This is the exact copy of parent's run() with injections
86 * of different types of failures
89 public void run(String
[] backupIds
) throws IOException
{
90 String bulkOutputConfKey
;
92 // TODO : run player on remote cluster
93 player
= new MapReduceHFileSplitterJob();
94 bulkOutputConfKey
= MapReduceHFileSplitterJob
.BULK_OUTPUT_CONF_KEY
;
95 // Player reads all files in arbitrary directory structure and creates
96 // a Map task for each file
97 String bids
= StringUtils
.join(backupIds
, ",");
99 if (LOG
.isDebugEnabled()) {
100 LOG
.debug("Merge backup images " + bids
);
103 List
<Pair
<TableName
, Path
>> processedTableList
= new ArrayList
<>();
104 boolean finishedTables
= false;
105 Connection conn
= ConnectionFactory
.createConnection(getConf());
106 BackupSystemTable table
= new BackupSystemTable(conn
);
107 FileSystem fs
= FileSystem
.get(getConf());
110 // Start backup exclusive operation
111 table
.startBackupExclusiveOperation();
112 // Start merge operation
113 table
.startMergeOperation(backupIds
);
115 // Select most recent backup id
116 String mergedBackupId
= BackupUtils
.findMostRecentBackupId(backupIds
);
118 TableName
[] tableNames
= getTableNamesInBackupImages(backupIds
);
120 BackupInfo bInfo
= table
.readBackupInfo(backupIds
[0]);
121 String backupRoot
= bInfo
.getBackupRootDir();
123 checkFailure(FailurePhase
.PHASE1
);
125 for (int i
= 0; i
< tableNames
.length
; i
++) {
126 LOG
.info("Merge backup images for " + tableNames
[i
]);
128 // Find input directories for table
129 Path
[] dirPaths
= findInputDirectories(fs
, backupRoot
, tableNames
[i
], backupIds
);
130 String dirs
= StringUtils
.join(dirPaths
, ",");
131 Path bulkOutputPath
=
132 BackupUtils
.getBulkOutputDir(BackupUtils
.getFileNameCompatibleString(tableNames
[i
]),
134 // Delete content if exists
135 if (fs
.exists(bulkOutputPath
)) {
136 if (!fs
.delete(bulkOutputPath
, true)) {
137 LOG
.warn("Can not delete: " + bulkOutputPath
);
140 Configuration conf
= getConf();
141 conf
.set(bulkOutputConfKey
, bulkOutputPath
.toString());
142 String
[] playerArgs
= { dirs
, tableNames
[i
].getNameAsString() };
145 checkFailure(FailurePhase
.PHASE2
);
146 player
.setConf(getConf());
147 int result
= player
.run(playerArgs
);
148 if (succeeded(result
)) {
149 // Add to processed table list
150 processedTableList
.add(new Pair
<>(tableNames
[i
], bulkOutputPath
));
152 throw new IOException("Can not merge backup images for " + dirs
153 + " (check Hadoop/MR and HBase logs). Player return code =" + result
);
155 LOG
.debug("Merge Job finished:" + result
);
157 List
<TableName
> tableList
= toTableNameList(processedTableList
);
159 checkFailure(FailurePhase
.PHASE3
);
160 table
.updateProcessedTablesForMerge(tableList
);
161 finishedTables
= true;
163 // (modification of a backup file system)
164 // Move existing mergedBackupId data into tmp directory
165 // we will need it later in case of a failure
166 Path tmpBackupDir
= HBackupFileSystem
.getBackupTmpDirPathForBackupId(backupRoot
,
168 Path backupDirPath
= HBackupFileSystem
.getBackupPath(backupRoot
, mergedBackupId
);
169 if (!fs
.rename(backupDirPath
, tmpBackupDir
)) {
170 throw new IOException("Failed to rename "+ backupDirPath
+" to "+tmpBackupDir
);
172 LOG
.debug("Renamed "+ backupDirPath
+" to "+ tmpBackupDir
);
174 // Move new data into backup dest
175 for (Pair
<TableName
, Path
> tn
: processedTableList
) {
176 moveData(fs
, backupRoot
, tn
.getSecond(), tn
.getFirst(), mergedBackupId
);
178 checkFailure(FailurePhase
.PHASE4
);
179 // Update backup manifest
180 List
<String
> backupsToDelete
= getBackupIdsToDelete(backupIds
, mergedBackupId
);
181 updateBackupManifest(tmpBackupDir
.getParent().toString(), mergedBackupId
, backupsToDelete
);
182 // Copy meta files back from tmp to backup dir
183 copyMetaData(fs
, tmpBackupDir
, backupDirPath
);
184 // Delete tmp dir (Rename back during repair)
185 if (!fs
.delete(tmpBackupDir
, true)) {
187 LOG
.warn("Could not delete tmp dir: "+ tmpBackupDir
);
190 deleteBackupImages(backupsToDelete
, conn
, fs
, backupRoot
);
191 // Finish merge session
192 table
.finishMergeOperation();
194 table
.finishBackupExclusiveOperation();
195 } catch (RuntimeException e
) {
197 } catch (Exception e
) {
198 LOG
.error(e
.toString(), e
);
199 if (!finishedTables
) {
200 // cleanup bulk directories and finish merge
201 // merge MUST be repeated (no need for repair)
202 cleanupBulkLoadDirs(fs
, toPathList(processedTableList
));
203 table
.finishMergeOperation();
204 table
.finishBackupExclusiveOperation();
205 throw new IOException("Backup merge operation failed, you should try it again", e
);
207 // backup repair must be run
208 throw new IOException(
209 "Backup merge operation failed, run backup repair tool to restore system's integrity",
218 private void checkFailure(FailurePhase phase
) throws IOException
{
219 if (failurePhase
!= null && failurePhase
== phase
) {
220 throw new IOException(phase
.toString());
226 public void TestIncBackupMergeRestore() throws Exception
{
228 // #1 - create full backup for all tables
229 LOG
.info("create full backup image for all tables");
231 List
<TableName
> tables
= Lists
.newArrayList(table1
, table2
);
232 // Set custom Merge Job implementation
233 conf1
.setClass(BackupRestoreFactory
.HBASE_BACKUP_MERGE_IMPL_CLASS
,
234 BackupMergeJobWithFailures
.class, BackupMergeJob
.class);
236 Connection conn
= ConnectionFactory
.createConnection(conf1
);
238 Admin admin
= conn
.getAdmin();
239 BackupAdminImpl client
= new BackupAdminImpl(conn
);
241 BackupRequest request
= createBackupRequest(BackupType
.FULL
, tables
, BACKUP_ROOT_DIR
);
242 String backupIdFull
= client
.backupTables(request
);
244 assertTrue(checkSucceeded(backupIdFull
));
246 // #2 - insert some data to table1
247 Table t1
= insertIntoTable(conn
, table1
, famName
, 1, ADD_ROWS
);
248 LOG
.debug("writing " + ADD_ROWS
+ " rows to " + table1
);
250 Assert
.assertEquals(TEST_UTIL
.countRows(t1
), NB_ROWS_IN_BATCH
+ ADD_ROWS
);
252 LOG
.debug("written " + ADD_ROWS
+ " rows to " + table1
);
254 Table t2
= insertIntoTable(conn
, table2
, famName
, 1, ADD_ROWS
);
256 Assert
.assertEquals(TEST_UTIL
.countRows(t2
), NB_ROWS_IN_BATCH
+ ADD_ROWS
);
258 LOG
.debug("written " + ADD_ROWS
+ " rows to " + table2
);
260 // #3 - incremental backup for multiple tables
261 tables
= Lists
.newArrayList(table1
, table2
);
262 request
= createBackupRequest(BackupType
.INCREMENTAL
, tables
, BACKUP_ROOT_DIR
);
263 String backupIdIncMultiple
= client
.backupTables(request
);
265 assertTrue(checkSucceeded(backupIdIncMultiple
));
267 t1
= insertIntoTable(conn
, table1
, famName
, 2, ADD_ROWS
);
270 t2
= insertIntoTable(conn
, table2
, famName
, 2, ADD_ROWS
);
273 // #3 - incremental backup for multiple tables
274 request
= createBackupRequest(BackupType
.INCREMENTAL
, tables
, BACKUP_ROOT_DIR
);
275 String backupIdIncMultiple2
= client
.backupTables(request
);
276 assertTrue(checkSucceeded(backupIdIncMultiple2
));
277 // #4 Merge backup images with failures
279 for (FailurePhase phase
: FailurePhase
.values()) {
280 Configuration conf
= conn
.getConfiguration();
282 conf
.set(FAILURE_PHASE_KEY
, phase
.toString());
284 try (BackupAdmin bAdmin
= new BackupAdminImpl(conn
)) {
285 String
[] backups
= new String
[] { backupIdIncMultiple
, backupIdIncMultiple2
};
286 bAdmin
.mergeBackups(backups
);
287 Assert
.fail("Expected IOException");
288 } catch (IOException e
) {
289 BackupSystemTable table
= new BackupSystemTable(conn
);
290 if(phase
.ordinal() < FailurePhase
.PHASE4
.ordinal()) {
291 // No need to repair:
292 // Both Merge and backup exclusive operations are finished
293 assertFalse(table
.isMergeInProgress());
295 table
.finishBackupExclusiveOperation();
296 Assert
.fail("IOException is expected");
297 } catch(IOException ee
) {
301 // Repair is required
302 assertTrue(table
.isMergeInProgress());
304 table
.startBackupExclusiveOperation();
305 Assert
.fail("IOException is expected");
306 } catch(IOException ee
) {
307 // Expected - clean up before proceeding
308 //table.finishMergeOperation();
309 //table.finishBackupExclusiveOperation();
313 LOG
.debug("Expected :"+ e
.getMessage());
316 // Now merge w/o failures
317 Configuration conf
= conn
.getConfiguration();
318 conf
.unset(FAILURE_PHASE_KEY
);
319 conf
.unset(BackupRestoreFactory
.HBASE_BACKUP_MERGE_IMPL_CLASS
);
321 BackupSystemTable sysTable
= new BackupSystemTable(conn
);
322 BackupCommands
.RepairCommand
.repairFailedBackupMergeIfAny(conn
, sysTable
);
324 try (BackupAdmin bAdmin
= new BackupAdminImpl(conn
)) {
325 String
[] backups
= new String
[] { backupIdIncMultiple
, backupIdIncMultiple2
};
326 bAdmin
.mergeBackups(backups
);
329 // #6 - restore incremental backup for multiple tables, with overwrite
330 TableName
[] tablesRestoreIncMultiple
= new TableName
[] { table1
, table2
};
331 TableName
[] tablesMapIncMultiple
= new TableName
[] { table1_restore
, table2_restore
};
332 client
.restore(BackupUtils
.createRestoreRequest(BACKUP_ROOT_DIR
, backupIdIncMultiple2
, false,
333 tablesRestoreIncMultiple
, tablesMapIncMultiple
, true));
335 Table hTable
= conn
.getTable(table1_restore
);
336 LOG
.debug("After incremental restore: " + hTable
.getDescriptor());
337 LOG
.debug("f1 has " + TEST_UTIL
.countRows(hTable
, famName
) + " rows");
338 Assert
.assertEquals(TEST_UTIL
.countRows(hTable
, famName
), NB_ROWS_IN_BATCH
+ 2 * ADD_ROWS
);
342 hTable
= conn
.getTable(table2_restore
);
343 Assert
.assertEquals(TEST_UTIL
.countRows(hTable
), NB_ROWS_IN_BATCH
+ 2 * ADD_ROWS
);