HBASE-17532 Replaced explicit type with diamond operator
[hbase.git] / hbase-it / src / test / java / org / apache / hadoop / hbase / mapreduce / IntegrationTestBulkLoad.java
blobe39d0fe03f81f7de0e22210185e1aeb11a3f10b1
1 /**
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org.apache.hadoop.hbase.mapreduce;
21 import static org.junit.Assert.assertEquals;
23 import com.google.common.base.Joiner;
25 import com.google.common.collect.Sets;
26 import org.apache.commons.cli.CommandLine;
27 import org.apache.commons.lang.RandomStringUtils;
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.fs.Path;
32 import org.apache.hadoop.hbase.Cell;
33 import org.apache.hadoop.hbase.CellUtil;
34 import org.apache.hadoop.hbase.HBaseConfiguration;
35 import org.apache.hadoop.hbase.HBaseTestingUtility;
36 import org.apache.hadoop.hbase.HTableDescriptor;
37 import org.apache.hadoop.hbase.IntegrationTestBase;
38 import org.apache.hadoop.hbase.IntegrationTestingUtility;
39 import org.apache.hadoop.hbase.KeyValue;
40 import org.apache.hadoop.hbase.TableName;
41 import org.apache.hadoop.hbase.client.Admin;
42 import org.apache.hadoop.hbase.client.Connection;
43 import org.apache.hadoop.hbase.client.ConnectionFactory;
44 import org.apache.hadoop.hbase.client.Consistency;
45 import org.apache.hadoop.hbase.client.RegionLocator;
46 import org.apache.hadoop.hbase.client.Result;
47 import org.apache.hadoop.hbase.client.Scan;
48 import org.apache.hadoop.hbase.client.Table;
49 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
50 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
51 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
52 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
53 import org.apache.hadoop.hbase.regionserver.InternalScanner;
54 import org.apache.hadoop.hbase.regionserver.RegionScanner;
55 import org.apache.hadoop.hbase.testclassification.IntegrationTests;
56 import org.apache.hadoop.hbase.util.Bytes;
57 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
58 import org.apache.hadoop.hbase.util.RegionSplitter;
59 import org.apache.hadoop.io.LongWritable;
60 import org.apache.hadoop.io.NullWritable;
61 import org.apache.hadoop.io.Writable;
62 import org.apache.hadoop.io.WritableComparable;
63 import org.apache.hadoop.io.WritableComparator;
64 import org.apache.hadoop.io.WritableUtils;
65 import org.apache.hadoop.mapreduce.InputFormat;
66 import org.apache.hadoop.mapreduce.InputSplit;
67 import org.apache.hadoop.mapreduce.Job;
68 import org.apache.hadoop.mapreduce.JobContext;
69 import org.apache.hadoop.mapreduce.Mapper;
70 import org.apache.hadoop.mapreduce.Partitioner;
71 import org.apache.hadoop.mapreduce.RecordReader;
72 import org.apache.hadoop.mapreduce.Reducer;
73 import org.apache.hadoop.mapreduce.TaskAttemptContext;
74 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
75 import org.apache.hadoop.util.StringUtils;
76 import org.apache.hadoop.util.ToolRunner;
77 import org.junit.Test;
78 import org.junit.experimental.categories.Category;
80 import java.io.DataInput;
81 import java.io.DataOutput;
82 import java.io.IOException;
83 import java.util.ArrayList;
84 import java.util.List;
85 import java.util.Map;
86 import java.util.Random;
87 import java.util.Set;
88 import java.util.concurrent.atomic.AtomicLong;
90 /**
91 * Test Bulk Load and MR on a distributed cluster.
92 * It starts an MR job that creates linked chains
94 * The format of rows is like this:
95 * Row Key -> Long
97 * L:<< Chain Id >> -> Row Key of the next link in the chain
98 * S:<< Chain Id >> -> The step in the chain that his link is.
99 * D:<< Chain Id >> -> Random Data.
101 * All chains start on row 0.
102 * All rk's are > 0.
104 * After creating the linked lists they are walked over using a TableMapper based Mapreduce Job.
106 * There are a few options exposed:
108 * hbase.IntegrationTestBulkLoad.chainLength
109 * The number of rows that will be part of each and every chain.
111 * hbase.IntegrationTestBulkLoad.numMaps
112 * The number of mappers that will be run. Each mapper creates on linked list chain.
114 * hbase.IntegrationTestBulkLoad.numImportRounds
115 * How many jobs will be run to create linked lists.
117 * hbase.IntegrationTestBulkLoad.tableName
118 * The name of the table.
120 * hbase.IntegrationTestBulkLoad.replicaCount
121 * How many region replicas to configure for the table under test.
123 @Category(IntegrationTests.class)
124 public class IntegrationTestBulkLoad extends IntegrationTestBase {
126 private static final Log LOG = LogFactory.getLog(IntegrationTestBulkLoad.class);
128 private static final byte[] CHAIN_FAM = Bytes.toBytes("L");
129 private static final byte[] SORT_FAM = Bytes.toBytes("S");
130 private static final byte[] DATA_FAM = Bytes.toBytes("D");
132 private static String CHAIN_LENGTH_KEY = "hbase.IntegrationTestBulkLoad.chainLength";
133 private static int CHAIN_LENGTH = 500000;
135 private static String NUM_MAPS_KEY = "hbase.IntegrationTestBulkLoad.numMaps";
136 private static int NUM_MAPS = 1;
138 private static String NUM_IMPORT_ROUNDS_KEY = "hbase.IntegrationTestBulkLoad.numImportRounds";
139 private static int NUM_IMPORT_ROUNDS = 1;
141 private static String ROUND_NUM_KEY = "hbase.IntegrationTestBulkLoad.roundNum";
143 private static String TABLE_NAME_KEY = "hbase.IntegrationTestBulkLoad.tableName";
144 private static String TABLE_NAME = "IntegrationTestBulkLoad";
146 private static String NUM_REPLICA_COUNT_KEY = "hbase.IntegrationTestBulkLoad.replicaCount";
147 private static int NUM_REPLICA_COUNT_DEFAULT = 1;
149 private static final String OPT_LOAD = "load";
150 private static final String OPT_CHECK = "check";
152 private boolean load = false;
153 private boolean check = false;
155 public static class SlowMeCoproScanOperations implements RegionObserver {
156 static final AtomicLong sleepTime = new AtomicLong(2000);
157 Random r = new Random();
158 AtomicLong countOfNext = new AtomicLong(0);
159 AtomicLong countOfOpen = new AtomicLong(0);
160 public SlowMeCoproScanOperations() {}
161 @Override
162 public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
163 final Scan scan, final RegionScanner s) throws IOException {
164 if (countOfOpen.incrementAndGet() == 2) { //slowdown openScanner randomly
165 slowdownCode(e);
167 return s;
170 @Override
171 public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e,
172 final InternalScanner s, final List<Result> results,
173 final int limit, final boolean hasMore) throws IOException {
174 //this will slow down a certain next operation if the conditions are met. The slowness
175 //will allow the call to go to a replica
176 countOfNext.incrementAndGet();
177 if (countOfNext.get() == 0 || countOfNext.get() == 4) {
178 slowdownCode(e);
180 return true;
182 protected void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e) {
183 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
184 try {
185 if (sleepTime.get() > 0) {
186 LOG.info("Sleeping for " + sleepTime.get() + " ms");
187 Thread.sleep(sleepTime.get());
189 } catch (InterruptedException e1) {
190 LOG.error(e1);
197 * Modify table {@code getTableName()} to carry {@link SlowMeCoproScanOperations}.
199 private void installSlowingCoproc() throws IOException, InterruptedException {
200 int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
201 if (replicaCount == NUM_REPLICA_COUNT_DEFAULT) return;
203 TableName t = getTablename();
204 Admin admin = util.getAdmin();
205 HTableDescriptor desc = admin.getTableDescriptor(t);
206 desc.addCoprocessor(SlowMeCoproScanOperations.class.getName());
207 HBaseTestingUtility.modifyTableSync(admin, desc);
210 @Test
211 public void testBulkLoad() throws Exception {
212 runLoad();
213 installSlowingCoproc();
214 runCheckWithRetry();
217 public void runLoad() throws Exception {
218 setupTable();
219 int numImportRounds = getConf().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS);
220 LOG.info("Running load with numIterations:" + numImportRounds);
221 for (int i = 0; i < numImportRounds; i++) {
222 runLinkedListMRJob(i);
226 private byte[][] getSplits(int numRegions) {
227 RegionSplitter.UniformSplit split = new RegionSplitter.UniformSplit();
228 split.setFirstRow(Bytes.toBytes(0L));
229 split.setLastRow(Bytes.toBytes(Long.MAX_VALUE));
230 return split.split(numRegions);
233 private void setupTable() throws IOException, InterruptedException {
234 if (util.getAdmin().tableExists(getTablename())) {
235 util.deleteTable(getTablename());
238 util.createTable(
239 getTablename(),
240 new byte[][]{CHAIN_FAM, SORT_FAM, DATA_FAM},
241 getSplits(16)
244 int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
245 if (replicaCount == NUM_REPLICA_COUNT_DEFAULT) return;
247 TableName t = getTablename();
248 HBaseTestingUtility.setReplicas(util.getAdmin(), t, replicaCount);
251 private void runLinkedListMRJob(int iteration) throws Exception {
252 String jobName = IntegrationTestBulkLoad.class.getSimpleName() + " - " +
253 EnvironmentEdgeManager.currentTime();
254 Configuration conf = new Configuration(util.getConfiguration());
255 Path p = null;
256 if (conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY) == null) {
257 p = util.getDataTestDirOnTestFS(getTablename() + "-" + iteration);
258 } else {
259 p = new Path(conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY));
262 conf.setBoolean("mapreduce.map.speculative", false);
263 conf.setBoolean("mapreduce.reduce.speculative", false);
264 conf.setInt(ROUND_NUM_KEY, iteration);
266 Job job = new Job(conf);
268 job.setJobName(jobName);
270 // set the input format so that we can create map tasks with no data input.
271 job.setInputFormatClass(ITBulkLoadInputFormat.class);
273 // Set the mapper classes.
274 job.setMapperClass(LinkedListCreationMapper.class);
275 job.setMapOutputKeyClass(ImmutableBytesWritable.class);
276 job.setMapOutputValueClass(KeyValue.class);
278 // Use the identity reducer
279 // So nothing to do here.
281 // Set this jar.
282 job.setJarByClass(getClass());
284 // Set where to place the hfiles.
285 FileOutputFormat.setOutputPath(job, p);
286 try (Connection conn = ConnectionFactory.createConnection(conf);
287 Admin admin = conn.getAdmin();
288 Table table = conn.getTable(getTablename());
289 RegionLocator regionLocator = conn.getRegionLocator(getTablename())) {
291 // Configure the partitioner and other things needed for HFileOutputFormat.
292 HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
294 // Run the job making sure it works.
295 assertEquals(true, job.waitForCompletion(true));
297 // Create a new loader.
298 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
300 // Load the HFiles in.
301 loader.doBulkLoad(p, admin, table, regionLocator);
304 // Delete the files.
305 util.getTestFileSystem().delete(p, true);
308 public static class EmptySplit extends InputSplit implements Writable {
309 @Override
310 public void write(DataOutput out) throws IOException { }
311 @Override
312 public void readFields(DataInput in) throws IOException { }
313 @Override
314 public long getLength() { return 0L; }
315 @Override
316 public String[] getLocations() { return new String[0]; }
319 public static class FixedRecordReader<K, V> extends RecordReader<K, V> {
320 private int index = -1;
321 private K[] keys;
322 private V[] values;
324 public FixedRecordReader(K[] keys, V[] values) {
325 this.keys = keys;
326 this.values = values;
328 @Override
329 public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
330 InterruptedException { }
331 @Override
332 public boolean nextKeyValue() throws IOException, InterruptedException {
333 return ++index < keys.length;
335 @Override
336 public K getCurrentKey() throws IOException, InterruptedException {
337 return keys[index];
339 @Override
340 public V getCurrentValue() throws IOException, InterruptedException {
341 return values[index];
343 @Override
344 public float getProgress() throws IOException, InterruptedException {
345 return (float)index / keys.length;
347 @Override
348 public void close() throws IOException {
352 public static class ITBulkLoadInputFormat extends InputFormat<LongWritable, LongWritable> {
353 @Override
354 public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
355 int numSplits = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS);
356 ArrayList<InputSplit> ret = new ArrayList<>(numSplits);
357 for (int i = 0; i < numSplits; ++i) {
358 ret.add(new EmptySplit());
360 return ret;
363 @Override
364 public RecordReader<LongWritable, LongWritable> createRecordReader(InputSplit split,
365 TaskAttemptContext context)
366 throws IOException, InterruptedException {
367 int taskId = context.getTaskAttemptID().getTaskID().getId();
368 int numMapTasks = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS);
369 int numIterations = context.getConfiguration().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS);
370 int iteration = context.getConfiguration().getInt(ROUND_NUM_KEY, 0);
372 taskId = taskId + iteration * numMapTasks;
373 numMapTasks = numMapTasks * numIterations;
375 long chainId = Math.abs(new Random().nextLong());
376 chainId = chainId - (chainId % numMapTasks) + taskId; // ensure that chainId is unique per task and across iterations
377 LongWritable[] keys = new LongWritable[] {new LongWritable(chainId)};
379 return new FixedRecordReader<>(keys, keys);
384 * Mapper that creates a linked list of KeyValues.
386 * Each map task generates one linked list.
387 * All lists start on row key 0L.
388 * All lists should be CHAIN_LENGTH long.
390 public static class LinkedListCreationMapper
391 extends Mapper<LongWritable, LongWritable, ImmutableBytesWritable, KeyValue> {
393 private Random rand = new Random();
395 @Override
396 protected void map(LongWritable key, LongWritable value, Context context)
397 throws IOException, InterruptedException {
398 long chainId = value.get();
399 LOG.info("Starting mapper with chainId:" + chainId);
401 byte[] chainIdArray = Bytes.toBytes(chainId);
402 long currentRow = 0;
404 long chainLength = context.getConfiguration().getLong(CHAIN_LENGTH_KEY, CHAIN_LENGTH);
405 long nextRow = getNextRow(0, chainLength);
407 for (long i = 0; i < chainLength; i++) {
408 byte[] rk = Bytes.toBytes(currentRow);
410 // Next link in the chain.
411 KeyValue linkKv = new KeyValue(rk, CHAIN_FAM, chainIdArray, Bytes.toBytes(nextRow));
412 // What link in the chain this is.
413 KeyValue sortKv = new KeyValue(rk, SORT_FAM, chainIdArray, Bytes.toBytes(i));
414 // Added data so that large stores are created.
415 KeyValue dataKv = new KeyValue(rk, DATA_FAM, chainIdArray,
416 Bytes.toBytes(RandomStringUtils.randomAlphabetic(50))
419 // Emit the key values.
420 context.write(new ImmutableBytesWritable(rk), linkKv);
421 context.write(new ImmutableBytesWritable(rk), sortKv);
422 context.write(new ImmutableBytesWritable(rk), dataKv);
423 // Move to the next row.
424 currentRow = nextRow;
425 nextRow = getNextRow(i+1, chainLength);
429 /** Returns a unique row id within this chain for this index */
430 private long getNextRow(long index, long chainLength) {
431 long nextRow = Math.abs(rand.nextLong());
432 // use significant bits from the random number, but pad with index to ensure it is unique
433 // this also ensures that we do not reuse row = 0
434 // row collisions from multiple mappers are fine, since we guarantee unique chainIds
435 nextRow = nextRow - (nextRow % chainLength) + index;
436 return nextRow;
441 * Writable class used as the key to group links in the linked list.
443 * Used as the key emited from a pass over the table.
445 public static class LinkKey implements WritableComparable<LinkKey> {
447 private Long chainId;
449 public Long getOrder() {
450 return order;
453 public Long getChainId() {
454 return chainId;
457 private Long order;
459 public LinkKey() {}
461 public LinkKey(long chainId, long order) {
462 this.chainId = chainId;
463 this.order = order;
466 @Override
467 public int compareTo(LinkKey linkKey) {
468 int res = getChainId().compareTo(linkKey.getChainId());
469 if (res == 0) {
470 res = getOrder().compareTo(linkKey.getOrder());
472 return res;
475 @Override
476 public void write(DataOutput dataOutput) throws IOException {
477 WritableUtils.writeVLong(dataOutput, chainId);
478 WritableUtils.writeVLong(dataOutput, order);
481 @Override
482 public void readFields(DataInput dataInput) throws IOException {
483 chainId = WritableUtils.readVLong(dataInput);
484 order = WritableUtils.readVLong(dataInput);
489 * Writable used as the value emitted from a pass over the hbase table.
491 public static class LinkChain implements WritableComparable<LinkChain> {
493 public Long getNext() {
494 return next;
497 public Long getRk() {
498 return rk;
501 public LinkChain() {}
503 public LinkChain(Long rk, Long next) {
504 this.rk = rk;
505 this.next = next;
508 private Long rk;
509 private Long next;
511 @Override
512 public int compareTo(LinkChain linkChain) {
513 int res = getRk().compareTo(linkChain.getRk());
514 if (res == 0) {
515 res = getNext().compareTo(linkChain.getNext());
517 return res;
520 @Override
521 public void write(DataOutput dataOutput) throws IOException {
522 WritableUtils.writeVLong(dataOutput, rk);
523 WritableUtils.writeVLong(dataOutput, next);
526 @Override
527 public void readFields(DataInput dataInput) throws IOException {
528 rk = WritableUtils.readVLong(dataInput);
529 next = WritableUtils.readVLong(dataInput);
534 * Class to figure out what partition to send a link in the chain to. This is based upon
535 * the linkKey's ChainId.
537 public static class NaturalKeyPartitioner extends Partitioner<LinkKey, LinkChain> {
538 @Override
539 public int getPartition(LinkKey linkKey,
540 LinkChain linkChain,
541 int numPartitions) {
542 int hash = linkKey.getChainId().hashCode();
543 return Math.abs(hash % numPartitions);
548 * Comparator used to figure out if a linkKey should be grouped together. This is based upon the
549 * linkKey's ChainId.
551 public static class NaturalKeyGroupingComparator extends WritableComparator {
553 protected NaturalKeyGroupingComparator() {
554 super(LinkKey.class, true);
557 @Override
558 public int compare(WritableComparable w1, WritableComparable w2) {
559 LinkKey k1 = (LinkKey) w1;
560 LinkKey k2 = (LinkKey) w2;
562 return k1.getChainId().compareTo(k2.getChainId());
567 * Comparator used to order linkKeys so that they are passed to a reducer in order. This is based
568 * upon linkKey ChainId and Order.
570 public static class CompositeKeyComparator extends WritableComparator {
572 protected CompositeKeyComparator() {
573 super(LinkKey.class, true);
576 @Override
577 public int compare(WritableComparable w1, WritableComparable w2) {
578 LinkKey k1 = (LinkKey) w1;
579 LinkKey k2 = (LinkKey) w2;
581 return k1.compareTo(k2);
586 * Mapper to pass over the table.
588 * For every row there could be multiple chains that landed on this row. So emit a linkKey
589 * and value for each.
591 public static class LinkedListCheckingMapper extends TableMapper<LinkKey, LinkChain> {
592 @Override
593 protected void map(ImmutableBytesWritable key, Result value, Context context)
594 throws IOException, InterruptedException {
595 long longRk = Bytes.toLong(value.getRow());
597 for (Map.Entry<byte[], byte[]> entry : value.getFamilyMap(CHAIN_FAM).entrySet()) {
598 long chainId = Bytes.toLong(entry.getKey());
599 long next = Bytes.toLong(entry.getValue());
600 Cell c = value.getColumnCells(SORT_FAM, entry.getKey()).get(0);
601 long order = Bytes.toLong(CellUtil.cloneValue(c));
602 context.write(new LinkKey(chainId, order), new LinkChain(longRk, next));
608 * Class that does the actual checking of the links.
610 * All links in the chain should be grouped and sorted when sent to this class. Then the chain
611 * will be traversed making sure that no link is missing and that the chain is the correct length.
613 * This will throw an exception if anything is not correct. That causes the job to fail if any
614 * data is corrupt.
616 public static class LinkedListCheckingReducer
617 extends Reducer<LinkKey, LinkChain, NullWritable, NullWritable> {
618 @Override
619 protected void reduce(LinkKey key, Iterable<LinkChain> values, Context context)
620 throws java.io.IOException, java.lang.InterruptedException {
621 long next = -1L;
622 long prev = -1L;
623 long count = 0L;
625 for (LinkChain lc : values) {
627 if (next == -1) {
628 if (lc.getRk() != 0L) {
629 String msg = "Chains should all start at rk 0, but read rk " + lc.getRk()
630 + ". Chain:" + key.chainId + ", order:" + key.order;
631 logError(msg, context);
632 throw new RuntimeException(msg);
634 next = lc.getNext();
635 } else {
636 if (next != lc.getRk()) {
637 String msg = "Missing a link in the chain. Prev rk " + prev + " was, expecting "
638 + next + " but got " + lc.getRk() + ". Chain:" + key.chainId
639 + ", order:" + key.order;
640 logError(msg, context);
641 throw new RuntimeException(msg);
643 prev = lc.getRk();
644 next = lc.getNext();
646 count++;
649 int expectedChainLen = context.getConfiguration().getInt(CHAIN_LENGTH_KEY, CHAIN_LENGTH);
650 if (count != expectedChainLen) {
651 String msg = "Chain wasn't the correct length. Expected " + expectedChainLen + " got "
652 + count + ". Chain:" + key.chainId + ", order:" + key.order;
653 logError(msg, context);
654 throw new RuntimeException(msg);
658 private static void logError(String msg, Context context) throws IOException {
659 TableName table = getTableName(context.getConfiguration());
661 LOG.error("Failure in chain verification: " + msg);
662 try (Connection connection = ConnectionFactory.createConnection(context.getConfiguration());
663 Admin admin = connection.getAdmin()) {
664 LOG.error("cluster status:\n" + admin.getClusterStatus());
665 LOG.error("table regions:\n"
666 + Joiner.on("\n").join(admin.getTableRegions(table)));
671 private void runCheckWithRetry() throws IOException, ClassNotFoundException, InterruptedException {
672 try {
673 runCheck();
674 } catch (Throwable t) {
675 LOG.warn("Received " + StringUtils.stringifyException(t));
676 LOG.warn("Running the check MR Job again to see whether an ephemeral problem or not");
677 runCheck();
678 throw t; // we should still fail the test even if second retry succeeds
680 // everything green
685 * After adding data to the table start a mr job to
686 * @throws IOException
687 * @throws ClassNotFoundException
688 * @throws InterruptedException
690 private void runCheck() throws IOException, ClassNotFoundException, InterruptedException {
691 LOG.info("Running check");
692 Configuration conf = getConf();
693 String jobName = getTablename() + "_check" + EnvironmentEdgeManager.currentTime();
694 Path p = util.getDataTestDirOnTestFS(jobName);
696 Job job = new Job(conf);
697 job.setJarByClass(getClass());
698 job.setJobName(jobName);
700 job.setPartitionerClass(NaturalKeyPartitioner.class);
701 job.setGroupingComparatorClass(NaturalKeyGroupingComparator.class);
702 job.setSortComparatorClass(CompositeKeyComparator.class);
704 Scan scan = new Scan();
705 scan.addFamily(CHAIN_FAM);
706 scan.addFamily(SORT_FAM);
707 scan.setMaxVersions(1);
708 scan.setCacheBlocks(false);
709 scan.setBatch(1000);
711 int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
712 if (replicaCount != NUM_REPLICA_COUNT_DEFAULT) {
713 scan.setConsistency(Consistency.TIMELINE);
716 TableMapReduceUtil.initTableMapperJob(
717 getTablename().getName(),
718 scan,
719 LinkedListCheckingMapper.class,
720 LinkKey.class,
721 LinkChain.class,
725 job.setReducerClass(LinkedListCheckingReducer.class);
726 job.setOutputKeyClass(NullWritable.class);
727 job.setOutputValueClass(NullWritable.class);
729 FileOutputFormat.setOutputPath(job, p);
731 assertEquals(true, job.waitForCompletion(true));
733 // Delete the files.
734 util.getTestFileSystem().delete(p, true);
737 @Override
738 public void setUpCluster() throws Exception {
739 util = getTestingUtil(getConf());
740 util.initializeCluster(1);
741 int replicaCount = getConf().getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
742 if (LOG.isDebugEnabled() && replicaCount != NUM_REPLICA_COUNT_DEFAULT) {
743 LOG.debug("Region Replicas enabled: " + replicaCount);
746 // Scale this up on a real cluster
747 if (util.isDistributedCluster()) {
748 util.getConfiguration().setIfUnset(NUM_MAPS_KEY,
749 Integer.toString(util.getAdmin().getClusterStatus().getServersSize() * 10)
751 util.getConfiguration().setIfUnset(NUM_IMPORT_ROUNDS_KEY, "5");
752 } else {
753 util.startMiniMapReduceCluster();
757 @Override
758 protected void addOptions() {
759 super.addOptions();
760 super.addOptNoArg(OPT_CHECK, "Run check only");
761 super.addOptNoArg(OPT_LOAD, "Run load only");
764 @Override
765 protected void processOptions(CommandLine cmd) {
766 super.processOptions(cmd);
767 check = cmd.hasOption(OPT_CHECK);
768 load = cmd.hasOption(OPT_LOAD);
771 @Override
772 public int runTestFromCommandLine() throws Exception {
773 if (load) {
774 runLoad();
775 } else if (check) {
776 installSlowingCoproc();
777 runCheckWithRetry();
778 } else {
779 testBulkLoad();
781 return 0;
784 @Override
785 public TableName getTablename() {
786 return getTableName(getConf());
789 public static TableName getTableName(Configuration conf) {
790 return TableName.valueOf(conf.get(TABLE_NAME_KEY, TABLE_NAME));
793 @Override
794 protected Set<String> getColumnFamilies() {
795 return Sets.newHashSet(Bytes.toString(CHAIN_FAM) , Bytes.toString(DATA_FAM),
796 Bytes.toString(SORT_FAM));
799 public static void main(String[] args) throws Exception {
800 Configuration conf = HBaseConfiguration.create();
801 IntegrationTestingUtility.setUseDistributedCluster(conf);
802 int status = ToolRunner.run(conf, new IntegrationTestBulkLoad(), args);
803 System.exit(status);