2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.wal
;
20 import static org
.junit
.Assert
.assertEquals
;
21 import static org
.junit
.Assert
.assertFalse
;
22 import static org
.junit
.Assert
.assertNull
;
23 import static org
.junit
.Assert
.assertTrue
;
25 import java
.io
.IOException
;
26 import java
.util
.HashSet
;
27 import java
.util
.NavigableMap
;
29 import java
.util
.TreeMap
;
30 import java
.util
.concurrent
.ThreadLocalRandom
;
31 import org
.apache
.hadoop
.conf
.Configuration
;
32 import org
.apache
.hadoop
.fs
.FileStatus
;
33 import org
.apache
.hadoop
.fs
.FileSystem
;
34 import org
.apache
.hadoop
.fs
.Path
;
35 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
36 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
37 import org
.apache
.hadoop
.hbase
.HConstants
;
38 import org
.apache
.hadoop
.hbase
.KeyValue
;
39 import org
.apache
.hadoop
.hbase
.ServerName
;
40 import org
.apache
.hadoop
.hbase
.TableName
;
41 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptorBuilder
;
42 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
43 import org
.apache
.hadoop
.hbase
.client
.RegionInfoBuilder
;
44 import org
.apache
.hadoop
.hbase
.client
.TableDescriptor
;
45 import org
.apache
.hadoop
.hbase
.client
.TableDescriptorBuilder
;
46 import org
.apache
.hadoop
.hbase
.regionserver
.MultiVersionConcurrencyControl
;
47 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
48 import org
.apache
.hadoop
.hbase
.testclassification
.RegionServerTests
;
49 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
50 import org
.apache
.hadoop
.hbase
.util
.CommonFSUtils
;
51 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
52 import org
.junit
.AfterClass
;
53 import org
.junit
.Before
;
54 import org
.junit
.BeforeClass
;
55 import org
.junit
.ClassRule
;
56 import org
.junit
.Rule
;
57 import org
.junit
.Test
;
58 import org
.junit
.experimental
.categories
.Category
;
59 import org
.junit
.rules
.TestName
;
60 import org
.slf4j
.Logger
;
61 import org
.slf4j
.LoggerFactory
;
63 @Category({RegionServerTests
.class, MediumTests
.class})
64 public class TestFSHLogProvider
{
67 public static final HBaseClassTestRule CLASS_RULE
=
68 HBaseClassTestRule
.forClass(TestFSHLogProvider
.class);
70 private static final Logger LOG
= LoggerFactory
.getLogger(TestFSHLogProvider
.class);
72 private static Configuration conf
;
73 private static FileSystem fs
;
74 private final static HBaseTestingUtil TEST_UTIL
= new HBaseTestingUtil();
75 private MultiVersionConcurrencyControl mvcc
;
78 public final TestName currentTest
= new TestName();
81 public void setUp() throws Exception
{
82 mvcc
= new MultiVersionConcurrencyControl();
83 FileStatus
[] entries
= fs
.listStatus(new Path("/"));
84 for (FileStatus dir
: entries
) {
85 fs
.delete(dir
.getPath(), true);
90 public static void setUpBeforeClass() throws Exception
{
91 // Make block sizes small.
92 TEST_UTIL
.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
93 // quicker heartbeat interval for faster DN death notification
94 TEST_UTIL
.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
95 TEST_UTIL
.getConfiguration().setInt("dfs.heartbeat.interval", 1);
96 TEST_UTIL
.getConfiguration().setInt("dfs.client.socket-timeout", 5000);
98 // faster failover with cluster.shutdown();fs.close() idiom
99 TEST_UTIL
.getConfiguration()
100 .setInt("hbase.ipc.client.connect.max.retries", 1);
101 TEST_UTIL
.getConfiguration().setInt(
102 "dfs.client.block.recovery.retries", 1);
103 TEST_UTIL
.getConfiguration().setInt(
104 "hbase.ipc.client.connection.maxidletime", 500);
105 TEST_UTIL
.startMiniDFSCluster(3);
107 // Set up a working space for our tests.
108 TEST_UTIL
.createRootDir();
109 conf
= TEST_UTIL
.getConfiguration();
110 fs
= TEST_UTIL
.getDFSCluster().getFileSystem();
114 public static void tearDownAfterClass() throws Exception
{
115 TEST_UTIL
.shutdownMiniCluster();
118 static String
getName() {
119 return "TestDefaultWALProvider";
123 public void testGetServerNameFromWALDirectoryName() throws IOException
{
124 ServerName sn
= ServerName
.valueOf("hn", 450, 1398);
125 String hl
= CommonFSUtils
.getRootDir(conf
) + "/" +
126 AbstractFSWALProvider
.getWALDirectoryName(sn
.toString());
128 // Must not throw exception
129 assertNull(AbstractFSWALProvider
.getServerNameFromWALDirectoryName(conf
, null));
130 assertNull(AbstractFSWALProvider
.getServerNameFromWALDirectoryName(conf
,
131 CommonFSUtils
.getRootDir(conf
).toUri().toString()));
132 assertNull(AbstractFSWALProvider
.getServerNameFromWALDirectoryName(conf
, ""));
133 assertNull(AbstractFSWALProvider
.getServerNameFromWALDirectoryName(conf
, " "));
134 assertNull(AbstractFSWALProvider
.getServerNameFromWALDirectoryName(conf
, hl
));
135 assertNull(AbstractFSWALProvider
.getServerNameFromWALDirectoryName(conf
, hl
+ "qdf"));
136 assertNull(AbstractFSWALProvider
.getServerNameFromWALDirectoryName(conf
, "sfqf" + hl
+ "qdf"));
138 final String wals
= "/WALs/";
139 ServerName parsed
= AbstractFSWALProvider
.getServerNameFromWALDirectoryName(conf
,
140 CommonFSUtils
.getRootDir(conf
).toUri().toString() + wals
+ sn
+
141 "/localhost%2C32984%2C1343316388997.1343316390417");
142 assertEquals("standard", sn
, parsed
);
144 parsed
= AbstractFSWALProvider
.getServerNameFromWALDirectoryName(conf
, hl
+ "/qdf");
145 assertEquals("subdir", sn
, parsed
);
147 parsed
= AbstractFSWALProvider
.getServerNameFromWALDirectoryName(conf
,
148 CommonFSUtils
.getRootDir(conf
).toUri().toString() + wals
+ sn
+
149 "-splitting/localhost%3A57020.1340474893931");
150 assertEquals("split", sn
, parsed
);
154 private void addEdits(WAL log
, RegionInfo hri
, TableDescriptor htd
, int times
,
155 NavigableMap
<byte[], Integer
> scopes
) throws IOException
{
156 final byte[] row
= Bytes
.toBytes("row");
157 for (int i
= 0; i
< times
; i
++) {
158 long timestamp
= EnvironmentEdgeManager
.currentTime();
159 WALEdit cols
= new WALEdit();
160 cols
.add(new KeyValue(row
, row
, row
, timestamp
, row
));
162 getWalKey(hri
.getEncodedNameAsBytes(), htd
.getTableName(), timestamp
, scopes
), cols
);
168 * used by TestDefaultWALProviderWithHLogKey
171 WALKeyImpl
getWalKey(final byte[] info
, final TableName tableName
, final long timestamp
,
172 NavigableMap
<byte[], Integer
> scopes
) {
173 return new WALKeyImpl(info
, tableName
, timestamp
, mvcc
, scopes
);
177 * helper method to simulate region flush for a WAL.
179 * @param regionEncodedName
181 protected void flushRegion(WAL wal
, byte[] regionEncodedName
, Set
<byte[]> flushedFamilyNames
) {
182 wal
.startCacheFlush(regionEncodedName
, flushedFamilyNames
);
183 wal
.completeCacheFlush(regionEncodedName
, HConstants
.NO_SEQNUM
);
187 public void testLogCleaning() throws Exception
{
188 LOG
.info(currentTest
.getMethodName());
189 TableDescriptor htd
=
190 TableDescriptorBuilder
.newBuilder(TableName
.valueOf(currentTest
.getMethodName()))
191 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of("row")).build();
192 TableDescriptor htd2
=
193 TableDescriptorBuilder
.newBuilder(TableName
.valueOf(currentTest
.getMethodName() + "2"))
194 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of("row")).build();
195 NavigableMap
<byte[], Integer
> scopes1
= new TreeMap
<>(Bytes
.BYTES_COMPARATOR
);
196 for (byte[] fam
: htd
.getColumnFamilyNames()) {
199 NavigableMap
<byte[], Integer
> scopes2
= new TreeMap
<>(Bytes
.BYTES_COMPARATOR
);
200 for (byte[] fam
: htd2
.getColumnFamilyNames()) {
203 Configuration localConf
= new Configuration(conf
);
204 localConf
.set(WALFactory
.WAL_PROVIDER
, FSHLogProvider
.class.getName());
205 WALFactory wals
= new WALFactory(localConf
, currentTest
.getMethodName());
207 RegionInfo hri
= RegionInfoBuilder
.newBuilder(htd
.getTableName()).build();
208 RegionInfo hri2
= RegionInfoBuilder
.newBuilder(htd2
.getTableName()).build();
209 // we want to mix edits from regions, so pick our own identifier.
210 WAL log
= wals
.getWAL(null);
212 // Add a single edit and make sure that rolling won't remove the file
213 // Before HBASE-3198 it used to delete it
214 addEdits(log
, hri
, htd
, 1, scopes1
);
216 assertEquals(1, AbstractFSWALProvider
.getNumRolledLogFiles(log
));
218 // See if there's anything wrong with more than 1 edit
219 addEdits(log
, hri
, htd
, 2, scopes1
);
221 assertEquals(2, FSHLogProvider
.getNumRolledLogFiles(log
));
223 // Now mix edits from 2 regions, still no flushing
224 addEdits(log
, hri
, htd
, 1, scopes1
);
225 addEdits(log
, hri2
, htd2
, 1, scopes2
);
226 addEdits(log
, hri
, htd
, 1, scopes1
);
227 addEdits(log
, hri2
, htd2
, 1, scopes2
);
229 assertEquals(3, AbstractFSWALProvider
.getNumRolledLogFiles(log
));
231 // Flush the first region, we expect to see the first two files getting
232 // archived. We need to append something or writer won't be rolled.
233 addEdits(log
, hri2
, htd2
, 1, scopes2
);
234 log
.startCacheFlush(hri
.getEncodedNameAsBytes(), htd
.getColumnFamilyNames());
235 log
.completeCacheFlush(hri
.getEncodedNameAsBytes(), HConstants
.NO_SEQNUM
);
237 int count
= AbstractFSWALProvider
.getNumRolledLogFiles(log
);
238 assertEquals(2, count
);
240 // Flush the second region, which removes all the remaining output files
241 // since the oldest was completely flushed and the two others only contain
243 addEdits(log
, hri2
, htd2
, 1, scopes2
);
244 log
.startCacheFlush(hri2
.getEncodedNameAsBytes(), htd2
.getColumnFamilyNames());
245 log
.completeCacheFlush(hri2
.getEncodedNameAsBytes(), HConstants
.NO_SEQNUM
);
247 assertEquals(0, AbstractFSWALProvider
.getNumRolledLogFiles(log
));
256 * Tests wal archiving by adding data, doing flushing/rolling and checking we archive old logs
257 * and also don't archive "live logs" (that is, a log with un-flushed entries).
259 * This is what it does:
260 * It creates two regions, and does a series of inserts along with log rolling.
261 * Whenever a WAL is rolled, HLogBase checks previous wals for archiving. A wal is eligible for
262 * archiving if for all the regions which have entries in that wal file, have flushed - past
263 * their maximum sequence id in that wal file.
265 * @throws IOException
268 public void testWALArchiving() throws IOException
{
269 LOG
.debug(currentTest
.getMethodName());
270 TableDescriptor table1
=
271 TableDescriptorBuilder
.newBuilder(TableName
.valueOf(currentTest
.getMethodName() + "1"))
272 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of("row")).build();
273 TableDescriptor table2
=
274 TableDescriptorBuilder
.newBuilder(TableName
.valueOf(currentTest
.getMethodName() + "2"))
275 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of("row")).build();
276 NavigableMap
<byte[], Integer
> scopes1
= new TreeMap
<>(Bytes
.BYTES_COMPARATOR
);
277 for (byte[] fam
: table1
.getColumnFamilyNames()) {
280 NavigableMap
<byte[], Integer
> scopes2
= new TreeMap
<>(Bytes
.BYTES_COMPARATOR
);
281 for (byte[] fam
: table2
.getColumnFamilyNames()) {
284 Configuration localConf
= new Configuration(conf
);
285 localConf
.set(WALFactory
.WAL_PROVIDER
, FSHLogProvider
.class.getName());
286 WALFactory wals
= new WALFactory(localConf
, currentTest
.getMethodName());
288 WAL wal
= wals
.getWAL(null);
289 assertEquals(0, AbstractFSWALProvider
.getNumRolledLogFiles(wal
));
290 RegionInfo hri1
= RegionInfoBuilder
.newBuilder(table1
.getTableName()).build();
291 RegionInfo hri2
= RegionInfoBuilder
.newBuilder(table2
.getTableName()).build();
292 // variables to mock region sequenceIds.
293 // start with the testing logic: insert a waledit, and roll writer
294 addEdits(wal
, hri1
, table1
, 1, scopes1
);
296 // assert that the wal is rolled
297 assertEquals(1, AbstractFSWALProvider
.getNumRolledLogFiles(wal
));
298 // add edits in the second wal file, and roll writer.
299 addEdits(wal
, hri1
, table1
, 1, scopes1
);
301 // assert that the wal is rolled
302 assertEquals(2, AbstractFSWALProvider
.getNumRolledLogFiles(wal
));
303 // add a waledit to table1, and flush the region.
304 addEdits(wal
, hri1
, table1
, 3, scopes1
);
305 flushRegion(wal
, hri1
.getEncodedNameAsBytes(), table1
.getColumnFamilyNames());
306 // roll log; all old logs should be archived.
308 assertEquals(0, AbstractFSWALProvider
.getNumRolledLogFiles(wal
));
309 // add an edit to table2, and roll writer
310 addEdits(wal
, hri2
, table2
, 1, scopes2
);
312 assertEquals(1, AbstractFSWALProvider
.getNumRolledLogFiles(wal
));
313 // add edits for table1, and roll writer
314 addEdits(wal
, hri1
, table1
, 2, scopes1
);
316 assertEquals(2, AbstractFSWALProvider
.getNumRolledLogFiles(wal
));
317 // add edits for table2, and flush hri1.
318 addEdits(wal
, hri2
, table2
, 2, scopes2
);
319 flushRegion(wal
, hri1
.getEncodedNameAsBytes(), table2
.getColumnFamilyNames());
320 // the log : region-sequenceId map is
321 // log1: region2 (unflushed)
322 // log2: region1 (flushed)
323 // log3: region2 (unflushed)
324 // roll the writer; log2 should be archived.
326 assertEquals(2, AbstractFSWALProvider
.getNumRolledLogFiles(wal
));
327 // flush region2, and all logs should be archived.
328 addEdits(wal
, hri2
, table2
, 2, scopes2
);
329 flushRegion(wal
, hri2
.getEncodedNameAsBytes(), table2
.getColumnFamilyNames());
331 assertEquals(0, AbstractFSWALProvider
.getNumRolledLogFiles(wal
));
340 * Write to a log file with three concurrent threads and verifying all data is written.
344 public void testConcurrentWrites() throws Exception
{
345 // Run the WPE tool with three threads writing 3000 edits each concurrently.
346 // When done, verify that all edits were written.
347 int errCode
= WALPerformanceEvaluation
.
348 innerMain(new Configuration(TEST_UTIL
.getConfiguration()),
349 new String
[] {"-threads", "3", "-verify", "-noclosefs", "-iterations", "3000"});
350 assertEquals(0, errCode
);
354 * Ensure that we can use Set.add to deduplicate WALs
357 public void setMembershipDedups() throws IOException
{
358 Configuration localConf
= new Configuration(conf
);
359 localConf
.set(WALFactory
.WAL_PROVIDER
, FSHLogProvider
.class.getName());
360 WALFactory wals
= new WALFactory(localConf
, currentTest
.getMethodName());
362 final Set
<WAL
> seen
= new HashSet
<>(1);
363 assertTrue("first attempt to add WAL from default provider should work.",
364 seen
.add(wals
.getWAL(null)));
365 for (int i
= 0; i
< 1000; i
++) {
367 "default wal provider is only supposed to return a single wal, which should " +
368 "compare as .equals itself.",
369 seen
.add(wals
.getWAL(RegionInfoBuilder
370 .newBuilder(TableName
.valueOf("Table-" + ThreadLocalRandom
.current().nextInt()))