2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.spark
20 import java
.net
.InetSocketAddress
23 import javax
.management
.openmbean
.KeyAlreadyExistsException
25 import org
.apache
.hadoop
.hbase
.fs
.HFileSystem
26 import org
.apache
.hadoop
.hbase
._
27 import org
.apache
.hadoop
.hbase
.io
.compress
.Compression
28 import org
.apache
.hadoop
.hbase
.io
.compress
.Compression
.Algorithm
29 import org
.apache
.hadoop
.hbase
.io
.encoding
.DataBlockEncoding
30 import org
.apache
.hadoop
.hbase
.io
.hfile
.{CacheConfig
, HFileContextBuilder
, HFileWriterImpl
}
31 import org
.apache
.hadoop
.hbase
.regionserver
.{HStore
, StoreFile
, BloomType
}
32 import org
.apache
.hadoop
.hbase
.util
.Bytes
33 import org
.apache
.hadoop
.mapred
.JobConf
34 import org
.apache
.spark
.broadcast
.Broadcast
35 import org
.apache
.spark
.deploy
.SparkHadoopUtil
36 import org
.apache
.spark
.rdd
.RDD
37 import org
.apache
.hadoop
.conf
.Configuration
38 import org
.apache
.hadoop
.hbase
.spark
.HBaseRDDFunctions
._
39 import org
.apache
.hadoop
.hbase
.client
._
40 import scala
.reflect
.ClassTag
41 import org
.apache
.spark
.{Logging
, SerializableWritable
, SparkContext
}
42 import org
.apache
.hadoop
.hbase
.mapreduce
.{TableMapReduceUtil
,
43 TableInputFormat
, IdentityTableMapper
}
44 import org
.apache
.hadoop
.hbase
.io
.ImmutableBytesWritable
45 import org
.apache
.hadoop
.mapreduce
.Job
46 import org
.apache
.spark
.streaming
.dstream
.DStream
48 import org
.apache
.hadoop
.security
.UserGroupInformation
49 import org
.apache
.hadoop
.security
.UserGroupInformation
.AuthenticationMethod
50 import org
.apache
.hadoop
.fs
.{Path
, FileSystem
}
51 import scala
.collection
.mutable
54 * HBaseContext is a façade for HBase operations
55 * like bulk put, get, increment, delete, and scan
57 * HBaseContext will take the responsibilities
58 * of disseminating the configuration information
59 * to the working and managing the life cycle of HConnections.
61 class HBaseContext(@transient sc
: SparkContext
,
62 @transient val config
: Configuration
,
63 val tmpHdfsConfgFile
: String
= null)
64 extends Serializable
with Logging
{
66 @transient var credentials
= SparkHadoopUtil
.get
.getCurrentUserCredentials()
67 @transient var tmpHdfsConfiguration
:Configuration
= config
68 @transient var appliedCredentials
= false
69 @transient val job
= Job
.getInstance(config
)
70 TableMapReduceUtil
.initCredentials(job
)
71 val broadcastedConf
= sc
.broadcast(new SerializableWritable(config
))
72 val credentialsConf
= sc
.broadcast(new SerializableWritable(job
.getCredentials
))
74 LatestHBaseContextCache
.latest
= this
76 if (tmpHdfsConfgFile
!= null && config
!= null) {
77 val fs
= FileSystem
.newInstance(config
)
78 val tmpPath
= new Path(tmpHdfsConfgFile
)
79 if (!fs
.exists(tmpPath
)) {
80 val outputStream
= fs
.create(tmpPath
)
81 config
.write(outputStream
)
84 logWarning("tmpHdfsConfigDir " + tmpHdfsConfgFile
+ " exist!!")
89 * A simple enrichment of the traditional Spark RDD foreachPartition.
90 * This function differs from the original in that it offers the
91 * developer access to a already connected HConnection object
93 * Note: Do not close the HConnection object. All HConnection
94 * management is handled outside this method
96 * @param rdd Original RDD with data to iterate over
97 * @param f Function to be given a iterator to iterate through
98 * the RDD values and a HConnection object to interact
101 def foreachPartition
[T
](rdd
: RDD
[T
],
102 f
: (Iterator
[T
], Connection
) => Unit
):Unit
= {
103 rdd
.foreachPartition(
104 it
=> hbaseForeachPartition(broadcastedConf
, it
, f
))
108 * A simple enrichment of the traditional Spark Streaming dStream foreach
109 * This function differs from the original in that it offers the
110 * developer access to a already connected HConnection object
112 * Note: Do not close the HConnection object. All HConnection
113 * management is handled outside this method
115 * @param dstream Original DStream with data to iterate over
116 * @param f Function to be given a iterator to iterate through
117 * the DStream values and a HConnection object to
118 * interact with HBase
120 def foreachPartition
[T
](dstream
: DStream
[T
],
121 f
: (Iterator
[T
], Connection
) => Unit
):Unit
= {
122 dstream
.foreachRDD((rdd
, time
) => {
123 foreachPartition(rdd
, f
)
128 * A simple enrichment of the traditional Spark RDD mapPartition.
129 * This function differs from the original in that it offers the
130 * developer access to a already connected HConnection object
132 * Note: Do not close the HConnection object. All HConnection
133 * management is handled outside this method
135 * @param rdd Original RDD with data to iterate over
136 * @param mp Function to be given a iterator to iterate through
137 * the RDD values and a HConnection object to interact
139 * @return Returns a new RDD generated by the user definition
140 * function just like normal mapPartition
142 def mapPartitions
[T
, R
: ClassTag
](rdd
: RDD
[T
],
143 mp
: (Iterator
[T
], Connection
) => Iterator
[R
]): RDD
[R
] = {
145 rdd
.mapPartitions
[R
](it
=> hbaseMapPartition
[T
, R
](broadcastedConf
,
152 * A simple enrichment of the traditional Spark Streaming DStream
155 * This function differs from the original in that it offers the
156 * developer access to a already connected HConnection object
158 * Note: Do not close the HConnection object. All HConnection
159 * management is handled outside this method
161 * Note: Make sure to partition correctly to avoid memory issue when
162 * getting data from HBase
164 * @param dstream Original DStream with data to iterate over
165 * @param f Function to be given a iterator to iterate through
166 * the DStream values and a HConnection object to
167 * interact with HBase
168 * @return Returns a new DStream generated by the user
169 * definition function just like normal mapPartition
171 def streamForeachPartition
[T
](dstream
: DStream
[T
],
172 f
: (Iterator
[T
], Connection
) => Unit
): Unit
= {
174 dstream
.foreachRDD(rdd
=> this.foreachPartition(rdd
, f
))
178 * A simple enrichment of the traditional Spark Streaming DStream
181 * This function differs from the original in that it offers the
182 * developer access to a already connected HConnection object
184 * Note: Do not close the HConnection object. All HConnection
185 * management is handled outside this method
187 * Note: Make sure to partition correctly to avoid memory issue when
188 * getting data from HBase
190 * @param dstream Original DStream with data to iterate over
191 * @param f Function to be given a iterator to iterate through
192 * the DStream values and a HConnection object to
193 * interact with HBase
194 * @return Returns a new DStream generated by the user
195 * definition function just like normal mapPartition
197 def streamMapPartitions
[T
, U
: ClassTag
](dstream
: DStream
[T
],
198 f
: (Iterator
[T
], Connection
) => Iterator
[U
]):
200 dstream
.mapPartitions(it
=> hbaseMapPartition
[T
, U
](
207 * A simple abstraction over the HBaseContext.foreachPartition method.
209 * It allow addition support for a user to take RDD
210 * and generate puts and send them to HBase.
211 * The complexity of managing the HConnection is
212 * removed from the developer
214 * @param rdd Original RDD with data to iterate over
215 * @param tableName The name of the table to put into
216 * @param f Function to convert a value in the RDD to a HBase Put
218 def bulkPut
[T
](rdd
: RDD
[T
], tableName
: TableName
, f
: (T
) => Put
) {
220 val tName
= tableName
.getName
221 rdd
.foreachPartition(
222 it
=> hbaseForeachPartition
[T
](
225 (iterator
, connection
) => {
226 val m
= connection
.getBufferedMutator(TableName
.valueOf(tName
))
227 iterator
.foreach(T
=> m
.mutate(f(T
)))
233 def applyCreds
[T
] (){
234 credentials
= SparkHadoopUtil
.get
.getCurrentUserCredentials()
236 logDebug("appliedCredentials:" + appliedCredentials
+ ",credentials:" + credentials
)
238 if (!appliedCredentials
&& credentials
!= null) {
239 appliedCredentials
= true
241 @transient val ugi
= UserGroupInformation
.getCurrentUser
242 ugi
.addCredentials(credentials
)
243 // specify that this is a proxy user
244 ugi
.setAuthenticationMethod(AuthenticationMethod
.PROXY
)
246 ugi
.addCredentials(credentialsConf
.value
.value
)
251 * A simple abstraction over the HBaseContext.streamMapPartition method.
253 * It allow addition support for a user to take a DStream and
254 * generate puts and send them to HBase.
256 * The complexity of managing the HConnection is
257 * removed from the developer
259 * @param dstream Original DStream with data to iterate over
260 * @param tableName The name of the table to put into
261 * @param f Function to convert a value in
262 * the DStream to a HBase Put
264 def streamBulkPut
[T
](dstream
: DStream
[T
],
265 tableName
: TableName
,
267 val tName
= tableName
.getName
268 dstream
.foreachRDD((rdd
, time
) => {
269 bulkPut(rdd
, TableName
.valueOf(tName
), f
)
274 * A simple abstraction over the HBaseContext.foreachPartition method.
276 * It allow addition support for a user to take a RDD and generate delete
277 * and send them to HBase. The complexity of managing the HConnection is
278 * removed from the developer
280 * @param rdd Original RDD with data to iterate over
281 * @param tableName The name of the table to delete from
282 * @param f Function to convert a value in the RDD to a
284 * @param batchSize The number of delete to batch before sending to HBase
286 def bulkDelete
[T
](rdd
: RDD
[T
], tableName
: TableName
,
287 f
: (T
) => Delete
, batchSize
: Integer
) {
288 bulkMutation(rdd
, tableName
, f
, batchSize
)
292 * A simple abstraction over the HBaseContext.streamBulkMutation method.
294 * It allow addition support for a user to take a DStream and
295 * generate Delete and send them to HBase.
297 * The complexity of managing the HConnection is
298 * removed from the developer
300 * @param dstream Original DStream with data to iterate over
301 * @param tableName The name of the table to delete from
302 * @param f function to convert a value in the DStream to a
304 * @param batchSize The number of deletes to batch before sending to HBase
306 def streamBulkDelete
[T
](dstream
: DStream
[T
],
307 tableName
: TableName
,
309 batchSize
: Integer
) = {
310 streamBulkMutation(dstream
, tableName
, f
, batchSize
)
314 * Under lining function to support all bulk mutations
316 * May be opened up if requested
318 private def bulkMutation
[T
](rdd
: RDD
[T
], tableName
: TableName
,
319 f
: (T
) => Mutation
, batchSize
: Integer
) {
321 val tName
= tableName
.getName
322 rdd
.foreachPartition(
323 it
=> hbaseForeachPartition
[T
](
326 (iterator
, connection
) => {
327 val table
= connection
.getTable(TableName
.valueOf(tName
))
328 val mutationList
= new java
.util
.ArrayList
[Mutation
]
329 iterator
.foreach(T
=> {
330 mutationList
.add(f(T
))
331 if (mutationList
.size
>= batchSize
) {
332 table
.batch(mutationList
, null)
336 if (mutationList
.size() > 0) {
337 table
.batch(mutationList
, null)
345 * Under lining function to support all bulk streaming mutations
347 * May be opened up if requested
349 private def streamBulkMutation
[T
](dstream
: DStream
[T
],
350 tableName
: TableName
,
352 batchSize
: Integer
) = {
353 val tName
= tableName
.getName
354 dstream
.foreachRDD((rdd
, time
) => {
355 bulkMutation(rdd
, TableName
.valueOf(tName
), f
, batchSize
)
360 * A simple abstraction over the HBaseContext.mapPartition method.
362 * It allow addition support for a user to take a RDD and generates a
363 * new RDD based on Gets and the results they bring back from HBase
365 * @param rdd Original RDD with data to iterate over
366 * @param tableName The name of the table to get from
367 * @param makeGet function to convert a value in the RDD to a
369 * @param convertResult This will convert the HBase Result object to
370 * what ever the user wants to put in the resulting
372 * return new RDD that is created by the Get to HBase
374 def bulkGet
[T
, U
: ClassTag
](tableName
: TableName
,
378 convertResult
: (Result
) => U
): RDD
[U
] = {
380 val getMapPartition
= new GetMapPartition(tableName
,
385 rdd
.mapPartitions
[U
](it
=>
386 hbaseMapPartition
[T
, U
](
389 getMapPartition
.run
))
393 * A simple abstraction over the HBaseContext.streamMap method.
395 * It allow addition support for a user to take a DStream and
396 * generates a new DStream based on Gets and the results
397 * they bring back from HBase
399 * @param tableName The name of the table to get from
400 * @param batchSize The number of Gets to be sent in a single batch
401 * @param dStream Original DStream with data to iterate over
402 * @param makeGet Function to convert a value in the DStream to a
404 * @param convertResult This will convert the HBase Result object to
405 * what ever the user wants to put in the resulting
407 * @return A new DStream that is created by the Get to HBase
409 def streamBulkGet
[T
, U
: ClassTag
](tableName
: TableName
,
413 convertResult
: (Result
) => U
): DStream
[U
] = {
415 val getMapPartition
= new GetMapPartition(tableName
,
420 dStream
.mapPartitions
[U
](it
=> hbaseMapPartition
[T
, U
](
423 getMapPartition
.run
))
427 * This function will use the native HBase TableInputFormat with the
428 * given scan object to generate a new RDD
430 * @param tableName the name of the table to scan
431 * @param scan the HBase scan object to use to read data from HBase
432 * @param f function to convert a Result object from HBase into
433 * what the user wants in the final generated RDD
434 * @return new RDD with results from scan
436 def hbaseRDD
[U
: ClassTag
](tableName
: TableName
, scan
: Scan
,
437 f
: ((ImmutableBytesWritable
, Result
)) => U
): RDD
[U
] = {
439 val job
: Job
= Job
.getInstance(getConf(broadcastedConf
))
441 TableMapReduceUtil
.initCredentials(job
)
442 TableMapReduceUtil
.initTableMapperJob(tableName
, scan
,
443 classOf
[IdentityTableMapper
], null, null, job
)
445 val jconf
= new JobConf(job
.getConfiguration
)
446 SparkHadoopUtil
.get
.addCredentials(jconf
)
448 classOf
[TableInputFormat
],
449 classOf
[ImmutableBytesWritable
],
451 job
.getConfiguration
,
456 * A overloaded version of HBaseContext hbaseRDD that defines the
457 * type of the resulting RDD
459 * @param tableName the name of the table to scan
460 * @param scans the HBase scan object to use to read data from HBase
461 * @return New RDD with results from scan
464 def hbaseRDD(tableName
: TableName
, scans
: Scan
):
465 RDD
[(ImmutableBytesWritable
, Result
)] = {
467 hbaseRDD
[(ImmutableBytesWritable
, Result
)](
470 (r
: (ImmutableBytesWritable
, Result
)) => r
)
474 * underlining wrapper all foreach functions in HBaseContext
476 private def hbaseForeachPartition
[T
](configBroadcast
:
477 Broadcast
[SerializableWritable
[Configuration
]],
479 f
: (Iterator
[T
], Connection
) => Unit
) = {
481 val config
= getConf(configBroadcast
)
484 // specify that this is a proxy user
485 val connection
= ConnectionFactory
.createConnection(config
)
490 private def getConf(configBroadcast
: Broadcast
[SerializableWritable
[Configuration
]]):
493 if (tmpHdfsConfiguration
== null && tmpHdfsConfgFile
!= null) {
494 val fs
= FileSystem
.newInstance(SparkHadoopUtil
.get
.conf
)
495 val inputStream
= fs
.open(new Path(tmpHdfsConfgFile
))
496 tmpHdfsConfiguration
= new Configuration(false)
497 tmpHdfsConfiguration
.readFields(inputStream
)
501 if (tmpHdfsConfiguration
== null) {
503 tmpHdfsConfiguration
= configBroadcast
.value
.value
505 case ex
: Exception
=> logError("Unable to getConfig from broadcast", ex
)
512 * underlining wrapper all mapPartition functions in HBaseContext
515 private def hbaseMapPartition
[K
, U
](
517 Broadcast
[SerializableWritable
[Configuration
]],
519 mp
: (Iterator
[K
], Connection
) =>
520 Iterator
[U
]): Iterator
[U
] = {
522 val config
= getConf(configBroadcast
)
525 val connection
= ConnectionFactory
.createConnection(config
)
526 val res
= mp(it
, connection
)
533 * underlining wrapper all get mapPartition functions in HBaseContext
535 private class GetMapPartition
[T
, U
](tableName
: TableName
,
538 convertResult
: (Result
) => U
)
539 extends Serializable
{
541 val tName
= tableName
.getName
543 def run(iterator
: Iterator
[T
], connection
: Connection
): Iterator
[U
] = {
544 val table
= connection
.getTable(TableName
.valueOf(tName
))
546 val gets
= new java
.util
.ArrayList
[Get
]()
549 while (iterator
.hasNext
) {
550 gets
.add(makeGet(iterator
.next()))
552 if (gets
.size() == batchSize
) {
553 val results
= table
.get(gets
)
554 res
= res
++ results
.map(convertResult
)
558 if (gets
.size() > 0) {
559 val results
= table
.get(gets
)
560 res
= res
++ results
.map(convertResult
)
569 * Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef].
571 * This method is used to keep ClassTags out of the external Java API, as
572 * the Java compiler cannot produce them automatically. While this
573 * ClassTag-faking does please the compiler, it can cause problems at runtime
574 * if the Scala API relies on ClassTags for correctness.
576 * Often, though, a ClassTag[AnyRef] will not lead to incorrect behavior,
577 * just worse performance or security issues.
578 * For instance, an Array of AnyRef can hold any type T, but may lose primitive
582 def fakeClassTag
[T
]: ClassTag
[T
] = ClassTag
.AnyRef
.asInstanceOf
[ClassTag
[T
]]
585 * Spark Implementation of HBase Bulk load for wide rows or when
586 * values are not already combined at the time of the map process
588 * This will take the content from an existing RDD then sort and shuffle
589 * it with respect to region splits. The result of that sort and shuffle
590 * will be written to HFiles.
592 * After this function is executed the user will have to call
593 * LoadIncrementalHFiles.doBulkLoad(...) to move the files into HBase
595 * Also note this version of bulk load is different from past versions in
596 * that it includes the qualifier as part of the sort process. The
597 * reason for this is to be able to support rows will very large number
600 * @param rdd The RDD we are bulk loading from
601 * @param tableName The HBase table we are loading into
602 * @param flatMap A flapMap function that will make every
604 * into N cells for the bulk load
605 * @param stagingDir The location on the FileSystem to bulk load into
606 * @param familyHFileWriteOptionsMap Options that will define how the HFile for a
607 * column family is written
608 * @param compactionExclude Compaction excluded for the HFiles
609 * @param maxSize Max size for the HFiles before they roll
610 * @tparam T The Type of values in the original RDD
612 def bulkLoad
[T
](rdd
:RDD
[T
],
613 tableName
: TableName
,
614 flatMap
: (T
) => Iterator
[(KeyFamilyQualifier
, Array
[Byte
])],
616 familyHFileWriteOptionsMap
:
617 util
.Map
[Array
[Byte
], FamilyHFileWriteOptions
] =
618 new util
.HashMap
[Array
[Byte
], FamilyHFileWriteOptions
],
619 compactionExclude
: Boolean
= false,
620 maxSize
:Long
= HConstants
.DEFAULT_MAX_FILE_SIZE
):
622 val conn
= ConnectionFactory
.createConnection(config
)
623 val regionLocator
= conn
.getRegionLocator(tableName
)
624 val startKeys
= regionLocator
.getStartKeys
625 val defaultCompressionStr
= config
.get("hfile.compression",
626 Compression
.Algorithm
.NONE
.getName
)
627 val hfileCompression
= HFileWriterImpl
628 .compressionByName(defaultCompressionStr
)
629 val nowTimeStamp
= System
.currentTimeMillis()
630 val tableRawName
= tableName
.getName
632 val familyHFileWriteOptionsMapInternal
=
633 new util
.HashMap
[ByteArrayWrapper
, FamilyHFileWriteOptions
]
635 val entrySetIt
= familyHFileWriteOptionsMap
.entrySet().iterator()
637 while (entrySetIt
.hasNext
) {
638 val entry
= entrySetIt
.next()
639 familyHFileWriteOptionsMapInternal
.put(new ByteArrayWrapper(entry
.getKey
), entry
.getValue
)
642 val regionSplitPartitioner
=
643 new BulkLoadPartitioner(startKeys
)
645 //This is where all the magic happens
646 //Here we are going to do the following things
647 // 1. FlapMap every row in the RDD into key column value tuples
648 // 2. Then we are going to repartition sort and shuffle
649 // 3. Finally we are going to write out our HFiles
650 rdd
.flatMap( r
=> flatMap(r
)).
651 repartitionAndSortWithinPartitions(regionSplitPartitioner
).
652 hbaseForeachPartition(this, (it
, conn
) => {
654 val conf
= broadcastedConf
.value
.value
655 val fs
= FileSystem
.get(conf
)
656 val writerMap
= new mutable
.HashMap
[ByteArrayWrapper
, WriterLength
]
657 var previousRow
:Array
[Byte
] = HConstants
.EMPTY_BYTE_ARRAY
658 var rollOverRequested
= false
659 val localTableName
= TableName
.valueOf(tableRawName
)
661 //Here is where we finally iterate through the data in this partition of the
662 //RDD that has been sorted and partitioned
663 it
.foreach
{ case (keyFamilyQualifier
, cellValue
:Array
[Byte
]) =>
665 val wl
= writeValueToHFile(keyFamilyQualifier
.rowKey
,
666 keyFamilyQualifier
.family
,
667 keyFamilyQualifier
.qualifier
,
674 familyHFileWriteOptionsMapInternal
,
679 rollOverRequested
= rollOverRequested
|| wl
.written
> maxSize
681 //This will only roll if we have at least one column family file that is
682 //bigger then maxSize and we have finished a given row key
683 if (rollOverRequested
&& Bytes
.compareTo(previousRow
, keyFamilyQualifier
.rowKey
) != 0) {
684 rollWriters(fs
, writerMap
,
685 regionSplitPartitioner
,
688 rollOverRequested
= false
691 previousRow
= keyFamilyQualifier
.rowKey
693 //We have finished all the data so lets close up the writers
694 rollWriters(fs
, writerMap
,
695 regionSplitPartitioner
,
698 rollOverRequested
= false
703 * Spark Implementation of HBase Bulk load for short rows some where less then
704 * a 1000 columns. This bulk load should be faster for tables will thinner
705 * rows then the other spark implementation of bulk load that puts only one
706 * value into a record going into a shuffle
708 * This will take the content from an existing RDD then sort and shuffle
709 * it with respect to region splits. The result of that sort and shuffle
710 * will be written to HFiles.
712 * After this function is executed the user will have to call
713 * LoadIncrementalHFiles.doBulkLoad(...) to move the files into HBase
715 * In this implementation, only the rowKey is given to the shuffle as the key
716 * and all the columns are already linked to the RowKey before the shuffle
717 * stage. The sorting of the qualifier is done in memory out side of the
720 * Also make sure that incoming RDDs only have one record for every row key.
722 * @param rdd The RDD we are bulk loading from
723 * @param tableName The HBase table we are loading into
724 * @param mapFunction A function that will convert the RDD records to
725 * the key value format used for the shuffle to prep
726 * for writing to the bulk loaded HFiles
727 * @param stagingDir The location on the FileSystem to bulk load into
728 * @param familyHFileWriteOptionsMap Options that will define how the HFile for a
729 * column family is written
730 * @param compactionExclude Compaction excluded for the HFiles
731 * @param maxSize Max size for the HFiles before they roll
732 * @tparam T The Type of values in the original RDD
734 def bulkLoadThinRows
[T
](rdd
:RDD
[T
],
735 tableName
: TableName
,
737 (ByteArrayWrapper
, FamiliesQualifiersValues
),
739 familyHFileWriteOptionsMap
:
740 util
.Map
[Array
[Byte
], FamilyHFileWriteOptions
] =
741 new util
.HashMap
[Array
[Byte
], FamilyHFileWriteOptions
],
742 compactionExclude
: Boolean
= false,
743 maxSize
:Long
= HConstants
.DEFAULT_MAX_FILE_SIZE
):
745 val conn
= ConnectionFactory
.createConnection(config
)
746 val regionLocator
= conn
.getRegionLocator(tableName
)
747 val startKeys
= regionLocator
.getStartKeys
748 val defaultCompressionStr
= config
.get("hfile.compression",
749 Compression
.Algorithm
.NONE
.getName
)
750 val defaultCompression
= HFileWriterImpl
751 .compressionByName(defaultCompressionStr
)
752 val nowTimeStamp
= System
.currentTimeMillis()
753 val tableRawName
= tableName
.getName
755 val familyHFileWriteOptionsMapInternal
=
756 new util
.HashMap
[ByteArrayWrapper
, FamilyHFileWriteOptions
]
758 val entrySetIt
= familyHFileWriteOptionsMap
.entrySet().iterator()
760 while (entrySetIt
.hasNext
) {
761 val entry
= entrySetIt
.next()
762 familyHFileWriteOptionsMapInternal
.put(new ByteArrayWrapper(entry
.getKey
), entry
.getValue
)
765 val regionSplitPartitioner
=
766 new BulkLoadPartitioner(startKeys
)
768 //This is where all the magic happens
769 //Here we are going to do the following things
770 // 1. FlapMap every row in the RDD into key column value tuples
771 // 2. Then we are going to repartition sort and shuffle
772 // 3. Finally we are going to write out our HFiles
773 rdd
.map( r
=> mapFunction(r
)).
774 repartitionAndSortWithinPartitions(regionSplitPartitioner
).
775 hbaseForeachPartition(this, (it
, conn
) => {
777 val conf
= broadcastedConf
.value
.value
778 val fs
= FileSystem
.get(conf
)
779 val writerMap
= new mutable
.HashMap
[ByteArrayWrapper
, WriterLength
]
780 var previousRow
:Array
[Byte
] = HConstants
.EMPTY_BYTE_ARRAY
781 var rollOverRequested
= false
782 val localTableName
= TableName
.valueOf(tableRawName
)
784 //Here is where we finally iterate through the data in this partition of the
785 //RDD that has been sorted and partitioned
786 it
.foreach
{ case (rowKey
:ByteArrayWrapper
,
787 familiesQualifiersValues
:FamiliesQualifiersValues
) =>
790 if (Bytes
.compareTo(previousRow
, rowKey
.value
) == 0) {
791 throw new KeyAlreadyExistsException("The following key was sent to the " +
792 "HFile load more then one: " + Bytes
.toString(previousRow
))
795 //The family map is a tree map so the families will be sorted
796 val familyIt
= familiesQualifiersValues
.familyMap
.entrySet().iterator()
797 while (familyIt
.hasNext
) {
798 val familyEntry
= familyIt
.next()
800 val family
= familyEntry
.getKey
.value
802 val qualifierIt
= familyEntry
.getValue
.entrySet().iterator()
804 //The qualifier map is a tree map so the families will be sorted
805 while (qualifierIt
.hasNext
) {
807 val qualifierEntry
= qualifierIt
.next()
808 val qualifier
= qualifierEntry
.getKey
809 val cellValue
= qualifierEntry
.getValue
811 writeValueToHFile(rowKey
.value
,
813 qualifier
.value
, // qualifier
820 familyHFileWriteOptionsMapInternal
,
825 previousRow
= rowKey
.value
828 writerMap
.values
.foreach( wl
=> {
829 rollOverRequested
= rollOverRequested
|| wl
.written
> maxSize
831 //This will only roll if we have at least one column family file that is
832 //bigger then maxSize and we have finished a given row key
833 if (rollOverRequested
) {
834 rollWriters(fs
, writerMap
,
835 regionSplitPartitioner
,
838 rollOverRequested
= false
844 //This will get a writer for the column family
845 //If there is no writer for a given column family then
846 //it will get created here.
847 //We have finished all the data so lets close up the writers
848 rollWriters(fs
, writerMap
,
849 regionSplitPartitioner
,
852 rollOverRequested
= false
857 * This will return a new HFile writer when requested
859 * @param family column family
860 * @param conf configuration to connect to HBase
861 * @param favoredNodes nodes that we would like to write too
862 * @param fs FileSystem object where we will be writing the HFiles to
863 * @return WriterLength object
865 private def getNewHFileWriter(family
: Array
[Byte
], conf
: Configuration
,
866 favoredNodes
: Array
[InetSocketAddress
],
869 familyHFileWriteOptionsMapInternal
:
870 util
.HashMap
[ByteArrayWrapper
, FamilyHFileWriteOptions
],
871 defaultCompression
:Compression
.Algorithm
): WriterLength
= {
874 var familyOptions
= familyHFileWriteOptionsMapInternal
.get(new ByteArrayWrapper(family
))
876 if (familyOptions
== null) {
877 familyOptions
= new FamilyHFileWriteOptions(defaultCompression
.toString
,
878 BloomType
.NONE
.toString
, HConstants
.DEFAULT_BLOCKSIZE
, DataBlockEncoding
.NONE
.toString
)
879 familyHFileWriteOptionsMapInternal
.put(new ByteArrayWrapper(family
), familyOptions
)
882 val tempConf
= new Configuration(conf
)
883 tempConf
.setFloat(HConstants
.HFILE_BLOCK_CACHE_SIZE_KEY
, 0.0f
)
884 val contextBuilder
= new HFileContextBuilder()
885 .withCompression(Algorithm
.valueOf(familyOptions
.compression
))
886 .withChecksumType(HStore
.getChecksumType(conf
))
887 .withBytesPerCheckSum(HStore
.getBytesPerChecksum(conf
))
888 .withBlockSize(familyOptions
.blockSize
)
889 contextBuilder
.withDataBlockEncoding(DataBlockEncoding
.
890 valueOf(familyOptions
.dataBlockEncoding
))
891 val hFileContext
= contextBuilder
.build()
893 //Add a '_' to the file name because this is a unfinished file. A rename will happen
894 // to remove the '_' when the file is closed.
896 new StoreFile
.WriterBuilder(conf
, new CacheConfig(tempConf
), new HFileSystem(fs
))
897 .withBloomType(BloomType
.valueOf(familyOptions
.bloomType
))
898 .withComparator(CellComparator
.COMPARATOR
).withFileContext(hFileContext
)
899 .withFilePath(new Path(familydir
, "_" + UUID
.randomUUID
.toString
.replaceAll("-", "")))
900 .withFavoredNodes(favoredNodes
).build())
905 * Encompasses the logic to write a value to an HFile
907 * @param rowKey The RowKey for the record
908 * @param family HBase column family for the record
909 * @param qualifier HBase column qualifier for the record
910 * @param cellValue HBase cell value
911 * @param nowTimeStamp The cell time stamp
912 * @param fs Connection to the FileSystem for the HFile
913 * @param conn Connection to HBaes
914 * @param tableName HBase TableName object
915 * @param conf Configuration to be used when making a new HFile
916 * @param familyHFileWriteOptionsMapInternal Extra configs for the HFile
917 * @param hfileCompression The compression codec for the new HFile
918 * @param writerMap HashMap of existing writers and their offsets
919 * @param stagingDir The staging directory on the FileSystem to store
921 * @return The writer for the given HFile that was writen
924 private def writeValueToHFile(rowKey
: Array
[Byte
],
926 qualifier
: Array
[Byte
],
927 cellValue
:Array
[Byte
],
931 tableName
: TableName
,
933 familyHFileWriteOptionsMapInternal
:
934 util
.HashMap
[ByteArrayWrapper
, FamilyHFileWriteOptions
],
935 hfileCompression
:Compression
.Algorithm
,
936 writerMap
:mutable
.HashMap
[ByteArrayWrapper
, WriterLength
],
940 val wl
= writerMap
.getOrElseUpdate(new ByteArrayWrapper(family
), {
941 val familyDir
= new Path(stagingDir
, Bytes
.toString(family
))
945 val loc
:HRegionLocation
= {
948 conn
.getRegionLocator(tableName
)
949 locator
.getRegionLocation(rowKey
)
952 logWarning("there's something wrong when locating rowkey: " +
953 Bytes
.toString(rowKey
))
958 if (log
.isTraceEnabled
) {
959 logTrace("failed to get region location, so use default writer: " +
960 Bytes
.toString(rowKey
))
962 getNewHFileWriter(family
= family
,
966 familydir
= familyDir
,
967 familyHFileWriteOptionsMapInternal
,
970 if (log
.isDebugEnabled
) {
971 logDebug("first rowkey: [" + Bytes
.toString(rowKey
) + "]")
974 new InetSocketAddress(loc
.getHostname
, loc
.getPort
)
975 if (initialIsa
.isUnresolved
) {
976 if (log
.isTraceEnabled
) {
977 logTrace("failed to resolve bind address: " + loc
.getHostname
+ ":"
978 + loc
.getPort
+ ", so use default writer")
980 getNewHFileWriter(family
,
985 familyHFileWriteOptionsMapInternal
,
988 if(log
.isDebugEnabled
) {
989 logDebug("use favored nodes writer: " + initialIsa
.getHostString
)
991 getNewHFileWriter(family
,
993 Array
[InetSocketAddress
](initialIsa
),
996 familyHFileWriteOptionsMapInternal
,
1002 val keyValue
=new KeyValue(rowKey
,
1005 nowTimeStamp
,cellValue
)
1007 wl
.writer
.append(keyValue
)
1008 wl
.written
+= keyValue
.getLength
1014 * This will roll all Writers
1015 * @param fs Hadoop FileSystem object
1016 * @param writerMap HashMap that contains all the writers
1017 * @param regionSplitPartitioner The partitioner with knowledge of how the
1018 * Region's are split by row key
1019 * @param previousRow The last row to fill the HFile ending range metadata
1020 * @param compactionExclude The exclude compaction metadata flag for the HFile
1022 private def rollWriters(fs
:FileSystem
,
1023 writerMap
:mutable
.HashMap
[ByteArrayWrapper
, WriterLength
],
1024 regionSplitPartitioner
: BulkLoadPartitioner
,
1025 previousRow
: Array
[Byte
],
1026 compactionExclude
: Boolean
): Unit
= {
1027 writerMap
.values
.foreach( wl
=> {
1028 if (wl
.writer
!= null) {
1029 logDebug("Writer=" + wl
.writer
.getPath
+
1030 (if (wl
.written
== 0) "" else ", wrote=" + wl
.written
))
1031 closeHFileWriter(fs
, wl
.writer
,
1032 regionSplitPartitioner
,
1042 * Function to close an HFile
1043 * @param fs Hadoop FileSystem object
1044 * @param w HFile Writer
1045 * @param regionSplitPartitioner The partitioner with knowledge of how the
1046 * Region's are split by row key
1047 * @param previousRow The last row to fill the HFile ending range metadata
1048 * @param compactionExclude The exclude compaction metadata flag for the HFile
1050 private def closeHFileWriter(fs
:FileSystem
,
1051 w
: StoreFile
.Writer
,
1052 regionSplitPartitioner
: BulkLoadPartitioner
,
1053 previousRow
: Array
[Byte
],
1054 compactionExclude
: Boolean
): Unit
= {
1056 w
.appendFileInfo(StoreFile
.BULKLOAD_TIME_KEY
,
1057 Bytes
.toBytes(System
.currentTimeMillis()))
1058 w
.appendFileInfo(StoreFile
.BULKLOAD_TASK_KEY
,
1059 Bytes
.toBytes(regionSplitPartitioner
.getPartition(previousRow
)))
1060 w
.appendFileInfo(StoreFile
.MAJOR_COMPACTION_KEY
,
1061 Bytes
.toBytes(true))
1062 w
.appendFileInfo(StoreFile
.EXCLUDE_FROM_MINOR_COMPACTION_KEY
,
1063 Bytes
.toBytes(compactionExclude
))
1064 w
.appendTrackedTimestampsToMetadata()
1067 val srcPath
= w
.getPath
1069 //In the new path you will see that we are using substring. This is to
1070 // remove the '_' character in front of the HFile name. '_' is a character
1071 // that will tell HBase that this file shouldn't be included in the bulk load
1072 // This feature is to protect for unfinished HFiles being submitted to HBase
1073 val newPath
= new Path(w
.getPath
.getParent
, w
.getPath
.getName
.substring(1))
1074 if (!fs
.rename(srcPath
, newPath
)) {
1075 throw new IOException("Unable to rename '" + srcPath
+
1082 * This is a wrapper class around StoreFile.Writer. The reason for the
1083 * wrapper is to keep the length of the file along side the writer
1085 * @param written The writer to be wrapped
1086 * @param writer The number of bytes written to the writer
1088 class WriterLength(var written
:Long
, val writer
:StoreFile
.Writer
)
1091 object LatestHBaseContextCache
{
1092 var latest
:HBaseContext
= null