2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.regionserver
;
20 import static org
.junit
.Assert
.fail
;
22 import java
.io
.IOException
;
23 import java
.util
.ArrayList
;
24 import java
.util
.Arrays
;
25 import java
.util
.List
;
26 import java
.util
.concurrent
.CountDownLatch
;
27 import org
.apache
.hadoop
.conf
.Configuration
;
28 import org
.apache
.hadoop
.fs
.FileSystem
;
29 import org
.apache
.hadoop
.fs
.Path
;
30 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
31 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
32 import org
.apache
.hadoop
.hbase
.HConstants
;
33 import org
.apache
.hadoop
.hbase
.TableName
;
34 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptorBuilder
;
35 import org
.apache
.hadoop
.hbase
.client
.Increment
;
36 import org
.apache
.hadoop
.hbase
.client
.Mutation
;
37 import org
.apache
.hadoop
.hbase
.client
.Put
;
38 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
39 import org
.apache
.hadoop
.hbase
.client
.RegionInfoBuilder
;
40 import org
.apache
.hadoop
.hbase
.client
.TableDescriptor
;
41 import org
.apache
.hadoop
.hbase
.client
.TableDescriptorBuilder
;
42 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.AbstractFSWAL
;
43 import org
.apache
.hadoop
.hbase
.testclassification
.RegionServerTests
;
44 import org
.apache
.hadoop
.hbase
.testclassification
.SmallTests
;
45 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
46 import org
.apache
.hadoop
.hbase
.util
.CommonFSUtils
;
47 import org
.apache
.hadoop
.hbase
.wal
.WAL
;
48 import org
.apache
.hadoop
.hbase
.wal
.WALEdit
;
49 import org
.apache
.hadoop
.hbase
.wal
.WALFactory
;
50 import org
.junit
.After
;
51 import org
.junit
.AfterClass
;
52 import org
.junit
.Before
;
53 import org
.junit
.ClassRule
;
54 import org
.junit
.Rule
;
55 import org
.junit
.Test
;
56 import org
.junit
.experimental
.categories
.Category
;
57 import org
.junit
.rules
.TestName
;
58 import org
.junit
.runner
.RunWith
;
59 import org
.junit
.runners
.Parameterized
;
60 import org
.junit
.runners
.Parameterized
.Parameter
;
61 import org
.junit
.runners
.Parameterized
.Parameters
;
62 import org
.slf4j
.Logger
;
63 import org
.slf4j
.LoggerFactory
;
66 * Test for HBASE-17471.
68 * MVCCPreAssign is added by HBASE-16698, but pre-assign mvcc is only used in put/delete path. Other
69 * write paths like increment/append still assign mvcc in ringbuffer's consumer thread. If put and
70 * increment are used parallel. Then seqid in WAL may not increase monotonically Disorder in wals
71 * will lead to data loss.
73 * This case use two thread to put and increment at the same time in a single region. Then check the
74 * seqid in WAL. If seqid is wal is not monotonically increasing, this case will fail
76 @RunWith(Parameterized
.class)
77 @Category({ RegionServerTests
.class, SmallTests
.class })
78 public class TestWALMonotonicallyIncreasingSeqId
{
81 public static final HBaseClassTestRule CLASS_RULE
=
82 HBaseClassTestRule
.forClass(TestWALMonotonicallyIncreasingSeqId
.class);
84 private final Logger LOG
= LoggerFactory
.getLogger(getClass());
85 private final static HBaseTestingUtil TEST_UTIL
= new HBaseTestingUtil();
86 private static Path testDir
= TEST_UTIL
.getDataTestDir("TestWALMonotonicallyIncreasingSeqId");
87 private WALFactory wals
;
88 private FileSystem fileSystem
;
89 private Configuration walConf
;
90 private HRegion region
;
93 public String walProvider
;
96 public TestName name
= new TestName();
98 @Parameters(name
= "{index}: wal={0}")
99 public static List
<Object
[]> data() {
100 return Arrays
.asList(new Object
[] { "asyncfs" }, new Object
[] { "filesystem" });
103 private TableDescriptor
getTableDesc(TableName tableName
, byte[]... families
) {
104 TableDescriptorBuilder builder
= TableDescriptorBuilder
.newBuilder(tableName
);
105 Arrays
.stream(families
).map(
106 f
-> ColumnFamilyDescriptorBuilder
.newBuilder(f
).setMaxVersions(Integer
.MAX_VALUE
).build())
107 .forEachOrdered(builder
::setColumnFamily
);
108 return builder
.build();
111 private HRegion
initHRegion(TableDescriptor htd
, byte[] startKey
, byte[] stopKey
, int replicaId
)
113 Configuration conf
= TEST_UTIL
.getConfiguration();
114 conf
.set("hbase.wal.provider", walProvider
);
115 conf
.setBoolean("hbase.hregion.mvcc.preassign", false);
116 Path tableDir
= CommonFSUtils
.getTableDir(testDir
, htd
.getTableName());
118 RegionInfo info
= RegionInfoBuilder
.newBuilder(htd
.getTableName()).setStartKey(startKey
)
119 .setEndKey(stopKey
).setReplicaId(replicaId
).setRegionId(0).build();
120 fileSystem
= tableDir
.getFileSystem(conf
);
121 final Configuration walConf
= new Configuration(conf
);
122 CommonFSUtils
.setRootDir(walConf
, tableDir
);
123 this.walConf
= walConf
;
124 wals
= new WALFactory(walConf
, "log_" + replicaId
);
125 ChunkCreator
.initialize(MemStoreLAB
.CHUNK_SIZE_DEFAULT
, false, 0, 0,
126 0, null, MemStoreLAB
.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT
);
127 HRegion region
= HRegion
.createHRegion(info
, TEST_UTIL
.getDefaultRootDirPath(), conf
, htd
,
132 CountDownLatch latch
= new CountDownLatch(1);
134 public class PutThread
extends Thread
{
137 public PutThread(HRegion region
) {
138 this.region
= region
;
144 for (int i
= 0; i
< 100; i
++) {
145 byte[] row
= Bytes
.toBytes("putRow" + i
);
146 Put put
= new Put(row
);
147 put
.addColumn(Bytes
.toBytes("cf"), Bytes
.toBytes(0), new byte[0]);
149 region
.batchMutate(new Mutation
[] { put
});
153 } catch (Throwable t
) {
154 LOG
.warn("Error happend when Increment: ", t
);
159 public class IncThread
extends Thread
{
162 public IncThread(HRegion region
) {
163 this.region
= region
;
169 for (int i
= 0; i
< 100; i
++) {
170 byte[] row
= Bytes
.toBytes("incrementRow" + i
);
171 Increment inc
= new Increment(row
);
172 inc
.addColumn(Bytes
.toBytes("cf"), Bytes
.toBytes(0), 1);
173 // inc.setDurability(Durability.ASYNC_WAL);
174 region
.increment(inc
);
179 } catch (Throwable t
) {
180 LOG
.warn("Error happend when Put: ", t
);
186 public void setUp() throws IOException
{
187 byte[][] families
= new byte[][] { Bytes
.toBytes("cf") };
188 TableDescriptor htd
= getTableDesc(
189 TableName
.valueOf(name
.getMethodName().replaceAll("[^0-9A-Za-z_]", "_")), families
);
190 region
= initHRegion(htd
, HConstants
.EMPTY_START_ROW
, HConstants
.EMPTY_END_ROW
, 0);
194 public void tearDown() throws IOException
{
195 if (region
!= null) {
201 public static void tearDownAfterClass() throws IOException
{
202 TEST_UTIL
.cleanupTestDir();
205 private WAL
.Reader
createReader(Path logPath
, Path oldWalsDir
) throws IOException
{
207 return wals
.createReader(fileSystem
, logPath
);
208 } catch (IOException e
) {
209 return wals
.createReader(fileSystem
, new Path(oldWalsDir
, logPath
.getName()));
214 public void testWALMonotonicallyIncreasingSeqId() throws Exception
{
215 List
<Thread
> putThreads
= new ArrayList
<>();
216 for (int i
= 0; i
< 1; i
++) {
217 putThreads
.add(new PutThread(region
));
219 IncThread incThread
= new IncThread(region
);
220 for (int i
= 0; i
< 1; i
++) {
221 putThreads
.get(i
).start();
226 Path logPath
= ((AbstractFSWAL
<?
>) region
.getWAL()).getCurrentFileName();
227 region
.getWAL().rollWriter();
229 Path hbaseDir
= new Path(walConf
.get(HConstants
.HBASE_DIR
));
230 Path oldWalsDir
= new Path(hbaseDir
, HConstants
.HREGION_OLDLOGDIR_NAME
);
231 try (WAL
.Reader reader
= createReader(logPath
, oldWalsDir
)) {
232 long currentMaxSeqid
= 0;
233 for (WAL
.Entry e
; (e
= reader
.next()) != null;) {
234 if (!WALEdit
.isMetaEditFamily(e
.getEdit().getCells().get(0))) {
235 long currentSeqid
= e
.getKey().getSequenceId();
236 if (currentSeqid
> currentMaxSeqid
) {
237 currentMaxSeqid
= currentSeqid
;
239 fail("Current max Seqid is " + currentMaxSeqid
+
240 ", but the next seqid in wal is smaller:" + currentSeqid
);