2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.junit
.Assert
.assertArrayEquals
;
21 import static org
.junit
.Assert
.assertEquals
;
22 import static org
.junit
.Assert
.assertNull
;
23 import java
.io
.IOException
;
24 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
25 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
26 import org
.apache
.hadoop
.hbase
.TableName
;
27 import org
.apache
.hadoop
.hbase
.regionserver
.HRegionServer
;
28 import org
.apache
.hadoop
.hbase
.testclassification
.ClientTests
;
29 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
30 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
31 import org
.junit
.AfterClass
;
32 import org
.junit
.Before
;
33 import org
.junit
.BeforeClass
;
34 import org
.junit
.ClassRule
;
35 import org
.junit
.Rule
;
36 import org
.junit
.Test
;
37 import org
.junit
.experimental
.categories
.Category
;
38 import org
.junit
.rules
.TestName
;
40 @Category({ MediumTests
.class, ClientTests
.class })
41 public class TestMvccConsistentScanner
{
44 public static final HBaseClassTestRule CLASS_RULE
=
45 HBaseClassTestRule
.forClass(TestMvccConsistentScanner
.class);
47 private static final HBaseTestingUtil UTIL
= new HBaseTestingUtil();
49 private static Connection CONN
;
51 private static final byte[] CF
= Bytes
.toBytes("cf");
53 private static final byte[] CQ1
= Bytes
.toBytes("cq1");
55 private static final byte[] CQ2
= Bytes
.toBytes("cq2");
57 private static final byte[] CQ3
= Bytes
.toBytes("cq3");
59 public TestName testName
= new TestName();
61 private TableName tableName
;
64 public static void setUpBeforeClass() throws Exception
{
65 UTIL
.startMiniCluster(2);
66 CONN
= ConnectionFactory
.createConnection(UTIL
.getConfiguration());
70 public static void tearDownAfterClass() throws Exception
{
72 UTIL
.shutdownMiniCluster();
76 public void setUp() throws IOException
, InterruptedException
{
77 tableName
= TableName
.valueOf(testName
.getMethodName().replaceAll("[^0-9a-zA-Z]", "_"));
78 UTIL
.createTable(tableName
, CF
);
79 UTIL
.waitTableAvailable(tableName
);
82 private void put(byte[] row
, byte[] cq
, byte[] value
) throws IOException
{
83 try (Table table
= CONN
.getTable(tableName
)) {
84 table
.put(new Put(row
).addColumn(CF
, cq
, value
));
88 private void move() throws IOException
, InterruptedException
{
90 UTIL
.getHBaseCluster().getRegions(tableName
).stream().findAny().get().getRegionInfo();
92 UTIL
.getHBaseCluster().getRegionServerThreads().stream().map(t
-> t
.getRegionServer())
93 .filter(r
-> !r
.getOnlineTables().contains(tableName
)).findAny().get();
94 UTIL
.getAdmin().move(region
.getEncodedNameAsBytes(), rs
.getServerName());
95 while (UTIL
.getRSForFirstRegionInTable(tableName
) != rs
) {
101 public void testRowAtomic() throws IOException
, InterruptedException
{
102 byte[] row
= Bytes
.toBytes("row");
103 put(row
, CQ1
, Bytes
.toBytes(1));
104 put(row
, CQ2
, Bytes
.toBytes(2));
105 try (Table table
= CONN
.getTable(tableName
);
106 ResultScanner scanner
= table
.getScanner(new Scan().setBatch(1).setCaching(1))) {
107 Result result
= scanner
.next();
108 assertEquals(1, result
.rawCells().length
);
109 assertEquals(1, Bytes
.toInt(result
.getValue(CF
, CQ1
)));
111 put(row
, CQ3
, Bytes
.toBytes(3));
112 result
= scanner
.next();
113 assertEquals(1, result
.rawCells().length
);
114 assertEquals(2, Bytes
.toInt(result
.getValue(CF
, CQ2
)));
115 assertNull(scanner
.next());
120 public void testCrossRowAtomicInRegion() throws IOException
, InterruptedException
{
121 put(Bytes
.toBytes("row1"), CQ1
, Bytes
.toBytes(1));
122 put(Bytes
.toBytes("row2"), CQ1
, Bytes
.toBytes(2));
123 try (Table table
= CONN
.getTable(tableName
);
124 ResultScanner scanner
= table
.getScanner(new Scan().setCaching(1))) {
125 Result result
= scanner
.next();
126 assertArrayEquals(Bytes
.toBytes("row1"), result
.getRow());
127 assertEquals(1, Bytes
.toInt(result
.getValue(CF
, CQ1
)));
129 put(Bytes
.toBytes("row3"), CQ1
, Bytes
.toBytes(3));
130 result
= scanner
.next();
131 assertArrayEquals(Bytes
.toBytes("row2"), result
.getRow());
132 assertEquals(2, Bytes
.toInt(result
.getValue(CF
, CQ1
)));
133 assertNull(scanner
.next());