HBASE-26567 Remove IndexType from ChunkCreator (#3947)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / client / TestReplicaWithCluster.java
blob391eed35b0659537c6d08e102cdc38e56c8fbd87
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.client;
20 import java.io.IOException;
21 import java.util.Arrays;
22 import java.util.Collections;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Optional;
26 import java.util.TreeMap;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicLong;
30 import java.util.concurrent.atomic.AtomicReference;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.fs.Path;
33 import org.apache.hadoop.hbase.Cell;
34 import org.apache.hadoop.hbase.HBaseClassTestRule;
35 import org.apache.hadoop.hbase.HBaseConfiguration;
36 import org.apache.hadoop.hbase.HBaseTestingUtil;
37 import org.apache.hadoop.hbase.HConstants;
38 import org.apache.hadoop.hbase.TableName;
39 import org.apache.hadoop.hbase.Waiter;
40 import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
41 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
42 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
43 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
44 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
45 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
46 import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
47 import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
48 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
49 import org.apache.hadoop.hbase.testclassification.ClientTests;
50 import org.apache.hadoop.hbase.testclassification.LargeTests;
51 import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
52 import org.apache.hadoop.hbase.util.Bytes;
53 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
54 import org.junit.AfterClass;
55 import org.junit.Assert;
56 import org.junit.BeforeClass;
57 import org.junit.ClassRule;
58 import org.junit.Test;
59 import org.junit.experimental.categories.Category;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
63 @Category({LargeTests.class, ClientTests.class})
64 public class TestReplicaWithCluster {
66 @ClassRule
67 public static final HBaseClassTestRule CLASS_RULE =
68 HBaseClassTestRule.forClass(TestReplicaWithCluster.class);
70 private static final Logger LOG = LoggerFactory.getLogger(TestReplicaWithCluster.class);
72 private static final int NB_SERVERS = 3;
73 private static final byte[] row = Bytes.toBytes(TestReplicaWithCluster.class.getName());
74 private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
76 // second minicluster used in testing of replication
77 private static HBaseTestingUtil HTU2;
78 private static final byte[] f = HConstants.CATALOG_FAMILY;
80 private final static int REFRESH_PERIOD = 1000;
81 private final static int META_SCAN_TIMEOUT_IN_MILLISEC = 200;
83 /**
84 * This copro is used to synchronize the tests.
86 public static class SlowMeCopro implements RegionCoprocessor, RegionObserver {
87 static final AtomicLong sleepTime = new AtomicLong(0);
88 static final AtomicReference<CountDownLatch> cdl = new AtomicReference<>(new CountDownLatch(0));
90 public SlowMeCopro() {
93 @Override
94 public Optional<RegionObserver> getRegionObserver() {
95 return Optional.of(this);
98 @Override
99 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
100 final Get get, final List<Cell> results) throws IOException {
102 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
103 CountDownLatch latch = cdl.get();
104 try {
105 if (sleepTime.get() > 0) {
106 LOG.info("Sleeping for " + sleepTime.get() + " ms");
107 Thread.sleep(sleepTime.get());
108 } else if (latch.getCount() > 0) {
109 LOG.info("Waiting for the counterCountDownLatch");
110 latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
111 if (latch.getCount() > 0) {
112 throw new RuntimeException("Can't wait more");
115 } catch (InterruptedException e1) {
116 LOG.error(e1.toString(), e1);
118 } else {
119 LOG.info("We're not the primary replicas.");
125 * This copro is used to simulate region server down exception for Get and Scan
127 @CoreCoprocessor
128 public static class RegionServerStoppedCopro implements RegionCoprocessor, RegionObserver {
130 public RegionServerStoppedCopro() {
133 @Override
134 public Optional<RegionObserver> getRegionObserver() {
135 return Optional.of(this);
138 @Override
139 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
140 final Get get, final List<Cell> results) throws IOException {
142 int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
144 // Fail for the primary replica and replica 1
145 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) {
146 LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId);
147 throw new RegionServerStoppedException("Server " + e.getEnvironment().getServerName()
148 + " not running");
149 } else {
150 LOG.info("We're replica region " + replicaId);
154 @Override
155 public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
156 final Scan scan) throws IOException {
157 int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
158 // Fail for the primary replica and replica 1
159 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) {
160 LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId);
161 throw new RegionServerStoppedException("Server " + e.getEnvironment().getServerName()
162 + " not running");
163 } else {
164 LOG.info("We're replica region " + replicaId);
170 * This copro is used to slow down the primary meta region scan a bit
172 public static class RegionServerHostingPrimayMetaRegionSlowOrStopCopro
173 implements RegionCoprocessor, RegionObserver {
174 static boolean slowDownPrimaryMetaScan = false;
175 static boolean throwException = false;
177 @Override
178 public Optional<RegionObserver> getRegionObserver() {
179 return Optional.of(this);
182 @Override
183 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
184 final Get get, final List<Cell> results) throws IOException {
186 int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
188 // Fail for the primary replica, but not for meta
189 if (throwException) {
190 if (!e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) {
191 LOG.info("Get, throw Region Server Stopped Exceptoin for region " + e.getEnvironment()
192 .getRegion().getRegionInfo());
193 throw new RegionServerStoppedException("Server " + e.getEnvironment().getServerName()
194 + " not running");
196 } else {
197 LOG.info("Get, We're replica region " + replicaId);
201 @Override
202 public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
203 final Scan scan) throws IOException {
205 int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
207 // Slow down with the primary meta region scan
208 if (e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) {
209 if (slowDownPrimaryMetaScan) {
210 LOG.info("Scan with primary meta region, slow down a bit");
211 try {
212 Thread.sleep(META_SCAN_TIMEOUT_IN_MILLISEC - 50);
213 } catch (InterruptedException ie) {
214 // Ingore
218 // Fail for the primary replica
219 if (throwException) {
220 LOG.info("Scan, throw Region Server Stopped Exceptoin for replica " + e.getEnvironment()
221 .getRegion().getRegionInfo());
223 throw new RegionServerStoppedException("Server " + e.getEnvironment().getServerName()
224 + " not running");
225 } else {
226 LOG.info("Scan, We're replica region " + replicaId);
228 } else {
229 LOG.info("Scan, We're replica region " + replicaId);
234 @BeforeClass
235 public static void beforeClass() throws Exception {
236 // enable store file refreshing
237 HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
238 REFRESH_PERIOD);
240 HTU.getConfiguration().setFloat("hbase.regionserver.logroll.multiplier", 0.0001f);
241 HTU.getConfiguration().setInt("replication.source.size.capacity", 10240);
242 HTU.getConfiguration().setLong("replication.source.sleepforretries", 100);
243 HTU.getConfiguration().setInt("hbase.regionserver.maxlogs", 2);
244 HTU.getConfiguration().setLong("hbase.master.logcleaner.ttl", 10);
245 HTU.getConfiguration().setInt("zookeeper.recovery.retry", 1);
246 HTU.getConfiguration().setInt("zookeeper.recovery.retry.intervalmill", 10);
248 // Wait for primary call longer so make sure that it will get exception from the primary call
249 HTU.getConfiguration().setInt("hbase.client.primaryCallTimeout.get", 1000000);
250 HTU.getConfiguration().setInt("hbase.client.primaryCallTimeout.scan", 1000000);
252 // Make sure master does not host system tables.
253 HTU.getConfiguration().set("hbase.balancer.tablesOnMaster", "none");
255 // Set system coprocessor so it can be applied to meta regions
256 HTU.getConfiguration().set("hbase.coprocessor.region.classes",
257 RegionServerHostingPrimayMetaRegionSlowOrStopCopro.class.getName());
259 HTU.getConfiguration().setInt(HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT,
260 META_SCAN_TIMEOUT_IN_MILLISEC * 1000);
262 HTU.startMiniCluster(NB_SERVERS);
263 // Enable meta replica at server side
264 HBaseTestingUtil.setReplicas(HTU.getAdmin(), TableName.META_TABLE_NAME, 2);
266 HTU.getHBaseCluster().startMaster();
269 @AfterClass
270 public static void afterClass() throws Exception {
271 if (HTU2 != null)
272 HTU2.shutdownMiniCluster();
273 HTU.shutdownMiniCluster();
276 @Test
277 public void testCreateDeleteTable() throws IOException {
278 // Create table then get the single region for our new table.
279 TableDescriptorBuilder builder =
280 HTU.createModifyableTableDescriptor(TableName.valueOf("testCreateDeleteTable"),
281 ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
282 ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
283 builder.setRegionReplication(NB_SERVERS);
284 builder.setCoprocessor(SlowMeCopro.class.getName());
285 TableDescriptor hdt = builder.build();
286 Table table = HTU.createTable(hdt, new byte[][] { f }, null);
288 Put p = new Put(row);
289 p.addColumn(f, row, row);
290 table.put(p);
292 Get g = new Get(row);
293 Result r = table.get(g);
294 Assert.assertFalse(r.isStale());
296 try {
297 // But if we ask for stale we will get it
298 SlowMeCopro.cdl.set(new CountDownLatch(1));
299 g = new Get(row);
300 g.setConsistency(Consistency.TIMELINE);
301 r = table.get(g);
302 Assert.assertTrue(r.isStale());
303 SlowMeCopro.cdl.get().countDown();
304 } finally {
305 SlowMeCopro.cdl.get().countDown();
306 SlowMeCopro.sleepTime.set(0);
309 HTU.getAdmin().disableTable(hdt.getTableName());
310 HTU.deleteTable(hdt.getTableName());
313 @Test
314 public void testChangeTable() throws Exception {
315 TableDescriptor td = TableDescriptorBuilder.newBuilder(TableName.valueOf("testChangeTable"))
316 .setRegionReplication(NB_SERVERS)
317 .setCoprocessor(SlowMeCopro.class.getName())
318 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(f))
319 .build();
320 HTU.getAdmin().createTable(td);
321 Table table = HTU.getConnection().getTable(td.getTableName());
322 // basic test: it should work.
323 Put p = new Put(row);
324 p.addColumn(f, row, row);
325 table.put(p);
327 Get g = new Get(row);
328 Result r = table.get(g);
329 Assert.assertFalse(r.isStale());
331 // Add a CF, it should work.
332 TableDescriptor bHdt = HTU.getAdmin().getDescriptor(td.getTableName());
333 td = TableDescriptorBuilder.newBuilder(td)
334 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(row))
335 .build();
336 HTU.getAdmin().disableTable(td.getTableName());
337 HTU.getAdmin().modifyTable(td);
338 HTU.getAdmin().enableTable(td.getTableName());
339 TableDescriptor nHdt = HTU.getAdmin().getDescriptor(td.getTableName());
340 Assert.assertEquals("fams=" + Arrays.toString(nHdt.getColumnFamilies()),
341 bHdt.getColumnFamilyCount() + 1, nHdt.getColumnFamilyCount());
343 p = new Put(row);
344 p.addColumn(row, row, row);
345 table.put(p);
347 g = new Get(row);
348 r = table.get(g);
349 Assert.assertFalse(r.isStale());
351 try {
352 SlowMeCopro.cdl.set(new CountDownLatch(1));
353 g = new Get(row);
354 g.setConsistency(Consistency.TIMELINE);
355 r = table.get(g);
356 Assert.assertTrue(r.isStale());
357 } finally {
358 SlowMeCopro.cdl.get().countDown();
359 SlowMeCopro.sleepTime.set(0);
362 Admin admin = HTU.getAdmin();
363 nHdt =admin.getDescriptor(td.getTableName());
364 Assert.assertEquals("fams=" + Arrays.toString(nHdt.getColumnFamilies()),
365 bHdt.getColumnFamilyCount() + 1, nHdt.getColumnFamilyCount());
367 admin.disableTable(td.getTableName());
368 admin.deleteTable(td.getTableName());
369 admin.close();
372 @SuppressWarnings("deprecation")
373 @Test
374 public void testReplicaAndReplication() throws Exception {
375 TableDescriptorBuilder builder =
376 HTU.createModifyableTableDescriptor("testReplicaAndReplication");
377 builder.setRegionReplication(NB_SERVERS);
378 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(row)
379 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build());
381 builder.setCoprocessor(SlowMeCopro.class.getName());
382 TableDescriptor tableDescriptor = builder.build();
383 HTU.getAdmin().createTable(tableDescriptor, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
385 Configuration conf2 = HBaseConfiguration.create(HTU.getConfiguration());
386 conf2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
387 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
388 MiniZooKeeperCluster miniZK = HTU.getZkCluster();
390 HTU2 = new HBaseTestingUtil(conf2);
391 HTU2.setZkCluster(miniZK);
392 HTU2.startMiniCluster(NB_SERVERS);
393 LOG.info("Setup second Zk");
394 HTU2.getAdmin().createTable(tableDescriptor, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
396 try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
397 Admin admin = connection.getAdmin()) {
398 ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
399 .setClusterKey(HTU2.getClusterKey()).build();
400 admin.addReplicationPeer("2", rpc);
403 Put p = new Put(row);
404 p.addColumn(row, row, row);
405 final Table table = HTU.getConnection().getTable(tableDescriptor.getTableName());
406 table.put(p);
408 HTU.getAdmin().flush(table.getName());
409 LOG.info("Put & flush done on the first cluster. Now doing a get on the same cluster.");
411 Waiter.waitFor(HTU.getConfiguration(), 1000, new Waiter.Predicate<Exception>() {
412 @Override public boolean evaluate() throws Exception {
413 try {
414 SlowMeCopro.cdl.set(new CountDownLatch(1));
415 Get g = new Get(row);
416 g.setConsistency(Consistency.TIMELINE);
417 Result r = table.get(g);
418 Assert.assertTrue(r.isStale());
419 return !r.isEmpty();
420 } finally {
421 SlowMeCopro.cdl.get().countDown();
422 SlowMeCopro.sleepTime.set(0);
426 table.close();
427 LOG.info("stale get on the first cluster done. Now for the second.");
429 final Table table2 = HTU.getConnection().getTable(tableDescriptor.getTableName());
430 Waiter.waitFor(HTU.getConfiguration(), 1000, new Waiter.Predicate<Exception>() {
431 @Override public boolean evaluate() throws Exception {
432 try {
433 SlowMeCopro.cdl.set(new CountDownLatch(1));
434 Get g = new Get(row);
435 g.setConsistency(Consistency.TIMELINE);
436 Result r = table2.get(g);
437 Assert.assertTrue(r.isStale());
438 return !r.isEmpty();
439 } finally {
440 SlowMeCopro.cdl.get().countDown();
441 SlowMeCopro.sleepTime.set(0);
445 table2.close();
447 HTU.getAdmin().disableTable(tableDescriptor.getTableName());
448 HTU.deleteTable(tableDescriptor.getTableName());
450 HTU2.getAdmin().disableTable(tableDescriptor.getTableName());
451 HTU2.deleteTable(tableDescriptor.getTableName());
453 // We shutdown HTU2 minicluster later, in afterClass(), as shutting down
454 // the minicluster has negative impact of deleting all HConnections in JVM.
457 @Test
458 public void testBulkLoad() throws IOException {
459 // Create table then get the single region for our new table.
460 LOG.debug("Creating test table");
461 TableDescriptorBuilder builder = HTU.createModifyableTableDescriptor(
462 TableName.valueOf("testBulkLoad"), ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3,
463 HConstants.FOREVER, ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
464 builder.setRegionReplication(NB_SERVERS);
465 builder.setCoprocessor(SlowMeCopro.class.getName());
466 TableDescriptor hdt = builder.build();
467 Table table = HTU.createTable(hdt, new byte[][] { f }, null);
469 // create hfiles to load.
470 LOG.debug("Creating test data");
471 Path dir = HTU.getDataTestDirOnTestFS("testBulkLoad");
472 final int numRows = 10;
473 final byte[] qual = Bytes.toBytes("qual");
474 final byte[] val = Bytes.toBytes("val");
475 Map<byte[], List<Path>> family2Files = new TreeMap<>(Bytes.BYTES_COMPARATOR);
476 for (ColumnFamilyDescriptor col : hdt.getColumnFamilies()) {
477 Path hfile = new Path(dir, col.getNameAsString());
478 TestHRegionServerBulkLoad.createHFile(HTU.getTestFileSystem(), hfile, col.getName(), qual,
479 val, numRows);
480 family2Files.put(col.getName(), Collections.singletonList(hfile));
483 // bulk load HFiles
484 LOG.debug("Loading test data");
485 BulkLoadHFiles.create(HTU.getConfiguration()).bulkLoad(hdt.getTableName(), family2Files);
487 // verify we can read them from the primary
488 LOG.debug("Verifying data load");
489 for (int i = 0; i < numRows; i++) {
490 byte[] row = TestHRegionServerBulkLoad.rowkey(i);
491 Get g = new Get(row);
492 Result r = table.get(g);
493 Assert.assertFalse(r.isStale());
496 // verify we can read them from the replica
497 LOG.debug("Verifying replica queries");
498 try {
499 SlowMeCopro.cdl.set(new CountDownLatch(1));
500 for (int i = 0; i < numRows; i++) {
501 byte[] row = TestHRegionServerBulkLoad.rowkey(i);
502 Get g = new Get(row);
503 g.setConsistency(Consistency.TIMELINE);
504 Result r = table.get(g);
505 Assert.assertTrue(r.isStale());
507 SlowMeCopro.cdl.get().countDown();
508 } finally {
509 SlowMeCopro.cdl.get().countDown();
510 SlowMeCopro.sleepTime.set(0);
513 HTU.getAdmin().disableTable(hdt.getTableName());
514 HTU.deleteTable(hdt.getTableName());
517 @Test
518 public void testReplicaGetWithPrimaryDown() throws IOException {
519 // Create table then get the single region for our new table.
520 TableDescriptorBuilder builder =
521 HTU.createModifyableTableDescriptor(TableName.valueOf("testCreateDeleteTable"),
522 ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
523 ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
524 builder.setRegionReplication(NB_SERVERS);
525 builder.setCoprocessor(RegionServerStoppedCopro.class.getName());
526 TableDescriptor hdt = builder.build();
527 try {
528 Table table = HTU.createTable(hdt, new byte[][] { f }, null);
530 Put p = new Put(row);
531 p.addColumn(f, row, row);
532 table.put(p);
534 // Flush so it can be picked by the replica refresher thread
535 HTU.flush(table.getName());
537 // Sleep for some time until data is picked up by replicas
538 try {
539 Thread.sleep(2 * REFRESH_PERIOD);
540 } catch (InterruptedException e1) {
541 LOG.error(e1.toString(), e1);
544 // But if we ask for stale we will get it
545 Get g = new Get(row);
546 g.setConsistency(Consistency.TIMELINE);
547 Result r = table.get(g);
548 Assert.assertTrue(r.isStale());
549 } finally {
550 HTU.getAdmin().disableTable(hdt.getTableName());
551 HTU.deleteTable(hdt.getTableName());
555 @Test
556 public void testReplicaScanWithPrimaryDown() throws IOException {
557 // Create table then get the single region for our new table.
558 TableDescriptorBuilder builder =
559 HTU.createModifyableTableDescriptor(TableName.valueOf("testCreateDeleteTable"),
560 ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
561 ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
562 builder.setRegionReplication(NB_SERVERS);
563 builder.setCoprocessor(RegionServerStoppedCopro.class.getName());
564 TableDescriptor hdt = builder.build();
565 try {
566 Table table = HTU.createTable(hdt, new byte[][] { f }, null);
568 Put p = new Put(row);
569 p.addColumn(f, row, row);
570 table.put(p);
572 // Flush so it can be picked by the replica refresher thread
573 HTU.flush(table.getName());
575 // Sleep for some time until data is picked up by replicas
576 try {
577 Thread.sleep(2 * REFRESH_PERIOD);
578 } catch (InterruptedException e1) {
579 LOG.error(e1.toString(), e1);
582 // But if we ask for stale we will get it
583 // Instantiating the Scan class
584 Scan scan = new Scan();
586 // Scanning the required columns
587 scan.addFamily(f);
588 scan.setConsistency(Consistency.TIMELINE);
590 // Getting the scan result
591 ResultScanner scanner = table.getScanner(scan);
593 Result r = scanner.next();
595 Assert.assertTrue(r.isStale());
596 } finally {
597 HTU.getAdmin().disableTable(hdt.getTableName());
598 HTU.deleteTable(hdt.getTableName());
602 @Test
603 public void testReplicaGetWithAsyncRpcClientImpl() throws IOException {
604 HTU.getConfiguration().setBoolean("hbase.ipc.client.specificThreadForWriting", true);
605 HTU.getConfiguration().set("hbase.rpc.client.impl",
606 "org.apache.hadoop.hbase.ipc.AsyncRpcClient");
607 // Create table then get the single region for our new table.
608 TableDescriptorBuilder builder =
609 HTU.createModifyableTableDescriptor(TableName.valueOf("testReplicaGetWithAsyncRpcClientImpl"),
610 ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
611 ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
612 builder.setRegionReplication(NB_SERVERS);
613 builder.setCoprocessor(SlowMeCopro.class.getName());
614 TableDescriptor hdt = builder.build();
615 try {
616 Table table = HTU.createTable(hdt, new byte[][] { f }, null);
618 Put p = new Put(row);
619 p.addColumn(f, row, row);
620 table.put(p);
622 // Flush so it can be picked by the replica refresher thread
623 HTU.flush(table.getName());
625 // Sleep for some time until data is picked up by replicas
626 try {
627 Thread.sleep(2 * REFRESH_PERIOD);
628 } catch (InterruptedException e1) {
629 LOG.error(e1.toString(), e1);
632 try {
633 // Create the new connection so new config can kick in
634 Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
635 Table t = connection.getTable(hdt.getTableName());
637 // But if we ask for stale we will get it
638 SlowMeCopro.cdl.set(new CountDownLatch(1));
639 Get g = new Get(row);
640 g.setConsistency(Consistency.TIMELINE);
641 Result r = t.get(g);
642 Assert.assertTrue(r.isStale());
643 SlowMeCopro.cdl.get().countDown();
644 } finally {
645 SlowMeCopro.cdl.get().countDown();
646 SlowMeCopro.sleepTime.set(0);
648 } finally {
649 HTU.getConfiguration().unset("hbase.ipc.client.specificThreadForWriting");
650 HTU.getConfiguration().unset("hbase.rpc.client.impl");
651 HTU.getAdmin().disableTable(hdt.getTableName());
652 HTU.deleteTable(hdt.getTableName());