2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.junit
.Assert
.assertEquals
;
21 import static org
.junit
.Assert
.assertFalse
;
22 import static org
.junit
.Assert
.assertTrue
;
24 import java
.util
.Arrays
;
25 import java
.util
.List
;
26 import java
.util
.concurrent
.TimeUnit
;
27 import java
.util
.stream
.Collectors
;
28 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
29 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
30 import org
.apache
.hadoop
.hbase
.TableName
;
31 import org
.apache
.hadoop
.hbase
.regionserver
.HRegion
;
32 import org
.apache
.hadoop
.hbase
.regionserver
.HRegionServer
;
33 import org
.apache
.hadoop
.hbase
.testclassification
.ClientTests
;
34 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
35 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
36 import org
.apache
.hadoop
.hbase
.util
.JVMClusterUtil
;
37 import org
.apache
.hadoop
.io
.IOUtils
;
38 import org
.junit
.After
;
39 import org
.junit
.AfterClass
;
40 import org
.junit
.Before
;
41 import org
.junit
.BeforeClass
;
42 import org
.junit
.ClassRule
;
43 import org
.junit
.Rule
;
44 import org
.junit
.Test
;
45 import org
.junit
.experimental
.categories
.Category
;
46 import org
.junit
.rules
.TestName
;
48 import org
.slf4j
.Logger
;
49 import org
.slf4j
.LoggerFactory
;
51 @Category({MediumTests
.class, ClientTests
.class})
52 public class TestFlushFromClient
{
55 public static final HBaseClassTestRule CLASS_RULE
=
56 HBaseClassTestRule
.forClass(TestFlushFromClient
.class);
58 private static final Logger LOG
= LoggerFactory
.getLogger(TestFlushFromClient
.class);
59 private final static HBaseTestingUtil TEST_UTIL
= new HBaseTestingUtil();
60 private static AsyncConnection asyncConn
;
61 private static final byte[][] SPLITS
= new byte[][]{Bytes
.toBytes("3"), Bytes
.toBytes("7")};
62 private static final List
<byte[]> ROWS
= Arrays
.asList(
66 private static final byte[] FAMILY_1
= Bytes
.toBytes("f1");
67 private static final byte[] FAMILY_2
= Bytes
.toBytes("f2");
68 public static final byte[][] FAMILIES
= {FAMILY_1
, FAMILY_2
};
70 public TestName name
= new TestName();
72 public TableName tableName
;
75 public static void setUpBeforeClass() throws Exception
{
76 TEST_UTIL
.startMiniCluster(ROWS
.size());
77 asyncConn
= ConnectionFactory
.createAsyncConnection(TEST_UTIL
.getConfiguration()).get();
81 public static void tearDownAfterClass() throws Exception
{
82 IOUtils
.cleanup(null, asyncConn
);
83 TEST_UTIL
.shutdownMiniCluster();
87 public void setUp() throws Exception
{
88 tableName
= TableName
.valueOf(name
.getMethodName());
89 try (Table t
= TEST_UTIL
.createTable(tableName
, FAMILIES
, SPLITS
)) {
90 List
<Put
> puts
= ROWS
.stream().map(r
-> new Put(r
)).collect(Collectors
.toList());
91 for (int i
= 0; i
!= 20; ++i
) {
92 byte[] value
= Bytes
.toBytes(i
);
94 p
.addColumn(FAMILY_1
, value
, value
);
95 p
.addColumn(FAMILY_2
, value
, value
);
100 assertFalse(getRegionInfo().isEmpty());
101 assertTrue(getRegionInfo().stream().allMatch(r
-> r
.getMemStoreDataSize() != 0));
105 public void tearDown() throws Exception
{
106 for (TableDescriptor htd
: TEST_UTIL
.getAdmin().listTableDescriptors()) {
107 LOG
.info("Tear down, remove table=" + htd
.getTableName());
108 TEST_UTIL
.deleteTable(htd
.getTableName());
113 public void testFlushTable() throws Exception
{
114 try (Admin admin
= TEST_UTIL
.getAdmin()) {
115 admin
.flush(tableName
);
116 assertFalse(getRegionInfo().stream().anyMatch(r
-> r
.getMemStoreDataSize() != 0));
121 public void testFlushTableFamily() throws Exception
{
122 try (Admin admin
= TEST_UTIL
.getAdmin()) {
123 long sizeBeforeFlush
= getRegionInfo().get(0).getMemStoreDataSize();
124 admin
.flush(tableName
, FAMILY_1
);
125 assertFalse(getRegionInfo().stream().
126 anyMatch(r
-> r
.getMemStoreDataSize() != sizeBeforeFlush
/ 2));
131 public void testAsyncFlushTable() throws Exception
{
132 AsyncAdmin admin
= asyncConn
.getAdmin();
133 admin
.flush(tableName
).get();
134 assertFalse(getRegionInfo().stream().anyMatch(r
-> r
.getMemStoreDataSize() != 0));
138 public void testAsyncFlushTableFamily() throws Exception
{
139 AsyncAdmin admin
= asyncConn
.getAdmin();
140 long sizeBeforeFlush
= getRegionInfo().get(0).getMemStoreDataSize();
141 admin
.flush(tableName
, FAMILY_1
).get();
142 assertFalse(getRegionInfo().stream().
143 anyMatch(r
-> r
.getMemStoreDataSize() != sizeBeforeFlush
/ 2));
147 public void testFlushRegion() throws Exception
{
148 try (Admin admin
= TEST_UTIL
.getAdmin()) {
149 for (HRegion r
: getRegionInfo()) {
150 admin
.flushRegion(r
.getRegionInfo().getRegionName());
151 TimeUnit
.SECONDS
.sleep(1);
152 assertEquals(0, r
.getMemStoreDataSize());
158 public void testFlushRegionFamily() throws Exception
{
159 try (Admin admin
= TEST_UTIL
.getAdmin()) {
160 for (HRegion r
: getRegionInfo()) {
161 long sizeBeforeFlush
= r
.getMemStoreDataSize();
162 admin
.flushRegion(r
.getRegionInfo().getRegionName(), FAMILY_1
);
163 TimeUnit
.SECONDS
.sleep(1);
164 assertEquals(sizeBeforeFlush
/ 2, r
.getMemStoreDataSize());
170 public void testAsyncFlushRegion() throws Exception
{
171 AsyncAdmin admin
= asyncConn
.getAdmin();
172 for (HRegion r
: getRegionInfo()) {
173 admin
.flushRegion(r
.getRegionInfo().getRegionName()).get();
174 TimeUnit
.SECONDS
.sleep(1);
175 assertEquals(0, r
.getMemStoreDataSize());
180 public void testAsyncFlushRegionFamily() throws Exception
{
181 AsyncAdmin admin
= asyncConn
.getAdmin();
182 for (HRegion r
: getRegionInfo()) {
183 long sizeBeforeFlush
= r
.getMemStoreDataSize();
184 admin
.flushRegion(r
.getRegionInfo().getRegionName(), FAMILY_1
).get();
185 TimeUnit
.SECONDS
.sleep(1);
186 assertEquals(sizeBeforeFlush
/ 2, r
.getMemStoreDataSize());
191 public void testFlushRegionServer() throws Exception
{
192 try (Admin admin
= TEST_UTIL
.getAdmin()) {
193 for (HRegionServer rs
: TEST_UTIL
.getHBaseCluster()
194 .getLiveRegionServerThreads()
195 .stream().map(JVMClusterUtil
.RegionServerThread
::getRegionServer
)
196 .collect(Collectors
.toList())) {
197 admin
.flushRegionServer(rs
.getServerName());
198 assertFalse(getRegionInfo(rs
).stream().anyMatch(r
-> r
.getMemStoreDataSize() != 0));
204 public void testAsyncFlushRegionServer() throws Exception
{
205 AsyncAdmin admin
= asyncConn
.getAdmin();
206 for (HRegionServer rs
: TEST_UTIL
.getHBaseCluster()
207 .getLiveRegionServerThreads()
208 .stream().map(JVMClusterUtil
.RegionServerThread
::getRegionServer
)
209 .collect(Collectors
.toList())) {
210 admin
.flushRegionServer(rs
.getServerName()).get();
211 assertFalse(getRegionInfo(rs
).stream().anyMatch(r
-> r
.getMemStoreDataSize() != 0));
215 private List
<HRegion
> getRegionInfo() {
216 return TEST_UTIL
.getHBaseCluster().getRegions(tableName
);
219 private List
<HRegion
> getRegionInfo(HRegionServer rs
) {
220 return rs
.getRegions().stream()
221 .filter(v
-> v
.getTableDescriptor().getTableName().equals(tableName
))
222 .collect(Collectors
.toList());