2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import java
.io
.IOException
;
21 import java
.util
.Arrays
;
22 import java
.util
.Collections
;
23 import java
.util
.List
;
25 import java
.util
.Optional
;
26 import java
.util
.TreeMap
;
27 import java
.util
.concurrent
.CountDownLatch
;
28 import java
.util
.concurrent
.TimeUnit
;
29 import java
.util
.concurrent
.atomic
.AtomicLong
;
30 import java
.util
.concurrent
.atomic
.AtomicReference
;
31 import org
.apache
.hadoop
.conf
.Configuration
;
32 import org
.apache
.hadoop
.fs
.Path
;
33 import org
.apache
.hadoop
.hbase
.Cell
;
34 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
35 import org
.apache
.hadoop
.hbase
.HBaseConfiguration
;
36 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
37 import org
.apache
.hadoop
.hbase
.HConstants
;
38 import org
.apache
.hadoop
.hbase
.TableName
;
39 import org
.apache
.hadoop
.hbase
.Waiter
;
40 import org
.apache
.hadoop
.hbase
.coprocessor
.CoreCoprocessor
;
41 import org
.apache
.hadoop
.hbase
.coprocessor
.ObserverContext
;
42 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionCoprocessor
;
43 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionCoprocessorEnvironment
;
44 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionObserver
;
45 import org
.apache
.hadoop
.hbase
.regionserver
.RegionServerStoppedException
;
46 import org
.apache
.hadoop
.hbase
.regionserver
.StorefileRefresherChore
;
47 import org
.apache
.hadoop
.hbase
.regionserver
.TestHRegionServerBulkLoad
;
48 import org
.apache
.hadoop
.hbase
.replication
.ReplicationPeerConfig
;
49 import org
.apache
.hadoop
.hbase
.testclassification
.ClientTests
;
50 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
51 import org
.apache
.hadoop
.hbase
.tool
.BulkLoadHFiles
;
52 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
53 import org
.apache
.hadoop
.hbase
.zookeeper
.MiniZooKeeperCluster
;
54 import org
.junit
.AfterClass
;
55 import org
.junit
.Assert
;
56 import org
.junit
.BeforeClass
;
57 import org
.junit
.ClassRule
;
58 import org
.junit
.Test
;
59 import org
.junit
.experimental
.categories
.Category
;
60 import org
.slf4j
.Logger
;
61 import org
.slf4j
.LoggerFactory
;
63 @Category({LargeTests
.class, ClientTests
.class})
64 public class TestReplicaWithCluster
{
67 public static final HBaseClassTestRule CLASS_RULE
=
68 HBaseClassTestRule
.forClass(TestReplicaWithCluster
.class);
70 private static final Logger LOG
= LoggerFactory
.getLogger(TestReplicaWithCluster
.class);
72 private static final int NB_SERVERS
= 3;
73 private static final byte[] row
= Bytes
.toBytes(TestReplicaWithCluster
.class.getName());
74 private static final HBaseTestingUtil HTU
= new HBaseTestingUtil();
76 // second minicluster used in testing of replication
77 private static HBaseTestingUtil HTU2
;
78 private static final byte[] f
= HConstants
.CATALOG_FAMILY
;
80 private final static int REFRESH_PERIOD
= 1000;
81 private final static int META_SCAN_TIMEOUT_IN_MILLISEC
= 200;
84 * This copro is used to synchronize the tests.
86 public static class SlowMeCopro
implements RegionCoprocessor
, RegionObserver
{
87 static final AtomicLong sleepTime
= new AtomicLong(0);
88 static final AtomicReference
<CountDownLatch
> cdl
= new AtomicReference
<>(new CountDownLatch(0));
90 public SlowMeCopro() {
94 public Optional
<RegionObserver
> getRegionObserver() {
95 return Optional
.of(this);
99 public void preGetOp(final ObserverContext
<RegionCoprocessorEnvironment
> e
,
100 final Get get
, final List
<Cell
> results
) throws IOException
{
102 if (e
.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
103 CountDownLatch latch
= cdl
.get();
105 if (sleepTime
.get() > 0) {
106 LOG
.info("Sleeping for " + sleepTime
.get() + " ms");
107 Thread
.sleep(sleepTime
.get());
108 } else if (latch
.getCount() > 0) {
109 LOG
.info("Waiting for the counterCountDownLatch");
110 latch
.await(2, TimeUnit
.MINUTES
); // To help the tests to finish.
111 if (latch
.getCount() > 0) {
112 throw new RuntimeException("Can't wait more");
115 } catch (InterruptedException e1
) {
116 LOG
.error(e1
.toString(), e1
);
119 LOG
.info("We're not the primary replicas.");
125 * This copro is used to simulate region server down exception for Get and Scan
128 public static class RegionServerStoppedCopro
implements RegionCoprocessor
, RegionObserver
{
130 public RegionServerStoppedCopro() {
134 public Optional
<RegionObserver
> getRegionObserver() {
135 return Optional
.of(this);
139 public void preGetOp(final ObserverContext
<RegionCoprocessorEnvironment
> e
,
140 final Get get
, final List
<Cell
> results
) throws IOException
{
142 int replicaId
= e
.getEnvironment().getRegion().getRegionInfo().getReplicaId();
144 // Fail for the primary replica and replica 1
145 if (e
.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) {
146 LOG
.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId
);
147 throw new RegionServerStoppedException("Server " + e
.getEnvironment().getServerName()
150 LOG
.info("We're replica region " + replicaId
);
155 public void preScannerOpen(final ObserverContext
<RegionCoprocessorEnvironment
> e
,
156 final Scan scan
) throws IOException
{
157 int replicaId
= e
.getEnvironment().getRegion().getRegionInfo().getReplicaId();
158 // Fail for the primary replica and replica 1
159 if (e
.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) {
160 LOG
.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId
);
161 throw new RegionServerStoppedException("Server " + e
.getEnvironment().getServerName()
164 LOG
.info("We're replica region " + replicaId
);
170 * This copro is used to slow down the primary meta region scan a bit
172 public static class RegionServerHostingPrimayMetaRegionSlowOrStopCopro
173 implements RegionCoprocessor
, RegionObserver
{
174 static boolean slowDownPrimaryMetaScan
= false;
175 static boolean throwException
= false;
178 public Optional
<RegionObserver
> getRegionObserver() {
179 return Optional
.of(this);
183 public void preGetOp(final ObserverContext
<RegionCoprocessorEnvironment
> e
,
184 final Get get
, final List
<Cell
> results
) throws IOException
{
186 int replicaId
= e
.getEnvironment().getRegion().getRegionInfo().getReplicaId();
188 // Fail for the primary replica, but not for meta
189 if (throwException
) {
190 if (!e
.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId
== 0)) {
191 LOG
.info("Get, throw Region Server Stopped Exceptoin for region " + e
.getEnvironment()
192 .getRegion().getRegionInfo());
193 throw new RegionServerStoppedException("Server " + e
.getEnvironment().getServerName()
197 LOG
.info("Get, We're replica region " + replicaId
);
202 public void preScannerOpen(final ObserverContext
<RegionCoprocessorEnvironment
> e
,
203 final Scan scan
) throws IOException
{
205 int replicaId
= e
.getEnvironment().getRegion().getRegionInfo().getReplicaId();
207 // Slow down with the primary meta region scan
208 if (e
.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId
== 0)) {
209 if (slowDownPrimaryMetaScan
) {
210 LOG
.info("Scan with primary meta region, slow down a bit");
212 Thread
.sleep(META_SCAN_TIMEOUT_IN_MILLISEC
- 50);
213 } catch (InterruptedException ie
) {
218 // Fail for the primary replica
219 if (throwException
) {
220 LOG
.info("Scan, throw Region Server Stopped Exceptoin for replica " + e
.getEnvironment()
221 .getRegion().getRegionInfo());
223 throw new RegionServerStoppedException("Server " + e
.getEnvironment().getServerName()
226 LOG
.info("Scan, We're replica region " + replicaId
);
229 LOG
.info("Scan, We're replica region " + replicaId
);
235 public static void beforeClass() throws Exception
{
236 // enable store file refreshing
237 HTU
.getConfiguration().setInt(StorefileRefresherChore
.REGIONSERVER_STOREFILE_REFRESH_PERIOD
,
240 HTU
.getConfiguration().setFloat("hbase.regionserver.logroll.multiplier", 0.0001f
);
241 HTU
.getConfiguration().setInt("replication.source.size.capacity", 10240);
242 HTU
.getConfiguration().setLong("replication.source.sleepforretries", 100);
243 HTU
.getConfiguration().setInt("hbase.regionserver.maxlogs", 2);
244 HTU
.getConfiguration().setLong("hbase.master.logcleaner.ttl", 10);
245 HTU
.getConfiguration().setInt("zookeeper.recovery.retry", 1);
246 HTU
.getConfiguration().setInt("zookeeper.recovery.retry.intervalmill", 10);
248 // Wait for primary call longer so make sure that it will get exception from the primary call
249 HTU
.getConfiguration().setInt("hbase.client.primaryCallTimeout.get", 1000000);
250 HTU
.getConfiguration().setInt("hbase.client.primaryCallTimeout.scan", 1000000);
252 // Make sure master does not host system tables.
253 HTU
.getConfiguration().set("hbase.balancer.tablesOnMaster", "none");
255 // Set system coprocessor so it can be applied to meta regions
256 HTU
.getConfiguration().set("hbase.coprocessor.region.classes",
257 RegionServerHostingPrimayMetaRegionSlowOrStopCopro
.class.getName());
259 HTU
.getConfiguration().setInt(HConstants
.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT
,
260 META_SCAN_TIMEOUT_IN_MILLISEC
* 1000);
262 HTU
.startMiniCluster(NB_SERVERS
);
263 // Enable meta replica at server side
264 HBaseTestingUtil
.setReplicas(HTU
.getAdmin(), TableName
.META_TABLE_NAME
, 2);
266 HTU
.getHBaseCluster().startMaster();
270 public static void afterClass() throws Exception
{
272 HTU2
.shutdownMiniCluster();
273 HTU
.shutdownMiniCluster();
277 public void testCreateDeleteTable() throws IOException
{
278 // Create table then get the single region for our new table.
279 TableDescriptorBuilder builder
=
280 HTU
.createModifyableTableDescriptor(TableName
.valueOf("testCreateDeleteTable"),
281 ColumnFamilyDescriptorBuilder
.DEFAULT_MIN_VERSIONS
, 3, HConstants
.FOREVER
,
282 ColumnFamilyDescriptorBuilder
.DEFAULT_KEEP_DELETED
);
283 builder
.setRegionReplication(NB_SERVERS
);
284 builder
.setCoprocessor(SlowMeCopro
.class.getName());
285 TableDescriptor hdt
= builder
.build();
286 Table table
= HTU
.createTable(hdt
, new byte[][] { f
}, null);
288 Put p
= new Put(row
);
289 p
.addColumn(f
, row
, row
);
292 Get g
= new Get(row
);
293 Result r
= table
.get(g
);
294 Assert
.assertFalse(r
.isStale());
297 // But if we ask for stale we will get it
298 SlowMeCopro
.cdl
.set(new CountDownLatch(1));
300 g
.setConsistency(Consistency
.TIMELINE
);
302 Assert
.assertTrue(r
.isStale());
303 SlowMeCopro
.cdl
.get().countDown();
305 SlowMeCopro
.cdl
.get().countDown();
306 SlowMeCopro
.sleepTime
.set(0);
309 HTU
.getAdmin().disableTable(hdt
.getTableName());
310 HTU
.deleteTable(hdt
.getTableName());
314 public void testChangeTable() throws Exception
{
315 TableDescriptor td
= TableDescriptorBuilder
.newBuilder(TableName
.valueOf("testChangeTable"))
316 .setRegionReplication(NB_SERVERS
)
317 .setCoprocessor(SlowMeCopro
.class.getName())
318 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of(f
))
320 HTU
.getAdmin().createTable(td
);
321 Table table
= HTU
.getConnection().getTable(td
.getTableName());
322 // basic test: it should work.
323 Put p
= new Put(row
);
324 p
.addColumn(f
, row
, row
);
327 Get g
= new Get(row
);
328 Result r
= table
.get(g
);
329 Assert
.assertFalse(r
.isStale());
331 // Add a CF, it should work.
332 TableDescriptor bHdt
= HTU
.getAdmin().getDescriptor(td
.getTableName());
333 td
= TableDescriptorBuilder
.newBuilder(td
)
334 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of(row
))
336 HTU
.getAdmin().disableTable(td
.getTableName());
337 HTU
.getAdmin().modifyTable(td
);
338 HTU
.getAdmin().enableTable(td
.getTableName());
339 TableDescriptor nHdt
= HTU
.getAdmin().getDescriptor(td
.getTableName());
340 Assert
.assertEquals("fams=" + Arrays
.toString(nHdt
.getColumnFamilies()),
341 bHdt
.getColumnFamilyCount() + 1, nHdt
.getColumnFamilyCount());
344 p
.addColumn(row
, row
, row
);
349 Assert
.assertFalse(r
.isStale());
352 SlowMeCopro
.cdl
.set(new CountDownLatch(1));
354 g
.setConsistency(Consistency
.TIMELINE
);
356 Assert
.assertTrue(r
.isStale());
358 SlowMeCopro
.cdl
.get().countDown();
359 SlowMeCopro
.sleepTime
.set(0);
362 Admin admin
= HTU
.getAdmin();
363 nHdt
=admin
.getDescriptor(td
.getTableName());
364 Assert
.assertEquals("fams=" + Arrays
.toString(nHdt
.getColumnFamilies()),
365 bHdt
.getColumnFamilyCount() + 1, nHdt
.getColumnFamilyCount());
367 admin
.disableTable(td
.getTableName());
368 admin
.deleteTable(td
.getTableName());
372 @SuppressWarnings("deprecation")
374 public void testReplicaAndReplication() throws Exception
{
375 TableDescriptorBuilder builder
=
376 HTU
.createModifyableTableDescriptor("testReplicaAndReplication");
377 builder
.setRegionReplication(NB_SERVERS
);
378 builder
.setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(row
)
379 .setScope(HConstants
.REPLICATION_SCOPE_GLOBAL
).build());
381 builder
.setCoprocessor(SlowMeCopro
.class.getName());
382 TableDescriptor tableDescriptor
= builder
.build();
383 HTU
.getAdmin().createTable(tableDescriptor
, HBaseTestingUtil
.KEYS_FOR_HBA_CREATE_TABLE
);
385 Configuration conf2
= HBaseConfiguration
.create(HTU
.getConfiguration());
386 conf2
.set(HConstants
.HBASE_CLIENT_INSTANCE_ID
, String
.valueOf(-1));
387 conf2
.set(HConstants
.ZOOKEEPER_ZNODE_PARENT
, "/2");
388 MiniZooKeeperCluster miniZK
= HTU
.getZkCluster();
390 HTU2
= new HBaseTestingUtil(conf2
);
391 HTU2
.setZkCluster(miniZK
);
392 HTU2
.startMiniCluster(NB_SERVERS
);
393 LOG
.info("Setup second Zk");
394 HTU2
.getAdmin().createTable(tableDescriptor
, HBaseTestingUtil
.KEYS_FOR_HBA_CREATE_TABLE
);
396 try (Connection connection
= ConnectionFactory
.createConnection(HTU
.getConfiguration());
397 Admin admin
= connection
.getAdmin()) {
398 ReplicationPeerConfig rpc
= ReplicationPeerConfig
.newBuilder()
399 .setClusterKey(HTU2
.getClusterKey()).build();
400 admin
.addReplicationPeer("2", rpc
);
403 Put p
= new Put(row
);
404 p
.addColumn(row
, row
, row
);
405 final Table table
= HTU
.getConnection().getTable(tableDescriptor
.getTableName());
408 HTU
.getAdmin().flush(table
.getName());
409 LOG
.info("Put & flush done on the first cluster. Now doing a get on the same cluster.");
411 Waiter
.waitFor(HTU
.getConfiguration(), 1000, new Waiter
.Predicate
<Exception
>() {
412 @Override public boolean evaluate() throws Exception
{
414 SlowMeCopro
.cdl
.set(new CountDownLatch(1));
415 Get g
= new Get(row
);
416 g
.setConsistency(Consistency
.TIMELINE
);
417 Result r
= table
.get(g
);
418 Assert
.assertTrue(r
.isStale());
421 SlowMeCopro
.cdl
.get().countDown();
422 SlowMeCopro
.sleepTime
.set(0);
427 LOG
.info("stale get on the first cluster done. Now for the second.");
429 final Table table2
= HTU
.getConnection().getTable(tableDescriptor
.getTableName());
430 Waiter
.waitFor(HTU
.getConfiguration(), 1000, new Waiter
.Predicate
<Exception
>() {
431 @Override public boolean evaluate() throws Exception
{
433 SlowMeCopro
.cdl
.set(new CountDownLatch(1));
434 Get g
= new Get(row
);
435 g
.setConsistency(Consistency
.TIMELINE
);
436 Result r
= table2
.get(g
);
437 Assert
.assertTrue(r
.isStale());
440 SlowMeCopro
.cdl
.get().countDown();
441 SlowMeCopro
.sleepTime
.set(0);
447 HTU
.getAdmin().disableTable(tableDescriptor
.getTableName());
448 HTU
.deleteTable(tableDescriptor
.getTableName());
450 HTU2
.getAdmin().disableTable(tableDescriptor
.getTableName());
451 HTU2
.deleteTable(tableDescriptor
.getTableName());
453 // We shutdown HTU2 minicluster later, in afterClass(), as shutting down
454 // the minicluster has negative impact of deleting all HConnections in JVM.
458 public void testBulkLoad() throws IOException
{
459 // Create table then get the single region for our new table.
460 LOG
.debug("Creating test table");
461 TableDescriptorBuilder builder
= HTU
.createModifyableTableDescriptor(
462 TableName
.valueOf("testBulkLoad"), ColumnFamilyDescriptorBuilder
.DEFAULT_MIN_VERSIONS
, 3,
463 HConstants
.FOREVER
, ColumnFamilyDescriptorBuilder
.DEFAULT_KEEP_DELETED
);
464 builder
.setRegionReplication(NB_SERVERS
);
465 builder
.setCoprocessor(SlowMeCopro
.class.getName());
466 TableDescriptor hdt
= builder
.build();
467 Table table
= HTU
.createTable(hdt
, new byte[][] { f
}, null);
469 // create hfiles to load.
470 LOG
.debug("Creating test data");
471 Path dir
= HTU
.getDataTestDirOnTestFS("testBulkLoad");
472 final int numRows
= 10;
473 final byte[] qual
= Bytes
.toBytes("qual");
474 final byte[] val
= Bytes
.toBytes("val");
475 Map
<byte[], List
<Path
>> family2Files
= new TreeMap
<>(Bytes
.BYTES_COMPARATOR
);
476 for (ColumnFamilyDescriptor col
: hdt
.getColumnFamilies()) {
477 Path hfile
= new Path(dir
, col
.getNameAsString());
478 TestHRegionServerBulkLoad
.createHFile(HTU
.getTestFileSystem(), hfile
, col
.getName(), qual
,
480 family2Files
.put(col
.getName(), Collections
.singletonList(hfile
));
484 LOG
.debug("Loading test data");
485 BulkLoadHFiles
.create(HTU
.getConfiguration()).bulkLoad(hdt
.getTableName(), family2Files
);
487 // verify we can read them from the primary
488 LOG
.debug("Verifying data load");
489 for (int i
= 0; i
< numRows
; i
++) {
490 byte[] row
= TestHRegionServerBulkLoad
.rowkey(i
);
491 Get g
= new Get(row
);
492 Result r
= table
.get(g
);
493 Assert
.assertFalse(r
.isStale());
496 // verify we can read them from the replica
497 LOG
.debug("Verifying replica queries");
499 SlowMeCopro
.cdl
.set(new CountDownLatch(1));
500 for (int i
= 0; i
< numRows
; i
++) {
501 byte[] row
= TestHRegionServerBulkLoad
.rowkey(i
);
502 Get g
= new Get(row
);
503 g
.setConsistency(Consistency
.TIMELINE
);
504 Result r
= table
.get(g
);
505 Assert
.assertTrue(r
.isStale());
507 SlowMeCopro
.cdl
.get().countDown();
509 SlowMeCopro
.cdl
.get().countDown();
510 SlowMeCopro
.sleepTime
.set(0);
513 HTU
.getAdmin().disableTable(hdt
.getTableName());
514 HTU
.deleteTable(hdt
.getTableName());
518 public void testReplicaGetWithPrimaryDown() throws IOException
{
519 // Create table then get the single region for our new table.
520 TableDescriptorBuilder builder
=
521 HTU
.createModifyableTableDescriptor(TableName
.valueOf("testCreateDeleteTable"),
522 ColumnFamilyDescriptorBuilder
.DEFAULT_MIN_VERSIONS
, 3, HConstants
.FOREVER
,
523 ColumnFamilyDescriptorBuilder
.DEFAULT_KEEP_DELETED
);
524 builder
.setRegionReplication(NB_SERVERS
);
525 builder
.setCoprocessor(RegionServerStoppedCopro
.class.getName());
526 TableDescriptor hdt
= builder
.build();
528 Table table
= HTU
.createTable(hdt
, new byte[][] { f
}, null);
530 Put p
= new Put(row
);
531 p
.addColumn(f
, row
, row
);
534 // Flush so it can be picked by the replica refresher thread
535 HTU
.flush(table
.getName());
537 // Sleep for some time until data is picked up by replicas
539 Thread
.sleep(2 * REFRESH_PERIOD
);
540 } catch (InterruptedException e1
) {
541 LOG
.error(e1
.toString(), e1
);
544 // But if we ask for stale we will get it
545 Get g
= new Get(row
);
546 g
.setConsistency(Consistency
.TIMELINE
);
547 Result r
= table
.get(g
);
548 Assert
.assertTrue(r
.isStale());
550 HTU
.getAdmin().disableTable(hdt
.getTableName());
551 HTU
.deleteTable(hdt
.getTableName());
556 public void testReplicaScanWithPrimaryDown() throws IOException
{
557 // Create table then get the single region for our new table.
558 TableDescriptorBuilder builder
=
559 HTU
.createModifyableTableDescriptor(TableName
.valueOf("testCreateDeleteTable"),
560 ColumnFamilyDescriptorBuilder
.DEFAULT_MIN_VERSIONS
, 3, HConstants
.FOREVER
,
561 ColumnFamilyDescriptorBuilder
.DEFAULT_KEEP_DELETED
);
562 builder
.setRegionReplication(NB_SERVERS
);
563 builder
.setCoprocessor(RegionServerStoppedCopro
.class.getName());
564 TableDescriptor hdt
= builder
.build();
566 Table table
= HTU
.createTable(hdt
, new byte[][] { f
}, null);
568 Put p
= new Put(row
);
569 p
.addColumn(f
, row
, row
);
572 // Flush so it can be picked by the replica refresher thread
573 HTU
.flush(table
.getName());
575 // Sleep for some time until data is picked up by replicas
577 Thread
.sleep(2 * REFRESH_PERIOD
);
578 } catch (InterruptedException e1
) {
579 LOG
.error(e1
.toString(), e1
);
582 // But if we ask for stale we will get it
583 // Instantiating the Scan class
584 Scan scan
= new Scan();
586 // Scanning the required columns
588 scan
.setConsistency(Consistency
.TIMELINE
);
590 // Getting the scan result
591 ResultScanner scanner
= table
.getScanner(scan
);
593 Result r
= scanner
.next();
595 Assert
.assertTrue(r
.isStale());
597 HTU
.getAdmin().disableTable(hdt
.getTableName());
598 HTU
.deleteTable(hdt
.getTableName());
603 public void testReplicaGetWithAsyncRpcClientImpl() throws IOException
{
604 HTU
.getConfiguration().setBoolean("hbase.ipc.client.specificThreadForWriting", true);
605 HTU
.getConfiguration().set("hbase.rpc.client.impl",
606 "org.apache.hadoop.hbase.ipc.AsyncRpcClient");
607 // Create table then get the single region for our new table.
608 TableDescriptorBuilder builder
=
609 HTU
.createModifyableTableDescriptor(TableName
.valueOf("testReplicaGetWithAsyncRpcClientImpl"),
610 ColumnFamilyDescriptorBuilder
.DEFAULT_MIN_VERSIONS
, 3, HConstants
.FOREVER
,
611 ColumnFamilyDescriptorBuilder
.DEFAULT_KEEP_DELETED
);
612 builder
.setRegionReplication(NB_SERVERS
);
613 builder
.setCoprocessor(SlowMeCopro
.class.getName());
614 TableDescriptor hdt
= builder
.build();
616 Table table
= HTU
.createTable(hdt
, new byte[][] { f
}, null);
618 Put p
= new Put(row
);
619 p
.addColumn(f
, row
, row
);
622 // Flush so it can be picked by the replica refresher thread
623 HTU
.flush(table
.getName());
625 // Sleep for some time until data is picked up by replicas
627 Thread
.sleep(2 * REFRESH_PERIOD
);
628 } catch (InterruptedException e1
) {
629 LOG
.error(e1
.toString(), e1
);
633 // Create the new connection so new config can kick in
634 Connection connection
= ConnectionFactory
.createConnection(HTU
.getConfiguration());
635 Table t
= connection
.getTable(hdt
.getTableName());
637 // But if we ask for stale we will get it
638 SlowMeCopro
.cdl
.set(new CountDownLatch(1));
639 Get g
= new Get(row
);
640 g
.setConsistency(Consistency
.TIMELINE
);
642 Assert
.assertTrue(r
.isStale());
643 SlowMeCopro
.cdl
.get().countDown();
645 SlowMeCopro
.cdl
.get().countDown();
646 SlowMeCopro
.sleepTime
.set(0);
649 HTU
.getConfiguration().unset("hbase.ipc.client.specificThreadForWriting");
650 HTU
.getConfiguration().unset("hbase.rpc.client.impl");
651 HTU
.getAdmin().disableTable(hdt
.getTableName());
652 HTU
.deleteTable(hdt
.getTableName());