2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.junit
.Assert
.assertArrayEquals
;
21 import static org
.junit
.Assert
.assertEquals
;
22 import static org
.junit
.Assert
.assertNull
;
24 import java
.io
.IOException
;
25 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
26 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
27 import org
.apache
.hadoop
.hbase
.TableName
;
28 import org
.apache
.hadoop
.hbase
.regionserver
.HRegionServer
;
29 import org
.apache
.hadoop
.hbase
.testclassification
.ClientTests
;
30 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
31 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
32 import org
.junit
.AfterClass
;
33 import org
.junit
.Before
;
34 import org
.junit
.BeforeClass
;
35 import org
.junit
.ClassRule
;
36 import org
.junit
.Rule
;
37 import org
.junit
.Test
;
38 import org
.junit
.experimental
.categories
.Category
;
39 import org
.junit
.rules
.TestName
;
41 @Category({ LargeTests
.class, ClientTests
.class })
42 public class TestMvccConsistentScanner
{
45 public static final HBaseClassTestRule CLASS_RULE
=
46 HBaseClassTestRule
.forClass(TestMvccConsistentScanner
.class);
48 private static final HBaseTestingUtility UTIL
= new HBaseTestingUtility();
50 private static Connection CONN
;
52 private static final byte[] CF
= Bytes
.toBytes("cf");
54 private static final byte[] CQ1
= Bytes
.toBytes("cq1");
56 private static final byte[] CQ2
= Bytes
.toBytes("cq2");
58 private static final byte[] CQ3
= Bytes
.toBytes("cq3");
60 public TestName testName
= new TestName();
62 private TableName tableName
;
65 public static void setUpBeforeClass() throws Exception
{
66 UTIL
.startMiniCluster(2);
67 CONN
= ConnectionFactory
.createConnection(UTIL
.getConfiguration());
71 public static void tearDownAfterClass() throws Exception
{
73 UTIL
.shutdownMiniCluster();
77 public void setUp() throws IOException
, InterruptedException
{
78 tableName
= TableName
.valueOf(testName
.getMethodName().replaceAll("[^0-9a-zA-Z]", "_"));
79 UTIL
.createTable(tableName
, CF
);
80 UTIL
.waitTableAvailable(tableName
);
83 private void put(byte[] row
, byte[] cq
, byte[] value
) throws IOException
{
84 try (Table table
= CONN
.getTable(tableName
)) {
85 table
.put(new Put(row
).addColumn(CF
, cq
, value
));
89 private void move() throws IOException
, InterruptedException
{
91 UTIL
.getHBaseCluster().getRegions(tableName
).stream().findAny().get().getRegionInfo();
93 UTIL
.getHBaseCluster().getRegionServerThreads().stream().map(t
-> t
.getRegionServer())
94 .filter(r
-> !r
.getOnlineTables().contains(tableName
)).findAny().get();
95 UTIL
.getAdmin().move(region
.getEncodedNameAsBytes(),
96 Bytes
.toBytes(rs
.getServerName().getServerName()));
97 while (UTIL
.getRSForFirstRegionInTable(tableName
) != rs
) {
103 public void testRowAtomic() throws IOException
, InterruptedException
{
104 byte[] row
= Bytes
.toBytes("row");
105 put(row
, CQ1
, Bytes
.toBytes(1));
106 put(row
, CQ2
, Bytes
.toBytes(2));
107 try (Table table
= CONN
.getTable(tableName
);
108 ResultScanner scanner
= table
.getScanner(new Scan().setBatch(1).setCaching(1))) {
109 Result result
= scanner
.next();
110 assertEquals(1, result
.rawCells().length
);
111 assertEquals(1, Bytes
.toInt(result
.getValue(CF
, CQ1
)));
113 put(row
, CQ3
, Bytes
.toBytes(3));
114 result
= scanner
.next();
115 assertEquals(1, result
.rawCells().length
);
116 assertEquals(2, Bytes
.toInt(result
.getValue(CF
, CQ2
)));
117 assertNull(scanner
.next());
122 public void testCrossRowAtomicInRegion() throws IOException
, InterruptedException
{
123 put(Bytes
.toBytes("row1"), CQ1
, Bytes
.toBytes(1));
124 put(Bytes
.toBytes("row2"), CQ1
, Bytes
.toBytes(2));
125 try (Table table
= CONN
.getTable(tableName
);
126 ResultScanner scanner
= table
.getScanner(new Scan().setCaching(1))) {
127 Result result
= scanner
.next();
128 assertArrayEquals(Bytes
.toBytes("row1"), result
.getRow());
129 assertEquals(1, Bytes
.toInt(result
.getValue(CF
, CQ1
)));
131 put(Bytes
.toBytes("row3"), CQ1
, Bytes
.toBytes(3));
132 result
= scanner
.next();
133 assertArrayEquals(Bytes
.toBytes("row2"), result
.getRow());
134 assertEquals(2, Bytes
.toInt(result
.getValue(CF
, CQ1
)));
135 assertNull(scanner
.next());