HBASE-24033 Add ut for loading the corrupt recovered hfiles (#1322)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / client / TestMultiParallel.java
blob487a84916f6c91e37803586b952b99bdf0d379e0
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.client;
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertTrue;
22 import static org.junit.Assert.fail;
24 import java.io.IOException;
25 import java.util.ArrayList;
26 import java.util.List;
27 import java.util.Optional;
28 import java.util.concurrent.atomic.AtomicBoolean;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import org.apache.hadoop.hbase.Cell;
31 import org.apache.hadoop.hbase.CellUtil;
32 import org.apache.hadoop.hbase.CoprocessorEnvironment;
33 import org.apache.hadoop.hbase.HBaseClassTestRule;
34 import org.apache.hadoop.hbase.HBaseTestingUtility;
35 import org.apache.hadoop.hbase.HConstants;
36 import org.apache.hadoop.hbase.TableName;
37 import org.apache.hadoop.hbase.Waiter;
38 import org.apache.hadoop.hbase.codec.KeyValueCodec;
39 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
40 import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
41 import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
42 import org.apache.hadoop.hbase.coprocessor.MasterObserver;
43 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
44 import org.apache.hadoop.hbase.master.RegionPlan;
45 import org.apache.hadoop.hbase.testclassification.FlakeyTests;
46 import org.apache.hadoop.hbase.testclassification.MediumTests;
47 import org.apache.hadoop.hbase.util.Bytes;
48 import org.apache.hadoop.hbase.util.JVMClusterUtil;
49 import org.junit.AfterClass;
50 import org.junit.Assert;
51 import org.junit.Before;
52 import org.junit.BeforeClass;
53 import org.junit.ClassRule;
54 import org.junit.Test;
55 import org.junit.experimental.categories.Category;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
59 @Category({MediumTests.class, FlakeyTests.class})
60 public class TestMultiParallel {
62 @ClassRule
63 public static final HBaseClassTestRule CLASS_RULE =
64 HBaseClassTestRule.forClass(TestMultiParallel.class);
66 private static final Logger LOG = LoggerFactory.getLogger(TestMultiParallel.class);
68 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
69 private static final byte[] VALUE = Bytes.toBytes("value");
70 private static final byte[] QUALIFIER = Bytes.toBytes("qual");
71 private static final String FAMILY = "family";
72 private static final TableName TEST_TABLE = TableName.valueOf("multi_test_table");
73 private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY);
74 private static final byte[] ONE_ROW = Bytes.toBytes("xxx");
75 private static final byte [][] KEYS = makeKeys();
77 private static final int slaves = 5; // also used for testing HTable pool size
78 private static Connection CONNECTION;
80 @BeforeClass
81 public static void beforeClass() throws Exception {
82 // Uncomment the following lines if more verbosity is needed for
83 // debugging (see HBASE-12285 for details).
84 //((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL);
85 //((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.ALL);
86 //((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL);
87 UTIL.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
88 KeyValueCodec.class.getCanonicalName());
89 // Disable table on master for now as the feature is broken
90 //UTIL.getConfiguration().setBoolean(LoadBalancer.TABLES_ON_MASTER, true);
91 // We used to ask for system tables on Master exclusively but not needed by test and doesn't
92 // work anyways -- so commented out.
93 // UTIL.getConfiguration().setBoolean(LoadBalancer.SYSTEM_TABLES_ON_MASTER, true);
94 UTIL.getConfiguration()
95 .set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, MyMasterObserver.class.getName());
96 UTIL.startMiniCluster(slaves);
97 Table t = UTIL.createMultiRegionTable(TEST_TABLE, Bytes.toBytes(FAMILY));
98 UTIL.waitTableEnabled(TEST_TABLE);
99 t.close();
100 CONNECTION = ConnectionFactory.createConnection(UTIL.getConfiguration());
101 assertTrue(MyMasterObserver.start.get());
104 @AfterClass
105 public static void afterClass() throws Exception {
106 CONNECTION.close();
107 UTIL.shutdownMiniCluster();
110 @Before
111 public void before() throws Exception {
112 final int balanceCount = MyMasterObserver.postBalanceCount.get();
113 LOG.info("before");
114 if (UTIL.ensureSomeRegionServersAvailable(slaves)) {
115 // Distribute regions
116 UTIL.getMiniHBaseCluster().getMaster().balance();
117 // Some plans are created.
118 if (MyMasterObserver.postBalanceCount.get() > balanceCount) {
119 // It is necessary to wait the move procedure to start.
120 // Otherwise, the next wait may pass immediately.
121 UTIL.waitFor(3 * 1000, 100, false, () ->
122 UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager().hasRegionsInTransition()
126 // Wait until completing balance
127 UTIL.waitUntilAllRegionsAssigned(TEST_TABLE);
129 LOG.info("before done");
132 private static byte[][] makeKeys() {
133 byte [][] starterKeys = HBaseTestingUtility.KEYS;
134 // Create a "non-uniform" test set with the following characteristics:
135 // a) Unequal number of keys per region
137 // Don't use integer as a multiple, so that we have a number of keys that is
138 // not a multiple of the number of regions
139 int numKeys = (int) (starterKeys.length * 10.33F);
141 List<byte[]> keys = new ArrayList<>();
142 for (int i = 0; i < numKeys; i++) {
143 int kIdx = i % starterKeys.length;
144 byte[] k = starterKeys[kIdx];
145 byte[] cp = new byte[k.length + 1];
146 System.arraycopy(k, 0, cp, 0, k.length);
147 cp[k.length] = new Integer(i % 256).byteValue();
148 keys.add(cp);
151 // b) Same duplicate keys (showing multiple Gets/Puts to the same row, which
152 // should work)
153 // c) keys are not in sorted order (within a region), to ensure that the
154 // sorting code and index mapping doesn't break the functionality
155 for (int i = 0; i < 100; i++) {
156 int kIdx = i % starterKeys.length;
157 byte[] k = starterKeys[kIdx];
158 byte[] cp = new byte[k.length + 1];
159 System.arraycopy(k, 0, cp, 0, k.length);
160 cp[k.length] = new Integer(i % 256).byteValue();
161 keys.add(cp);
163 return keys.toArray(new byte [][] {new byte [] {}});
166 @Test
167 public void testBatchWithGet() throws Exception {
168 LOG.info("test=testBatchWithGet");
169 Table table = UTIL.getConnection().getTable(TEST_TABLE);
171 // load test data
172 List<Put> puts = constructPutRequests();
173 table.batch(puts, null);
175 // create a list of gets and run it
176 List<Row> gets = new ArrayList<>();
177 for (byte[] k : KEYS) {
178 Get get = new Get(k);
179 get.addColumn(BYTES_FAMILY, QUALIFIER);
180 gets.add(get);
182 Result[] multiRes = new Result[gets.size()];
183 table.batch(gets, multiRes);
185 // Same gets using individual call API
186 List<Result> singleRes = new ArrayList<>();
187 for (Row get : gets) {
188 singleRes.add(table.get((Get) get));
190 // Compare results
191 Assert.assertEquals(singleRes.size(), multiRes.length);
192 for (int i = 0; i < singleRes.size(); i++) {
193 Assert.assertTrue(singleRes.get(i).containsColumn(BYTES_FAMILY, QUALIFIER));
194 Cell[] singleKvs = singleRes.get(i).rawCells();
195 Cell[] multiKvs = multiRes[i].rawCells();
196 for (int j = 0; j < singleKvs.length; j++) {
197 Assert.assertEquals(singleKvs[j], multiKvs[j]);
198 Assert.assertEquals(0, Bytes.compareTo(CellUtil.cloneValue(singleKvs[j]),
199 CellUtil.cloneValue(multiKvs[j])));
202 table.close();
205 @Test
206 public void testBadFam() throws Exception {
207 LOG.info("test=testBadFam");
208 Table table = UTIL.getConnection().getTable(TEST_TABLE);
210 List<Row> actions = new ArrayList<>();
211 Put p = new Put(Bytes.toBytes("row1"));
212 p.addColumn(Bytes.toBytes("bad_family"), Bytes.toBytes("qual"), Bytes.toBytes("value"));
213 actions.add(p);
214 p = new Put(Bytes.toBytes("row2"));
215 p.addColumn(BYTES_FAMILY, Bytes.toBytes("qual"), Bytes.toBytes("value"));
216 actions.add(p);
218 // row1 and row2 should be in the same region.
220 Object[] r = new Object[actions.size()];
221 try {
222 table.batch(actions, r);
223 fail();
224 } catch (RetriesExhaustedException ex) {
225 // expected
227 assertEquals(2, r.length);
228 assertTrue(r[0] instanceof Throwable);
229 assertTrue(r[1] instanceof Result);
230 table.close();
233 @Test
234 public void testFlushCommitsNoAbort() throws Exception {
235 LOG.info("test=testFlushCommitsNoAbort");
236 doTestFlushCommits(false);
240 * Only run one Multi test with a forced RegionServer abort. Otherwise, the
241 * unit tests will take an unnecessarily long time to run.
243 @Test
244 public void testFlushCommitsWithAbort() throws Exception {
245 LOG.info("test=testFlushCommitsWithAbort");
246 doTestFlushCommits(true);
250 * Set table auto flush to false and test flushing commits
251 * @param doAbort true if abort one regionserver in the testing
253 private void doTestFlushCommits(boolean doAbort) throws Exception {
254 // Load the data
255 LOG.info("get new table");
256 Table table = UTIL.getConnection().getTable(TEST_TABLE);
258 LOG.info("constructPutRequests");
259 List<Put> puts = constructPutRequests();
260 table.put(puts);
261 LOG.info("puts");
262 final int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()
263 .size();
264 assert liveRScount > 0;
265 JVMClusterUtil.RegionServerThread liveRS = UTIL.getMiniHBaseCluster()
266 .getLiveRegionServerThreads().get(0);
267 if (doAbort) {
268 liveRS.getRegionServer().abort("Aborting for tests",
269 new Exception("doTestFlushCommits"));
270 // If we wait for no regions being online after we abort the server, we
271 // could ensure the master has re-assigned the regions on killed server
272 // after writing successfully. It means the server we aborted is dead
273 // and detected by matser
274 while (liveRS.getRegionServer().getNumberOfOnlineRegions() != 0) {
275 Thread.sleep(100);
277 // try putting more keys after the abort. same key/qual... just validating
278 // no exceptions thrown
279 puts = constructPutRequests();
280 table.put(puts);
283 LOG.info("validating loaded data");
284 validateLoadedData(table);
286 // Validate server and region count
287 List<JVMClusterUtil.RegionServerThread> liveRSs = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads();
288 int count = 0;
289 for (JVMClusterUtil.RegionServerThread t: liveRSs) {
290 count++;
291 LOG.info("Count=" + count + ", Alive=" + t.getRegionServer());
293 LOG.info("Count=" + count);
294 Assert.assertEquals("Server count=" + count + ", abort=" + doAbort,
295 (doAbort ? (liveRScount - 1) : liveRScount), count);
296 if (doAbort) {
297 UTIL.getMiniHBaseCluster().waitOnRegionServer(0);
298 UTIL.waitFor(15 * 1000, new Waiter.Predicate<Exception>() {
299 @Override
300 public boolean evaluate() throws Exception {
301 // We disable regions on master so the count should be liveRScount - 1
302 return UTIL.getMiniHBaseCluster().getMaster()
303 .getClusterMetrics().getLiveServerMetrics().size() == liveRScount - 1;
306 UTIL.waitFor(15 * 1000, UTIL.predicateNoRegionsInTransition());
309 table.close();
310 LOG.info("done");
313 @Test
314 public void testBatchWithPut() throws Exception {
315 LOG.info("test=testBatchWithPut");
316 Table table = CONNECTION.getTable(TEST_TABLE);
317 // put multiple rows using a batch
318 List<Put> puts = constructPutRequests();
320 Object[] results = new Object[puts.size()];
321 table.batch(puts, results);
322 validateSizeAndEmpty(results, KEYS.length);
324 if (true) {
325 int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().size();
326 assert liveRScount > 0;
327 JVMClusterUtil.RegionServerThread liveRS =
328 UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().get(0);
329 liveRS.getRegionServer().abort("Aborting for tests", new Exception("testBatchWithPut"));
330 puts = constructPutRequests();
331 try {
332 results = new Object[puts.size()];
333 table.batch(puts, results);
334 } catch (RetriesExhaustedWithDetailsException ree) {
335 LOG.info(ree.getExhaustiveDescription());
336 table.close();
337 throw ree;
339 validateSizeAndEmpty(results, KEYS.length);
342 validateLoadedData(table);
343 table.close();
346 @Test
347 public void testBatchWithDelete() throws Exception {
348 LOG.info("test=testBatchWithDelete");
349 Table table = UTIL.getConnection().getTable(TEST_TABLE);
351 // Load some data
352 List<Put> puts = constructPutRequests();
353 Object[] results = new Object[puts.size()];
354 table.batch(puts, results);
355 validateSizeAndEmpty(results, KEYS.length);
357 // Deletes
358 List<Row> deletes = new ArrayList<>();
359 for (int i = 0; i < KEYS.length; i++) {
360 Delete delete = new Delete(KEYS[i]);
361 delete.addFamily(BYTES_FAMILY);
362 deletes.add(delete);
364 results= new Object[deletes.size()];
365 table.batch(deletes, results);
366 validateSizeAndEmpty(results, KEYS.length);
368 // Get to make sure ...
369 for (byte[] k : KEYS) {
370 Get get = new Get(k);
371 get.addColumn(BYTES_FAMILY, QUALIFIER);
372 Assert.assertFalse(table.exists(get));
374 table.close();
377 @Test
378 public void testHTableDeleteWithList() throws Exception {
379 LOG.info("test=testHTableDeleteWithList");
380 Table table = UTIL.getConnection().getTable(TEST_TABLE);
382 // Load some data
383 List<Put> puts = constructPutRequests();
384 Object[] results = new Object[puts.size()];
385 table.batch(puts, results);
386 validateSizeAndEmpty(results, KEYS.length);
388 // Deletes
389 ArrayList<Delete> deletes = new ArrayList<>();
390 for (int i = 0; i < KEYS.length; i++) {
391 Delete delete = new Delete(KEYS[i]);
392 delete.addFamily(BYTES_FAMILY);
393 deletes.add(delete);
395 table.delete(deletes);
397 // Get to make sure ...
398 for (byte[] k : KEYS) {
399 Get get = new Get(k);
400 get.addColumn(BYTES_FAMILY, QUALIFIER);
401 Assert.assertFalse(table.exists(get));
403 table.close();
406 @Test
407 public void testBatchWithManyColsInOneRowGetAndPut() throws Exception {
408 LOG.info("test=testBatchWithManyColsInOneRowGetAndPut");
409 Table table = UTIL.getConnection().getTable(TEST_TABLE);
411 List<Row> puts = new ArrayList<>();
412 for (int i = 0; i < 100; i++) {
413 Put put = new Put(ONE_ROW);
414 byte[] qual = Bytes.toBytes("column" + i);
415 put.addColumn(BYTES_FAMILY, qual, VALUE);
416 puts.add(put);
418 Object[] results = new Object[puts.size()];
419 table.batch(puts, results);
421 // validate
422 validateSizeAndEmpty(results, 100);
424 // get the data back and validate that it is correct
425 List<Row> gets = new ArrayList<>();
426 for (int i = 0; i < 100; i++) {
427 Get get = new Get(ONE_ROW);
428 byte[] qual = Bytes.toBytes("column" + i);
429 get.addColumn(BYTES_FAMILY, qual);
430 gets.add(get);
433 Object[] multiRes = new Object[gets.size()];
434 table.batch(gets, multiRes);
436 int idx = 0;
437 for (Object r : multiRes) {
438 byte[] qual = Bytes.toBytes("column" + idx);
439 validateResult(r, qual, VALUE);
440 idx++;
442 table.close();
445 @Test
446 public void testBatchWithIncrementAndAppend() throws Exception {
447 LOG.info("test=testBatchWithIncrementAndAppend");
448 final byte[] QUAL1 = Bytes.toBytes("qual1");
449 final byte[] QUAL2 = Bytes.toBytes("qual2");
450 final byte[] QUAL3 = Bytes.toBytes("qual3");
451 final byte[] QUAL4 = Bytes.toBytes("qual4");
452 Table table = UTIL.getConnection().getTable(TEST_TABLE);
453 Delete d = new Delete(ONE_ROW);
454 table.delete(d);
455 Put put = new Put(ONE_ROW);
456 put.addColumn(BYTES_FAMILY, QUAL1, Bytes.toBytes("abc"));
457 put.addColumn(BYTES_FAMILY, QUAL2, Bytes.toBytes(1L));
458 table.put(put);
460 Increment inc = new Increment(ONE_ROW);
461 inc.addColumn(BYTES_FAMILY, QUAL2, 1);
462 inc.addColumn(BYTES_FAMILY, QUAL3, 1);
464 Append a = new Append(ONE_ROW);
465 a.addColumn(BYTES_FAMILY, QUAL1, Bytes.toBytes("def"));
466 a.addColumn(BYTES_FAMILY, QUAL4, Bytes.toBytes("xyz"));
467 List<Row> actions = new ArrayList<>();
468 actions.add(inc);
469 actions.add(a);
471 Object[] multiRes = new Object[actions.size()];
472 table.batch(actions, multiRes);
473 validateResult(multiRes[1], QUAL1, Bytes.toBytes("abcdef"));
474 validateResult(multiRes[1], QUAL4, Bytes.toBytes("xyz"));
475 validateResult(multiRes[0], QUAL2, Bytes.toBytes(2L));
476 validateResult(multiRes[0], QUAL3, Bytes.toBytes(1L));
477 table.close();
480 @Test
481 public void testBatchWithMixedActions() throws Exception {
482 LOG.info("test=testBatchWithMixedActions");
483 Table table = UTIL.getConnection().getTable(TEST_TABLE);
485 // Load some data to start
486 List<Put> puts = constructPutRequests();
487 Object[] results = new Object[puts.size()];
488 table.batch(puts, results);
489 validateSizeAndEmpty(results, KEYS.length);
491 // Batch: get, get, put(new col), delete, get, get of put, get of deleted,
492 // put
493 List<Row> actions = new ArrayList<>();
495 byte[] qual2 = Bytes.toBytes("qual2");
496 byte[] val2 = Bytes.toBytes("putvalue2");
498 // 0 get
499 Get get = new Get(KEYS[10]);
500 get.addColumn(BYTES_FAMILY, QUALIFIER);
501 actions.add(get);
503 // 1 get
504 get = new Get(KEYS[11]);
505 get.addColumn(BYTES_FAMILY, QUALIFIER);
506 actions.add(get);
508 // 2 put of new column
509 Put put = new Put(KEYS[10]);
510 put.addColumn(BYTES_FAMILY, qual2, val2);
511 actions.add(put);
513 // 3 delete
514 Delete delete = new Delete(KEYS[20]);
515 delete.addFamily(BYTES_FAMILY);
516 actions.add(delete);
518 // 4 get
519 get = new Get(KEYS[30]);
520 get.addColumn(BYTES_FAMILY, QUALIFIER);
521 actions.add(get);
523 // There used to be a 'get' of a previous put here, but removed
524 // since this API really cannot guarantee order in terms of mixed
525 // get/puts.
527 // 5 put of new column
528 put = new Put(KEYS[40]);
529 put.addColumn(BYTES_FAMILY, qual2, val2);
530 actions.add(put);
532 // 6 RowMutations
533 RowMutations rm = new RowMutations(KEYS[50]);
534 put = new Put(KEYS[50]);
535 put.addColumn(BYTES_FAMILY, qual2, val2);
536 rm.add((Mutation) put);
537 byte[] qual3 = Bytes.toBytes("qual3");
538 byte[] val3 = Bytes.toBytes("putvalue3");
539 put = new Put(KEYS[50]);
540 put.addColumn(BYTES_FAMILY, qual3, val3);
541 rm.add((Mutation) put);
542 actions.add(rm);
544 // 7 Add another Get to the mixed sequence after RowMutations
545 get = new Get(KEYS[10]);
546 get.addColumn(BYTES_FAMILY, QUALIFIER);
547 actions.add(get);
549 results = new Object[actions.size()];
550 table.batch(actions, results);
552 // Validation
554 validateResult(results[0]);
555 validateResult(results[1]);
556 validateEmpty(results[3]);
557 validateResult(results[4]);
558 validateEmpty(results[5]);
559 validateEmpty(results[6]);
560 validateResult(results[7]);
562 // validate last put, externally from the batch
563 get = new Get(KEYS[40]);
564 get.addColumn(BYTES_FAMILY, qual2);
565 Result r = table.get(get);
566 validateResult(r, qual2, val2);
568 // validate last RowMutations, externally from the batch
569 get = new Get(KEYS[50]);
570 get.addColumn(BYTES_FAMILY, qual2);
571 r = table.get(get);
572 validateResult(r, qual2, val2);
574 get = new Get(KEYS[50]);
575 get.addColumn(BYTES_FAMILY, qual3);
576 r = table.get(get);
577 validateResult(r, qual3, val3);
579 table.close();
582 // // Helper methods ////
584 private void validateResult(Object r) {
585 validateResult(r, QUALIFIER, VALUE);
588 private void validateResult(Object r1, byte[] qual, byte[] val) {
589 Result r = (Result)r1;
590 Assert.assertTrue(r.containsColumn(BYTES_FAMILY, qual));
591 byte[] value = r.getValue(BYTES_FAMILY, qual);
592 if (0 != Bytes.compareTo(val, value)) {
593 fail("Expected [" + Bytes.toStringBinary(val)
594 + "] but got [" + Bytes.toStringBinary(value) + "]");
598 private List<Put> constructPutRequests() {
599 List<Put> puts = new ArrayList<>();
600 for (byte[] k : KEYS) {
601 Put put = new Put(k);
602 put.addColumn(BYTES_FAMILY, QUALIFIER, VALUE);
603 puts.add(put);
605 return puts;
608 private void validateLoadedData(Table table) throws IOException {
609 // get the data back and validate that it is correct
610 LOG.info("Validating data on " + table);
611 List<Get> gets = new ArrayList<>();
612 for (byte[] k : KEYS) {
613 Get get = new Get(k);
614 get.addColumn(BYTES_FAMILY, QUALIFIER);
615 gets.add(get);
617 int retryNum = 10;
618 Result[] results = null;
619 do {
620 results = table.get(gets);
621 boolean finished = true;
622 for (Result result : results) {
623 if (result.isEmpty()) {
624 finished = false;
625 break;
628 if (finished) {
629 break;
631 try {
632 Thread.sleep(10);
633 } catch (InterruptedException e) {
635 retryNum--;
636 } while (retryNum > 0);
638 if (retryNum == 0) {
639 fail("Timeout for validate data");
640 } else {
641 if (results != null) {
642 for (Result r : results) {
643 Assert.assertTrue(r.containsColumn(BYTES_FAMILY, QUALIFIER));
644 Assert.assertEquals(0, Bytes.compareTo(VALUE, r
645 .getValue(BYTES_FAMILY, QUALIFIER)));
647 LOG.info("Validating data on " + table + " successfully!");
652 private void validateEmpty(Object r1) {
653 Result result = (Result)r1;
654 Assert.assertTrue(result != null);
655 Assert.assertTrue(result.isEmpty());
658 private void validateSizeAndEmpty(Object[] results, int expectedSize) {
659 // Validate got back the same number of Result objects, all empty
660 Assert.assertEquals(expectedSize, results.length);
661 for (Object result : results) {
662 validateEmpty(result);
666 public static class MyMasterObserver implements MasterObserver, MasterCoprocessor {
667 private static final AtomicInteger postBalanceCount = new AtomicInteger(0);
668 private static final AtomicBoolean start = new AtomicBoolean(false);
670 @Override
671 public void start(CoprocessorEnvironment env) throws IOException {
672 start.set(true);
675 @Override
676 public Optional<MasterObserver> getMasterObserver() {
677 return Optional.of(this);
680 @Override
681 public void postBalance(final ObserverContext<MasterCoprocessorEnvironment> ctx,
682 List<RegionPlan> plans) throws IOException {
683 if (!plans.isEmpty()) {
684 postBalanceCount.incrementAndGet();