HBASE-26700 The way we bypass broken track file is not enough in StoreFileListFile...
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / regionserver / TestRegionReplicas.java
blobf7af83bcabbb9931c0c15b4d1d99ed8214630459
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.regionserver;
20 import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.closeRegion;
21 import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.openRegion;
23 import java.io.IOException;
24 import java.util.List;
25 import java.util.Random;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicBoolean;
30 import java.util.concurrent.atomic.AtomicReference;
31 import org.apache.hadoop.hbase.Cell;
32 import org.apache.hadoop.hbase.HBaseClassTestRule;
33 import org.apache.hadoop.hbase.HBaseTestingUtil;
34 import org.apache.hadoop.hbase.HConstants;
35 import org.apache.hadoop.hbase.TableName;
36 import org.apache.hadoop.hbase.TestMetaTableAccessor;
37 import org.apache.hadoop.hbase.client.Consistency;
38 import org.apache.hadoop.hbase.client.Get;
39 import org.apache.hadoop.hbase.client.Put;
40 import org.apache.hadoop.hbase.client.RegionInfo;
41 import org.apache.hadoop.hbase.client.RegionLocator;
42 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
43 import org.apache.hadoop.hbase.client.Result;
44 import org.apache.hadoop.hbase.client.Table;
45 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
46 import org.apache.hadoop.hbase.testclassification.LargeTests;
47 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
48 import org.apache.hadoop.hbase.util.Bytes;
49 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
50 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
51 import org.apache.hadoop.hbase.util.Threads;
52 import org.apache.hadoop.hdfs.DFSConfigKeys;
53 import org.apache.hadoop.util.StringUtils;
54 import org.junit.AfterClass;
55 import org.junit.Assert;
56 import org.junit.BeforeClass;
57 import org.junit.ClassRule;
58 import org.junit.Test;
59 import org.junit.experimental.categories.Category;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
63 import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
65 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
66 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
67 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
69 /**
70 * Tests for region replicas. Sad that we cannot isolate these without bringing up a whole
71 * cluster. See {@link TestRegionServerNoMaster}.
73 @Category({RegionServerTests.class, LargeTests.class})
74 public class TestRegionReplicas {
76 @ClassRule
77 public static final HBaseClassTestRule CLASS_RULE =
78 HBaseClassTestRule.forClass(TestRegionReplicas.class);
80 private static final Logger LOG = LoggerFactory.getLogger(TestRegionReplicas.class);
82 private static final int NB_SERVERS = 1;
83 private static Table table;
84 private static final byte[] row = Bytes.toBytes("TestRegionReplicas");
86 private static RegionInfo hriPrimary;
87 private static RegionInfo hriSecondary;
89 private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
90 private static final byte[] f = HConstants.CATALOG_FAMILY;
92 @BeforeClass
93 public static void before() throws Exception {
94 // Reduce the hdfs block size and prefetch to trigger the file-link reopen
95 // when the file is moved to archive (e.g. compaction)
96 HTU.getConfiguration().setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 8192);
97 HTU.getConfiguration().setInt(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 1);
98 HTU.getConfiguration().setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 128 * 1024 * 1024);
100 HTU.startMiniCluster(NB_SERVERS);
101 final TableName tableName = TableName.valueOf(TestRegionReplicas.class.getSimpleName());
103 // Create table then get the single region for our new table.
104 table = HTU.createTable(tableName, f);
106 try (RegionLocator locator = HTU.getConnection().getRegionLocator(tableName)) {
107 hriPrimary = locator.getRegionLocation(row, false).getRegion();
110 // mock a secondary region info to open
111 hriSecondary = RegionReplicaUtil.getRegionInfoForReplica(hriPrimary, 1);
113 // No master
114 TestRegionServerNoMaster.stopMasterAndCacheMetaLocation(HTU);
117 @AfterClass
118 public static void afterClass() throws Exception {
119 HRegionServer.TEST_SKIP_REPORTING_TRANSITION = false;
120 table.close();
121 HTU.shutdownMiniCluster();
124 private HRegionServer getRS() {
125 return HTU.getMiniHBaseCluster().getRegionServer(0);
128 @Test
129 public void testOpenRegionReplica() throws Exception {
130 openRegion(HTU, getRS(), hriSecondary);
131 try {
132 //load some data to primary
133 HTU.loadNumericRows(table, f, 0, 1000);
135 // assert that we can read back from primary
136 Assert.assertEquals(1000, HBaseTestingUtil.countRows(table));
137 } finally {
138 HTU.deleteNumericRows(table, f, 0, 1000);
139 closeRegion(HTU, getRS(), hriSecondary);
143 /** Tests that the meta location is saved for secondary regions */
144 @Test
145 public void testRegionReplicaUpdatesMetaLocation() throws Exception {
146 openRegion(HTU, getRS(), hriSecondary);
147 Table meta = null;
148 try {
149 meta = HTU.getConnection().getTable(TableName.META_TABLE_NAME);
150 TestMetaTableAccessor.assertMetaLocation(meta, hriPrimary.getRegionName()
151 , getRS().getServerName(), -1, 1, false);
152 } finally {
153 if (meta != null) {
154 meta.close();
156 closeRegion(HTU, getRS(), hriSecondary);
160 @Test
161 public void testRegionReplicaGets() throws Exception {
162 try {
163 //load some data to primary
164 HTU.loadNumericRows(table, f, 0, 1000);
165 // assert that we can read back from primary
166 Assert.assertEquals(1000, HBaseTestingUtil.countRows(table));
167 // flush so that region replica can read
168 HRegion region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
169 region.flush(true);
171 openRegion(HTU, getRS(), hriSecondary);
173 // first try directly against region
174 region = getRS().getRegion(hriSecondary.getEncodedName());
175 assertGet(region, 42, true);
177 assertGetRpc(hriSecondary, 42, true);
178 } finally {
179 HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
180 closeRegion(HTU, getRS(), hriSecondary);
184 @Test
185 public void testGetOnTargetRegionReplica() throws Exception {
186 try {
187 //load some data to primary
188 HTU.loadNumericRows(table, f, 0, 1000);
189 // assert that we can read back from primary
190 Assert.assertEquals(1000, HBaseTestingUtil.countRows(table));
191 // flush so that region replica can read
192 HRegion region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
193 region.flush(true);
195 openRegion(HTU, getRS(), hriSecondary);
197 // try directly Get against region replica
198 byte[] row = Bytes.toBytes(String.valueOf(42));
199 Get get = new Get(row);
200 get.setConsistency(Consistency.TIMELINE);
201 get.setReplicaId(1);
202 Result result = table.get(get);
203 Assert.assertArrayEquals(row, result.getValue(f, null));
204 } finally {
205 HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
206 closeRegion(HTU, getRS(), hriSecondary);
210 private void assertGet(Region region, int value, boolean expect) throws IOException {
211 byte[] row = Bytes.toBytes(String.valueOf(value));
212 Get get = new Get(row);
213 Result result = region.get(get);
214 if (expect) {
215 Assert.assertArrayEquals(row, result.getValue(f, null));
216 } else {
217 result.isEmpty();
221 // build a mock rpc
222 private void assertGetRpc(RegionInfo info, int value, boolean expect)
223 throws IOException, org.apache.hbase.thirdparty.com.google.protobuf.ServiceException {
224 byte[] row = Bytes.toBytes(String.valueOf(value));
225 Get get = new Get(row);
226 ClientProtos.GetRequest getReq = RequestConverter.buildGetRequest(info.getRegionName(), get);
227 ClientProtos.GetResponse getResp = getRS().getRSRpcServices().get(null, getReq);
228 Result result = ProtobufUtil.toResult(getResp.getResult());
229 if (expect) {
230 Assert.assertArrayEquals(row, result.getValue(f, null));
231 } else {
232 result.isEmpty();
236 private void restartRegionServer() throws Exception {
237 afterClass();
238 before();
241 @Test
242 public void testRefresStoreFiles() throws Exception {
243 // enable store file refreshing
244 final int refreshPeriod = 2000; // 2 sec
245 HTU.getConfiguration().setInt("hbase.hstore.compactionThreshold", 100);
246 HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
247 refreshPeriod);
248 // restart the region server so that it starts the refresher chore
249 restartRegionServer();
251 try {
252 LOG.info("Opening the secondary region " + hriSecondary.getEncodedName());
253 openRegion(HTU, getRS(), hriSecondary);
255 //load some data to primary
256 LOG.info("Loading data to primary region");
257 HTU.loadNumericRows(table, f, 0, 1000);
258 // assert that we can read back from primary
259 Assert.assertEquals(1000, HBaseTestingUtil.countRows(table));
260 // flush so that region replica can read
261 LOG.info("Flushing primary region");
262 HRegion region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
263 region.flush(true);
265 // ensure that chore is run
266 LOG.info("Sleeping for " + (4 * refreshPeriod));
267 Threads.sleep(4 * refreshPeriod);
269 LOG.info("Checking results from secondary region replica");
270 Region secondaryRegion = getRS().getRegion(hriSecondary.getEncodedName());
271 Assert.assertEquals(1, secondaryRegion.getStore(f).getStorefilesCount());
273 assertGet(secondaryRegion, 42, true);
274 assertGetRpc(hriSecondary, 42, true);
275 assertGetRpc(hriSecondary, 1042, false);
277 //load some data to primary
278 HTU.loadNumericRows(table, f, 1000, 1100);
279 region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
280 region.flush(true);
282 HTU.loadNumericRows(table, f, 2000, 2100);
283 region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
284 region.flush(true);
286 // ensure that chore is run
287 Threads.sleep(4 * refreshPeriod);
289 assertGetRpc(hriSecondary, 42, true);
290 assertGetRpc(hriSecondary, 1042, true);
291 assertGetRpc(hriSecondary, 2042, true);
293 // ensure that we see the 3 store files
294 Assert.assertEquals(3, secondaryRegion.getStore(f).getStorefilesCount());
296 // force compaction
297 HTU.compact(table.getName(), true);
299 long wakeUpTime = EnvironmentEdgeManager.currentTime() + 4 * refreshPeriod;
300 while (EnvironmentEdgeManager.currentTime() < wakeUpTime) {
301 assertGetRpc(hriSecondary, 42, true);
302 assertGetRpc(hriSecondary, 1042, true);
303 assertGetRpc(hriSecondary, 2042, true);
304 Threads.sleep(10);
307 // ensure that we see the compacted file only
308 // This will be 4 until the cleaner chore runs
309 Assert.assertEquals(4, secondaryRegion.getStore(f).getStorefilesCount());
311 } finally {
312 HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
313 closeRegion(HTU, getRS(), hriSecondary);
317 @Test
318 public void testFlushAndCompactionsInPrimary() throws Exception {
320 long runtime = 30 * 1000;
321 // enable store file refreshing
322 final int refreshPeriod = 100; // 100ms refresh is a lot
323 HTU.getConfiguration().setInt("hbase.hstore.compactionThreshold", 3);
324 HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
325 refreshPeriod);
326 // restart the region server so that it starts the refresher chore
327 restartRegionServer();
328 final int startKey = 0, endKey = 1000;
330 try {
331 openRegion(HTU, getRS(), hriSecondary);
333 //load some data to primary so that reader won't fail
334 HTU.loadNumericRows(table, f, startKey, endKey);
335 TestRegionServerNoMaster.flushRegion(HTU, hriPrimary);
336 // ensure that chore is run
337 Threads.sleep(2 * refreshPeriod);
339 final AtomicBoolean running = new AtomicBoolean(true);
340 @SuppressWarnings("unchecked")
341 final AtomicReference<Exception>[] exceptions = new AtomicReference[3];
342 for (int i=0; i < exceptions.length; i++) {
343 exceptions[i] = new AtomicReference<>();
346 Runnable writer = new Runnable() {
347 int key = startKey;
348 @Override
349 public void run() {
350 try {
351 while (running.get()) {
352 byte[] data = Bytes.toBytes(String.valueOf(key));
353 Put put = new Put(data);
354 put.addColumn(f, null, data);
355 table.put(put);
356 key++;
357 if (key == endKey) {
358 key = startKey;
361 } catch (Exception ex) {
362 LOG.warn(ex.toString(), ex);
363 exceptions[0].compareAndSet(null, ex);
368 Runnable flusherCompactor = new Runnable() {
369 Random random = new Random();
370 @Override
371 public void run() {
372 try {
373 while (running.get()) {
374 // flush or compact
375 if (random.nextBoolean()) {
376 TestRegionServerNoMaster.flushRegion(HTU, hriPrimary);
377 } else {
378 HTU.compact(table.getName(), random.nextBoolean());
381 } catch (Exception ex) {
382 LOG.warn(ex.toString(), ex);
383 exceptions[1].compareAndSet(null, ex);
388 Runnable reader = new Runnable() {
389 Random random = new Random();
390 @Override
391 public void run() {
392 try {
393 while (running.get()) {
394 // whether to do a close and open
395 if (random.nextInt(10) == 0) {
396 try {
397 closeRegion(HTU, getRS(), hriSecondary);
398 } catch (Exception ex) {
399 LOG.warn("Failed closing the region " + hriSecondary + " " +
400 StringUtils.stringifyException(ex));
401 exceptions[2].compareAndSet(null, ex);
403 try {
404 openRegion(HTU, getRS(), hriSecondary);
405 } catch (Exception ex) {
406 LOG.warn("Failed opening the region " + hriSecondary + " " +
407 StringUtils.stringifyException(ex));
408 exceptions[2].compareAndSet(null, ex);
412 int key = random.nextInt(endKey - startKey) + startKey;
413 assertGetRpc(hriSecondary, key, true);
415 } catch (Exception ex) {
416 LOG.warn("Failed getting the value in the region " + hriSecondary + " " +
417 StringUtils.stringifyException(ex));
418 exceptions[2].compareAndSet(null, ex);
423 LOG.info("Starting writer and reader, secondary={}", hriSecondary.getEncodedName());
424 ExecutorService executor = Executors.newFixedThreadPool(3);
425 executor.submit(writer);
426 executor.submit(flusherCompactor);
427 executor.submit(reader);
429 // wait for threads
430 Threads.sleep(runtime);
431 running.set(false);
432 executor.shutdown();
433 executor.awaitTermination(30, TimeUnit.SECONDS);
435 for (AtomicReference<Exception> exRef : exceptions) {
436 Assert.assertNull(exRef.get());
438 } finally {
439 HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, startKey, endKey);
440 try {
441 closeRegion(HTU, getRS(), hriSecondary);
442 } catch (ServiceException e) {
443 LOG.info("Closing wrong region {}", hriSecondary, e);
448 @Test
449 public void testVerifySecondaryAbilityToReadWithOnFiles() throws Exception {
450 // disable the store file refresh chore (we do this by hand)
451 HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 0);
452 restartRegionServer();
454 try {
455 LOG.info("Opening the secondary region " + hriSecondary.getEncodedName());
456 openRegion(HTU, getRS(), hriSecondary);
458 // load some data to primary
459 LOG.info("Loading data to primary region");
460 for (int i = 0; i < 3; ++i) {
461 HTU.loadNumericRows(table, f, i * 1000, (i + 1) * 1000);
462 HRegion region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
463 region.flush(true);
466 HRegion primaryRegion = getRS().getRegion(hriPrimary.getEncodedName());
467 Assert.assertEquals(3, primaryRegion.getStore(f).getStorefilesCount());
469 // Refresh store files on the secondary
470 Region secondaryRegion = getRS().getRegion(hriSecondary.getEncodedName());
471 secondaryRegion.getStore(f).refreshStoreFiles();
472 Assert.assertEquals(3, secondaryRegion.getStore(f).getStorefilesCount());
474 // force compaction
475 LOG.info("Force Major compaction on primary region " + hriPrimary);
476 primaryRegion.compact(true);
477 Assert.assertEquals(1, primaryRegion.getStore(f).getStorefilesCount());
478 List<RegionServerThread> regionServerThreads = HTU.getMiniHBaseCluster()
479 .getRegionServerThreads();
480 HRegionServer hrs = null;
481 for (RegionServerThread rs : regionServerThreads) {
482 if (rs.getRegionServer()
483 .getOnlineRegion(primaryRegion.getRegionInfo().getRegionName()) != null) {
484 hrs = rs.getRegionServer();
485 break;
488 CompactedHFilesDischarger cleaner =
489 new CompactedHFilesDischarger(100, null, hrs, false);
490 cleaner.chore();
491 // scan all the hfiles on the secondary.
492 // since there are no read on the secondary when we ask locations to
493 // the NN a FileNotFound exception will be returned and the FileLink
494 // should be able to deal with it giving us all the result we expect.
495 int keys = 0;
496 int sum = 0;
497 for (HStoreFile sf : ((HStore) secondaryRegion.getStore(f)).getStorefiles()) {
498 // Our file does not exist anymore. was moved by the compaction above.
499 LOG.debug(Boolean.toString(getRS().getFileSystem().exists(sf.getPath())));
500 Assert.assertFalse(getRS().getFileSystem().exists(sf.getPath()));
502 HFileScanner scanner = sf.getReader().getScanner(false, false);
503 scanner.seekTo();
504 do {
505 keys++;
507 Cell cell = scanner.getCell();
508 sum += Integer.parseInt(Bytes.toString(cell.getRowArray(),
509 cell.getRowOffset(), cell.getRowLength()));
510 } while (scanner.next());
512 Assert.assertEquals(3000, keys);
513 Assert.assertEquals(4498500, sum);
514 } finally {
515 HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
516 closeRegion(HTU, getRS(), hriSecondary);