2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.junit
.Assert
.assertEquals
;
21 import static org
.junit
.Assert
.assertTrue
;
22 import static org
.junit
.Assert
.fail
;
24 import java
.io
.IOException
;
25 import java
.util
.ArrayList
;
26 import java
.util
.List
;
27 import java
.util
.Optional
;
28 import java
.util
.concurrent
.atomic
.AtomicBoolean
;
29 import java
.util
.concurrent
.atomic
.AtomicInteger
;
30 import org
.apache
.hadoop
.hbase
.Cell
;
31 import org
.apache
.hadoop
.hbase
.CellUtil
;
32 import org
.apache
.hadoop
.hbase
.CoprocessorEnvironment
;
33 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
34 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
35 import org
.apache
.hadoop
.hbase
.HConstants
;
36 import org
.apache
.hadoop
.hbase
.TableName
;
37 import org
.apache
.hadoop
.hbase
.Waiter
;
38 import org
.apache
.hadoop
.hbase
.codec
.KeyValueCodec
;
39 import org
.apache
.hadoop
.hbase
.coprocessor
.CoprocessorHost
;
40 import org
.apache
.hadoop
.hbase
.coprocessor
.MasterCoprocessor
;
41 import org
.apache
.hadoop
.hbase
.coprocessor
.MasterCoprocessorEnvironment
;
42 import org
.apache
.hadoop
.hbase
.coprocessor
.MasterObserver
;
43 import org
.apache
.hadoop
.hbase
.coprocessor
.ObserverContext
;
44 import org
.apache
.hadoop
.hbase
.master
.RegionPlan
;
45 import org
.apache
.hadoop
.hbase
.testclassification
.FlakeyTests
;
46 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
47 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
48 import org
.apache
.hadoop
.hbase
.util
.JVMClusterUtil
;
49 import org
.junit
.AfterClass
;
50 import org
.junit
.Assert
;
51 import org
.junit
.Before
;
52 import org
.junit
.BeforeClass
;
53 import org
.junit
.ClassRule
;
54 import org
.junit
.Test
;
55 import org
.junit
.experimental
.categories
.Category
;
56 import org
.slf4j
.Logger
;
57 import org
.slf4j
.LoggerFactory
;
59 @Category({MediumTests
.class, FlakeyTests
.class})
60 public class TestMultiParallel
{
63 public static final HBaseClassTestRule CLASS_RULE
=
64 HBaseClassTestRule
.forClass(TestMultiParallel
.class);
66 private static final Logger LOG
= LoggerFactory
.getLogger(TestMultiParallel
.class);
68 private static final HBaseTestingUtil UTIL
= new HBaseTestingUtil();
69 private static final byte[] VALUE
= Bytes
.toBytes("value");
70 private static final byte[] QUALIFIER
= Bytes
.toBytes("qual");
71 private static final String FAMILY
= "family";
72 private static final TableName TEST_TABLE
= TableName
.valueOf("multi_test_table");
73 private static final byte[] BYTES_FAMILY
= Bytes
.toBytes(FAMILY
);
74 private static final byte[] ONE_ROW
= Bytes
.toBytes("xxx");
75 private static final byte [][] KEYS
= makeKeys();
77 private static final int slaves
= 5; // also used for testing HTable pool size
78 private static Connection CONNECTION
;
81 public static void beforeClass() throws Exception
{
82 // Uncomment the following lines if more verbosity is needed for
83 // debugging (see HBASE-12285 for details).
84 //((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL);
85 //((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.ALL);
86 //((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL);
87 UTIL
.getConfiguration().set(HConstants
.RPC_CODEC_CONF_KEY
,
88 KeyValueCodec
.class.getCanonicalName());
89 // Disable table on master for now as the feature is broken
90 //UTIL.getConfiguration().setBoolean(LoadBalancer.TABLES_ON_MASTER, true);
91 // We used to ask for system tables on Master exclusively but not needed by test and doesn't
92 // work anyways -- so commented out.
93 // UTIL.getConfiguration().setBoolean(LoadBalancer.SYSTEM_TABLES_ON_MASTER, true);
94 UTIL
.getConfiguration()
95 .set(CoprocessorHost
.MASTER_COPROCESSOR_CONF_KEY
, MyMasterObserver
.class.getName());
96 UTIL
.startMiniCluster(slaves
);
97 Table t
= UTIL
.createMultiRegionTable(TEST_TABLE
, Bytes
.toBytes(FAMILY
));
98 UTIL
.waitTableEnabled(TEST_TABLE
);
100 CONNECTION
= ConnectionFactory
.createConnection(UTIL
.getConfiguration());
101 assertTrue(MyMasterObserver
.start
.get());
105 public static void afterClass() throws Exception
{
107 UTIL
.shutdownMiniCluster();
111 public void before() throws Exception
{
112 final int balanceCount
= MyMasterObserver
.postBalanceCount
.get();
114 if (UTIL
.ensureSomeRegionServersAvailable(slaves
)) {
115 // Distribute regions
116 UTIL
.getMiniHBaseCluster().getMaster().balance();
117 // Some plans are created.
118 if (MyMasterObserver
.postBalanceCount
.get() > balanceCount
) {
119 // It is necessary to wait the move procedure to start.
120 // Otherwise, the next wait may pass immediately.
121 UTIL
.waitFor(3 * 1000, 100, false, () ->
122 UTIL
.getMiniHBaseCluster().getMaster().getAssignmentManager().hasRegionsInTransition()
126 // Wait until completing balance
127 UTIL
.waitUntilAllRegionsAssigned(TEST_TABLE
);
129 LOG
.info("before done");
132 private static byte[][] makeKeys() {
133 byte [][] starterKeys
= HBaseTestingUtil
.KEYS
;
134 // Create a "non-uniform" test set with the following characteristics:
135 // a) Unequal number of keys per region
137 // Don't use integer as a multiple, so that we have a number of keys that is
138 // not a multiple of the number of regions
139 int numKeys
= (int) (starterKeys
.length
* 10.33F
);
141 List
<byte[]> keys
= new ArrayList
<>();
142 for (int i
= 0; i
< numKeys
; i
++) {
143 int kIdx
= i
% starterKeys
.length
;
144 byte[] k
= starterKeys
[kIdx
];
145 byte[] cp
= new byte[k
.length
+ 1];
146 System
.arraycopy(k
, 0, cp
, 0, k
.length
);
147 cp
[k
.length
] = new Integer(i
% 256).byteValue();
151 // b) Same duplicate keys (showing multiple Gets/Puts to the same row, which
153 // c) keys are not in sorted order (within a region), to ensure that the
154 // sorting code and index mapping doesn't break the functionality
155 for (int i
= 0; i
< 100; i
++) {
156 int kIdx
= i
% starterKeys
.length
;
157 byte[] k
= starterKeys
[kIdx
];
158 byte[] cp
= new byte[k
.length
+ 1];
159 System
.arraycopy(k
, 0, cp
, 0, k
.length
);
160 cp
[k
.length
] = new Integer(i
% 256).byteValue();
163 return keys
.toArray(new byte [][] {new byte [] {}});
167 public void testBatchWithGet() throws Exception
{
168 LOG
.info("test=testBatchWithGet");
169 Table table
= UTIL
.getConnection().getTable(TEST_TABLE
);
172 List
<Put
> puts
= constructPutRequests();
173 table
.batch(puts
, null);
175 // create a list of gets and run it
176 List
<Row
> gets
= new ArrayList
<>();
177 for (byte[] k
: KEYS
) {
178 Get get
= new Get(k
);
179 get
.addColumn(BYTES_FAMILY
, QUALIFIER
);
182 Result
[] multiRes
= new Result
[gets
.size()];
183 table
.batch(gets
, multiRes
);
185 // Same gets using individual call API
186 List
<Result
> singleRes
= new ArrayList
<>();
187 for (Row get
: gets
) {
188 singleRes
.add(table
.get((Get
) get
));
191 Assert
.assertEquals(singleRes
.size(), multiRes
.length
);
192 for (int i
= 0; i
< singleRes
.size(); i
++) {
193 Assert
.assertTrue(singleRes
.get(i
).containsColumn(BYTES_FAMILY
, QUALIFIER
));
194 Cell
[] singleKvs
= singleRes
.get(i
).rawCells();
195 Cell
[] multiKvs
= multiRes
[i
].rawCells();
196 for (int j
= 0; j
< singleKvs
.length
; j
++) {
197 Assert
.assertEquals(singleKvs
[j
], multiKvs
[j
]);
198 Assert
.assertEquals(0, Bytes
.compareTo(CellUtil
.cloneValue(singleKvs
[j
]),
199 CellUtil
.cloneValue(multiKvs
[j
])));
206 public void testBadFam() throws Exception
{
207 LOG
.info("test=testBadFam");
208 Table table
= UTIL
.getConnection().getTable(TEST_TABLE
);
210 List
<Row
> actions
= new ArrayList
<>();
211 Put p
= new Put(Bytes
.toBytes("row1"));
212 p
.addColumn(Bytes
.toBytes("bad_family"), Bytes
.toBytes("qual"), Bytes
.toBytes("value"));
214 p
= new Put(Bytes
.toBytes("row2"));
215 p
.addColumn(BYTES_FAMILY
, Bytes
.toBytes("qual"), Bytes
.toBytes("value"));
218 // row1 and row2 should be in the same region.
220 Object
[] r
= new Object
[actions
.size()];
222 table
.batch(actions
, r
);
224 } catch (RetriesExhaustedException ex
) {
227 assertEquals(2, r
.length
);
228 assertTrue(r
[0] instanceof Throwable
);
229 assertTrue(r
[1] instanceof Result
);
234 public void testFlushCommitsNoAbort() throws Exception
{
235 LOG
.info("test=testFlushCommitsNoAbort");
236 doTestFlushCommits(false);
240 * Only run one Multi test with a forced RegionServer abort. Otherwise, the
241 * unit tests will take an unnecessarily long time to run.
244 public void testFlushCommitsWithAbort() throws Exception
{
245 LOG
.info("test=testFlushCommitsWithAbort");
246 doTestFlushCommits(true);
250 * Set table auto flush to false and test flushing commits
251 * @param doAbort true if abort one regionserver in the testing
253 private void doTestFlushCommits(boolean doAbort
) throws Exception
{
255 LOG
.info("get new table");
256 Table table
= UTIL
.getConnection().getTable(TEST_TABLE
);
258 LOG
.info("constructPutRequests");
259 List
<Put
> puts
= constructPutRequests();
262 final int liveRScount
= UTIL
.getMiniHBaseCluster().getLiveRegionServerThreads()
264 assert liveRScount
> 0;
265 JVMClusterUtil
.RegionServerThread liveRS
= UTIL
.getMiniHBaseCluster()
266 .getLiveRegionServerThreads().get(0);
268 liveRS
.getRegionServer().abort("Aborting for tests",
269 new Exception("doTestFlushCommits"));
270 // If we wait for no regions being online after we abort the server, we
271 // could ensure the master has re-assigned the regions on killed server
272 // after writing successfully. It means the server we aborted is dead
273 // and detected by matser
274 while (liveRS
.getRegionServer().getNumberOfOnlineRegions() != 0) {
277 // try putting more keys after the abort. same key/qual... just validating
278 // no exceptions thrown
279 puts
= constructPutRequests();
283 LOG
.info("validating loaded data");
284 validateLoadedData(table
);
286 // Validate server and region count
287 List
<JVMClusterUtil
.RegionServerThread
> liveRSs
= UTIL
.getMiniHBaseCluster().getLiveRegionServerThreads();
289 for (JVMClusterUtil
.RegionServerThread t
: liveRSs
) {
291 LOG
.info("Count=" + count
+ ", Alive=" + t
.getRegionServer());
293 LOG
.info("Count=" + count
);
294 Assert
.assertEquals("Server count=" + count
+ ", abort=" + doAbort
,
295 (doAbort ?
(liveRScount
- 1) : liveRScount
), count
);
297 UTIL
.getMiniHBaseCluster().waitOnRegionServer(0);
298 UTIL
.waitFor(15 * 1000, new Waiter
.Predicate
<Exception
>() {
300 public boolean evaluate() throws Exception
{
301 // We disable regions on master so the count should be liveRScount - 1
302 return UTIL
.getMiniHBaseCluster().getMaster()
303 .getClusterMetrics().getLiveServerMetrics().size() == liveRScount
- 1;
306 UTIL
.waitFor(15 * 1000, UTIL
.predicateNoRegionsInTransition());
314 public void testBatchWithPut() throws Exception
{
315 LOG
.info("test=testBatchWithPut");
316 Table table
= CONNECTION
.getTable(TEST_TABLE
);
317 // put multiple rows using a batch
318 List
<Put
> puts
= constructPutRequests();
320 Object
[] results
= new Object
[puts
.size()];
321 table
.batch(puts
, results
);
322 validateSizeAndEmpty(results
, KEYS
.length
);
325 int liveRScount
= UTIL
.getMiniHBaseCluster().getLiveRegionServerThreads().size();
326 assert liveRScount
> 0;
327 JVMClusterUtil
.RegionServerThread liveRS
=
328 UTIL
.getMiniHBaseCluster().getLiveRegionServerThreads().get(0);
329 liveRS
.getRegionServer().abort("Aborting for tests", new Exception("testBatchWithPut"));
330 puts
= constructPutRequests();
332 results
= new Object
[puts
.size()];
333 table
.batch(puts
, results
);
334 } catch (RetriesExhaustedWithDetailsException ree
) {
335 LOG
.info(ree
.getExhaustiveDescription());
339 validateSizeAndEmpty(results
, KEYS
.length
);
342 validateLoadedData(table
);
347 public void testBatchWithDelete() throws Exception
{
348 LOG
.info("test=testBatchWithDelete");
349 Table table
= UTIL
.getConnection().getTable(TEST_TABLE
);
352 List
<Put
> puts
= constructPutRequests();
353 Object
[] results
= new Object
[puts
.size()];
354 table
.batch(puts
, results
);
355 validateSizeAndEmpty(results
, KEYS
.length
);
358 List
<Row
> deletes
= new ArrayList
<>();
359 for (int i
= 0; i
< KEYS
.length
; i
++) {
360 Delete delete
= new Delete(KEYS
[i
]);
361 delete
.addFamily(BYTES_FAMILY
);
364 results
= new Object
[deletes
.size()];
365 table
.batch(deletes
, results
);
366 validateSizeAndEmpty(results
, KEYS
.length
);
368 // Get to make sure ...
369 for (byte[] k
: KEYS
) {
370 Get get
= new Get(k
);
371 get
.addColumn(BYTES_FAMILY
, QUALIFIER
);
372 Assert
.assertFalse(table
.exists(get
));
378 public void testHTableDeleteWithList() throws Exception
{
379 LOG
.info("test=testHTableDeleteWithList");
380 Table table
= UTIL
.getConnection().getTable(TEST_TABLE
);
383 List
<Put
> puts
= constructPutRequests();
384 Object
[] results
= new Object
[puts
.size()];
385 table
.batch(puts
, results
);
386 validateSizeAndEmpty(results
, KEYS
.length
);
389 ArrayList
<Delete
> deletes
= new ArrayList
<>();
390 for (int i
= 0; i
< KEYS
.length
; i
++) {
391 Delete delete
= new Delete(KEYS
[i
]);
392 delete
.addFamily(BYTES_FAMILY
);
395 table
.delete(deletes
);
397 // Get to make sure ...
398 for (byte[] k
: KEYS
) {
399 Get get
= new Get(k
);
400 get
.addColumn(BYTES_FAMILY
, QUALIFIER
);
401 Assert
.assertFalse(table
.exists(get
));
407 public void testBatchWithManyColsInOneRowGetAndPut() throws Exception
{
408 LOG
.info("test=testBatchWithManyColsInOneRowGetAndPut");
409 Table table
= UTIL
.getConnection().getTable(TEST_TABLE
);
411 List
<Row
> puts
= new ArrayList
<>();
412 for (int i
= 0; i
< 100; i
++) {
413 Put put
= new Put(ONE_ROW
);
414 byte[] qual
= Bytes
.toBytes("column" + i
);
415 put
.addColumn(BYTES_FAMILY
, qual
, VALUE
);
418 Object
[] results
= new Object
[puts
.size()];
419 table
.batch(puts
, results
);
422 validateSizeAndEmpty(results
, 100);
424 // get the data back and validate that it is correct
425 List
<Row
> gets
= new ArrayList
<>();
426 for (int i
= 0; i
< 100; i
++) {
427 Get get
= new Get(ONE_ROW
);
428 byte[] qual
= Bytes
.toBytes("column" + i
);
429 get
.addColumn(BYTES_FAMILY
, qual
);
433 Object
[] multiRes
= new Object
[gets
.size()];
434 table
.batch(gets
, multiRes
);
437 for (Object r
: multiRes
) {
438 byte[] qual
= Bytes
.toBytes("column" + idx
);
439 validateResult(r
, qual
, VALUE
);
446 public void testBatchWithIncrementAndAppend() throws Exception
{
447 LOG
.info("test=testBatchWithIncrementAndAppend");
448 final byte[] QUAL1
= Bytes
.toBytes("qual1");
449 final byte[] QUAL2
= Bytes
.toBytes("qual2");
450 final byte[] QUAL3
= Bytes
.toBytes("qual3");
451 final byte[] QUAL4
= Bytes
.toBytes("qual4");
452 Table table
= UTIL
.getConnection().getTable(TEST_TABLE
);
453 Delete d
= new Delete(ONE_ROW
);
455 Put put
= new Put(ONE_ROW
);
456 put
.addColumn(BYTES_FAMILY
, QUAL1
, Bytes
.toBytes("abc"));
457 put
.addColumn(BYTES_FAMILY
, QUAL2
, Bytes
.toBytes(1L));
460 Increment inc
= new Increment(ONE_ROW
);
461 inc
.addColumn(BYTES_FAMILY
, QUAL2
, 1);
462 inc
.addColumn(BYTES_FAMILY
, QUAL3
, 1);
464 Append a
= new Append(ONE_ROW
);
465 a
.addColumn(BYTES_FAMILY
, QUAL1
, Bytes
.toBytes("def"));
466 a
.addColumn(BYTES_FAMILY
, QUAL4
, Bytes
.toBytes("xyz"));
467 List
<Row
> actions
= new ArrayList
<>();
471 Object
[] multiRes
= new Object
[actions
.size()];
472 table
.batch(actions
, multiRes
);
473 validateResult(multiRes
[1], QUAL1
, Bytes
.toBytes("abcdef"));
474 validateResult(multiRes
[1], QUAL4
, Bytes
.toBytes("xyz"));
475 validateResult(multiRes
[0], QUAL2
, Bytes
.toBytes(2L));
476 validateResult(multiRes
[0], QUAL3
, Bytes
.toBytes(1L));
481 public void testBatchWithMixedActions() throws Exception
{
482 LOG
.info("test=testBatchWithMixedActions");
483 Table table
= UTIL
.getConnection().getTable(TEST_TABLE
);
485 // Load some data to start
486 List
<Put
> puts
= constructPutRequests();
487 Object
[] results
= new Object
[puts
.size()];
488 table
.batch(puts
, results
);
489 validateSizeAndEmpty(results
, KEYS
.length
);
491 // Batch: get, get, put(new col), delete, get, get of put, get of deleted,
493 List
<Row
> actions
= new ArrayList
<>();
495 byte[] qual2
= Bytes
.toBytes("qual2");
496 byte[] val2
= Bytes
.toBytes("putvalue2");
499 Get get
= new Get(KEYS
[10]);
500 get
.addColumn(BYTES_FAMILY
, QUALIFIER
);
504 get
= new Get(KEYS
[11]);
505 get
.addColumn(BYTES_FAMILY
, QUALIFIER
);
508 // 2 put of new column
509 Put put
= new Put(KEYS
[10]);
510 put
.addColumn(BYTES_FAMILY
, qual2
, val2
);
514 Delete delete
= new Delete(KEYS
[20]);
515 delete
.addFamily(BYTES_FAMILY
);
519 get
= new Get(KEYS
[30]);
520 get
.addColumn(BYTES_FAMILY
, QUALIFIER
);
523 // There used to be a 'get' of a previous put here, but removed
524 // since this API really cannot guarantee order in terms of mixed
527 // 5 put of new column
528 put
= new Put(KEYS
[40]);
529 put
.addColumn(BYTES_FAMILY
, qual2
, val2
);
533 RowMutations rm
= new RowMutations(KEYS
[50]);
534 put
= new Put(KEYS
[50]);
535 put
.addColumn(BYTES_FAMILY
, qual2
, val2
);
536 rm
.add((Mutation
) put
);
537 byte[] qual3
= Bytes
.toBytes("qual3");
538 byte[] val3
= Bytes
.toBytes("putvalue3");
539 put
= new Put(KEYS
[50]);
540 put
.addColumn(BYTES_FAMILY
, qual3
, val3
);
541 rm
.add((Mutation
) put
);
544 // 7 Add another Get to the mixed sequence after RowMutations
545 get
= new Get(KEYS
[10]);
546 get
.addColumn(BYTES_FAMILY
, QUALIFIER
);
549 results
= new Object
[actions
.size()];
550 table
.batch(actions
, results
);
554 validateResult(results
[0]);
555 validateResult(results
[1]);
556 validateEmpty(results
[3]);
557 validateResult(results
[4]);
558 validateEmpty(results
[5]);
559 validateEmpty(results
[6]);
560 validateResult(results
[7]);
562 // validate last put, externally from the batch
563 get
= new Get(KEYS
[40]);
564 get
.addColumn(BYTES_FAMILY
, qual2
);
565 Result r
= table
.get(get
);
566 validateResult(r
, qual2
, val2
);
568 // validate last RowMutations, externally from the batch
569 get
= new Get(KEYS
[50]);
570 get
.addColumn(BYTES_FAMILY
, qual2
);
572 validateResult(r
, qual2
, val2
);
574 get
= new Get(KEYS
[50]);
575 get
.addColumn(BYTES_FAMILY
, qual3
);
577 validateResult(r
, qual3
, val3
);
582 // // Helper methods ////
584 private void validateResult(Object r
) {
585 validateResult(r
, QUALIFIER
, VALUE
);
588 private void validateResult(Object r1
, byte[] qual
, byte[] val
) {
589 Result r
= (Result
)r1
;
590 Assert
.assertTrue(r
.containsColumn(BYTES_FAMILY
, qual
));
591 byte[] value
= r
.getValue(BYTES_FAMILY
, qual
);
592 if (0 != Bytes
.compareTo(val
, value
)) {
593 fail("Expected [" + Bytes
.toStringBinary(val
)
594 + "] but got [" + Bytes
.toStringBinary(value
) + "]");
598 private List
<Put
> constructPutRequests() {
599 List
<Put
> puts
= new ArrayList
<>();
600 for (byte[] k
: KEYS
) {
601 Put put
= new Put(k
);
602 put
.addColumn(BYTES_FAMILY
, QUALIFIER
, VALUE
);
608 private void validateLoadedData(Table table
) throws IOException
{
609 // get the data back and validate that it is correct
610 LOG
.info("Validating data on " + table
);
611 List
<Get
> gets
= new ArrayList
<>();
612 for (byte[] k
: KEYS
) {
613 Get get
= new Get(k
);
614 get
.addColumn(BYTES_FAMILY
, QUALIFIER
);
618 Result
[] results
= null;
620 results
= table
.get(gets
);
621 boolean finished
= true;
622 for (Result result
: results
) {
623 if (result
.isEmpty()) {
633 } catch (InterruptedException e
) {
636 } while (retryNum
> 0);
639 fail("Timeout for validate data");
641 if (results
!= null) {
642 for (Result r
: results
) {
643 Assert
.assertTrue(r
.containsColumn(BYTES_FAMILY
, QUALIFIER
));
644 Assert
.assertEquals(0, Bytes
.compareTo(VALUE
, r
645 .getValue(BYTES_FAMILY
, QUALIFIER
)));
647 LOG
.info("Validating data on " + table
+ " successfully!");
652 private void validateEmpty(Object r1
) {
653 Result result
= (Result
)r1
;
654 Assert
.assertTrue(result
!= null);
655 Assert
.assertTrue(result
.isEmpty());
658 private void validateSizeAndEmpty(Object
[] results
, int expectedSize
) {
659 // Validate got back the same number of Result objects, all empty
660 Assert
.assertEquals(expectedSize
, results
.length
);
661 for (Object result
: results
) {
662 validateEmpty(result
);
666 public static class MyMasterObserver
implements MasterObserver
, MasterCoprocessor
{
667 private static final AtomicInteger postBalanceCount
= new AtomicInteger(0);
668 private static final AtomicBoolean start
= new AtomicBoolean(false);
671 public void start(CoprocessorEnvironment env
) throws IOException
{
676 public Optional
<MasterObserver
> getMasterObserver() {
677 return Optional
.of(this);
681 public void postBalance(final ObserverContext
<MasterCoprocessorEnvironment
> ctx
,
682 BalanceRequest request
, List
<RegionPlan
> plans
) throws IOException
{
683 if (!plans
.isEmpty()) {
684 postBalanceCount
.incrementAndGet();