4 * Licensed to the Apache Software Foundation (ASF) under one
5 * or more contributor license agreements. See the NOTICE file
6 * distributed with this work for additional information
7 * regarding copyright ownership. The ASF licenses this file
8 * to you under the Apache License, Version 2.0 (the
9 * "License"); you may not use this file except in compliance
10 * with the License. You may obtain a copy of the License at
12 . . http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
30 link:https://spark.apache.org/[Apache Spark] is a software framework that is used
31 to process data in memory in a distributed manner, and is replacing MapReduce in
34 Spark itself is out of scope of this document, please refer to the Spark site for
35 more information on the Spark project and subprojects. This document will focus
36 on 4 main interaction points between Spark and HBase. Those interaction points are:
39 The ability to have an HBase Connection at any point in your Spark DAG.
41 The ability to have an HBase Connection at any point in your Spark Streaming
44 The ability to write directly to HBase HFiles for bulk insertion into HBase
46 The ability to write SparkSQL that draws on tables that are represented in HBase.
48 The following sections will walk through examples of all these interaction points.
52 This section discusses Spark HBase integration at the lowest and simplest levels.
53 All the other interaction points are built upon the concepts that will be described
56 At the root of all Spark and HBase integration is the HBaseContext. The HBaseContext
57 takes in HBase configurations and pushes them to the Spark executors. This allows
58 us to have an HBase Connection per Spark Executor in a static location.
60 For reference, Spark Executors can be on the same nodes as the Region Servers or
61 on different nodes, there is no dependence on co-location. Think of every Spark
62 Executor as a multi-threaded client application. This allows any Spark Tasks
63 running on the executors to access the shared Connection object.
65 .HBaseContext Usage Example
68 This example shows how HBaseContext can be used to do a `foreachPartition` on a RDD
73 val sc = new SparkContext("local", "test")
74 val config = new HBaseConfiguration()
78 val hbaseContext = new HBaseContext(sc, config)
80 rdd.hbaseForeachPartition(hbaseContext, (it, conn) => {
81 val bufferedMutator = conn.getBufferedMutator(TableName.valueOf("t1"))
82 it.foreach((putRecord) => {
83 . val put = new Put(putRecord._1)
84 . putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
85 . bufferedMutator.mutate(put)
87 bufferedMutator.flush()
88 bufferedMutator.close()
92 Here is the same example implemented in Java:
96 JavaSparkContext jsc = new JavaSparkContext(sparkConf);
99 List<byte[]> list = new ArrayList<>();
100 list.add(Bytes.toBytes("1"));
102 list.add(Bytes.toBytes("5"));
104 JavaRDD<byte[]> rdd = jsc.parallelize(list);
105 Configuration conf = HBaseConfiguration.create();
107 JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
109 hbaseContext.foreachPartition(rdd,
110 new VoidFunction<Tuple2<Iterator<byte[]>, Connection>>() {
111 public void call(Tuple2<Iterator<byte[]>, Connection> t)
113 Table table = t._2().getTable(TableName.valueOf(tableName));
114 BufferedMutator mutator = t._2().getBufferedMutator(TableName.valueOf(tableName));
115 while (t._1().hasNext()) {
116 byte[] b = t._1().next();
117 Result r = table.get(new Get(b));
119 mutator.mutate(new Put(b));
134 All functionality between Spark and HBase will be supported both in Scala and in
135 Java, with the exception of SparkSQL which will support any language that is
136 supported by Spark. For the remaining of this documentation we will focus on
139 The examples above illustrate how to do a foreachPartition with a connection. A
140 number of other Spark base functions are supported out of the box:
142 // tag::spark_base_functions[]
143 `bulkPut`:: For massively parallel sending of puts to HBase
144 `bulkDelete`:: For massively parallel sending of deletes to HBase
145 `bulkGet`:: For massively parallel sending of gets to HBase to create a new RDD
146 `mapPartition`:: To do a Spark Map function with a Connection object to allow full
148 `hbaseRDD`:: To simplify a distributed scan to create a RDD
149 // end::spark_base_functions[]
151 For examples of all these functionalities, see the
152 link:https://github.com/apache/hbase-connectors/tree/master/spark[hbase-spark integration]
153 in the link:https://github.com/apache/hbase-connectors[hbase-connectors] repository
154 (the hbase-spark connectors live outside hbase core in a related,
155 Apache HBase project maintained, associated repo).
158 https://spark.apache.org/streaming/[Spark Streaming] is a micro batching stream
159 processing framework built on top of Spark. HBase and Spark Streaming make great
160 companions in that HBase can help serve the following benefits alongside Spark
163 * A place to grab reference data or profile data on the fly
164 * A place to store counts or aggregates in a way that supports Spark Streaming's
165 promise of _only once processing_.
167 The link:https://github.com/apache/hbase-connectors/tree/master/spark[hbase-spark integration]
168 with Spark Streaming is similar to its normal Spark integration points, in that the following
169 commands are possible straight off a Spark Streaming DStream.
171 include::spark.adoc[tags=spark_base_functions]
173 .`bulkPut` Example with DStreams
176 Below is an example of bulkPut with DStreams. It is very close in feel to the RDD
181 val sc = new SparkContext("local", "test")
182 val config = new HBaseConfiguration()
184 val hbaseContext = new HBaseContext(sc, config)
185 val ssc = new StreamingContext(sc, Milliseconds(200))
190 val queue = mutable.Queue[RDD[(Array[Byte], Array[(Array[Byte],
191 Array[Byte], Array[Byte])])]]()
196 val dStream = ssc.queueStream(queue)
198 dStream.hbaseBulkPut(
200 TableName.valueOf(tableName),
202 val put = new Put(putRecord._1)
203 putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
208 There are three inputs to the `hbaseBulkPut` function.
209 The hbaseContext that carries the configuration broadcast information link
210 to the HBase Connections in the executor, the table name of the table we are
211 putting data into, and a function that will convert a record in the DStream
212 into an HBase Put object.
217 There are two options for bulk loading data into HBase with Spark. There is the
218 basic bulk load functionality that will work for cases where your rows have
219 millions of columns and cases where your columns are not consolidated and
220 partitioned before the map side of the Spark bulk load process.
222 There is also a thin record bulk load option with Spark. This second option is
223 designed for tables that have less then 10k columns per row. The advantage
224 of this second option is higher throughput and less over-all load on the Spark
227 Both implementations work more or less like the MapReduce bulk load process in
228 that a partitioner partitions the rowkeys based on region splits and
229 the row keys are sent to the reducers in order, so that HFiles can be written
230 out directly from the reduce phase.
232 In Spark terms, the bulk load will be implemented around a Spark
233 `repartitionAndSortWithinPartitions` followed by a Spark `foreachPartition`.
235 First lets look at an example of using the basic bulk load functionality
237 .Bulk Loading Example
240 The following example shows bulk loading with Spark.
244 val sc = new SparkContext("local", "test")
245 val config = new HBaseConfiguration()
247 val hbaseContext = new HBaseContext(sc, config)
249 val stagingFolder = ...
250 val rdd = sc.parallelize(Array(
252 (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
254 (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ...
256 rdd.hbaseBulkLoad(TableName.valueOf(tableName),
259 val family:Array[Byte] = t._2(0)._1
260 val qualifier = t._2(0)._2
261 val value = t._2(0)._3
263 val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)
265 Seq((keyFamilyQualifier, value)).iterator
267 stagingFolder.getPath)
269 val load = new LoadIncrementalHFiles(config)
270 load.doBulkLoad(new Path(stagingFolder.getPath),
271 conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))
275 The `hbaseBulkLoad` function takes three required parameters:
277 . The table name of the table we intend to bulk load too
279 . A function that will convert a record in the RDD to a tuple key value par. With
280 the tuple key being a KeyFamilyQualifer object and the value being the cell value.
281 The KeyFamilyQualifer object will hold the RowKey, Column Family, and Column Qualifier.
282 The shuffle will partition on the RowKey but will sort by all three values.
284 . The temporary path for the HFile to be written out too
286 Following the Spark bulk load command, use the HBase's LoadIncrementalHFiles object
287 to load the newly created HFiles into HBase.
289 .Additional Parameters for Bulk Loading with Spark
291 You can set the following attributes with additional parameter options on hbaseBulkLoad.
293 * Max file size of the HFiles
294 * A flag to exclude HFiles from compactions
295 * Column Family settings for compression, bloomType, blockSize, and dataBlockEncoding
297 .Using Additional Parameters
302 val sc = new SparkContext("local", "test")
303 val config = new HBaseConfiguration()
305 val hbaseContext = new HBaseContext(sc, config)
307 val stagingFolder = ...
308 val rdd = sc.parallelize(Array(
310 (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
312 (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ...
314 val familyHBaseWriterOptions = new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions]
315 val f1Options = new FamilyHFileWriteOptions("GZ", "ROW", 128, "PREFIX")
317 familyHBaseWriterOptions.put(Bytes.toBytes("columnFamily1"), f1Options)
319 rdd.hbaseBulkLoad(TableName.valueOf(tableName),
322 val family:Array[Byte] = t._2(0)._1
323 val qualifier = t._2(0)._2
324 val value = t._2(0)._3
326 val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)
328 Seq((keyFamilyQualifier, value)).iterator
330 stagingFolder.getPath,
331 familyHBaseWriterOptions,
332 compactionExclude = false,
333 HConstants.DEFAULT_MAX_FILE_SIZE)
335 val load = new LoadIncrementalHFiles(config)
336 load.doBulkLoad(new Path(stagingFolder.getPath),
337 conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))
341 Now lets look at how you would call the thin record bulk load implementation
343 .Using thin record bulk load
348 val sc = new SparkContext("local", "test")
349 val config = new HBaseConfiguration()
351 val hbaseContext = new HBaseContext(sc, config)
353 val stagingFolder = ...
354 val rdd = sc.parallelize(Array(
356 (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
358 (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ...
360 rdd.hbaseBulkLoadThinRows(hbaseContext,
361 TableName.valueOf(tableName),
365 val familyQualifiersValues = new FamiliesQualifiersValues
367 val family:Array[Byte] = f._1
369 val value:Array[Byte] = f._3
371 familyQualifiersValues +=(family, qualifier, value)
373 (new ByteArrayWrapper(Bytes.toBytes(rowKey)), familyQualifiersValues)
375 stagingFolder.getPath,
376 new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions],
377 compactionExclude = false,
380 val load = new LoadIncrementalHFiles(config)
381 load.doBulkLoad(new Path(stagingFolder.getPath),
382 conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))
386 Note that the big difference in using bulk load for thin rows is the function
387 returns a tuple with the first value being the row key and the second value
388 being an object of FamiliesQualifiersValues, which will contain all the
389 values for this row for all column families.
391 == SparkSQL/DataFrames
393 The link:https://github.com/apache/hbase-connectors/tree/master/spark[hbase-spark integration]
395 link:https://databricks.com/blog/2015/01/09/spark-sql-data-sources-api-unified-data-access-for-the-spark-platform.html[DataSource API]
396 (link:https://issues.apache.org/jira/browse/SPARK-3247[SPARK-3247])
397 introduced in Spark-1.2.0, which bridges the gap between simple HBase KV store and complex
398 relational SQL queries and enables users to perform complex data analytical work
399 on top of HBase using Spark. HBase Dataframe is a standard Spark Dataframe, and is able to
400 interact with any other data sources such as Hive, Orc, Parquet, JSON, etc.
401 The link:https://github.com/apache/hbase-connectors/tree/master/spark[hbase-spark integration]
402 applies critical techniques such as partition pruning, column pruning,
403 predicate pushdown and data locality.
406 link:https://github.com/apache/hbase-connectors/tree/master/spark[hbase-spark integration]
407 connector, users need to define the Catalog for the schema mapping
408 between HBase and Spark tables, prepare the data and populate the HBase table,
409 then load the HBase DataFrame. After that, users can do integrated query and access records
410 in HBase tables with SQL query. The following illustrates the basic procedure.
417 |"table":{"namespace":"default", "name":"table1"},
420 |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
421 |"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
422 |"col2":{"cf":"cf2", "col":"col2", "type":"double"},
423 |"col3":{"cf":"cf3", "col":"col3", "type":"float"},
424 |"col4":{"cf":"cf4", "col":"col4", "type":"int"},
425 |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
426 |"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
427 |"col7":{"cf":"cf7", "col":"col7", "type":"string"},
428 |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
433 Catalog defines a mapping between HBase and Spark tables. There are two critical parts of this catalog.
434 One is the rowkey definition and the other is the mapping between table column in Spark and
435 the column family and column qualifier in HBase. The above defines a schema for a HBase table
436 with name as table1, row key as key and a number of columns (col1 `-` col8). Note that the rowkey
437 also has to be defined in details as a column (col0), which has a specific cf (rowkey).
439 === Save the DataFrame
443 case class HBaseRecord(
456 def apply(i: Int, t: String): HBaseRecord = {
457 val s = s"""row${"%03d".format(i)}"""
470 val data = (0 to 255).map { i => HBaseRecord(i, "extra")}
472 sc.parallelize(data).toDF.write.options(
473 Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
474 .format("org.apache.hadoop.hbase.spark ")
478 `data` prepared by the user is a local Scala collection which has 256 HBaseRecord objects.
479 `sc.parallelize(data)` function distributes `data` to form an RDD. `toDF` returns a DataFrame.
480 `write` function returns a DataFrameWriter used to write the DataFrame to external storage
481 systems (e.g. HBase here). Given a DataFrame with specified schema `catalog`, `save` function
482 will create an HBase table with 5 regions and save the DataFrame inside.
484 === Load the DataFrame
488 def withCatalog(cat: String): DataFrame = {
491 .options(Map(HBaseTableCatalog.tableCatalog->cat))
492 .format("org.apache.hadoop.hbase.spark")
495 val df = withCatalog(catalog)
497 In ‘withCatalog’ function, sqlContext is a variable of SQLContext, which is the entry point
498 for working with structured data (rows and columns) in Spark.
499 `read` returns a DataFrameReader that can be used to read data in as a DataFrame.
500 `option` function adds input options for the underlying data source to the DataFrameReader,
501 and `format` function specifies the input data source format for the DataFrameReader.
502 The `load()` function loads input in as a DataFrame. The date frame `df` returned
503 by `withCatalog` function could be used to access HBase table, such as 4.4 and 4.5.
505 === Language Integrated Query
509 val s = df.filter(($"col0" <= "row050" && $"col0" > "row040") ||
510 $"col0" === "row005" ||
512 .select("col0", "col1", "col4")
515 DataFrame can do various operations, such as join, sort, select, filter, orderBy and so on.
516 `df.filter` above filters rows using the given SQL expression. `select` selects a set of columns:
517 `col0`, `col1` and `col4`.
523 df.registerTempTable("table1")
524 sqlContext.sql("select count(col1) from table1").show
527 `registerTempTable` registers `df` DataFrame as a temporary table using the table name `table1`.
528 The lifetime of this temporary table is tied to the SQLContext that was used to create `df`.
529 `sqlContext.sql` function allows the user to execute SQL queries.
533 .Query with different timestamps
535 In HBaseSparkConf, four parameters related to timestamp can be set. They are TIMESTAMP,
536 MIN_TIMESTAMP, MAX_TIMESTAMP and MAX_VERSIONS respectively. Users can query records with
537 different timestamps or time ranges with MIN_TIMESTAMP and MAX_TIMESTAMP. In the meantime,
538 use concrete value instead of tsSpecified and oldMs in the examples below.
540 The example below shows how to load df DataFrame with different timestamps.
541 tsSpecified is specified by the user.
542 HBaseTableCatalog defines the HBase and Relation relation schema.
543 writeCatalog defines catalog for the schema mapping.
547 val df = sqlContext.read
548 .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.TIMESTAMP -> tsSpecified.toString))
549 .format("org.apache.hadoop.hbase.spark")
553 The example below shows how to load df DataFrame with different time ranges.
554 oldMs is specified by the user.
558 val df = sqlContext.read
559 .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.MIN_TIMESTAMP -> "0",
560 HBaseSparkConf.MAX_TIMESTAMP -> oldMs.toString))
561 .format("org.apache.hadoop.hbase.spark")
564 After loading df DataFrame, users can query data.
568 df.registerTempTable("table")
569 sqlContext.sql("select count(col1) from table").show
575 The link:https://github.com/apache/hbase-connectors/tree/master/spark[hbase-spark integration]
576 connector supports different data formats like Avro, JSON, etc. The use case below
577 shows how spark supports Avro. Users can persist the Avro record into HBase directly. Internally,
578 the Avro schema is converted to a native Spark Catalyst data type automatically.
579 Note that both key-value parts in an HBase table can be defined in Avro format.
581 1) Define catalog for the schema mapping:
586 |"table":{"namespace":"default", "name":"Avrotable"},
589 |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
590 |"col1":{"cf":"cf1", "col":"col1", "type":"binary"}
595 `catalog` is a schema for a HBase table named `Avrotable`. row key as key and
596 one column col1. The rowkey also has to be defined in details as a column (col0),
597 which has a specific cf (rowkey).
603 object AvroHBaseRecord {
605 s"""{"namespace": "example.avro",
606 | "type": "record", "name": "User",
608 | {"name": "name", "type": "string"},
609 | {"name": "favorite_number", "type": ["int", "null"]},
610 | {"name": "favorite_color", "type": ["string", "null"]},
611 | {"name": "favorite_array", "type": {"type": "array", "items": "string"}},
612 | {"name": "favorite_map", "type": {"type": "map", "values": "int"}}
615 val avroSchema: Schema = {
616 val p = new Schema.Parser
617 p.parse(schemaString)
620 def apply(i: Int): AvroHBaseRecord = {
621 val user = new GenericData.Record(avroSchema);
622 user.put("name", s"name${"%03d".format(i)}")
623 user.put("favorite_number", i)
624 user.put("favorite_color", s"color${"%03d".format(i)}")
625 val favoriteArray = new GenericData.Array[String](2, avroSchema.getField("favorite_array").schema())
626 favoriteArray.add(s"number${i}")
627 favoriteArray.add(s"number${i+1}")
628 user.put("favorite_array", favoriteArray)
629 import collection.JavaConverters._
630 val favoriteMap = Map[String, Int](("key1" -> i), ("key2" -> (i+1))).asJava
631 user.put("favorite_map", favoriteMap)
632 val avroByte = AvroSedes.serialize(user, avroSchema)
633 AvroHBaseRecord(s"name${"%03d".format(i)}", avroByte)
637 val data = (0 to 255).map { i =>
642 `schemaString` is defined first, then it is parsed to get `avroSchema`. `avroSchema` is used to
643 generate `AvroHBaseRecord`. `data` prepared by users is a local Scala collection
644 which has 256 `AvroHBaseRecord` objects.
650 sc.parallelize(data).toDF.write.options(
651 Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
652 .format("org.apache.spark.sql.execution.datasources.hbase")
656 Given a data frame with specified schema `catalog`, above will create an HBase table with 5
657 regions and save the data frame inside.
659 4) Load the DataFrame
663 def avroCatalog = s"""{
664 |"table":{"namespace":"default", "name":"avrotable"},
667 |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
668 |"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
672 def withCatalog(cat: String): DataFrame = {
675 .options(Map("avroSchema" -> AvroHBaseRecord.schemaString, HBaseTableCatalog.tableCatalog -> avroCatalog))
676 .format("org.apache.spark.sql.execution.datasources.hbase")
679 val df = withCatalog(catalog)
682 In `withCatalog` function, `read` returns a DataFrameReader that can be used to read data in as a DataFrame.
683 The `option` function adds input options for the underlying data source to the DataFrameReader.
684 There are two options: one is to set `avroSchema` as `AvroHBaseRecord.schemaString`, and one is to
685 set `HBaseTableCatalog.tableCatalog` as `avroCatalog`. The `load()` function loads input in as a DataFrame.
686 The date frame `df` returned by `withCatalog` function could be used to access the HBase table.
692 df.registerTempTable("avrotable")
693 val c = sqlContext.sql("select count(1) from avrotable").
696 After loading df DataFrame, users can query data. registerTempTable registers df DataFrame
697 as a temporary table using the table name avrotable. `sqlContext.sql` function allows the
698 user to execute SQL queries.