Revert "HBASE-15572 Adding optional timestamp semantics to HBase-Spark (Weiqing Yang)"
[hbase.git] / hbase-spark / src / main / scala / org / apache / hadoop / hbase / spark / HBaseContext.scala
blobc16d45d077487abf37e241bbe16cc3ac1fbf6a77
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 package org.apache.hadoop.hbase.spark
20 import java.net.InetSocketAddress
21 import java.util
22 import java.util.UUID
23 import javax.management.openmbean.KeyAlreadyExistsException
25 import org.apache.hadoop.hbase.fs.HFileSystem
26 import org.apache.hadoop.hbase._
27 import org.apache.hadoop.hbase.io.compress.Compression
28 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
29 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
30 import org.apache.hadoop.hbase.io.hfile.{CacheConfig, HFileContextBuilder, HFileWriterImpl}
31 import org.apache.hadoop.hbase.regionserver.{HStore, StoreFile, BloomType}
32 import org.apache.hadoop.hbase.util.Bytes
33 import org.apache.hadoop.mapred.JobConf
34 import org.apache.spark.broadcast.Broadcast
35 import org.apache.spark.deploy.SparkHadoopUtil
36 import org.apache.spark.rdd.RDD
37 import org.apache.hadoop.conf.Configuration
38 import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
39 import org.apache.hadoop.hbase.client._
40 import scala.reflect.ClassTag
41 import org.apache.spark.{Logging, SerializableWritable, SparkContext}
42 import org.apache.hadoop.hbase.mapreduce.{TableMapReduceUtil,
43 TableInputFormat, IdentityTableMapper}
44 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
45 import org.apache.hadoop.mapreduce.Job
46 import org.apache.spark.streaming.dstream.DStream
47 import java.io._
48 import org.apache.hadoop.security.UserGroupInformation
49 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
50 import org.apache.hadoop.fs.{Path, FileSystem}
51 import scala.collection.mutable
53 /**
54 * HBaseContext is a façade for HBase operations
55 * like bulk put, get, increment, delete, and scan
57 * HBaseContext will take the responsibilities
58 * of disseminating the configuration information
59 * to the working and managing the life cycle of HConnections.
61 class HBaseContext(@transient sc: SparkContext,
62 @transient val config: Configuration,
63 val tmpHdfsConfgFile: String = null)
64 extends Serializable with Logging {
66 @transient var credentials = SparkHadoopUtil.get.getCurrentUserCredentials()
67 @transient var tmpHdfsConfiguration:Configuration = config
68 @transient var appliedCredentials = false
69 @transient val job = Job.getInstance(config)
70 TableMapReduceUtil.initCredentials(job)
71 val broadcastedConf = sc.broadcast(new SerializableWritable(config))
72 val credentialsConf = sc.broadcast(new SerializableWritable(job.getCredentials))
74 LatestHBaseContextCache.latest = this
76 if (tmpHdfsConfgFile != null && config != null) {
77 val fs = FileSystem.newInstance(config)
78 val tmpPath = new Path(tmpHdfsConfgFile)
79 if (!fs.exists(tmpPath)) {
80 val outputStream = fs.create(tmpPath)
81 config.write(outputStream)
82 outputStream.close()
83 } else {
84 logWarning("tmpHdfsConfigDir " + tmpHdfsConfgFile + " exist!!")
88 /**
89 * A simple enrichment of the traditional Spark RDD foreachPartition.
90 * This function differs from the original in that it offers the
91 * developer access to a already connected HConnection object
93 * Note: Do not close the HConnection object. All HConnection
94 * management is handled outside this method
96 * @param rdd Original RDD with data to iterate over
97 * @param f Function to be given a iterator to iterate through
98 * the RDD values and a HConnection object to interact
99 * with HBase
101 def foreachPartition[T](rdd: RDD[T],
102 f: (Iterator[T], Connection) => Unit):Unit = {
103 rdd.foreachPartition(
104 it => hbaseForeachPartition(broadcastedConf, it, f))
108 * A simple enrichment of the traditional Spark Streaming dStream foreach
109 * This function differs from the original in that it offers the
110 * developer access to a already connected HConnection object
112 * Note: Do not close the HConnection object. All HConnection
113 * management is handled outside this method
115 * @param dstream Original DStream with data to iterate over
116 * @param f Function to be given a iterator to iterate through
117 * the DStream values and a HConnection object to
118 * interact with HBase
120 def foreachPartition[T](dstream: DStream[T],
121 f: (Iterator[T], Connection) => Unit):Unit = {
122 dstream.foreachRDD((rdd, time) => {
123 foreachPartition(rdd, f)
128 * A simple enrichment of the traditional Spark RDD mapPartition.
129 * This function differs from the original in that it offers the
130 * developer access to a already connected HConnection object
132 * Note: Do not close the HConnection object. All HConnection
133 * management is handled outside this method
135 * @param rdd Original RDD with data to iterate over
136 * @param mp Function to be given a iterator to iterate through
137 * the RDD values and a HConnection object to interact
138 * with HBase
139 * @return Returns a new RDD generated by the user definition
140 * function just like normal mapPartition
142 def mapPartitions[T, R: ClassTag](rdd: RDD[T],
143 mp: (Iterator[T], Connection) => Iterator[R]): RDD[R] = {
145 rdd.mapPartitions[R](it => hbaseMapPartition[T, R](broadcastedConf,
147 mp))
152 * A simple enrichment of the traditional Spark Streaming DStream
153 * foreachPartition.
155 * This function differs from the original in that it offers the
156 * developer access to a already connected HConnection object
158 * Note: Do not close the HConnection object. All HConnection
159 * management is handled outside this method
161 * Note: Make sure to partition correctly to avoid memory issue when
162 * getting data from HBase
164 * @param dstream Original DStream with data to iterate over
165 * @param f Function to be given a iterator to iterate through
166 * the DStream values and a HConnection object to
167 * interact with HBase
168 * @return Returns a new DStream generated by the user
169 * definition function just like normal mapPartition
171 def streamForeachPartition[T](dstream: DStream[T],
172 f: (Iterator[T], Connection) => Unit): Unit = {
174 dstream.foreachRDD(rdd => this.foreachPartition(rdd, f))
178 * A simple enrichment of the traditional Spark Streaming DStream
179 * mapPartition.
181 * This function differs from the original in that it offers the
182 * developer access to a already connected HConnection object
184 * Note: Do not close the HConnection object. All HConnection
185 * management is handled outside this method
187 * Note: Make sure to partition correctly to avoid memory issue when
188 * getting data from HBase
190 * @param dstream Original DStream with data to iterate over
191 * @param f Function to be given a iterator to iterate through
192 * the DStream values and a HConnection object to
193 * interact with HBase
194 * @return Returns a new DStream generated by the user
195 * definition function just like normal mapPartition
197 def streamMapPartitions[T, U: ClassTag](dstream: DStream[T],
198 f: (Iterator[T], Connection) => Iterator[U]):
199 DStream[U] = {
200 dstream.mapPartitions(it => hbaseMapPartition[T, U](
201 broadcastedConf,
207 * A simple abstraction over the HBaseContext.foreachPartition method.
209 * It allow addition support for a user to take RDD
210 * and generate puts and send them to HBase.
211 * The complexity of managing the HConnection is
212 * removed from the developer
214 * @param rdd Original RDD with data to iterate over
215 * @param tableName The name of the table to put into
216 * @param f Function to convert a value in the RDD to a HBase Put
218 def bulkPut[T](rdd: RDD[T], tableName: TableName, f: (T) => Put) {
220 val tName = tableName.getName
221 rdd.foreachPartition(
222 it => hbaseForeachPartition[T](
223 broadcastedConf,
225 (iterator, connection) => {
226 val m = connection.getBufferedMutator(TableName.valueOf(tName))
227 iterator.foreach(T => m.mutate(f(T)))
228 m.flush()
229 m.close()
233 def applyCreds[T] (){
234 credentials = SparkHadoopUtil.get.getCurrentUserCredentials()
236 logDebug("appliedCredentials:" + appliedCredentials + ",credentials:" + credentials)
238 if (!appliedCredentials && credentials != null) {
239 appliedCredentials = true
241 @transient val ugi = UserGroupInformation.getCurrentUser
242 ugi.addCredentials(credentials)
243 // specify that this is a proxy user
244 ugi.setAuthenticationMethod(AuthenticationMethod.PROXY)
246 ugi.addCredentials(credentialsConf.value.value)
251 * A simple abstraction over the HBaseContext.streamMapPartition method.
253 * It allow addition support for a user to take a DStream and
254 * generate puts and send them to HBase.
256 * The complexity of managing the HConnection is
257 * removed from the developer
259 * @param dstream Original DStream with data to iterate over
260 * @param tableName The name of the table to put into
261 * @param f Function to convert a value in
262 * the DStream to a HBase Put
264 def streamBulkPut[T](dstream: DStream[T],
265 tableName: TableName,
266 f: (T) => Put) = {
267 val tName = tableName.getName
268 dstream.foreachRDD((rdd, time) => {
269 bulkPut(rdd, TableName.valueOf(tName), f)
274 * A simple abstraction over the HBaseContext.foreachPartition method.
276 * It allow addition support for a user to take a RDD and generate delete
277 * and send them to HBase. The complexity of managing the HConnection is
278 * removed from the developer
280 * @param rdd Original RDD with data to iterate over
281 * @param tableName The name of the table to delete from
282 * @param f Function to convert a value in the RDD to a
283 * HBase Deletes
284 * @param batchSize The number of delete to batch before sending to HBase
286 def bulkDelete[T](rdd: RDD[T], tableName: TableName,
287 f: (T) => Delete, batchSize: Integer) {
288 bulkMutation(rdd, tableName, f, batchSize)
292 * A simple abstraction over the HBaseContext.streamBulkMutation method.
294 * It allow addition support for a user to take a DStream and
295 * generate Delete and send them to HBase.
297 * The complexity of managing the HConnection is
298 * removed from the developer
300 * @param dstream Original DStream with data to iterate over
301 * @param tableName The name of the table to delete from
302 * @param f function to convert a value in the DStream to a
303 * HBase Delete
304 * @param batchSize The number of deletes to batch before sending to HBase
306 def streamBulkDelete[T](dstream: DStream[T],
307 tableName: TableName,
308 f: (T) => Delete,
309 batchSize: Integer) = {
310 streamBulkMutation(dstream, tableName, f, batchSize)
314 * Under lining function to support all bulk mutations
316 * May be opened up if requested
318 private def bulkMutation[T](rdd: RDD[T], tableName: TableName,
319 f: (T) => Mutation, batchSize: Integer) {
321 val tName = tableName.getName
322 rdd.foreachPartition(
323 it => hbaseForeachPartition[T](
324 broadcastedConf,
326 (iterator, connection) => {
327 val table = connection.getTable(TableName.valueOf(tName))
328 val mutationList = new java.util.ArrayList[Mutation]
329 iterator.foreach(T => {
330 mutationList.add(f(T))
331 if (mutationList.size >= batchSize) {
332 table.batch(mutationList, null)
333 mutationList.clear()
336 if (mutationList.size() > 0) {
337 table.batch(mutationList, null)
338 mutationList.clear()
340 table.close()
345 * Under lining function to support all bulk streaming mutations
347 * May be opened up if requested
349 private def streamBulkMutation[T](dstream: DStream[T],
350 tableName: TableName,
351 f: (T) => Mutation,
352 batchSize: Integer) = {
353 val tName = tableName.getName
354 dstream.foreachRDD((rdd, time) => {
355 bulkMutation(rdd, TableName.valueOf(tName), f, batchSize)
360 * A simple abstraction over the HBaseContext.mapPartition method.
362 * It allow addition support for a user to take a RDD and generates a
363 * new RDD based on Gets and the results they bring back from HBase
365 * @param rdd Original RDD with data to iterate over
366 * @param tableName The name of the table to get from
367 * @param makeGet function to convert a value in the RDD to a
368 * HBase Get
369 * @param convertResult This will convert the HBase Result object to
370 * what ever the user wants to put in the resulting
371 * RDD
372 * return new RDD that is created by the Get to HBase
374 def bulkGet[T, U: ClassTag](tableName: TableName,
375 batchSize: Integer,
376 rdd: RDD[T],
377 makeGet: (T) => Get,
378 convertResult: (Result) => U): RDD[U] = {
380 val getMapPartition = new GetMapPartition(tableName,
381 batchSize,
382 makeGet,
383 convertResult)
385 rdd.mapPartitions[U](it =>
386 hbaseMapPartition[T, U](
387 broadcastedConf,
389 getMapPartition.run))
393 * A simple abstraction over the HBaseContext.streamMap method.
395 * It allow addition support for a user to take a DStream and
396 * generates a new DStream based on Gets and the results
397 * they bring back from HBase
399 * @param tableName The name of the table to get from
400 * @param batchSize The number of Gets to be sent in a single batch
401 * @param dStream Original DStream with data to iterate over
402 * @param makeGet Function to convert a value in the DStream to a
403 * HBase Get
404 * @param convertResult This will convert the HBase Result object to
405 * what ever the user wants to put in the resulting
406 * DStream
407 * @return A new DStream that is created by the Get to HBase
409 def streamBulkGet[T, U: ClassTag](tableName: TableName,
410 batchSize: Integer,
411 dStream: DStream[T],
412 makeGet: (T) => Get,
413 convertResult: (Result) => U): DStream[U] = {
415 val getMapPartition = new GetMapPartition(tableName,
416 batchSize,
417 makeGet,
418 convertResult)
420 dStream.mapPartitions[U](it => hbaseMapPartition[T, U](
421 broadcastedConf,
423 getMapPartition.run))
427 * This function will use the native HBase TableInputFormat with the
428 * given scan object to generate a new RDD
430 * @param tableName the name of the table to scan
431 * @param scan the HBase scan object to use to read data from HBase
432 * @param f function to convert a Result object from HBase into
433 * what the user wants in the final generated RDD
434 * @return new RDD with results from scan
436 def hbaseRDD[U: ClassTag](tableName: TableName, scan: Scan,
437 f: ((ImmutableBytesWritable, Result)) => U): RDD[U] = {
439 val job: Job = Job.getInstance(getConf(broadcastedConf))
441 TableMapReduceUtil.initCredentials(job)
442 TableMapReduceUtil.initTableMapperJob(tableName, scan,
443 classOf[IdentityTableMapper], null, null, job)
445 val jconf = new JobConf(job.getConfiguration)
446 SparkHadoopUtil.get.addCredentials(jconf)
447 new NewHBaseRDD(sc,
448 classOf[TableInputFormat],
449 classOf[ImmutableBytesWritable],
450 classOf[Result],
451 job.getConfiguration,
452 this).map(f)
456 * A overloaded version of HBaseContext hbaseRDD that defines the
457 * type of the resulting RDD
459 * @param tableName the name of the table to scan
460 * @param scans the HBase scan object to use to read data from HBase
461 * @return New RDD with results from scan
464 def hbaseRDD(tableName: TableName, scans: Scan):
465 RDD[(ImmutableBytesWritable, Result)] = {
467 hbaseRDD[(ImmutableBytesWritable, Result)](
468 tableName,
469 scans,
470 (r: (ImmutableBytesWritable, Result)) => r)
474 * underlining wrapper all foreach functions in HBaseContext
476 private def hbaseForeachPartition[T](configBroadcast:
477 Broadcast[SerializableWritable[Configuration]],
478 it: Iterator[T],
479 f: (Iterator[T], Connection) => Unit) = {
481 val config = getConf(configBroadcast)
483 applyCreds
484 // specify that this is a proxy user
485 val connection = ConnectionFactory.createConnection(config)
486 f(it, connection)
487 connection.close()
490 private def getConf(configBroadcast: Broadcast[SerializableWritable[Configuration]]):
491 Configuration = {
493 if (tmpHdfsConfiguration == null && tmpHdfsConfgFile != null) {
494 val fs = FileSystem.newInstance(SparkHadoopUtil.get.conf)
495 val inputStream = fs.open(new Path(tmpHdfsConfgFile))
496 tmpHdfsConfiguration = new Configuration(false)
497 tmpHdfsConfiguration.readFields(inputStream)
498 inputStream.close()
501 if (tmpHdfsConfiguration == null) {
502 try {
503 tmpHdfsConfiguration = configBroadcast.value.value
504 } catch {
505 case ex: Exception => logError("Unable to getConfig from broadcast", ex)
508 tmpHdfsConfiguration
512 * underlining wrapper all mapPartition functions in HBaseContext
515 private def hbaseMapPartition[K, U](
516 configBroadcast:
517 Broadcast[SerializableWritable[Configuration]],
518 it: Iterator[K],
519 mp: (Iterator[K], Connection) =>
520 Iterator[U]): Iterator[U] = {
522 val config = getConf(configBroadcast)
523 applyCreds
525 val connection = ConnectionFactory.createConnection(config)
526 val res = mp(it, connection)
527 connection.close()
533 * underlining wrapper all get mapPartition functions in HBaseContext
535 private class GetMapPartition[T, U](tableName: TableName,
536 batchSize: Integer,
537 makeGet: (T) => Get,
538 convertResult: (Result) => U)
539 extends Serializable {
541 val tName = tableName.getName
543 def run(iterator: Iterator[T], connection: Connection): Iterator[U] = {
544 val table = connection.getTable(TableName.valueOf(tName))
546 val gets = new java.util.ArrayList[Get]()
547 var res = List[U]()
549 while (iterator.hasNext) {
550 gets.add(makeGet(iterator.next()))
552 if (gets.size() == batchSize) {
553 val results = table.get(gets)
554 res = res ++ results.map(convertResult)
555 gets.clear()
558 if (gets.size() > 0) {
559 val results = table.get(gets)
560 res = res ++ results.map(convertResult)
561 gets.clear()
563 table.close()
564 res.iterator
569 * Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef].
571 * This method is used to keep ClassTags out of the external Java API, as
572 * the Java compiler cannot produce them automatically. While this
573 * ClassTag-faking does please the compiler, it can cause problems at runtime
574 * if the Scala API relies on ClassTags for correctness.
576 * Often, though, a ClassTag[AnyRef] will not lead to incorrect behavior,
577 * just worse performance or security issues.
578 * For instance, an Array of AnyRef can hold any type T, but may lose primitive
579 * specialization.
581 private[spark]
582 def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
585 * Spark Implementation of HBase Bulk load for wide rows or when
586 * values are not already combined at the time of the map process
588 * This will take the content from an existing RDD then sort and shuffle
589 * it with respect to region splits. The result of that sort and shuffle
590 * will be written to HFiles.
592 * After this function is executed the user will have to call
593 * LoadIncrementalHFiles.doBulkLoad(...) to move the files into HBase
595 * Also note this version of bulk load is different from past versions in
596 * that it includes the qualifier as part of the sort process. The
597 * reason for this is to be able to support rows will very large number
598 * of columns.
600 * @param rdd The RDD we are bulk loading from
601 * @param tableName The HBase table we are loading into
602 * @param flatMap A flapMap function that will make every
603 * row in the RDD
604 * into N cells for the bulk load
605 * @param stagingDir The location on the FileSystem to bulk load into
606 * @param familyHFileWriteOptionsMap Options that will define how the HFile for a
607 * column family is written
608 * @param compactionExclude Compaction excluded for the HFiles
609 * @param maxSize Max size for the HFiles before they roll
610 * @tparam T The Type of values in the original RDD
612 def bulkLoad[T](rdd:RDD[T],
613 tableName: TableName,
614 flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte])],
615 stagingDir:String,
616 familyHFileWriteOptionsMap:
617 util.Map[Array[Byte], FamilyHFileWriteOptions] =
618 new util.HashMap[Array[Byte], FamilyHFileWriteOptions],
619 compactionExclude: Boolean = false,
620 maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):
621 Unit = {
622 val conn = ConnectionFactory.createConnection(config)
623 val regionLocator = conn.getRegionLocator(tableName)
624 val startKeys = regionLocator.getStartKeys
625 val defaultCompressionStr = config.get("hfile.compression",
626 Compression.Algorithm.NONE.getName)
627 val hfileCompression = HFileWriterImpl
628 .compressionByName(defaultCompressionStr)
629 val nowTimeStamp = System.currentTimeMillis()
630 val tableRawName = tableName.getName
632 val familyHFileWriteOptionsMapInternal =
633 new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions]
635 val entrySetIt = familyHFileWriteOptionsMap.entrySet().iterator()
637 while (entrySetIt.hasNext) {
638 val entry = entrySetIt.next()
639 familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(entry.getKey), entry.getValue)
642 val regionSplitPartitioner =
643 new BulkLoadPartitioner(startKeys)
645 //This is where all the magic happens
646 //Here we are going to do the following things
647 // 1. FlapMap every row in the RDD into key column value tuples
648 // 2. Then we are going to repartition sort and shuffle
649 // 3. Finally we are going to write out our HFiles
650 rdd.flatMap( r => flatMap(r)).
651 repartitionAndSortWithinPartitions(regionSplitPartitioner).
652 hbaseForeachPartition(this, (it, conn) => {
654 val conf = broadcastedConf.value.value
655 val fs = FileSystem.get(conf)
656 val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength]
657 var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY
658 var rollOverRequested = false
659 val localTableName = TableName.valueOf(tableRawName)
661 //Here is where we finally iterate through the data in this partition of the
662 //RDD that has been sorted and partitioned
663 it.foreach{ case (keyFamilyQualifier, cellValue:Array[Byte]) =>
665 val wl = writeValueToHFile(keyFamilyQualifier.rowKey,
666 keyFamilyQualifier.family,
667 keyFamilyQualifier.qualifier,
668 cellValue,
669 nowTimeStamp,
671 conn,
672 localTableName,
673 conf,
674 familyHFileWriteOptionsMapInternal,
675 hfileCompression,
676 writerMap,
677 stagingDir)
679 rollOverRequested = rollOverRequested || wl.written > maxSize
681 //This will only roll if we have at least one column family file that is
682 //bigger then maxSize and we have finished a given row key
683 if (rollOverRequested && Bytes.compareTo(previousRow, keyFamilyQualifier.rowKey) != 0) {
684 rollWriters(fs, writerMap,
685 regionSplitPartitioner,
686 previousRow,
687 compactionExclude)
688 rollOverRequested = false
691 previousRow = keyFamilyQualifier.rowKey
693 //We have finished all the data so lets close up the writers
694 rollWriters(fs, writerMap,
695 regionSplitPartitioner,
696 previousRow,
697 compactionExclude)
698 rollOverRequested = false
703 * Spark Implementation of HBase Bulk load for short rows some where less then
704 * a 1000 columns. This bulk load should be faster for tables will thinner
705 * rows then the other spark implementation of bulk load that puts only one
706 * value into a record going into a shuffle
708 * This will take the content from an existing RDD then sort and shuffle
709 * it with respect to region splits. The result of that sort and shuffle
710 * will be written to HFiles.
712 * After this function is executed the user will have to call
713 * LoadIncrementalHFiles.doBulkLoad(...) to move the files into HBase
715 * In this implementation, only the rowKey is given to the shuffle as the key
716 * and all the columns are already linked to the RowKey before the shuffle
717 * stage. The sorting of the qualifier is done in memory out side of the
718 * shuffle stage
720 * Also make sure that incoming RDDs only have one record for every row key.
722 * @param rdd The RDD we are bulk loading from
723 * @param tableName The HBase table we are loading into
724 * @param mapFunction A function that will convert the RDD records to
725 * the key value format used for the shuffle to prep
726 * for writing to the bulk loaded HFiles
727 * @param stagingDir The location on the FileSystem to bulk load into
728 * @param familyHFileWriteOptionsMap Options that will define how the HFile for a
729 * column family is written
730 * @param compactionExclude Compaction excluded for the HFiles
731 * @param maxSize Max size for the HFiles before they roll
732 * @tparam T The Type of values in the original RDD
734 def bulkLoadThinRows[T](rdd:RDD[T],
735 tableName: TableName,
736 mapFunction: (T) =>
737 (ByteArrayWrapper, FamiliesQualifiersValues),
738 stagingDir:String,
739 familyHFileWriteOptionsMap:
740 util.Map[Array[Byte], FamilyHFileWriteOptions] =
741 new util.HashMap[Array[Byte], FamilyHFileWriteOptions],
742 compactionExclude: Boolean = false,
743 maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):
744 Unit = {
745 val conn = ConnectionFactory.createConnection(config)
746 val regionLocator = conn.getRegionLocator(tableName)
747 val startKeys = regionLocator.getStartKeys
748 val defaultCompressionStr = config.get("hfile.compression",
749 Compression.Algorithm.NONE.getName)
750 val defaultCompression = HFileWriterImpl
751 .compressionByName(defaultCompressionStr)
752 val nowTimeStamp = System.currentTimeMillis()
753 val tableRawName = tableName.getName
755 val familyHFileWriteOptionsMapInternal =
756 new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions]
758 val entrySetIt = familyHFileWriteOptionsMap.entrySet().iterator()
760 while (entrySetIt.hasNext) {
761 val entry = entrySetIt.next()
762 familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(entry.getKey), entry.getValue)
765 val regionSplitPartitioner =
766 new BulkLoadPartitioner(startKeys)
768 //This is where all the magic happens
769 //Here we are going to do the following things
770 // 1. FlapMap every row in the RDD into key column value tuples
771 // 2. Then we are going to repartition sort and shuffle
772 // 3. Finally we are going to write out our HFiles
773 rdd.map( r => mapFunction(r)).
774 repartitionAndSortWithinPartitions(regionSplitPartitioner).
775 hbaseForeachPartition(this, (it, conn) => {
777 val conf = broadcastedConf.value.value
778 val fs = FileSystem.get(conf)
779 val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength]
780 var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY
781 var rollOverRequested = false
782 val localTableName = TableName.valueOf(tableRawName)
784 //Here is where we finally iterate through the data in this partition of the
785 //RDD that has been sorted and partitioned
786 it.foreach{ case (rowKey:ByteArrayWrapper,
787 familiesQualifiersValues:FamiliesQualifiersValues) =>
790 if (Bytes.compareTo(previousRow, rowKey.value) == 0) {
791 throw new KeyAlreadyExistsException("The following key was sent to the " +
792 "HFile load more then one: " + Bytes.toString(previousRow))
795 //The family map is a tree map so the families will be sorted
796 val familyIt = familiesQualifiersValues.familyMap.entrySet().iterator()
797 while (familyIt.hasNext) {
798 val familyEntry = familyIt.next()
800 val family = familyEntry.getKey.value
802 val qualifierIt = familyEntry.getValue.entrySet().iterator()
804 //The qualifier map is a tree map so the families will be sorted
805 while (qualifierIt.hasNext) {
807 val qualifierEntry = qualifierIt.next()
808 val qualifier = qualifierEntry.getKey
809 val cellValue = qualifierEntry.getValue
811 writeValueToHFile(rowKey.value,
812 family,
813 qualifier.value, // qualifier
814 cellValue, // value
815 nowTimeStamp,
817 conn,
818 localTableName,
819 conf,
820 familyHFileWriteOptionsMapInternal,
821 defaultCompression,
822 writerMap,
823 stagingDir)
825 previousRow = rowKey.value
828 writerMap.values.foreach( wl => {
829 rollOverRequested = rollOverRequested || wl.written > maxSize
831 //This will only roll if we have at least one column family file that is
832 //bigger then maxSize and we have finished a given row key
833 if (rollOverRequested) {
834 rollWriters(fs, writerMap,
835 regionSplitPartitioner,
836 previousRow,
837 compactionExclude)
838 rollOverRequested = false
844 //This will get a writer for the column family
845 //If there is no writer for a given column family then
846 //it will get created here.
847 //We have finished all the data so lets close up the writers
848 rollWriters(fs, writerMap,
849 regionSplitPartitioner,
850 previousRow,
851 compactionExclude)
852 rollOverRequested = false
857 * This will return a new HFile writer when requested
859 * @param family column family
860 * @param conf configuration to connect to HBase
861 * @param favoredNodes nodes that we would like to write too
862 * @param fs FileSystem object where we will be writing the HFiles to
863 * @return WriterLength object
865 private def getNewHFileWriter(family: Array[Byte], conf: Configuration,
866 favoredNodes: Array[InetSocketAddress],
867 fs:FileSystem,
868 familydir:Path,
869 familyHFileWriteOptionsMapInternal:
870 util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions],
871 defaultCompression:Compression.Algorithm): WriterLength = {
874 var familyOptions = familyHFileWriteOptionsMapInternal.get(new ByteArrayWrapper(family))
876 if (familyOptions == null) {
877 familyOptions = new FamilyHFileWriteOptions(defaultCompression.toString,
878 BloomType.NONE.toString, HConstants.DEFAULT_BLOCKSIZE, DataBlockEncoding.NONE.toString)
879 familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(family), familyOptions)
882 val tempConf = new Configuration(conf)
883 tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f)
884 val contextBuilder = new HFileContextBuilder()
885 .withCompression(Algorithm.valueOf(familyOptions.compression))
886 .withChecksumType(HStore.getChecksumType(conf))
887 .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
888 .withBlockSize(familyOptions.blockSize)
889 contextBuilder.withDataBlockEncoding(DataBlockEncoding.
890 valueOf(familyOptions.dataBlockEncoding))
891 val hFileContext = contextBuilder.build()
893 //Add a '_' to the file name because this is a unfinished file. A rename will happen
894 // to remove the '_' when the file is closed.
895 new WriterLength(0,
896 new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
897 .withBloomType(BloomType.valueOf(familyOptions.bloomType))
898 .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext)
899 .withFilePath(new Path(familydir, "_" + UUID.randomUUID.toString.replaceAll("-", "")))
900 .withFavoredNodes(favoredNodes).build())
905 * Encompasses the logic to write a value to an HFile
907 * @param rowKey The RowKey for the record
908 * @param family HBase column family for the record
909 * @param qualifier HBase column qualifier for the record
910 * @param cellValue HBase cell value
911 * @param nowTimeStamp The cell time stamp
912 * @param fs Connection to the FileSystem for the HFile
913 * @param conn Connection to HBaes
914 * @param tableName HBase TableName object
915 * @param conf Configuration to be used when making a new HFile
916 * @param familyHFileWriteOptionsMapInternal Extra configs for the HFile
917 * @param hfileCompression The compression codec for the new HFile
918 * @param writerMap HashMap of existing writers and their offsets
919 * @param stagingDir The staging directory on the FileSystem to store
920 * the HFiles
921 * @return The writer for the given HFile that was writen
922 * too
924 private def writeValueToHFile(rowKey: Array[Byte],
925 family: Array[Byte],
926 qualifier: Array[Byte],
927 cellValue:Array[Byte],
928 nowTimeStamp: Long,
929 fs: FileSystem,
930 conn: Connection,
931 tableName: TableName,
932 conf: Configuration,
933 familyHFileWriteOptionsMapInternal:
934 util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions],
935 hfileCompression:Compression.Algorithm,
936 writerMap:mutable.HashMap[ByteArrayWrapper, WriterLength],
937 stagingDir: String
938 ): WriterLength = {
940 val wl = writerMap.getOrElseUpdate(new ByteArrayWrapper(family), {
941 val familyDir = new Path(stagingDir, Bytes.toString(family))
943 fs.mkdirs(familyDir)
945 val loc:HRegionLocation = {
946 try {
947 val locator =
948 conn.getRegionLocator(tableName)
949 locator.getRegionLocation(rowKey)
950 } catch {
951 case e: Throwable =>
952 logWarning("there's something wrong when locating rowkey: " +
953 Bytes.toString(rowKey))
954 null
957 if (null == loc) {
958 if (log.isTraceEnabled) {
959 logTrace("failed to get region location, so use default writer: " +
960 Bytes.toString(rowKey))
962 getNewHFileWriter(family = family,
963 conf = conf,
964 favoredNodes = null,
965 fs = fs,
966 familydir = familyDir,
967 familyHFileWriteOptionsMapInternal,
968 hfileCompression)
969 } else {
970 if (log.isDebugEnabled) {
971 logDebug("first rowkey: [" + Bytes.toString(rowKey) + "]")
973 val initialIsa =
974 new InetSocketAddress(loc.getHostname, loc.getPort)
975 if (initialIsa.isUnresolved) {
976 if (log.isTraceEnabled) {
977 logTrace("failed to resolve bind address: " + loc.getHostname + ":"
978 + loc.getPort + ", so use default writer")
980 getNewHFileWriter(family,
981 conf,
982 null,
984 familyDir,
985 familyHFileWriteOptionsMapInternal,
986 hfileCompression)
987 } else {
988 if(log.isDebugEnabled) {
989 logDebug("use favored nodes writer: " + initialIsa.getHostString)
991 getNewHFileWriter(family,
992 conf,
993 Array[InetSocketAddress](initialIsa),
995 familyDir,
996 familyHFileWriteOptionsMapInternal,
997 hfileCompression)
1002 val keyValue =new KeyValue(rowKey,
1003 family,
1004 qualifier,
1005 nowTimeStamp,cellValue)
1007 wl.writer.append(keyValue)
1008 wl.written += keyValue.getLength
1014 * This will roll all Writers
1015 * @param fs Hadoop FileSystem object
1016 * @param writerMap HashMap that contains all the writers
1017 * @param regionSplitPartitioner The partitioner with knowledge of how the
1018 * Region's are split by row key
1019 * @param previousRow The last row to fill the HFile ending range metadata
1020 * @param compactionExclude The exclude compaction metadata flag for the HFile
1022 private def rollWriters(fs:FileSystem,
1023 writerMap:mutable.HashMap[ByteArrayWrapper, WriterLength],
1024 regionSplitPartitioner: BulkLoadPartitioner,
1025 previousRow: Array[Byte],
1026 compactionExclude: Boolean): Unit = {
1027 writerMap.values.foreach( wl => {
1028 if (wl.writer != null) {
1029 logDebug("Writer=" + wl.writer.getPath +
1030 (if (wl.written == 0) "" else ", wrote=" + wl.written))
1031 closeHFileWriter(fs, wl.writer,
1032 regionSplitPartitioner,
1033 previousRow,
1034 compactionExclude)
1037 writerMap.clear()
1042 * Function to close an HFile
1043 * @param fs Hadoop FileSystem object
1044 * @param w HFile Writer
1045 * @param regionSplitPartitioner The partitioner with knowledge of how the
1046 * Region's are split by row key
1047 * @param previousRow The last row to fill the HFile ending range metadata
1048 * @param compactionExclude The exclude compaction metadata flag for the HFile
1050 private def closeHFileWriter(fs:FileSystem,
1051 w: StoreFile.Writer,
1052 regionSplitPartitioner: BulkLoadPartitioner,
1053 previousRow: Array[Byte],
1054 compactionExclude: Boolean): Unit = {
1055 if (w != null) {
1056 w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
1057 Bytes.toBytes(System.currentTimeMillis()))
1058 w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
1059 Bytes.toBytes(regionSplitPartitioner.getPartition(previousRow)))
1060 w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
1061 Bytes.toBytes(true))
1062 w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
1063 Bytes.toBytes(compactionExclude))
1064 w.appendTrackedTimestampsToMetadata()
1065 w.close()
1067 val srcPath = w.getPath
1069 //In the new path you will see that we are using substring. This is to
1070 // remove the '_' character in front of the HFile name. '_' is a character
1071 // that will tell HBase that this file shouldn't be included in the bulk load
1072 // This feature is to protect for unfinished HFiles being submitted to HBase
1073 val newPath = new Path(w.getPath.getParent, w.getPath.getName.substring(1))
1074 if (!fs.rename(srcPath, newPath)) {
1075 throw new IOException("Unable to rename '" + srcPath +
1076 "' to " + newPath)
1082 * This is a wrapper class around StoreFile.Writer. The reason for the
1083 * wrapper is to keep the length of the file along side the writer
1085 * @param written The writer to be wrapped
1086 * @param writer The number of bytes written to the writer
1088 class WriterLength(var written:Long, val writer:StoreFile.Writer)
1091 object LatestHBaseContextCache {
1092 var latest:HBaseContext = null