2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.regionserver
;
20 import static org
.apache
.hadoop
.hbase
.regionserver
.TestRegionServerNoMaster
.closeRegion
;
21 import static org
.apache
.hadoop
.hbase
.regionserver
.TestRegionServerNoMaster
.openRegion
;
23 import java
.io
.IOException
;
24 import java
.util
.List
;
25 import java
.util
.Random
;
26 import java
.util
.concurrent
.ExecutorService
;
27 import java
.util
.concurrent
.Executors
;
28 import java
.util
.concurrent
.TimeUnit
;
29 import java
.util
.concurrent
.atomic
.AtomicBoolean
;
30 import java
.util
.concurrent
.atomic
.AtomicReference
;
31 import org
.apache
.hadoop
.hbase
.Cell
;
32 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
33 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
34 import org
.apache
.hadoop
.hbase
.HConstants
;
35 import org
.apache
.hadoop
.hbase
.TableName
;
36 import org
.apache
.hadoop
.hbase
.TestMetaTableAccessor
;
37 import org
.apache
.hadoop
.hbase
.client
.Consistency
;
38 import org
.apache
.hadoop
.hbase
.client
.Get
;
39 import org
.apache
.hadoop
.hbase
.client
.Put
;
40 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
41 import org
.apache
.hadoop
.hbase
.client
.RegionLocator
;
42 import org
.apache
.hadoop
.hbase
.client
.RegionReplicaUtil
;
43 import org
.apache
.hadoop
.hbase
.client
.Result
;
44 import org
.apache
.hadoop
.hbase
.client
.Table
;
45 import org
.apache
.hadoop
.hbase
.io
.hfile
.HFileScanner
;
46 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
47 import org
.apache
.hadoop
.hbase
.testclassification
.RegionServerTests
;
48 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
49 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
50 import org
.apache
.hadoop
.hbase
.util
.JVMClusterUtil
.RegionServerThread
;
51 import org
.apache
.hadoop
.hbase
.util
.Threads
;
52 import org
.apache
.hadoop
.hdfs
.DFSConfigKeys
;
53 import org
.apache
.hadoop
.util
.StringUtils
;
54 import org
.junit
.AfterClass
;
55 import org
.junit
.Assert
;
56 import org
.junit
.BeforeClass
;
57 import org
.junit
.ClassRule
;
58 import org
.junit
.Test
;
59 import org
.junit
.experimental
.categories
.Category
;
60 import org
.slf4j
.Logger
;
61 import org
.slf4j
.LoggerFactory
;
63 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.ServiceException
;
65 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ProtobufUtil
;
66 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.RequestConverter
;
67 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
;
70 * Tests for region replicas. Sad that we cannot isolate these without bringing up a whole
71 * cluster. See {@link TestRegionServerNoMaster}.
73 @Category({RegionServerTests
.class, LargeTests
.class})
74 public class TestRegionReplicas
{
77 public static final HBaseClassTestRule CLASS_RULE
=
78 HBaseClassTestRule
.forClass(TestRegionReplicas
.class);
80 private static final Logger LOG
= LoggerFactory
.getLogger(TestRegionReplicas
.class);
82 private static final int NB_SERVERS
= 1;
83 private static Table table
;
84 private static final byte[] row
= Bytes
.toBytes("TestRegionReplicas");
86 private static RegionInfo hriPrimary
;
87 private static RegionInfo hriSecondary
;
89 private static final HBaseTestingUtil HTU
= new HBaseTestingUtil();
90 private static final byte[] f
= HConstants
.CATALOG_FAMILY
;
93 public static void before() throws Exception
{
94 // Reduce the hdfs block size and prefetch to trigger the file-link reopen
95 // when the file is moved to archive (e.g. compaction)
96 HTU
.getConfiguration().setInt(DFSConfigKeys
.DFS_BLOCK_SIZE_KEY
, 8192);
97 HTU
.getConfiguration().setInt(DFSConfigKeys
.DFS_CLIENT_READ_PREFETCH_SIZE_KEY
, 1);
98 HTU
.getConfiguration().setInt(HConstants
.HREGION_MEMSTORE_FLUSH_SIZE
, 128 * 1024 * 1024);
100 HTU
.startMiniCluster(NB_SERVERS
);
101 final TableName tableName
= TableName
.valueOf(TestRegionReplicas
.class.getSimpleName());
103 // Create table then get the single region for our new table.
104 table
= HTU
.createTable(tableName
, f
);
106 try (RegionLocator locator
= HTU
.getConnection().getRegionLocator(tableName
)) {
107 hriPrimary
= locator
.getRegionLocation(row
, false).getRegion();
110 // mock a secondary region info to open
111 hriSecondary
= RegionReplicaUtil
.getRegionInfoForReplica(hriPrimary
, 1);
114 TestRegionServerNoMaster
.stopMasterAndCacheMetaLocation(HTU
);
118 public static void afterClass() throws Exception
{
119 HRegionServer
.TEST_SKIP_REPORTING_TRANSITION
= false;
121 HTU
.shutdownMiniCluster();
124 private HRegionServer
getRS() {
125 return HTU
.getMiniHBaseCluster().getRegionServer(0);
129 public void testOpenRegionReplica() throws Exception
{
130 openRegion(HTU
, getRS(), hriSecondary
);
132 //load some data to primary
133 HTU
.loadNumericRows(table
, f
, 0, 1000);
135 // assert that we can read back from primary
136 Assert
.assertEquals(1000, HBaseTestingUtil
.countRows(table
));
138 HTU
.deleteNumericRows(table
, f
, 0, 1000);
139 closeRegion(HTU
, getRS(), hriSecondary
);
143 /** Tests that the meta location is saved for secondary regions */
145 public void testRegionReplicaUpdatesMetaLocation() throws Exception
{
146 openRegion(HTU
, getRS(), hriSecondary
);
149 meta
= HTU
.getConnection().getTable(TableName
.META_TABLE_NAME
);
150 TestMetaTableAccessor
.assertMetaLocation(meta
, hriPrimary
.getRegionName()
151 , getRS().getServerName(), -1, 1, false);
156 closeRegion(HTU
, getRS(), hriSecondary
);
161 public void testRegionReplicaGets() throws Exception
{
163 //load some data to primary
164 HTU
.loadNumericRows(table
, f
, 0, 1000);
165 // assert that we can read back from primary
166 Assert
.assertEquals(1000, HBaseTestingUtil
.countRows(table
));
167 // flush so that region replica can read
168 HRegion region
= getRS().getRegionByEncodedName(hriPrimary
.getEncodedName());
171 openRegion(HTU
, getRS(), hriSecondary
);
173 // first try directly against region
174 region
= getRS().getRegion(hriSecondary
.getEncodedName());
175 assertGet(region
, 42, true);
177 assertGetRpc(hriSecondary
, 42, true);
179 HTU
.deleteNumericRows(table
, HConstants
.CATALOG_FAMILY
, 0, 1000);
180 closeRegion(HTU
, getRS(), hriSecondary
);
185 public void testGetOnTargetRegionReplica() throws Exception
{
187 //load some data to primary
188 HTU
.loadNumericRows(table
, f
, 0, 1000);
189 // assert that we can read back from primary
190 Assert
.assertEquals(1000, HBaseTestingUtil
.countRows(table
));
191 // flush so that region replica can read
192 HRegion region
= getRS().getRegionByEncodedName(hriPrimary
.getEncodedName());
195 openRegion(HTU
, getRS(), hriSecondary
);
197 // try directly Get against region replica
198 byte[] row
= Bytes
.toBytes(String
.valueOf(42));
199 Get get
= new Get(row
);
200 get
.setConsistency(Consistency
.TIMELINE
);
202 Result result
= table
.get(get
);
203 Assert
.assertArrayEquals(row
, result
.getValue(f
, null));
205 HTU
.deleteNumericRows(table
, HConstants
.CATALOG_FAMILY
, 0, 1000);
206 closeRegion(HTU
, getRS(), hriSecondary
);
210 private void assertGet(Region region
, int value
, boolean expect
) throws IOException
{
211 byte[] row
= Bytes
.toBytes(String
.valueOf(value
));
212 Get get
= new Get(row
);
213 Result result
= region
.get(get
);
215 Assert
.assertArrayEquals(row
, result
.getValue(f
, null));
222 private void assertGetRpc(RegionInfo info
, int value
, boolean expect
)
223 throws IOException
, org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.ServiceException
{
224 byte[] row
= Bytes
.toBytes(String
.valueOf(value
));
225 Get get
= new Get(row
);
226 ClientProtos
.GetRequest getReq
= RequestConverter
.buildGetRequest(info
.getRegionName(), get
);
227 ClientProtos
.GetResponse getResp
= getRS().getRSRpcServices().get(null, getReq
);
228 Result result
= ProtobufUtil
.toResult(getResp
.getResult());
230 Assert
.assertArrayEquals(row
, result
.getValue(f
, null));
236 private void restartRegionServer() throws Exception
{
242 public void testRefresStoreFiles() throws Exception
{
243 // enable store file refreshing
244 final int refreshPeriod
= 2000; // 2 sec
245 HTU
.getConfiguration().setInt("hbase.hstore.compactionThreshold", 100);
246 HTU
.getConfiguration().setInt(StorefileRefresherChore
.REGIONSERVER_STOREFILE_REFRESH_PERIOD
,
248 // restart the region server so that it starts the refresher chore
249 restartRegionServer();
252 LOG
.info("Opening the secondary region " + hriSecondary
.getEncodedName());
253 openRegion(HTU
, getRS(), hriSecondary
);
255 //load some data to primary
256 LOG
.info("Loading data to primary region");
257 HTU
.loadNumericRows(table
, f
, 0, 1000);
258 // assert that we can read back from primary
259 Assert
.assertEquals(1000, HBaseTestingUtil
.countRows(table
));
260 // flush so that region replica can read
261 LOG
.info("Flushing primary region");
262 HRegion region
= getRS().getRegionByEncodedName(hriPrimary
.getEncodedName());
265 // ensure that chore is run
266 LOG
.info("Sleeping for " + (4 * refreshPeriod
));
267 Threads
.sleep(4 * refreshPeriod
);
269 LOG
.info("Checking results from secondary region replica");
270 Region secondaryRegion
= getRS().getRegion(hriSecondary
.getEncodedName());
271 Assert
.assertEquals(1, secondaryRegion
.getStore(f
).getStorefilesCount());
273 assertGet(secondaryRegion
, 42, true);
274 assertGetRpc(hriSecondary
, 42, true);
275 assertGetRpc(hriSecondary
, 1042, false);
277 //load some data to primary
278 HTU
.loadNumericRows(table
, f
, 1000, 1100);
279 region
= getRS().getRegionByEncodedName(hriPrimary
.getEncodedName());
282 HTU
.loadNumericRows(table
, f
, 2000, 2100);
283 region
= getRS().getRegionByEncodedName(hriPrimary
.getEncodedName());
286 // ensure that chore is run
287 Threads
.sleep(4 * refreshPeriod
);
289 assertGetRpc(hriSecondary
, 42, true);
290 assertGetRpc(hriSecondary
, 1042, true);
291 assertGetRpc(hriSecondary
, 2042, true);
293 // ensure that we see the 3 store files
294 Assert
.assertEquals(3, secondaryRegion
.getStore(f
).getStorefilesCount());
297 HTU
.compact(table
.getName(), true);
299 long wakeUpTime
= EnvironmentEdgeManager
.currentTime() + 4 * refreshPeriod
;
300 while (EnvironmentEdgeManager
.currentTime() < wakeUpTime
) {
301 assertGetRpc(hriSecondary
, 42, true);
302 assertGetRpc(hriSecondary
, 1042, true);
303 assertGetRpc(hriSecondary
, 2042, true);
307 // ensure that we see the compacted file only
308 // This will be 4 until the cleaner chore runs
309 Assert
.assertEquals(4, secondaryRegion
.getStore(f
).getStorefilesCount());
312 HTU
.deleteNumericRows(table
, HConstants
.CATALOG_FAMILY
, 0, 1000);
313 closeRegion(HTU
, getRS(), hriSecondary
);
318 public void testFlushAndCompactionsInPrimary() throws Exception
{
320 long runtime
= 30 * 1000;
321 // enable store file refreshing
322 final int refreshPeriod
= 100; // 100ms refresh is a lot
323 HTU
.getConfiguration().setInt("hbase.hstore.compactionThreshold", 3);
324 HTU
.getConfiguration().setInt(StorefileRefresherChore
.REGIONSERVER_STOREFILE_REFRESH_PERIOD
,
326 // restart the region server so that it starts the refresher chore
327 restartRegionServer();
328 final int startKey
= 0, endKey
= 1000;
331 openRegion(HTU
, getRS(), hriSecondary
);
333 //load some data to primary so that reader won't fail
334 HTU
.loadNumericRows(table
, f
, startKey
, endKey
);
335 TestRegionServerNoMaster
.flushRegion(HTU
, hriPrimary
);
336 // ensure that chore is run
337 Threads
.sleep(2 * refreshPeriod
);
339 final AtomicBoolean running
= new AtomicBoolean(true);
340 @SuppressWarnings("unchecked")
341 final AtomicReference
<Exception
>[] exceptions
= new AtomicReference
[3];
342 for (int i
=0; i
< exceptions
.length
; i
++) {
343 exceptions
[i
] = new AtomicReference
<>();
346 Runnable writer
= new Runnable() {
351 while (running
.get()) {
352 byte[] data
= Bytes
.toBytes(String
.valueOf(key
));
353 Put put
= new Put(data
);
354 put
.addColumn(f
, null, data
);
361 } catch (Exception ex
) {
362 LOG
.warn(ex
.toString(), ex
);
363 exceptions
[0].compareAndSet(null, ex
);
368 Runnable flusherCompactor
= new Runnable() {
369 Random random
= new Random();
373 while (running
.get()) {
375 if (random
.nextBoolean()) {
376 TestRegionServerNoMaster
.flushRegion(HTU
, hriPrimary
);
378 HTU
.compact(table
.getName(), random
.nextBoolean());
381 } catch (Exception ex
) {
382 LOG
.warn(ex
.toString(), ex
);
383 exceptions
[1].compareAndSet(null, ex
);
388 Runnable reader
= new Runnable() {
389 Random random
= new Random();
393 while (running
.get()) {
394 // whether to do a close and open
395 if (random
.nextInt(10) == 0) {
397 closeRegion(HTU
, getRS(), hriSecondary
);
398 } catch (Exception ex
) {
399 LOG
.warn("Failed closing the region " + hriSecondary
+ " " +
400 StringUtils
.stringifyException(ex
));
401 exceptions
[2].compareAndSet(null, ex
);
404 openRegion(HTU
, getRS(), hriSecondary
);
405 } catch (Exception ex
) {
406 LOG
.warn("Failed opening the region " + hriSecondary
+ " " +
407 StringUtils
.stringifyException(ex
));
408 exceptions
[2].compareAndSet(null, ex
);
412 int key
= random
.nextInt(endKey
- startKey
) + startKey
;
413 assertGetRpc(hriSecondary
, key
, true);
415 } catch (Exception ex
) {
416 LOG
.warn("Failed getting the value in the region " + hriSecondary
+ " " +
417 StringUtils
.stringifyException(ex
));
418 exceptions
[2].compareAndSet(null, ex
);
423 LOG
.info("Starting writer and reader, secondary={}", hriSecondary
.getEncodedName());
424 ExecutorService executor
= Executors
.newFixedThreadPool(3);
425 executor
.submit(writer
);
426 executor
.submit(flusherCompactor
);
427 executor
.submit(reader
);
430 Threads
.sleep(runtime
);
433 executor
.awaitTermination(30, TimeUnit
.SECONDS
);
435 for (AtomicReference
<Exception
> exRef
: exceptions
) {
436 Assert
.assertNull(exRef
.get());
439 HTU
.deleteNumericRows(table
, HConstants
.CATALOG_FAMILY
, startKey
, endKey
);
441 closeRegion(HTU
, getRS(), hriSecondary
);
442 } catch (ServiceException e
) {
443 LOG
.info("Closing wrong region {}", hriSecondary
, e
);
449 public void testVerifySecondaryAbilityToReadWithOnFiles() throws Exception
{
450 // disable the store file refresh chore (we do this by hand)
451 HTU
.getConfiguration().setInt(StorefileRefresherChore
.REGIONSERVER_STOREFILE_REFRESH_PERIOD
, 0);
452 restartRegionServer();
455 LOG
.info("Opening the secondary region " + hriSecondary
.getEncodedName());
456 openRegion(HTU
, getRS(), hriSecondary
);
458 // load some data to primary
459 LOG
.info("Loading data to primary region");
460 for (int i
= 0; i
< 3; ++i
) {
461 HTU
.loadNumericRows(table
, f
, i
* 1000, (i
+ 1) * 1000);
462 HRegion region
= getRS().getRegionByEncodedName(hriPrimary
.getEncodedName());
466 HRegion primaryRegion
= getRS().getRegion(hriPrimary
.getEncodedName());
467 Assert
.assertEquals(3, primaryRegion
.getStore(f
).getStorefilesCount());
469 // Refresh store files on the secondary
470 Region secondaryRegion
= getRS().getRegion(hriSecondary
.getEncodedName());
471 secondaryRegion
.getStore(f
).refreshStoreFiles();
472 Assert
.assertEquals(3, secondaryRegion
.getStore(f
).getStorefilesCount());
475 LOG
.info("Force Major compaction on primary region " + hriPrimary
);
476 primaryRegion
.compact(true);
477 Assert
.assertEquals(1, primaryRegion
.getStore(f
).getStorefilesCount());
478 List
<RegionServerThread
> regionServerThreads
= HTU
.getMiniHBaseCluster()
479 .getRegionServerThreads();
480 HRegionServer hrs
= null;
481 for (RegionServerThread rs
: regionServerThreads
) {
482 if (rs
.getRegionServer()
483 .getOnlineRegion(primaryRegion
.getRegionInfo().getRegionName()) != null) {
484 hrs
= rs
.getRegionServer();
488 CompactedHFilesDischarger cleaner
=
489 new CompactedHFilesDischarger(100, null, hrs
, false);
491 // scan all the hfiles on the secondary.
492 // since there are no read on the secondary when we ask locations to
493 // the NN a FileNotFound exception will be returned and the FileLink
494 // should be able to deal with it giving us all the result we expect.
497 for (HStoreFile sf
: ((HStore
) secondaryRegion
.getStore(f
)).getStorefiles()) {
498 // Our file does not exist anymore. was moved by the compaction above.
499 LOG
.debug(Boolean
.toString(getRS().getFileSystem().exists(sf
.getPath())));
500 Assert
.assertFalse(getRS().getFileSystem().exists(sf
.getPath()));
502 HFileScanner scanner
= sf
.getReader().getScanner(false, false);
507 Cell cell
= scanner
.getCell();
508 sum
+= Integer
.parseInt(Bytes
.toString(cell
.getRowArray(),
509 cell
.getRowOffset(), cell
.getRowLength()));
510 } while (scanner
.next());
512 Assert
.assertEquals(3000, keys
);
513 Assert
.assertEquals(4498500, sum
);
515 HTU
.deleteNumericRows(table
, HConstants
.CATALOG_FAMILY
, 0, 1000);
516 closeRegion(HTU
, getRS(), hriSecondary
);