HBASE-26921 Rewrite the counting cells part in TestMultiVersions (#4316)
[hbase.git] / src / main / asciidoc / _chapters / spark.adoc
blob207528f057d0b6cf1aba3f1206635ffc4aacb44b
1 ////
2 /**
3  *
4  * Licensed to the Apache Software Foundation (ASF) under one
5  * or more contributor license agreements. See the NOTICE file
6  * distributed with this work for additional information
7  * regarding copyright ownership. The ASF licenses this file
8  * to you under the Apache License, Version 2.0 (the
9  * "License"); you may not use this file except in compliance
10  * with the License. You may obtain a copy of the License at
11  *
12  . . http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 ////
22 [[spark]]
23 = HBase and Spark
24 :doctype: book
25 :numbered:
26 :toc: left
27 :icons: font
28 :experimental:
30 link:https://spark.apache.org/[Apache Spark] is a software framework that is used
31 to process data in memory in a distributed manner, and is replacing MapReduce in
32 many use cases.
34 Spark itself is out of scope of this document, please refer to the Spark site for
35 more information on the Spark project and subprojects. This document will focus
36 on 4 main interaction points between Spark and HBase. Those interaction points are:
38 Basic Spark::
39   The ability to have an HBase Connection at any point in your Spark DAG.
40 Spark Streaming::
41   The ability to have an HBase Connection at any point in your Spark Streaming
42   application.
43 Spark Bulk Load::
44   The ability to write directly to HBase HFiles for bulk insertion into HBase
45 SparkSQL/DataFrames::
46   The ability to write SparkSQL that draws on tables that are represented in HBase.
48 The following sections will walk through examples of all these interaction points.
50 == Basic Spark
52 This section discusses Spark HBase integration at the lowest and simplest levels.
53 All the other interaction points are built upon the concepts that will be described
54 here.
56 At the root of all Spark and HBase integration is the HBaseContext. The HBaseContext
57 takes in HBase configurations and pushes them to the Spark executors. This allows
58 us to have an HBase Connection per Spark Executor in a static location.
60 For reference, Spark Executors can be on the same nodes as the Region Servers or
61 on different nodes, there is no dependence on co-location. Think of every Spark
62 Executor as a multi-threaded client application. This allows any Spark Tasks
63 running on the executors to access the shared Connection object.
65 .HBaseContext Usage Example
66 ====
68 This example shows how HBaseContext can be used to do a `foreachPartition` on a RDD
69 in Scala:
71 [source, scala]
72 ----
73 val sc = new SparkContext("local", "test")
74 val config = new HBaseConfiguration()
76 ...
78 val hbaseContext = new HBaseContext(sc, config)
80 rdd.hbaseForeachPartition(hbaseContext, (it, conn) => {
81  val bufferedMutator = conn.getBufferedMutator(TableName.valueOf("t1"))
82  it.foreach((putRecord) => {
83 . val put = new Put(putRecord._1)
84 . putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
85 . bufferedMutator.mutate(put)
86  })
87  bufferedMutator.flush()
88  bufferedMutator.close()
90 ----
92 Here is the same example implemented in Java:
94 [source, java]
95 ----
96 JavaSparkContext jsc = new JavaSparkContext(sparkConf);
98 try {
99   List<byte[]> list = new ArrayList<>();
100   list.add(Bytes.toBytes("1"));
101   ...
102   list.add(Bytes.toBytes("5"));
104   JavaRDD<byte[]> rdd = jsc.parallelize(list);
105   Configuration conf = HBaseConfiguration.create();
107   JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
109   hbaseContext.foreachPartition(rdd,
110       new VoidFunction<Tuple2<Iterator<byte[]>, Connection>>() {
111    public void call(Tuple2<Iterator<byte[]>, Connection> t)
112         throws Exception {
113     Table table = t._2().getTable(TableName.valueOf(tableName));
114     BufferedMutator mutator = t._2().getBufferedMutator(TableName.valueOf(tableName));
115     while (t._1().hasNext()) {
116       byte[] b = t._1().next();
117       Result r = table.get(new Get(b));
118       if (r.getExists()) {
119        mutator.mutate(new Put(b));
120       }
121     }
123     mutator.flush();
124     mutator.close();
125     table.close();
126    }
127   });
128 } finally {
129   jsc.stop();
131 ----
132 ====
134 All functionality between Spark and HBase will be supported both in Scala and in
135 Java, with the exception of SparkSQL which will support any language that is
136 supported by Spark. For the remaining of this documentation we will focus on
137 Scala examples.
139 The examples above illustrate how to do a foreachPartition with a connection. A
140 number of other Spark base functions  are supported out of the box:
142 // tag::spark_base_functions[]
143 `bulkPut`:: For massively parallel sending of puts to HBase
144 `bulkDelete`:: For massively parallel sending of deletes to HBase
145 `bulkGet`:: For massively parallel sending of gets to HBase to create a new RDD
146 `mapPartition`:: To do a Spark Map function with a Connection object to allow full
147 access to HBase
148 `hbaseRDD`:: To simplify a distributed scan to create a RDD
149 // end::spark_base_functions[]
151 For examples of all these functionalities, see the
152 link:https://github.com/apache/hbase-connectors/tree/master/spark[hbase-spark integration]
153 in the link:https://github.com/apache/hbase-connectors[hbase-connectors] repository
154 (the hbase-spark connectors live outside hbase core in a related,
155 Apache HBase project maintained, associated repo).
157 == Spark Streaming
158 https://spark.apache.org/streaming/[Spark Streaming] is a micro batching stream
159 processing framework built on top of Spark. HBase and Spark Streaming make great
160 companions in that HBase can help serve the following benefits alongside Spark
161 Streaming.
163 * A place to grab reference data or profile data on the fly
164 * A place to store counts or aggregates in a way that supports Spark Streaming's
165 promise of _only once processing_.
167 The link:https://github.com/apache/hbase-connectors/tree/master/spark[hbase-spark integration]
168 with Spark Streaming is similar to its normal Spark integration points, in that the following
169 commands are possible straight off a Spark Streaming DStream.
171 include::spark.adoc[tags=spark_base_functions]
173 .`bulkPut` Example with DStreams
174 ====
176 Below is an example of bulkPut with DStreams. It is very close in feel to the RDD
177 bulk put.
179 [source, scala]
180 ----
181 val sc = new SparkContext("local", "test")
182 val config = new HBaseConfiguration()
184 val hbaseContext = new HBaseContext(sc, config)
185 val ssc = new StreamingContext(sc, Milliseconds(200))
187 val rdd1 = ...
188 val rdd2 = ...
190 val queue = mutable.Queue[RDD[(Array[Byte], Array[(Array[Byte],
191     Array[Byte], Array[Byte])])]]()
193 queue += rdd1
194 queue += rdd2
196 val dStream = ssc.queueStream(queue)
198 dStream.hbaseBulkPut(
199   hbaseContext,
200   TableName.valueOf(tableName),
201   (putRecord) => {
202    val put = new Put(putRecord._1)
203    putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
204    put
205   })
206 ----
208 There are three inputs to the `hbaseBulkPut` function.
209 The hbaseContext that carries the configuration broadcast information link
210 to the HBase Connections in the executor, the table name of the table we are
211 putting data into, and a function that will convert a record in the DStream
212 into an HBase Put object.
213 ====
215 == Bulk Load
217 There are two options for bulk loading data into HBase with Spark.  There is the
218 basic bulk load functionality that will work for cases where your rows have
219 millions of columns and cases where your columns are not consolidated and
220 partitioned before the map side of the Spark bulk load process.
222 There is also a thin record bulk load option with Spark. This second option is
223 designed for tables that have less then 10k columns per row.  The advantage
224 of this second option is higher throughput and less over-all load on the Spark
225 shuffle operation.
227 Both implementations work more or less like the MapReduce bulk load process in
228 that a partitioner partitions the rowkeys based on region splits and
229 the row keys are sent to the reducers in order, so that HFiles can be written
230 out directly from the reduce phase.
232 In Spark terms, the bulk load will be implemented around a Spark
233 `repartitionAndSortWithinPartitions` followed by a Spark `foreachPartition`.
235 First lets look at an example of using the basic bulk load functionality
237 .Bulk Loading Example
238 ====
240 The following example shows bulk loading with Spark.
242 [source, scala]
243 ----
244 val sc = new SparkContext("local", "test")
245 val config = new HBaseConfiguration()
247 val hbaseContext = new HBaseContext(sc, config)
249 val stagingFolder = ...
250 val rdd = sc.parallelize(Array(
251       (Bytes.toBytes("1"),
252         (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
253       (Bytes.toBytes("3"),
254         (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ...
256 rdd.hbaseBulkLoad(TableName.valueOf(tableName),
257   t => {
258    val rowKey = t._1
259    val family:Array[Byte] = t._2(0)._1
260    val qualifier = t._2(0)._2
261    val value = t._2(0)._3
263    val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)
265    Seq((keyFamilyQualifier, value)).iterator
266   },
267   stagingFolder.getPath)
269 val load = new LoadIncrementalHFiles(config)
270 load.doBulkLoad(new Path(stagingFolder.getPath),
271   conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))
272 ----
273 ====
275 The `hbaseBulkLoad` function takes three required parameters:
277 . The table name of the table we intend to bulk load too
279 . A function that will convert a record in the RDD to a tuple key value par. With
280 the tuple key being a KeyFamilyQualifer object and the value being the cell value.
281 The KeyFamilyQualifer object will hold the RowKey, Column Family, and Column Qualifier.
282 The shuffle will partition on the RowKey but will sort by all three values.
284 . The temporary path for the HFile to be written out too
286 Following the Spark bulk load command,  use the HBase's LoadIncrementalHFiles object
287 to load the newly created HFiles into HBase.
289 .Additional Parameters for Bulk Loading with Spark
291 You can set the following attributes with additional parameter options on hbaseBulkLoad.
293 * Max file size of the HFiles
294 * A flag to exclude HFiles from compactions
295 * Column Family settings for compression, bloomType, blockSize, and dataBlockEncoding
297 .Using Additional Parameters
298 ====
300 [source, scala]
301 ----
302 val sc = new SparkContext("local", "test")
303 val config = new HBaseConfiguration()
305 val hbaseContext = new HBaseContext(sc, config)
307 val stagingFolder = ...
308 val rdd = sc.parallelize(Array(
309       (Bytes.toBytes("1"),
310         (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
311       (Bytes.toBytes("3"),
312         (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ...
314 val familyHBaseWriterOptions = new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions]
315 val f1Options = new FamilyHFileWriteOptions("GZ", "ROW", 128, "PREFIX")
317 familyHBaseWriterOptions.put(Bytes.toBytes("columnFamily1"), f1Options)
319 rdd.hbaseBulkLoad(TableName.valueOf(tableName),
320   t => {
321    val rowKey = t._1
322    val family:Array[Byte] = t._2(0)._1
323    val qualifier = t._2(0)._2
324    val value = t._2(0)._3
326    val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)
328    Seq((keyFamilyQualifier, value)).iterator
329   },
330   stagingFolder.getPath,
331   familyHBaseWriterOptions,
332   compactionExclude = false,
333   HConstants.DEFAULT_MAX_FILE_SIZE)
335 val load = new LoadIncrementalHFiles(config)
336 load.doBulkLoad(new Path(stagingFolder.getPath),
337   conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))
338 ----
339 ====
341 Now lets look at how you would call the thin record bulk load implementation
343 .Using thin record bulk load
344 ====
346 [source, scala]
347 ----
348 val sc = new SparkContext("local", "test")
349 val config = new HBaseConfiguration()
351 val hbaseContext = new HBaseContext(sc, config)
353 val stagingFolder = ...
354 val rdd = sc.parallelize(Array(
355       ("1",
356         (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
357       ("3",
358         (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ...
360 rdd.hbaseBulkLoadThinRows(hbaseContext,
361       TableName.valueOf(tableName),
362       t => {
363         val rowKey = t._1
365         val familyQualifiersValues = new FamiliesQualifiersValues
366         t._2.foreach(f => {
367           val family:Array[Byte] = f._1
368           val qualifier = f._2
369           val value:Array[Byte] = f._3
371           familyQualifiersValues +=(family, qualifier, value)
372         })
373         (new ByteArrayWrapper(Bytes.toBytes(rowKey)), familyQualifiersValues)
374       },
375       stagingFolder.getPath,
376       new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions],
377       compactionExclude = false,
378       20)
380 val load = new LoadIncrementalHFiles(config)
381 load.doBulkLoad(new Path(stagingFolder.getPath),
382   conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))
383 ----
384 ====
386 Note that the big difference in using bulk load for thin rows is the function
387 returns a tuple with the first value being the row key and the second value
388 being an object of FamiliesQualifiersValues, which will contain all the
389 values for this row for all column families.
391 == SparkSQL/DataFrames
393 The link:https://github.com/apache/hbase-connectors/tree/master/spark[hbase-spark integration]
394 leverages
395 link:https://databricks.com/blog/2015/01/09/spark-sql-data-sources-api-unified-data-access-for-the-spark-platform.html[DataSource API]
396 (link:https://issues.apache.org/jira/browse/SPARK-3247[SPARK-3247])
397 introduced in Spark-1.2.0, which bridges the gap between simple HBase KV store and complex
398 relational SQL queries and enables users to perform complex data analytical work
399 on top of HBase using Spark. HBase Dataframe is a standard Spark Dataframe, and is able to
400 interact with any other data sources such as Hive, Orc, Parquet, JSON, etc.
401 The link:https://github.com/apache/hbase-connectors/tree/master/spark[hbase-spark integration]
402 applies critical techniques such as partition pruning, column pruning,
403 predicate pushdown and data locality.
405 To use the
406 link:https://github.com/apache/hbase-connectors/tree/master/spark[hbase-spark integration]
407 connector, users need to define the Catalog for the schema mapping
408 between HBase and Spark tables, prepare the data and populate the HBase table,
409 then load the HBase DataFrame. After that, users can do integrated query and access records
410 in HBase tables with SQL query. The following illustrates the basic procedure.
412 === Define catalog
414 [source, scala]
415 ----
416 def catalog = s"""{
417        |"table":{"namespace":"default", "name":"table1"},
418        |"rowkey":"key",
419        |"columns":{
420          |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
421          |"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
422          |"col2":{"cf":"cf2", "col":"col2", "type":"double"},
423          |"col3":{"cf":"cf3", "col":"col3", "type":"float"},
424          |"col4":{"cf":"cf4", "col":"col4", "type":"int"},
425          |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
426          |"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
427          |"col7":{"cf":"cf7", "col":"col7", "type":"string"},
428          |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
429        |}
430      |}""".stripMargin
431 ----
433 Catalog defines a mapping between HBase and Spark tables. There are two critical parts of this catalog.
434 One is the rowkey definition and the other is the mapping between table column in Spark and
435 the column family and column qualifier in HBase. The above defines a schema for a HBase table
436 with name as table1, row key as key and a number of columns (col1 `-` col8). Note that the rowkey
437 also has to be defined in details as a column (col0), which has a specific cf (rowkey).
439 === Save the DataFrame
441 [source, scala]
442 ----
443 case class HBaseRecord(
444    col0: String,
445    col1: Boolean,
446    col2: Double,
447    col3: Float,
448    col4: Int,       
449    col5: Long,
450    col6: Short,
451    col7: String,
452    col8: Byte)
454 object HBaseRecord
455 {                                                                                                             
456    def apply(i: Int, t: String): HBaseRecord = {
457       val s = s"""row${"%03d".format(i)}"""       
458       HBaseRecord(s,
459       i % 2 == 0,
460       i.toDouble,
461       i.toFloat,  
462       i,
463       i.toLong,
464       i.toShort,  
465       s"String$i: $t",      
466       i.toByte)
467   }
470 val data = (0 to 255).map { i =>  HBaseRecord(i, "extra")}
472 sc.parallelize(data).toDF.write.options(
473  Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
474  .format("org.apache.hadoop.hbase.spark ")
475  .save()
477 ----
478 `data` prepared by the user is a local Scala collection which has 256 HBaseRecord objects.
479 `sc.parallelize(data)` function distributes `data` to form an RDD. `toDF` returns a DataFrame.
480 `write` function returns a DataFrameWriter used to write the DataFrame to external storage
481 systems (e.g. HBase here). Given a DataFrame with specified schema `catalog`, `save` function
482 will create an HBase table with 5 regions and save the DataFrame inside.
484 === Load the DataFrame
486 [source, scala]
487 ----
488 def withCatalog(cat: String): DataFrame = {
489   sqlContext
490   .read
491   .options(Map(HBaseTableCatalog.tableCatalog->cat))
492   .format("org.apache.hadoop.hbase.spark")
493   .load()
495 val df = withCatalog(catalog)
496 ----
497 In ‘withCatalog’ function, sqlContext is a variable of SQLContext, which is the entry point
498 for working with structured data (rows and columns) in Spark.
499 `read` returns a DataFrameReader that can be used to read data in as a DataFrame.
500 `option` function adds input options for the underlying data source to the DataFrameReader,
501 and `format` function specifies the input data source format for the DataFrameReader.
502 The `load()` function loads input in as a DataFrame. The date frame `df` returned
503 by `withCatalog` function could be used to access HBase table, such as 4.4 and 4.5.
505 === Language Integrated Query
507 [source, scala]
508 ----
509 val s = df.filter(($"col0" <= "row050" && $"col0" > "row040") ||
510   $"col0" === "row005" ||
511   $"col0" <= "row005")
512   .select("col0", "col1", "col4")
513 s.show
514 ----
515 DataFrame can do various operations, such as join, sort, select, filter, orderBy and so on.
516 `df.filter` above filters rows using the given SQL expression. `select` selects a set of columns:
517 `col0`, `col1` and `col4`.
519 === SQL Query
521 [source, scala]
522 ----
523 df.registerTempTable("table1")
524 sqlContext.sql("select count(col1) from table1").show
525 ----
527 `registerTempTable` registers `df` DataFrame as a temporary table using the table name `table1`.
528 The lifetime of this temporary table is tied to the SQLContext that was used to create `df`.
529 `sqlContext.sql` function allows the user to execute SQL queries.
531 === Others
533 .Query with different timestamps
534 ====
535 In HBaseSparkConf, four parameters related to timestamp can be set. They are TIMESTAMP,
536 MIN_TIMESTAMP, MAX_TIMESTAMP and MAX_VERSIONS respectively. Users can query records with
537 different timestamps or time ranges with MIN_TIMESTAMP and MAX_TIMESTAMP. In the meantime,
538 use concrete value instead of tsSpecified and oldMs in the examples below.
540 The example below shows how to load df DataFrame with different timestamps.
541 tsSpecified is specified by the user.
542 HBaseTableCatalog defines the HBase and Relation relation schema.
543 writeCatalog defines catalog for the schema mapping.
545 [source, scala]
546 ----
547 val df = sqlContext.read
548       .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.TIMESTAMP -> tsSpecified.toString))
549       .format("org.apache.hadoop.hbase.spark")
550       .load()
551 ----
553 The example below shows how to load df DataFrame with different time ranges.
554 oldMs is specified by the user.
556 [source, scala]
557 ----
558 val df = sqlContext.read
559       .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.MIN_TIMESTAMP -> "0",
560         HBaseSparkConf.MAX_TIMESTAMP -> oldMs.toString))
561       .format("org.apache.hadoop.hbase.spark")
562       .load()
563 ----
564 After loading df DataFrame, users can query data.
566 [source, scala]
567 ----
568 df.registerTempTable("table")
569 sqlContext.sql("select count(col1) from table").show
570 ----
571 ====
573 .Native Avro support
574 ====
575 The link:https://github.com/apache/hbase-connectors/tree/master/spark[hbase-spark integration]
576 connector supports different data formats like Avro, JSON, etc. The use case below
577 shows how spark supports Avro. Users can persist the Avro record into HBase directly. Internally,
578 the Avro schema is converted to a native Spark Catalyst data type automatically.
579 Note that both key-value parts in an HBase table can be defined in Avro format.
581 1) Define catalog for the schema mapping:
583 [source, scala]
584 ----
585 def catalog = s"""{
586                      |"table":{"namespace":"default", "name":"Avrotable"},
587                       |"rowkey":"key",
588                       |"columns":{
589                       |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
590                       |"col1":{"cf":"cf1", "col":"col1", "type":"binary"}
591                       |}
592                       |}""".stripMargin
593 ----
595 `catalog` is a schema for a HBase table named `Avrotable`. row key as key and
596 one column col1. The rowkey also has to be defined in details as a column (col0),
597 which has a specific cf (rowkey).
599 2) Prepare the Data:
601 [source, scala]
602 ----
603  object AvroHBaseRecord {
604    val schemaString =
605      s"""{"namespace": "example.avro",
606          |   "type": "record",      "name": "User",
607          |    "fields": [
608          |        {"name": "name", "type": "string"},
609          |        {"name": "favorite_number",  "type": ["int", "null"]},
610          |        {"name": "favorite_color", "type": ["string", "null"]},
611          |        {"name": "favorite_array", "type": {"type": "array", "items": "string"}},
612          |        {"name": "favorite_map", "type": {"type": "map", "values": "int"}}
613          |      ]    }""".stripMargin
615    val avroSchema: Schema = {
616      val p = new Schema.Parser
617      p.parse(schemaString)
618    }
620    def apply(i: Int): AvroHBaseRecord = {
621      val user = new GenericData.Record(avroSchema);
622      user.put("name", s"name${"%03d".format(i)}")
623      user.put("favorite_number", i)
624      user.put("favorite_color", s"color${"%03d".format(i)}")
625      val favoriteArray = new GenericData.Array[String](2, avroSchema.getField("favorite_array").schema())
626      favoriteArray.add(s"number${i}")
627      favoriteArray.add(s"number${i+1}")
628      user.put("favorite_array", favoriteArray)
629      import collection.JavaConverters._
630      val favoriteMap = Map[String, Int](("key1" -> i), ("key2" -> (i+1))).asJava
631      user.put("favorite_map", favoriteMap)
632      val avroByte = AvroSedes.serialize(user, avroSchema)
633      AvroHBaseRecord(s"name${"%03d".format(i)}", avroByte)
634    }
637  val data = (0 to 255).map { i =>
638     AvroHBaseRecord(i)
640 ----
642 `schemaString` is defined first, then it is parsed to get `avroSchema`. `avroSchema` is used to
643 generate `AvroHBaseRecord`. `data` prepared by users is a local Scala collection
644 which has 256 `AvroHBaseRecord` objects.
646 3) Save DataFrame:
648 [source, scala]
649 ----
650  sc.parallelize(data).toDF.write.options(
651      Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
652      .format("org.apache.spark.sql.execution.datasources.hbase")
653      .save()
654 ----
656 Given a data frame with specified schema `catalog`, above will create an HBase table with 5
657 regions and save the data frame inside.
659 4) Load the DataFrame
661 [source, scala]
662 ----
663 def avroCatalog = s"""{
664             |"table":{"namespace":"default", "name":"avrotable"},
665             |"rowkey":"key",
666             |"columns":{
667               |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
668               |"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
669             |}
670           |}""".stripMargin
672  def withCatalog(cat: String): DataFrame = {
673      sqlContext
674          .read
675          .options(Map("avroSchema" -> AvroHBaseRecord.schemaString, HBaseTableCatalog.tableCatalog -> avroCatalog))
676          .format("org.apache.spark.sql.execution.datasources.hbase")
677          .load()
679  val df = withCatalog(catalog)
680 ----
682 In `withCatalog` function, `read` returns a DataFrameReader that can be used to read data in as a DataFrame.
683 The `option` function adds input options for the underlying data source to the DataFrameReader.
684 There are two options: one is to set `avroSchema` as `AvroHBaseRecord.schemaString`, and one is to
685 set `HBaseTableCatalog.tableCatalog` as `avroCatalog`. The `load()` function loads input in as a DataFrame.
686 The date frame `df` returned by `withCatalog` function could be used to access the HBase table.
688 5) SQL Query
690 [source, scala]
691 ----
692  df.registerTempTable("avrotable")
693  val c = sqlContext.sql("select count(1) from avrotable").
694 ----
696 After loading df DataFrame, users can query data. registerTempTable registers df DataFrame
697 as a temporary table using the table name avrotable. `sqlContext.sql` function allows the
698 user to execute SQL queries.
699 ====