3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
.mapreduce
;
21 import static org
.junit
.Assert
.assertEquals
;
23 import com
.google
.common
.base
.Joiner
;
25 import com
.google
.common
.collect
.Sets
;
26 import org
.apache
.commons
.cli
.CommandLine
;
27 import org
.apache
.commons
.lang
.RandomStringUtils
;
28 import org
.apache
.commons
.logging
.Log
;
29 import org
.apache
.commons
.logging
.LogFactory
;
30 import org
.apache
.hadoop
.conf
.Configuration
;
31 import org
.apache
.hadoop
.fs
.Path
;
32 import org
.apache
.hadoop
.hbase
.Cell
;
33 import org
.apache
.hadoop
.hbase
.CellUtil
;
34 import org
.apache
.hadoop
.hbase
.HBaseConfiguration
;
35 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
36 import org
.apache
.hadoop
.hbase
.HTableDescriptor
;
37 import org
.apache
.hadoop
.hbase
.IntegrationTestBase
;
38 import org
.apache
.hadoop
.hbase
.IntegrationTestingUtility
;
39 import org
.apache
.hadoop
.hbase
.KeyValue
;
40 import org
.apache
.hadoop
.hbase
.TableName
;
41 import org
.apache
.hadoop
.hbase
.client
.Admin
;
42 import org
.apache
.hadoop
.hbase
.client
.Connection
;
43 import org
.apache
.hadoop
.hbase
.client
.ConnectionFactory
;
44 import org
.apache
.hadoop
.hbase
.client
.Consistency
;
45 import org
.apache
.hadoop
.hbase
.client
.RegionLocator
;
46 import org
.apache
.hadoop
.hbase
.client
.Result
;
47 import org
.apache
.hadoop
.hbase
.client
.Scan
;
48 import org
.apache
.hadoop
.hbase
.client
.Table
;
49 import org
.apache
.hadoop
.hbase
.coprocessor
.ObserverContext
;
50 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionCoprocessorEnvironment
;
51 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionObserver
;
52 import org
.apache
.hadoop
.hbase
.io
.ImmutableBytesWritable
;
53 import org
.apache
.hadoop
.hbase
.regionserver
.InternalScanner
;
54 import org
.apache
.hadoop
.hbase
.regionserver
.RegionScanner
;
55 import org
.apache
.hadoop
.hbase
.testclassification
.IntegrationTests
;
56 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
57 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
58 import org
.apache
.hadoop
.hbase
.util
.RegionSplitter
;
59 import org
.apache
.hadoop
.io
.LongWritable
;
60 import org
.apache
.hadoop
.io
.NullWritable
;
61 import org
.apache
.hadoop
.io
.Writable
;
62 import org
.apache
.hadoop
.io
.WritableComparable
;
63 import org
.apache
.hadoop
.io
.WritableComparator
;
64 import org
.apache
.hadoop
.io
.WritableUtils
;
65 import org
.apache
.hadoop
.mapreduce
.InputFormat
;
66 import org
.apache
.hadoop
.mapreduce
.InputSplit
;
67 import org
.apache
.hadoop
.mapreduce
.Job
;
68 import org
.apache
.hadoop
.mapreduce
.JobContext
;
69 import org
.apache
.hadoop
.mapreduce
.Mapper
;
70 import org
.apache
.hadoop
.mapreduce
.Partitioner
;
71 import org
.apache
.hadoop
.mapreduce
.RecordReader
;
72 import org
.apache
.hadoop
.mapreduce
.Reducer
;
73 import org
.apache
.hadoop
.mapreduce
.TaskAttemptContext
;
74 import org
.apache
.hadoop
.mapreduce
.lib
.output
.FileOutputFormat
;
75 import org
.apache
.hadoop
.util
.StringUtils
;
76 import org
.apache
.hadoop
.util
.ToolRunner
;
77 import org
.junit
.Test
;
78 import org
.junit
.experimental
.categories
.Category
;
80 import java
.io
.DataInput
;
81 import java
.io
.DataOutput
;
82 import java
.io
.IOException
;
83 import java
.util
.ArrayList
;
84 import java
.util
.List
;
86 import java
.util
.Random
;
88 import java
.util
.concurrent
.atomic
.AtomicLong
;
91 * Test Bulk Load and MR on a distributed cluster.
92 * It starts an MR job that creates linked chains
94 * The format of rows is like this:
97 * L:<< Chain Id >> -> Row Key of the next link in the chain
98 * S:<< Chain Id >> -> The step in the chain that his link is.
99 * D:<< Chain Id >> -> Random Data.
101 * All chains start on row 0.
104 * After creating the linked lists they are walked over using a TableMapper based Mapreduce Job.
106 * There are a few options exposed:
108 * hbase.IntegrationTestBulkLoad.chainLength
109 * The number of rows that will be part of each and every chain.
111 * hbase.IntegrationTestBulkLoad.numMaps
112 * The number of mappers that will be run. Each mapper creates on linked list chain.
114 * hbase.IntegrationTestBulkLoad.numImportRounds
115 * How many jobs will be run to create linked lists.
117 * hbase.IntegrationTestBulkLoad.tableName
118 * The name of the table.
120 * hbase.IntegrationTestBulkLoad.replicaCount
121 * How many region replicas to configure for the table under test.
123 @Category(IntegrationTests
.class)
124 public class IntegrationTestBulkLoad
extends IntegrationTestBase
{
126 private static final Log LOG
= LogFactory
.getLog(IntegrationTestBulkLoad
.class);
128 private static final byte[] CHAIN_FAM
= Bytes
.toBytes("L");
129 private static final byte[] SORT_FAM
= Bytes
.toBytes("S");
130 private static final byte[] DATA_FAM
= Bytes
.toBytes("D");
132 private static String CHAIN_LENGTH_KEY
= "hbase.IntegrationTestBulkLoad.chainLength";
133 private static int CHAIN_LENGTH
= 500000;
135 private static String NUM_MAPS_KEY
= "hbase.IntegrationTestBulkLoad.numMaps";
136 private static int NUM_MAPS
= 1;
138 private static String NUM_IMPORT_ROUNDS_KEY
= "hbase.IntegrationTestBulkLoad.numImportRounds";
139 private static int NUM_IMPORT_ROUNDS
= 1;
141 private static String ROUND_NUM_KEY
= "hbase.IntegrationTestBulkLoad.roundNum";
143 private static String TABLE_NAME_KEY
= "hbase.IntegrationTestBulkLoad.tableName";
144 private static String TABLE_NAME
= "IntegrationTestBulkLoad";
146 private static String NUM_REPLICA_COUNT_KEY
= "hbase.IntegrationTestBulkLoad.replicaCount";
147 private static int NUM_REPLICA_COUNT_DEFAULT
= 1;
149 private static final String OPT_LOAD
= "load";
150 private static final String OPT_CHECK
= "check";
152 private boolean load
= false;
153 private boolean check
= false;
155 public static class SlowMeCoproScanOperations
implements RegionObserver
{
156 static final AtomicLong sleepTime
= new AtomicLong(2000);
157 Random r
= new Random();
158 AtomicLong countOfNext
= new AtomicLong(0);
159 AtomicLong countOfOpen
= new AtomicLong(0);
160 public SlowMeCoproScanOperations() {}
162 public RegionScanner
preScannerOpen(final ObserverContext
<RegionCoprocessorEnvironment
> e
,
163 final Scan scan
, final RegionScanner s
) throws IOException
{
164 if (countOfOpen
.incrementAndGet() == 2) { //slowdown openScanner randomly
171 public boolean preScannerNext(final ObserverContext
<RegionCoprocessorEnvironment
> e
,
172 final InternalScanner s
, final List
<Result
> results
,
173 final int limit
, final boolean hasMore
) throws IOException
{
174 //this will slow down a certain next operation if the conditions are met. The slowness
175 //will allow the call to go to a replica
176 countOfNext
.incrementAndGet();
177 if (countOfNext
.get() == 0 || countOfNext
.get() == 4) {
182 protected void slowdownCode(final ObserverContext
<RegionCoprocessorEnvironment
> e
) {
183 if (e
.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
185 if (sleepTime
.get() > 0) {
186 LOG
.info("Sleeping for " + sleepTime
.get() + " ms");
187 Thread
.sleep(sleepTime
.get());
189 } catch (InterruptedException e1
) {
197 * Modify table {@code getTableName()} to carry {@link SlowMeCoproScanOperations}.
199 private void installSlowingCoproc() throws IOException
, InterruptedException
{
200 int replicaCount
= conf
.getInt(NUM_REPLICA_COUNT_KEY
, NUM_REPLICA_COUNT_DEFAULT
);
201 if (replicaCount
== NUM_REPLICA_COUNT_DEFAULT
) return;
203 TableName t
= getTablename();
204 Admin admin
= util
.getAdmin();
205 HTableDescriptor desc
= admin
.getTableDescriptor(t
);
206 desc
.addCoprocessor(SlowMeCoproScanOperations
.class.getName());
207 HBaseTestingUtility
.modifyTableSync(admin
, desc
);
211 public void testBulkLoad() throws Exception
{
213 installSlowingCoproc();
217 public void runLoad() throws Exception
{
219 int numImportRounds
= getConf().getInt(NUM_IMPORT_ROUNDS_KEY
, NUM_IMPORT_ROUNDS
);
220 LOG
.info("Running load with numIterations:" + numImportRounds
);
221 for (int i
= 0; i
< numImportRounds
; i
++) {
222 runLinkedListMRJob(i
);
226 private byte[][] getSplits(int numRegions
) {
227 RegionSplitter
.UniformSplit split
= new RegionSplitter
.UniformSplit();
228 split
.setFirstRow(Bytes
.toBytes(0L));
229 split
.setLastRow(Bytes
.toBytes(Long
.MAX_VALUE
));
230 return split
.split(numRegions
);
233 private void setupTable() throws IOException
, InterruptedException
{
234 if (util
.getAdmin().tableExists(getTablename())) {
235 util
.deleteTable(getTablename());
240 new byte[][]{CHAIN_FAM
, SORT_FAM
, DATA_FAM
},
244 int replicaCount
= conf
.getInt(NUM_REPLICA_COUNT_KEY
, NUM_REPLICA_COUNT_DEFAULT
);
245 if (replicaCount
== NUM_REPLICA_COUNT_DEFAULT
) return;
247 TableName t
= getTablename();
248 HBaseTestingUtility
.setReplicas(util
.getAdmin(), t
, replicaCount
);
251 private void runLinkedListMRJob(int iteration
) throws Exception
{
252 String jobName
= IntegrationTestBulkLoad
.class.getSimpleName() + " - " +
253 EnvironmentEdgeManager
.currentTime();
254 Configuration conf
= new Configuration(util
.getConfiguration());
256 if (conf
.get(ImportTsv
.BULK_OUTPUT_CONF_KEY
) == null) {
257 p
= util
.getDataTestDirOnTestFS(getTablename() + "-" + iteration
);
259 p
= new Path(conf
.get(ImportTsv
.BULK_OUTPUT_CONF_KEY
));
262 conf
.setBoolean("mapreduce.map.speculative", false);
263 conf
.setBoolean("mapreduce.reduce.speculative", false);
264 conf
.setInt(ROUND_NUM_KEY
, iteration
);
266 Job job
= new Job(conf
);
268 job
.setJobName(jobName
);
270 // set the input format so that we can create map tasks with no data input.
271 job
.setInputFormatClass(ITBulkLoadInputFormat
.class);
273 // Set the mapper classes.
274 job
.setMapperClass(LinkedListCreationMapper
.class);
275 job
.setMapOutputKeyClass(ImmutableBytesWritable
.class);
276 job
.setMapOutputValueClass(KeyValue
.class);
278 // Use the identity reducer
279 // So nothing to do here.
282 job
.setJarByClass(getClass());
284 // Set where to place the hfiles.
285 FileOutputFormat
.setOutputPath(job
, p
);
286 try (Connection conn
= ConnectionFactory
.createConnection(conf
);
287 Admin admin
= conn
.getAdmin();
288 Table table
= conn
.getTable(getTablename());
289 RegionLocator regionLocator
= conn
.getRegionLocator(getTablename())) {
291 // Configure the partitioner and other things needed for HFileOutputFormat.
292 HFileOutputFormat2
.configureIncrementalLoad(job
, table
.getTableDescriptor(), regionLocator
);
294 // Run the job making sure it works.
295 assertEquals(true, job
.waitForCompletion(true));
297 // Create a new loader.
298 LoadIncrementalHFiles loader
= new LoadIncrementalHFiles(conf
);
300 // Load the HFiles in.
301 loader
.doBulkLoad(p
, admin
, table
, regionLocator
);
305 util
.getTestFileSystem().delete(p
, true);
308 public static class EmptySplit
extends InputSplit
implements Writable
{
310 public void write(DataOutput out
) throws IOException
{ }
312 public void readFields(DataInput in
) throws IOException
{ }
314 public long getLength() { return 0L; }
316 public String
[] getLocations() { return new String
[0]; }
319 public static class FixedRecordReader
<K
, V
> extends RecordReader
<K
, V
> {
320 private int index
= -1;
324 public FixedRecordReader(K
[] keys
, V
[] values
) {
326 this.values
= values
;
329 public void initialize(InputSplit split
, TaskAttemptContext context
) throws IOException
,
330 InterruptedException
{ }
332 public boolean nextKeyValue() throws IOException
, InterruptedException
{
333 return ++index
< keys
.length
;
336 public K
getCurrentKey() throws IOException
, InterruptedException
{
340 public V
getCurrentValue() throws IOException
, InterruptedException
{
341 return values
[index
];
344 public float getProgress() throws IOException
, InterruptedException
{
345 return (float)index
/ keys
.length
;
348 public void close() throws IOException
{
352 public static class ITBulkLoadInputFormat
extends InputFormat
<LongWritable
, LongWritable
> {
354 public List
<InputSplit
> getSplits(JobContext context
) throws IOException
, InterruptedException
{
355 int numSplits
= context
.getConfiguration().getInt(NUM_MAPS_KEY
, NUM_MAPS
);
356 ArrayList
<InputSplit
> ret
= new ArrayList
<>(numSplits
);
357 for (int i
= 0; i
< numSplits
; ++i
) {
358 ret
.add(new EmptySplit());
364 public RecordReader
<LongWritable
, LongWritable
> createRecordReader(InputSplit split
,
365 TaskAttemptContext context
)
366 throws IOException
, InterruptedException
{
367 int taskId
= context
.getTaskAttemptID().getTaskID().getId();
368 int numMapTasks
= context
.getConfiguration().getInt(NUM_MAPS_KEY
, NUM_MAPS
);
369 int numIterations
= context
.getConfiguration().getInt(NUM_IMPORT_ROUNDS_KEY
, NUM_IMPORT_ROUNDS
);
370 int iteration
= context
.getConfiguration().getInt(ROUND_NUM_KEY
, 0);
372 taskId
= taskId
+ iteration
* numMapTasks
;
373 numMapTasks
= numMapTasks
* numIterations
;
375 long chainId
= Math
.abs(new Random().nextLong());
376 chainId
= chainId
- (chainId
% numMapTasks
) + taskId
; // ensure that chainId is unique per task and across iterations
377 LongWritable
[] keys
= new LongWritable
[] {new LongWritable(chainId
)};
379 return new FixedRecordReader
<>(keys
, keys
);
384 * Mapper that creates a linked list of KeyValues.
386 * Each map task generates one linked list.
387 * All lists start on row key 0L.
388 * All lists should be CHAIN_LENGTH long.
390 public static class LinkedListCreationMapper
391 extends Mapper
<LongWritable
, LongWritable
, ImmutableBytesWritable
, KeyValue
> {
393 private Random rand
= new Random();
396 protected void map(LongWritable key
, LongWritable value
, Context context
)
397 throws IOException
, InterruptedException
{
398 long chainId
= value
.get();
399 LOG
.info("Starting mapper with chainId:" + chainId
);
401 byte[] chainIdArray
= Bytes
.toBytes(chainId
);
404 long chainLength
= context
.getConfiguration().getLong(CHAIN_LENGTH_KEY
, CHAIN_LENGTH
);
405 long nextRow
= getNextRow(0, chainLength
);
407 for (long i
= 0; i
< chainLength
; i
++) {
408 byte[] rk
= Bytes
.toBytes(currentRow
);
410 // Next link in the chain.
411 KeyValue linkKv
= new KeyValue(rk
, CHAIN_FAM
, chainIdArray
, Bytes
.toBytes(nextRow
));
412 // What link in the chain this is.
413 KeyValue sortKv
= new KeyValue(rk
, SORT_FAM
, chainIdArray
, Bytes
.toBytes(i
));
414 // Added data so that large stores are created.
415 KeyValue dataKv
= new KeyValue(rk
, DATA_FAM
, chainIdArray
,
416 Bytes
.toBytes(RandomStringUtils
.randomAlphabetic(50))
419 // Emit the key values.
420 context
.write(new ImmutableBytesWritable(rk
), linkKv
);
421 context
.write(new ImmutableBytesWritable(rk
), sortKv
);
422 context
.write(new ImmutableBytesWritable(rk
), dataKv
);
423 // Move to the next row.
424 currentRow
= nextRow
;
425 nextRow
= getNextRow(i
+1, chainLength
);
429 /** Returns a unique row id within this chain for this index */
430 private long getNextRow(long index
, long chainLength
) {
431 long nextRow
= Math
.abs(rand
.nextLong());
432 // use significant bits from the random number, but pad with index to ensure it is unique
433 // this also ensures that we do not reuse row = 0
434 // row collisions from multiple mappers are fine, since we guarantee unique chainIds
435 nextRow
= nextRow
- (nextRow
% chainLength
) + index
;
441 * Writable class used as the key to group links in the linked list.
443 * Used as the key emited from a pass over the table.
445 public static class LinkKey
implements WritableComparable
<LinkKey
> {
447 private Long chainId
;
449 public Long
getOrder() {
453 public Long
getChainId() {
461 public LinkKey(long chainId
, long order
) {
462 this.chainId
= chainId
;
467 public int compareTo(LinkKey linkKey
) {
468 int res
= getChainId().compareTo(linkKey
.getChainId());
470 res
= getOrder().compareTo(linkKey
.getOrder());
476 public void write(DataOutput dataOutput
) throws IOException
{
477 WritableUtils
.writeVLong(dataOutput
, chainId
);
478 WritableUtils
.writeVLong(dataOutput
, order
);
482 public void readFields(DataInput dataInput
) throws IOException
{
483 chainId
= WritableUtils
.readVLong(dataInput
);
484 order
= WritableUtils
.readVLong(dataInput
);
489 * Writable used as the value emitted from a pass over the hbase table.
491 public static class LinkChain
implements WritableComparable
<LinkChain
> {
493 public Long
getNext() {
497 public Long
getRk() {
501 public LinkChain() {}
503 public LinkChain(Long rk
, Long next
) {
512 public int compareTo(LinkChain linkChain
) {
513 int res
= getRk().compareTo(linkChain
.getRk());
515 res
= getNext().compareTo(linkChain
.getNext());
521 public void write(DataOutput dataOutput
) throws IOException
{
522 WritableUtils
.writeVLong(dataOutput
, rk
);
523 WritableUtils
.writeVLong(dataOutput
, next
);
527 public void readFields(DataInput dataInput
) throws IOException
{
528 rk
= WritableUtils
.readVLong(dataInput
);
529 next
= WritableUtils
.readVLong(dataInput
);
534 * Class to figure out what partition to send a link in the chain to. This is based upon
535 * the linkKey's ChainId.
537 public static class NaturalKeyPartitioner
extends Partitioner
<LinkKey
, LinkChain
> {
539 public int getPartition(LinkKey linkKey
,
542 int hash
= linkKey
.getChainId().hashCode();
543 return Math
.abs(hash
% numPartitions
);
548 * Comparator used to figure out if a linkKey should be grouped together. This is based upon the
551 public static class NaturalKeyGroupingComparator
extends WritableComparator
{
553 protected NaturalKeyGroupingComparator() {
554 super(LinkKey
.class, true);
558 public int compare(WritableComparable w1
, WritableComparable w2
) {
559 LinkKey k1
= (LinkKey
) w1
;
560 LinkKey k2
= (LinkKey
) w2
;
562 return k1
.getChainId().compareTo(k2
.getChainId());
567 * Comparator used to order linkKeys so that they are passed to a reducer in order. This is based
568 * upon linkKey ChainId and Order.
570 public static class CompositeKeyComparator
extends WritableComparator
{
572 protected CompositeKeyComparator() {
573 super(LinkKey
.class, true);
577 public int compare(WritableComparable w1
, WritableComparable w2
) {
578 LinkKey k1
= (LinkKey
) w1
;
579 LinkKey k2
= (LinkKey
) w2
;
581 return k1
.compareTo(k2
);
586 * Mapper to pass over the table.
588 * For every row there could be multiple chains that landed on this row. So emit a linkKey
589 * and value for each.
591 public static class LinkedListCheckingMapper
extends TableMapper
<LinkKey
, LinkChain
> {
593 protected void map(ImmutableBytesWritable key
, Result value
, Context context
)
594 throws IOException
, InterruptedException
{
595 long longRk
= Bytes
.toLong(value
.getRow());
597 for (Map
.Entry
<byte[], byte[]> entry
: value
.getFamilyMap(CHAIN_FAM
).entrySet()) {
598 long chainId
= Bytes
.toLong(entry
.getKey());
599 long next
= Bytes
.toLong(entry
.getValue());
600 Cell c
= value
.getColumnCells(SORT_FAM
, entry
.getKey()).get(0);
601 long order
= Bytes
.toLong(CellUtil
.cloneValue(c
));
602 context
.write(new LinkKey(chainId
, order
), new LinkChain(longRk
, next
));
608 * Class that does the actual checking of the links.
610 * All links in the chain should be grouped and sorted when sent to this class. Then the chain
611 * will be traversed making sure that no link is missing and that the chain is the correct length.
613 * This will throw an exception if anything is not correct. That causes the job to fail if any
616 public static class LinkedListCheckingReducer
617 extends Reducer
<LinkKey
, LinkChain
, NullWritable
, NullWritable
> {
619 protected void reduce(LinkKey key
, Iterable
<LinkChain
> values
, Context context
)
620 throws java
.io
.IOException
, java
.lang
.InterruptedException
{
625 for (LinkChain lc
: values
) {
628 if (lc
.getRk() != 0L) {
629 String msg
= "Chains should all start at rk 0, but read rk " + lc
.getRk()
630 + ". Chain:" + key
.chainId
+ ", order:" + key
.order
;
631 logError(msg
, context
);
632 throw new RuntimeException(msg
);
636 if (next
!= lc
.getRk()) {
637 String msg
= "Missing a link in the chain. Prev rk " + prev
+ " was, expecting "
638 + next
+ " but got " + lc
.getRk() + ". Chain:" + key
.chainId
639 + ", order:" + key
.order
;
640 logError(msg
, context
);
641 throw new RuntimeException(msg
);
649 int expectedChainLen
= context
.getConfiguration().getInt(CHAIN_LENGTH_KEY
, CHAIN_LENGTH
);
650 if (count
!= expectedChainLen
) {
651 String msg
= "Chain wasn't the correct length. Expected " + expectedChainLen
+ " got "
652 + count
+ ". Chain:" + key
.chainId
+ ", order:" + key
.order
;
653 logError(msg
, context
);
654 throw new RuntimeException(msg
);
658 private static void logError(String msg
, Context context
) throws IOException
{
659 TableName table
= getTableName(context
.getConfiguration());
661 LOG
.error("Failure in chain verification: " + msg
);
662 try (Connection connection
= ConnectionFactory
.createConnection(context
.getConfiguration());
663 Admin admin
= connection
.getAdmin()) {
664 LOG
.error("cluster status:\n" + admin
.getClusterStatus());
665 LOG
.error("table regions:\n"
666 + Joiner
.on("\n").join(admin
.getTableRegions(table
)));
671 private void runCheckWithRetry() throws IOException
, ClassNotFoundException
, InterruptedException
{
674 } catch (Throwable t
) {
675 LOG
.warn("Received " + StringUtils
.stringifyException(t
));
676 LOG
.warn("Running the check MR Job again to see whether an ephemeral problem or not");
678 throw t
; // we should still fail the test even if second retry succeeds
685 * After adding data to the table start a mr job to
686 * @throws IOException
687 * @throws ClassNotFoundException
688 * @throws InterruptedException
690 private void runCheck() throws IOException
, ClassNotFoundException
, InterruptedException
{
691 LOG
.info("Running check");
692 Configuration conf
= getConf();
693 String jobName
= getTablename() + "_check" + EnvironmentEdgeManager
.currentTime();
694 Path p
= util
.getDataTestDirOnTestFS(jobName
);
696 Job job
= new Job(conf
);
697 job
.setJarByClass(getClass());
698 job
.setJobName(jobName
);
700 job
.setPartitionerClass(NaturalKeyPartitioner
.class);
701 job
.setGroupingComparatorClass(NaturalKeyGroupingComparator
.class);
702 job
.setSortComparatorClass(CompositeKeyComparator
.class);
704 Scan scan
= new Scan();
705 scan
.addFamily(CHAIN_FAM
);
706 scan
.addFamily(SORT_FAM
);
707 scan
.setMaxVersions(1);
708 scan
.setCacheBlocks(false);
711 int replicaCount
= conf
.getInt(NUM_REPLICA_COUNT_KEY
, NUM_REPLICA_COUNT_DEFAULT
);
712 if (replicaCount
!= NUM_REPLICA_COUNT_DEFAULT
) {
713 scan
.setConsistency(Consistency
.TIMELINE
);
716 TableMapReduceUtil
.initTableMapperJob(
717 getTablename().getName(),
719 LinkedListCheckingMapper
.class,
725 job
.setReducerClass(LinkedListCheckingReducer
.class);
726 job
.setOutputKeyClass(NullWritable
.class);
727 job
.setOutputValueClass(NullWritable
.class);
729 FileOutputFormat
.setOutputPath(job
, p
);
731 assertEquals(true, job
.waitForCompletion(true));
734 util
.getTestFileSystem().delete(p
, true);
738 public void setUpCluster() throws Exception
{
739 util
= getTestingUtil(getConf());
740 util
.initializeCluster(1);
741 int replicaCount
= getConf().getInt(NUM_REPLICA_COUNT_KEY
, NUM_REPLICA_COUNT_DEFAULT
);
742 if (LOG
.isDebugEnabled() && replicaCount
!= NUM_REPLICA_COUNT_DEFAULT
) {
743 LOG
.debug("Region Replicas enabled: " + replicaCount
);
746 // Scale this up on a real cluster
747 if (util
.isDistributedCluster()) {
748 util
.getConfiguration().setIfUnset(NUM_MAPS_KEY
,
749 Integer
.toString(util
.getAdmin().getClusterStatus().getServersSize() * 10)
751 util
.getConfiguration().setIfUnset(NUM_IMPORT_ROUNDS_KEY
, "5");
753 util
.startMiniMapReduceCluster();
758 protected void addOptions() {
760 super.addOptNoArg(OPT_CHECK
, "Run check only");
761 super.addOptNoArg(OPT_LOAD
, "Run load only");
765 protected void processOptions(CommandLine cmd
) {
766 super.processOptions(cmd
);
767 check
= cmd
.hasOption(OPT_CHECK
);
768 load
= cmd
.hasOption(OPT_LOAD
);
772 public int runTestFromCommandLine() throws Exception
{
776 installSlowingCoproc();
785 public TableName
getTablename() {
786 return getTableName(getConf());
789 public static TableName
getTableName(Configuration conf
) {
790 return TableName
.valueOf(conf
.get(TABLE_NAME_KEY
, TABLE_NAME
));
794 protected Set
<String
> getColumnFamilies() {
795 return Sets
.newHashSet(Bytes
.toString(CHAIN_FAM
) , Bytes
.toString(DATA_FAM
),
796 Bytes
.toString(SORT_FAM
));
799 public static void main(String
[] args
) throws Exception
{
800 Configuration conf
= HBaseConfiguration
.create();
801 IntegrationTestingUtility
.setUseDistributedCluster(conf
);
802 int status
= ToolRunner
.run(conf
, new IntegrationTestBulkLoad(), args
);