2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.wal
;
20 import static org
.junit
.Assert
.assertEquals
;
21 import static org
.junit
.Assert
.assertFalse
;
22 import static org
.junit
.Assert
.assertNull
;
23 import static org
.junit
.Assert
.assertTrue
;
25 import java
.io
.IOException
;
26 import java
.util
.HashSet
;
27 import java
.util
.NavigableMap
;
29 import java
.util
.TreeMap
;
30 import java
.util
.concurrent
.ThreadLocalRandom
;
31 import org
.apache
.hadoop
.conf
.Configuration
;
32 import org
.apache
.hadoop
.fs
.FileStatus
;
33 import org
.apache
.hadoop
.fs
.FileSystem
;
34 import org
.apache
.hadoop
.fs
.Path
;
35 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
36 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
37 import org
.apache
.hadoop
.hbase
.KeyValue
;
38 import org
.apache
.hadoop
.hbase
.ServerName
;
39 import org
.apache
.hadoop
.hbase
.TableName
;
40 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptorBuilder
;
41 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
42 import org
.apache
.hadoop
.hbase
.client
.RegionInfoBuilder
;
43 import org
.apache
.hadoop
.hbase
.client
.TableDescriptor
;
44 import org
.apache
.hadoop
.hbase
.client
.TableDescriptorBuilder
;
45 import org
.apache
.hadoop
.hbase
.regionserver
.MultiVersionConcurrencyControl
;
46 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
47 import org
.apache
.hadoop
.hbase
.testclassification
.RegionServerTests
;
48 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
49 import org
.apache
.hadoop
.hbase
.util
.FSUtils
;
50 import org
.junit
.AfterClass
;
51 import org
.junit
.Before
;
52 import org
.junit
.BeforeClass
;
53 import org
.junit
.ClassRule
;
54 import org
.junit
.Rule
;
55 import org
.junit
.Test
;
56 import org
.junit
.experimental
.categories
.Category
;
57 import org
.junit
.rules
.TestName
;
58 import org
.slf4j
.Logger
;
59 import org
.slf4j
.LoggerFactory
;
61 @Category({RegionServerTests
.class, MediumTests
.class})
62 public class TestFSHLogProvider
{
65 public static final HBaseClassTestRule CLASS_RULE
=
66 HBaseClassTestRule
.forClass(TestFSHLogProvider
.class);
68 private static final Logger LOG
= LoggerFactory
.getLogger(TestFSHLogProvider
.class);
70 private static Configuration conf
;
71 private static FileSystem fs
;
72 private final static HBaseTestingUtility TEST_UTIL
= new HBaseTestingUtility();
73 private MultiVersionConcurrencyControl mvcc
;
76 public final TestName currentTest
= new TestName();
79 public void setUp() throws Exception
{
80 mvcc
= new MultiVersionConcurrencyControl();
81 FileStatus
[] entries
= fs
.listStatus(new Path("/"));
82 for (FileStatus dir
: entries
) {
83 fs
.delete(dir
.getPath(), true);
88 public static void setUpBeforeClass() throws Exception
{
89 // Make block sizes small.
90 TEST_UTIL
.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
91 // quicker heartbeat interval for faster DN death notification
92 TEST_UTIL
.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
93 TEST_UTIL
.getConfiguration().setInt("dfs.heartbeat.interval", 1);
94 TEST_UTIL
.getConfiguration().setInt("dfs.client.socket-timeout", 5000);
96 // faster failover with cluster.shutdown();fs.close() idiom
97 TEST_UTIL
.getConfiguration()
98 .setInt("hbase.ipc.client.connect.max.retries", 1);
99 TEST_UTIL
.getConfiguration().setInt(
100 "dfs.client.block.recovery.retries", 1);
101 TEST_UTIL
.getConfiguration().setInt(
102 "hbase.ipc.client.connection.maxidletime", 500);
103 TEST_UTIL
.startMiniDFSCluster(3);
105 // Set up a working space for our tests.
106 TEST_UTIL
.createRootDir();
107 conf
= TEST_UTIL
.getConfiguration();
108 fs
= TEST_UTIL
.getDFSCluster().getFileSystem();
112 public static void tearDownAfterClass() throws Exception
{
113 TEST_UTIL
.shutdownMiniCluster();
116 static String
getName() {
117 return "TestDefaultWALProvider";
121 public void testGetServerNameFromWALDirectoryName() throws IOException
{
122 ServerName sn
= ServerName
.valueOf("hn", 450, 1398);
123 String hl
= FSUtils
.getRootDir(conf
) + "/" +
124 AbstractFSWALProvider
.getWALDirectoryName(sn
.toString());
126 // Must not throw exception
127 assertNull(AbstractFSWALProvider
.getServerNameFromWALDirectoryName(conf
, null));
128 assertNull(AbstractFSWALProvider
.getServerNameFromWALDirectoryName(conf
,
129 FSUtils
.getRootDir(conf
).toUri().toString()));
130 assertNull(AbstractFSWALProvider
.getServerNameFromWALDirectoryName(conf
, ""));
131 assertNull(AbstractFSWALProvider
.getServerNameFromWALDirectoryName(conf
, " "));
132 assertNull(AbstractFSWALProvider
.getServerNameFromWALDirectoryName(conf
, hl
));
133 assertNull(AbstractFSWALProvider
.getServerNameFromWALDirectoryName(conf
, hl
+ "qdf"));
134 assertNull(AbstractFSWALProvider
.getServerNameFromWALDirectoryName(conf
, "sfqf" + hl
+ "qdf"));
136 final String wals
= "/WALs/";
137 ServerName parsed
= AbstractFSWALProvider
.getServerNameFromWALDirectoryName(conf
,
138 FSUtils
.getRootDir(conf
).toUri().toString() + wals
+ sn
+
139 "/localhost%2C32984%2C1343316388997.1343316390417");
140 assertEquals("standard", sn
, parsed
);
142 parsed
= AbstractFSWALProvider
.getServerNameFromWALDirectoryName(conf
, hl
+ "/qdf");
143 assertEquals("subdir", sn
, parsed
);
145 parsed
= AbstractFSWALProvider
.getServerNameFromWALDirectoryName(conf
,
146 FSUtils
.getRootDir(conf
).toUri().toString() + wals
+ sn
+
147 "-splitting/localhost%3A57020.1340474893931");
148 assertEquals("split", sn
, parsed
);
152 private void addEdits(WAL log
, RegionInfo hri
, TableDescriptor htd
, int times
,
153 NavigableMap
<byte[], Integer
> scopes
) throws IOException
{
154 final byte[] row
= Bytes
.toBytes("row");
155 for (int i
= 0; i
< times
; i
++) {
156 long timestamp
= System
.currentTimeMillis();
157 WALEdit cols
= new WALEdit();
158 cols
.add(new KeyValue(row
, row
, row
, timestamp
, row
));
159 log
.append(hri
, getWalKey(hri
.getEncodedNameAsBytes(), htd
.getTableName(), timestamp
, scopes
),
166 * used by TestDefaultWALProviderWithHLogKey
169 WALKeyImpl
getWalKey(final byte[] info
, final TableName tableName
, final long timestamp
,
170 NavigableMap
<byte[], Integer
> scopes
) {
171 return new WALKeyImpl(info
, tableName
, timestamp
, mvcc
, scopes
);
175 * helper method to simulate region flush for a WAL.
177 * @param regionEncodedName
179 protected void flushRegion(WAL wal
, byte[] regionEncodedName
, Set
<byte[]> flushedFamilyNames
) {
180 wal
.startCacheFlush(regionEncodedName
, flushedFamilyNames
);
181 wal
.completeCacheFlush(regionEncodedName
);
185 public void testLogCleaning() throws Exception
{
186 LOG
.info(currentTest
.getMethodName());
187 TableDescriptor htd
=
188 TableDescriptorBuilder
.newBuilder(TableName
.valueOf(currentTest
.getMethodName()))
189 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of("row")).build();
190 TableDescriptor htd2
=
191 TableDescriptorBuilder
.newBuilder(TableName
.valueOf(currentTest
.getMethodName() + "2"))
192 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of("row")).build();
193 NavigableMap
<byte[], Integer
> scopes1
= new TreeMap
<>(Bytes
.BYTES_COMPARATOR
);
194 for (byte[] fam
: htd
.getColumnFamilyNames()) {
197 NavigableMap
<byte[], Integer
> scopes2
= new TreeMap
<>(Bytes
.BYTES_COMPARATOR
);
198 for (byte[] fam
: htd2
.getColumnFamilyNames()) {
201 Configuration localConf
= new Configuration(conf
);
202 localConf
.set(WALFactory
.WAL_PROVIDER
, FSHLogProvider
.class.getName());
203 WALFactory wals
= new WALFactory(localConf
, currentTest
.getMethodName());
205 RegionInfo hri
= RegionInfoBuilder
.newBuilder(htd
.getTableName()).build();
206 RegionInfo hri2
= RegionInfoBuilder
.newBuilder(htd2
.getTableName()).build();
207 // we want to mix edits from regions, so pick our own identifier.
208 WAL log
= wals
.getWAL(null);
210 // Add a single edit and make sure that rolling won't remove the file
211 // Before HBASE-3198 it used to delete it
212 addEdits(log
, hri
, htd
, 1, scopes1
);
214 assertEquals(1, AbstractFSWALProvider
.getNumRolledLogFiles(log
));
216 // See if there's anything wrong with more than 1 edit
217 addEdits(log
, hri
, htd
, 2, scopes1
);
219 assertEquals(2, FSHLogProvider
.getNumRolledLogFiles(log
));
221 // Now mix edits from 2 regions, still no flushing
222 addEdits(log
, hri
, htd
, 1, scopes1
);
223 addEdits(log
, hri2
, htd2
, 1, scopes2
);
224 addEdits(log
, hri
, htd
, 1, scopes1
);
225 addEdits(log
, hri2
, htd2
, 1, scopes2
);
227 assertEquals(3, AbstractFSWALProvider
.getNumRolledLogFiles(log
));
229 // Flush the first region, we expect to see the first two files getting
230 // archived. We need to append something or writer won't be rolled.
231 addEdits(log
, hri2
, htd2
, 1, scopes2
);
232 log
.startCacheFlush(hri
.getEncodedNameAsBytes(), htd
.getColumnFamilyNames());
233 log
.completeCacheFlush(hri
.getEncodedNameAsBytes());
235 int count
= AbstractFSWALProvider
.getNumRolledLogFiles(log
);
236 assertEquals(2, count
);
238 // Flush the second region, which removes all the remaining output files
239 // since the oldest was completely flushed and the two others only contain
241 addEdits(log
, hri2
, htd2
, 1, scopes2
);
242 log
.startCacheFlush(hri2
.getEncodedNameAsBytes(), htd2
.getColumnFamilyNames());
243 log
.completeCacheFlush(hri2
.getEncodedNameAsBytes());
245 assertEquals(0, AbstractFSWALProvider
.getNumRolledLogFiles(log
));
254 * Tests wal archiving by adding data, doing flushing/rolling and checking we archive old logs
255 * and also don't archive "live logs" (that is, a log with un-flushed entries).
257 * This is what it does:
258 * It creates two regions, and does a series of inserts along with log rolling.
259 * Whenever a WAL is rolled, HLogBase checks previous wals for archiving. A wal is eligible for
260 * archiving if for all the regions which have entries in that wal file, have flushed - past
261 * their maximum sequence id in that wal file.
263 * @throws IOException
266 public void testWALArchiving() throws IOException
{
267 LOG
.debug(currentTest
.getMethodName());
268 TableDescriptor table1
=
269 TableDescriptorBuilder
.newBuilder(TableName
.valueOf(currentTest
.getMethodName() + "1"))
270 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of("row")).build();
271 TableDescriptor table2
=
272 TableDescriptorBuilder
.newBuilder(TableName
.valueOf(currentTest
.getMethodName() + "2"))
273 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of("row")).build();
274 NavigableMap
<byte[], Integer
> scopes1
= new TreeMap
<>(Bytes
.BYTES_COMPARATOR
);
275 for (byte[] fam
: table1
.getColumnFamilyNames()) {
278 NavigableMap
<byte[], Integer
> scopes2
= new TreeMap
<>(Bytes
.BYTES_COMPARATOR
);
279 for (byte[] fam
: table2
.getColumnFamilyNames()) {
282 Configuration localConf
= new Configuration(conf
);
283 localConf
.set(WALFactory
.WAL_PROVIDER
, FSHLogProvider
.class.getName());
284 WALFactory wals
= new WALFactory(localConf
, currentTest
.getMethodName());
286 WAL wal
= wals
.getWAL(null);
287 assertEquals(0, AbstractFSWALProvider
.getNumRolledLogFiles(wal
));
288 RegionInfo hri1
= RegionInfoBuilder
.newBuilder(table1
.getTableName()).build();
289 RegionInfo hri2
= RegionInfoBuilder
.newBuilder(table2
.getTableName()).build();
290 // variables to mock region sequenceIds.
291 // start with the testing logic: insert a waledit, and roll writer
292 addEdits(wal
, hri1
, table1
, 1, scopes1
);
294 // assert that the wal is rolled
295 assertEquals(1, AbstractFSWALProvider
.getNumRolledLogFiles(wal
));
296 // add edits in the second wal file, and roll writer.
297 addEdits(wal
, hri1
, table1
, 1, scopes1
);
299 // assert that the wal is rolled
300 assertEquals(2, AbstractFSWALProvider
.getNumRolledLogFiles(wal
));
301 // add a waledit to table1, and flush the region.
302 addEdits(wal
, hri1
, table1
, 3, scopes1
);
303 flushRegion(wal
, hri1
.getEncodedNameAsBytes(), table1
.getColumnFamilyNames());
304 // roll log; all old logs should be archived.
306 assertEquals(0, AbstractFSWALProvider
.getNumRolledLogFiles(wal
));
307 // add an edit to table2, and roll writer
308 addEdits(wal
, hri2
, table2
, 1, scopes2
);
310 assertEquals(1, AbstractFSWALProvider
.getNumRolledLogFiles(wal
));
311 // add edits for table1, and roll writer
312 addEdits(wal
, hri1
, table1
, 2, scopes1
);
314 assertEquals(2, AbstractFSWALProvider
.getNumRolledLogFiles(wal
));
315 // add edits for table2, and flush hri1.
316 addEdits(wal
, hri2
, table2
, 2, scopes2
);
317 flushRegion(wal
, hri1
.getEncodedNameAsBytes(), table2
.getColumnFamilyNames());
318 // the log : region-sequenceId map is
319 // log1: region2 (unflushed)
320 // log2: region1 (flushed)
321 // log3: region2 (unflushed)
322 // roll the writer; log2 should be archived.
324 assertEquals(2, AbstractFSWALProvider
.getNumRolledLogFiles(wal
));
325 // flush region2, and all logs should be archived.
326 addEdits(wal
, hri2
, table2
, 2, scopes2
);
327 flushRegion(wal
, hri2
.getEncodedNameAsBytes(), table2
.getColumnFamilyNames());
329 assertEquals(0, AbstractFSWALProvider
.getNumRolledLogFiles(wal
));
338 * Write to a log file with three concurrent threads and verifying all data is written.
342 public void testConcurrentWrites() throws Exception
{
343 // Run the WPE tool with three threads writing 3000 edits each concurrently.
344 // When done, verify that all edits were written.
345 int errCode
= WALPerformanceEvaluation
.
346 innerMain(new Configuration(TEST_UTIL
.getConfiguration()),
347 new String
[] {"-threads", "3", "-verify", "-noclosefs", "-iterations", "3000"});
348 assertEquals(0, errCode
);
352 * Ensure that we can use Set.add to deduplicate WALs
355 public void setMembershipDedups() throws IOException
{
356 Configuration localConf
= new Configuration(conf
);
357 localConf
.set(WALFactory
.WAL_PROVIDER
, FSHLogProvider
.class.getName());
358 WALFactory wals
= new WALFactory(localConf
, currentTest
.getMethodName());
360 final Set
<WAL
> seen
= new HashSet
<>(1);
361 assertTrue("first attempt to add WAL from default provider should work.",
362 seen
.add(wals
.getWAL(null)));
363 for (int i
= 0; i
< 1000; i
++) {
365 "default wal provider is only supposed to return a single wal, which should " +
366 "compare as .equals itself.",
367 seen
.add(wals
.getWAL(RegionInfoBuilder
368 .newBuilder(TableName
.valueOf("Table-" + ThreadLocalRandom
.current().nextInt()))