2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
.backup
;
21 import static org
.apache
.hadoop
.hbase
.backup
.util
.BackupUtils
.succeeded
;
22 import static org
.junit
.Assert
.assertFalse
;
23 import static org
.junit
.Assert
.assertTrue
;
25 import java
.io
.IOException
;
26 import java
.util
.ArrayList
;
27 import java
.util
.List
;
29 import org
.apache
.commons
.lang
.StringUtils
;
30 import org
.apache
.commons
.logging
.Log
;
31 import org
.apache
.commons
.logging
.LogFactory
;
32 import org
.apache
.hadoop
.conf
.Configuration
;
33 import org
.apache
.hadoop
.fs
.FileSystem
;
34 import org
.apache
.hadoop
.fs
.Path
;
35 import org
.apache
.hadoop
.hbase
.TableName
;
36 import org
.apache
.hadoop
.hbase
.backup
.impl
.BackupAdminImpl
;
37 import org
.apache
.hadoop
.hbase
.backup
.impl
.BackupSystemTable
;
38 import org
.apache
.hadoop
.hbase
.backup
.mapreduce
.MapReduceBackupMergeJob
;
39 import org
.apache
.hadoop
.hbase
.backup
.mapreduce
.MapReduceHFileSplitterJob
;
40 import org
.apache
.hadoop
.hbase
.backup
.util
.BackupUtils
;
41 import org
.apache
.hadoop
.hbase
.client
.Connection
;
42 import org
.apache
.hadoop
.hbase
.client
.ConnectionFactory
;
43 import org
.apache
.hadoop
.hbase
.client
.HBaseAdmin
;
44 import org
.apache
.hadoop
.hbase
.client
.HTable
;
45 import org
.apache
.hadoop
.hbase
.client
.Table
;
46 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
47 import org
.apache
.hadoop
.hbase
.util
.Pair
;
48 import org
.junit
.Assert
;
49 import org
.junit
.Test
;
50 import org
.junit
.experimental
.categories
.Category
;
52 import com
.google
.common
.collect
.Lists
;
54 @Category(LargeTests
.class)
55 public class TestIncrementalBackupMergeWithFailures
extends TestBackupBase
{
56 private static final Log LOG
= LogFactory
.getLog(TestIncrementalBackupMergeWithFailures
.class);
58 static enum FailurePhase
{
59 PHASE1
, PHASE2
, PHASE3
, PHASE4
61 public final static String FAILURE_PHASE_KEY
= "failurePhase";
63 static class BackupMergeJobWithFailures
extends MapReduceBackupMergeJob
{
65 FailurePhase failurePhase
;
68 public void setConf(Configuration conf
) {
70 String val
= conf
.get(FAILURE_PHASE_KEY
);
72 failurePhase
= FailurePhase
.valueOf(val
);
74 Assert
.fail("Failure phase is not set");
80 * This is the exact copy of parent's run() with injections
81 * of different types of failures
84 public void run(String
[] backupIds
) throws IOException
{
85 String bulkOutputConfKey
;
87 // TODO : run player on remote cluster
88 player
= new MapReduceHFileSplitterJob();
89 bulkOutputConfKey
= MapReduceHFileSplitterJob
.BULK_OUTPUT_CONF_KEY
;
90 // Player reads all files in arbitrary directory structure and creates
91 // a Map task for each file
92 String bids
= StringUtils
.join(backupIds
, ",");
94 if (LOG
.isDebugEnabled()) {
95 LOG
.debug("Merge backup images " + bids
);
98 List
<Pair
<TableName
, Path
>> processedTableList
= new ArrayList
<Pair
<TableName
, Path
>>();
99 boolean finishedTables
= false;
100 Connection conn
= ConnectionFactory
.createConnection(getConf());
101 BackupSystemTable table
= new BackupSystemTable(conn
);
102 FileSystem fs
= FileSystem
.get(getConf());
106 // Start backup exclusive operation
107 table
.startBackupExclusiveOperation();
108 // Start merge operation
109 table
.startMergeOperation(backupIds
);
111 // Select most recent backup id
112 String mergedBackupId
= findMostRecentBackupId(backupIds
);
114 TableName
[] tableNames
= getTableNamesInBackupImages(backupIds
);
115 String backupRoot
= null;
117 BackupInfo bInfo
= table
.readBackupInfo(backupIds
[0]);
118 backupRoot
= bInfo
.getBackupRootDir();
120 checkFailure(FailurePhase
.PHASE1
);
122 for (int i
= 0; i
< tableNames
.length
; i
++) {
124 LOG
.info("Merge backup images for " + tableNames
[i
]);
126 // Find input directories for table
128 Path
[] dirPaths
= findInputDirectories(fs
, backupRoot
, tableNames
[i
], backupIds
);
129 String dirs
= StringUtils
.join(dirPaths
, ",");
130 Path bulkOutputPath
=
131 BackupUtils
.getBulkOutputDir(BackupUtils
.getFileNameCompatibleString(tableNames
[i
]),
133 // Delete content if exists
134 if (fs
.exists(bulkOutputPath
)) {
135 if (!fs
.delete(bulkOutputPath
, true)) {
136 LOG
.warn("Can not delete: " + bulkOutputPath
);
139 Configuration conf
= getConf();
140 conf
.set(bulkOutputConfKey
, bulkOutputPath
.toString());
141 String
[] playerArgs
= { dirs
, tableNames
[i
].getNameAsString() };
145 checkFailure(FailurePhase
.PHASE2
);
146 player
.setConf(getConf());
147 result
= player
.run(playerArgs
);
148 if (succeeded(result
)) {
149 // Add to processed table list
150 processedTableList
.add(new Pair
<TableName
, Path
>(tableNames
[i
], bulkOutputPath
));
152 throw new IOException("Can not merge backup images for " + dirs
153 + " (check Hadoop/MR and HBase logs). Player return code =" + result
);
155 LOG
.debug("Merge Job finished:" + result
);
157 List
<TableName
> tableList
= toTableNameList(processedTableList
);
159 checkFailure(FailurePhase
.PHASE3
);
160 table
.updateProcessedTablesForMerge(tableList
);
161 finishedTables
= true;
164 for (Pair
<TableName
, Path
> tn
: processedTableList
) {
165 moveData(fs
, backupRoot
, tn
.getSecond(), tn
.getFirst(), mergedBackupId
);
168 checkFailure(FailurePhase
.PHASE4
);
169 // Delete old data and update manifest
170 List
<String
> backupsToDelete
= getBackupIdsToDelete(backupIds
, mergedBackupId
);
171 deleteBackupImages(backupsToDelete
, conn
, fs
, backupRoot
);
172 updateBackupManifest(backupRoot
, mergedBackupId
, backupsToDelete
);
173 // Finish merge session
174 table
.finishMergeOperation();
175 } catch (RuntimeException e
) {
177 } catch (Exception e
) {
179 if (!finishedTables
) {
180 // cleanup bulk directories and finish merge
181 // merge MUST be repeated (no need for repair)
182 cleanupBulkLoadDirs(fs
, toPathList(processedTableList
));
183 table
.finishMergeOperation();
184 table
.finishBackupExclusiveOperation();
185 throw new IOException("Backup merge operation failed, you should try it again", e
);
187 // backup repair must be run
188 throw new IOException(
189 "Backup merge operation failed, run backup repair tool to restore system's integrity",
199 private void checkFailure(FailurePhase phase
) throws IOException
{
200 if ( failurePhase
!= null && failurePhase
== phase
) {
201 throw new IOException (phase
.toString());
209 public void TestIncBackupMergeRestore() throws Exception
{
212 // #1 - create full backup for all tables
213 LOG
.info("create full backup image for all tables");
215 List
<TableName
> tables
= Lists
.newArrayList(table1
, table2
);
216 // Set custom Merge Job implementation
217 conf1
.setClass(BackupRestoreFactory
.HBASE_BACKUP_MERGE_IMPL_CLASS
,
218 BackupMergeJobWithFailures
.class, BackupMergeJob
.class);
220 Connection conn
= ConnectionFactory
.createConnection(conf1
);
222 HBaseAdmin admin
= null;
223 admin
= (HBaseAdmin
) conn
.getAdmin();
224 BackupAdminImpl client
= new BackupAdminImpl(conn
);
226 BackupRequest request
= createBackupRequest(BackupType
.FULL
, tables
, BACKUP_ROOT_DIR
);
227 String backupIdFull
= client
.backupTables(request
);
229 assertTrue(checkSucceeded(backupIdFull
));
231 // #2 - insert some data to table1
232 HTable t1
= insertIntoTable(conn
, table1
, famName
, 1, ADD_ROWS
);
233 LOG
.debug("writing " + ADD_ROWS
+ " rows to " + table1
);
235 Assert
.assertEquals(TEST_UTIL
.countRows(t1
), NB_ROWS_IN_BATCH
+ ADD_ROWS
);
237 LOG
.debug("written " + ADD_ROWS
+ " rows to " + table1
);
239 HTable t2
= insertIntoTable(conn
, table2
, famName
, 1, ADD_ROWS
);
241 Assert
.assertEquals(TEST_UTIL
.countRows(t2
), NB_ROWS_IN_BATCH
+ ADD_ROWS
);
243 LOG
.debug("written " + ADD_ROWS
+ " rows to " + table2
);
245 // #3 - incremental backup for multiple tables
246 tables
= Lists
.newArrayList(table1
, table2
);
247 request
= createBackupRequest(BackupType
.INCREMENTAL
, tables
, BACKUP_ROOT_DIR
);
248 String backupIdIncMultiple
= client
.backupTables(request
);
250 assertTrue(checkSucceeded(backupIdIncMultiple
));
252 t1
= insertIntoTable(conn
, table1
, famName
, 2, ADD_ROWS
);
255 t2
= insertIntoTable(conn
, table2
, famName
, 2, ADD_ROWS
);
258 // #3 - incremental backup for multiple tables
259 request
= createBackupRequest(BackupType
.INCREMENTAL
, tables
, BACKUP_ROOT_DIR
);
260 String backupIdIncMultiple2
= client
.backupTables(request
);
261 assertTrue(checkSucceeded(backupIdIncMultiple2
));
263 // #4 Merge backup images with failures
265 for ( FailurePhase phase
: FailurePhase
.values()) {
266 Configuration conf
= conn
.getConfiguration();
268 conf
.set(FAILURE_PHASE_KEY
, phase
.toString());
270 try (BackupAdmin bAdmin
= new BackupAdminImpl(conn
);)
272 String
[] backups
= new String
[] { backupIdIncMultiple
, backupIdIncMultiple2
};
273 bAdmin
.mergeBackups(backups
);
274 Assert
.fail("Expected IOException");
275 } catch (IOException e
) {
276 BackupSystemTable table
= new BackupSystemTable(conn
);
277 if(phase
.ordinal() < FailurePhase
.PHASE4
.ordinal()) {
278 // No need to repair:
279 // Both Merge and backup exclusive operations are finished
280 assertFalse(table
.isMergeInProgress());
282 table
.finishBackupExclusiveOperation();
283 Assert
.fail("IOException is expected");
284 } catch(IOException ee
) {
288 // Repair is required
289 assertTrue(table
.isMergeInProgress());
291 table
.startBackupExclusiveOperation();
292 Assert
.fail("IOException is expected");
293 } catch(IOException ee
) {
294 // Expected - clean up before proceeding
295 table
.finishMergeOperation();
296 table
.finishBackupExclusiveOperation();
300 LOG
.debug("Expected :"+ e
.getMessage());
304 // Now merge w/o failures
305 Configuration conf
= conn
.getConfiguration();
306 conf
.unset(FAILURE_PHASE_KEY
);
307 conf
.unset(BackupRestoreFactory
.HBASE_BACKUP_MERGE_IMPL_CLASS
);
309 try (BackupAdmin bAdmin
= new BackupAdminImpl(conn
);) {
310 String
[] backups
= new String
[] { backupIdIncMultiple
, backupIdIncMultiple2
};
311 bAdmin
.mergeBackups(backups
);
314 // #6 - restore incremental backup for multiple tables, with overwrite
315 TableName
[] tablesRestoreIncMultiple
= new TableName
[] { table1
, table2
};
316 TableName
[] tablesMapIncMultiple
= new TableName
[] { table1_restore
, table2_restore
};
317 client
.restore(BackupUtils
.createRestoreRequest(BACKUP_ROOT_DIR
, backupIdIncMultiple2
, false,
318 tablesRestoreIncMultiple
, tablesMapIncMultiple
, true));
320 Table hTable
= conn
.getTable(table1_restore
);
321 LOG
.debug("After incremental restore: " + hTable
.getTableDescriptor());
322 LOG
.debug("f1 has " + TEST_UTIL
.countRows(hTable
, famName
) + " rows");
323 Assert
.assertEquals(TEST_UTIL
.countRows(hTable
, famName
), NB_ROWS_IN_BATCH
+ 2 * ADD_ROWS
);
327 hTable
= conn
.getTable(table2_restore
);
328 Assert
.assertEquals(TEST_UTIL
.countRows(hTable
), NB_ROWS_IN_BATCH
+ 2 * ADD_ROWS
);