3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
.mob
;
20 import static org
.junit
.Assert
.assertEquals
;
21 import static org
.junit
.Assert
.assertTrue
;
23 import java
.io
.IOException
;
24 import java
.util
.Arrays
;
26 import org
.apache
.hadoop
.conf
.Configuration
;
27 import org
.apache
.hadoop
.fs
.FileStatus
;
28 import org
.apache
.hadoop
.fs
.FileSystem
;
29 import org
.apache
.hadoop
.fs
.Path
;
30 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
31 import org
.apache
.hadoop
.hbase
.TableName
;
32 import org
.apache
.hadoop
.hbase
.client
.Admin
;
33 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptorBuilder
;
34 import org
.apache
.hadoop
.hbase
.client
.Connection
;
35 import org
.apache
.hadoop
.hbase
.client
.ConnectionFactory
;
36 import org
.apache
.hadoop
.hbase
.client
.Put
;
37 import org
.apache
.hadoop
.hbase
.client
.Result
;
38 import org
.apache
.hadoop
.hbase
.client
.ResultScanner
;
39 import org
.apache
.hadoop
.hbase
.client
.Table
;
40 import org
.apache
.hadoop
.hbase
.client
.TableDescriptorBuilder
;
41 import org
.apache
.hadoop
.hbase
.master
.cleaner
.TimeToLiveHFileCleaner
;
42 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
43 import org
.slf4j
.Logger
;
44 import org
.slf4j
.LoggerFactory
;
46 Reproduction for MOB data loss
48 1. Settings: Region Size 200 MB, Flush threshold 800 KB.
49 2. Insert 10 Million records
50 3. MOB Compaction and Archiver
51 a) Trigger MOB Compaction (every 2 minutes)
52 b) Trigger major compaction (every 2 minutes)
53 c) Trigger archive cleaner (every 3 minutes)
54 4. Validate MOB data after complete data load.
56 This class is used by MobStressTool only. This is not a unit test
59 @SuppressWarnings("deprecation")
60 public class MobStressToolRunner
{
61 private static final Logger LOG
= LoggerFactory
.getLogger(MobStressToolRunner
.class);
64 private HBaseTestingUtility HTU
;
66 private final static String famStr
= "f1";
67 private final static byte[] fam
= Bytes
.toBytes(famStr
);
68 private final static byte[] qualifier
= Bytes
.toBytes("q1");
69 private final static long mobLen
= 10;
70 private final static byte[] mobVal
= Bytes
71 .toBytes("01234567890123456789012345678901234567890123456789012345678901234567890123456789");
73 private Configuration conf
;
74 private TableDescriptorBuilder
.ModifyableTableDescriptor tableDescriptor
;
75 private ColumnFamilyDescriptorBuilder
.ModifyableColumnFamilyDescriptor familyDescriptor
;
77 private long count
= 500000;
78 private double failureProb
= 0.1;
79 private Table table
= null;
80 private MobFileCleanerChore chore
= new MobFileCleanerChore();
82 private static volatile boolean run
= true;
84 public MobStressToolRunner() {
88 public void init(Configuration conf
, long numRows
) throws IOException
{
93 tableDescriptor
= new TableDescriptorBuilder
.ModifyableTableDescriptor(
94 TableName
.valueOf("testMobCompactTable"));
95 Connection conn
= ConnectionFactory
.createConnection(this.conf
);
96 this.admin
= conn
.getAdmin();
97 this.familyDescriptor
= new ColumnFamilyDescriptorBuilder
.ModifyableColumnFamilyDescriptor(fam
);
98 this.familyDescriptor
.setMobEnabled(true);
99 this.familyDescriptor
.setMobThreshold(mobLen
);
100 this.familyDescriptor
.setMaxVersions(1);
101 this.tableDescriptor
.setColumnFamily(familyDescriptor
);
102 if (admin
.tableExists(tableDescriptor
.getTableName())) {
103 admin
.disableTable(tableDescriptor
.getTableName());
104 admin
.deleteTable(tableDescriptor
.getTableName());
106 admin
.createTable(tableDescriptor
);
107 table
= conn
.getTable(tableDescriptor
.getTableName());
110 private void printConf() {
111 LOG
.info("Please ensure the following HBase configuration is set:");
112 LOG
.info("hfile.format.version=3");
113 LOG
.info("hbase.master.hfilecleaner.ttl=0");
114 LOG
.info("hbase.hregion.max.filesize=200000000");
115 LOG
.info("hbase.client.retries.number=100");
116 LOG
.info("hbase.hregion.memstore.flush.size=800000");
117 LOG
.info("hbase.hstore.blockingStoreFiles=150");
118 LOG
.info("hbase.hstore.compaction.throughput.lower.bound=50000000");
119 LOG
.info("hbase.hstore.compaction.throughput.higher.bound=100000000");
120 LOG
.info("hbase.master.mob.cleaner.period=0");
121 LOG
.info("hbase.mob.default.compactor=org.apache.hadoop.hbase.mob.FaultyMobStoreCompactor");
122 LOG
.warn("hbase.mob.compaction.fault.probability=x, where x is between 0. and 1.");
126 private void initConf() {
128 conf
.setInt("hfile.format.version", 3);
129 conf
.setLong(TimeToLiveHFileCleaner
.TTL_CONF_KEY
, 0);
130 conf
.setInt("hbase.client.retries.number", 100);
131 conf
.setInt("hbase.hregion.max.filesize", 200000000);
132 conf
.setInt("hbase.hregion.memstore.flush.size", 800000);
133 conf
.setInt("hbase.hstore.blockingStoreFiles", 150);
134 conf
.setInt("hbase.hstore.compaction.throughput.lower.bound", 52428800);
135 conf
.setInt("hbase.hstore.compaction.throughput.higher.bound", 2 * 52428800);
136 conf
.setDouble("hbase.mob.compaction.fault.probability", failureProb
);
137 // conf.set(MobStoreEngine.DEFAULT_MOB_COMPACTOR_CLASS_KEY,
138 // FaultyMobStoreCompactor.class.getName());
139 conf
.setLong(MobConstants
.MOB_COMPACTION_CHORE_PERIOD
, 0);
140 conf
.setLong(MobConstants
.MOB_CLEANER_PERIOD
, 0);
141 conf
.setLong(MobConstants
.MIN_AGE_TO_ARCHIVE_KEY
, 120000);
142 conf
.set(MobConstants
.MOB_COMPACTION_TYPE_KEY
,
143 MobConstants
.OPTIMIZED_MOB_COMPACTION_TYPE
);
144 conf
.setLong(MobConstants
.MOB_COMPACTION_MAX_FILE_SIZE_KEY
, 1000000);
148 class MajorCompaction
implements Runnable
{
154 admin
.majorCompact(tableDescriptor
.getTableName(), fam
);
155 Thread
.sleep(120000);
156 } catch (Exception e
) {
157 LOG
.error("MOB Stress Test FAILED", e
);
164 class CleanMobAndArchive
implements Runnable
{
170 LOG
.info("MOB cleanup chore started ...");
171 chore
.cleanupObsoleteMobFiles(conf
, table
.getName());
172 LOG
.info("MOB cleanup chore finished");
174 Thread
.sleep(130000);
175 } catch (Exception e
) {
176 LOG
.error("CleanMobAndArchive", e
);
182 class WriteData
implements Runnable
{
184 private long rows
= -1;
186 public WriteData(long rows
) {
195 for (int i
= 0; i
< rows
; i
++) {
196 byte[] key
= Bytes
.toBytes(i
);
197 Put p
= new Put(key
);
198 p
.addColumn(fam
, qualifier
, Bytes
.add(key
,mobVal
));
200 if (i
% 10000 == 0) {
201 LOG
.info("LOADED=" + i
);
204 } catch (InterruptedException ee
) {
207 if (i
% 100000 == 0) {
211 admin
.flush(table
.getName());
213 } catch (Exception e
) {
214 LOG
.error("MOB Stress Test FAILED", e
);
220 public void runStressTest() throws InterruptedException
, IOException
{
224 Thread writeData
= new Thread(new WriteData(count
));
227 Thread majorcompact
= new Thread(new MajorCompaction());
228 majorcompact
.start();
230 Thread cleaner
= new Thread(new CleanMobAndArchive());
237 getNumberOfMobFiles(conf
, table
.getName(), new String(fam
));
238 LOG
.info("Waiting for write thread to finish ...");
241 chore
.cleanupObsoleteMobFiles(conf
, table
.getName());
242 getNumberOfMobFiles(conf
, table
.getName(), new String(fam
));
245 LOG
.info("Archive cleaner started ...");
246 // Call archive cleaner again
247 HTU
.getMiniHBaseCluster().getMaster().getHFileCleaner().choreForTesting();
248 LOG
.info("Archive cleaner finished");
255 admin
.disableTable(tableDescriptor
.getTableName());
256 admin
.deleteTable(tableDescriptor
.getTableName());
258 LOG
.info("MOB Stress Test finished OK");
263 private long getNumberOfMobFiles(Configuration conf
, TableName tableName
, String family
)
265 FileSystem fs
= FileSystem
.get(conf
);
266 Path dir
= MobUtils
.getMobFamilyPath(conf
, tableName
, family
);
267 FileStatus
[] stat
= fs
.listStatus(dir
);
269 for (FileStatus st
: stat
) {
270 LOG
.debug("MOB Directory content: {} len={}", st
.getPath(), st
.getLen());
273 LOG
.debug("MOB Directory content total files: {}, total size={}", stat
.length
, size
);
278 public void printStats(long loaded
) {
279 LOG
.info("MOB Stress Test: loaded=" + loaded
+ " compactions="
280 + FaultyMobStoreCompactor
.totalCompactions
.get() + " major="
281 + FaultyMobStoreCompactor
.totalMajorCompactions
.get() + " mob="
282 + FaultyMobStoreCompactor
.mobCounter
.get() + " injected failures="
283 + FaultyMobStoreCompactor
.totalFailures
.get());
286 private void scanTable() {
290 ResultScanner scanner
= table
.getScanner(fam
);
292 while ((result
= scanner
.next()) != null) {
293 byte[] key
= result
.getRow();
294 assertTrue(Arrays
.equals(result
.getValue(fam
, qualifier
),
295 Bytes
.add(key
,mobVal
)));
296 if (counter
% 10000 == 0) {
297 LOG
.info("GET=" + counter
+" key=" + Bytes
.toInt(key
));
302 assertEquals(count
, counter
);
303 } catch (Exception e
) {
305 LOG
.error("MOB Stress Test FAILED");