Revert "HBASE-15572 Adding optional timestamp semantics to HBase-Spark (Weiqing Yang)"
[hbase.git] / hbase-spark / src / main / scala / org / apache / hadoop / hbase / spark / HBaseDStreamFunctions.scala
blobd563a29fa139174870340af33e5a0794811e9c2c
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 package org.apache.hadoop.hbase.spark
19 import org.apache.hadoop.hbase.TableName
20 import org.apache.hadoop.hbase.client._
21 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
22 import org.apache.spark.streaming.dstream.DStream
24 import scala.reflect.ClassTag
26 /**
27 * HBaseDStreamFunctions contains a set of implicit functions that can be
28 * applied to a Spark DStream so that we can easily interact with HBase
30 object HBaseDStreamFunctions {
32 /**
33 * These are implicit methods for a DStream that contains any type of
34 * data.
36 * @param dStream This is for dStreams of any type
37 * @tparam T Type T
39 implicit class GenericHBaseDStreamFunctions[T](val dStream: DStream[T]) {
41 /**
42 * Implicit method that gives easy access to HBaseContext's bulk
43 * put. This will not return a new Stream. Think of it like a foreach
45 * @param hc The hbaseContext object to identify which
46 * HBase cluster connection to use
47 * @param tableName The tableName that the put will be sent to
48 * @param f The function that will turn the DStream values
49 * into HBase Put objects.
51 def hbaseBulkPut(hc: HBaseContext,
52 tableName: TableName,
53 f: (T) => Put): Unit = {
54 hc.streamBulkPut(dStream, tableName, f)
57 /**
58 * Implicit method that gives easy access to HBaseContext's bulk
59 * get. This will return a new DStream. Think about it as a DStream map
60 * function. In that every DStream value will get a new value out of
61 * HBase. That new value will populate the newly generated DStream.
63 * @param hc The hbaseContext object to identify which
64 * HBase cluster connection to use
65 * @param tableName The tableName that the put will be sent to
66 * @param batchSize How many gets to execute in a single batch
67 * @param f The function that will turn the RDD values
68 * in HBase Get objects
69 * @param convertResult The function that will convert a HBase
70 * Result object into a value that will go
71 * into the resulting DStream
72 * @tparam R The type of Object that will be coming
73 * out of the resulting DStream
74 * @return A resulting DStream with type R objects
76 def hbaseBulkGet[R: ClassTag](hc: HBaseContext,
77 tableName: TableName,
78 batchSize:Int, f: (T) => Get, convertResult: (Result) => R):
79 DStream[R] = {
80 hc.streamBulkGet[T, R](tableName, batchSize, dStream, f, convertResult)
83 /**
84 * Implicit method that gives easy access to HBaseContext's bulk
85 * get. This will return a new DStream. Think about it as a DStream map
86 * function. In that every DStream value will get a new value out of
87 * HBase. That new value will populate the newly generated DStream.
89 * @param hc The hbaseContext object to identify which
90 * HBase cluster connection to use
91 * @param tableName The tableName that the put will be sent to
92 * @param batchSize How many gets to execute in a single batch
93 * @param f The function that will turn the RDD values
94 * in HBase Get objects
95 * @return A resulting DStream with type R objects
97 def hbaseBulkGet(hc: HBaseContext,
98 tableName: TableName, batchSize:Int,
99 f: (T) => Get): DStream[(ImmutableBytesWritable, Result)] = {
100 hc.streamBulkGet[T, (ImmutableBytesWritable, Result)](
101 tableName, batchSize, dStream, f,
102 result => (new ImmutableBytesWritable(result.getRow), result))
106 * Implicit method that gives easy access to HBaseContext's bulk
107 * Delete. This will not return a new DStream.
109 * @param hc The hbaseContext object to identify which HBase
110 * cluster connection to use
111 * @param tableName The tableName that the deletes will be sent to
112 * @param f The function that will convert the DStream value into
113 * a HBase Delete Object
114 * @param batchSize The number of Deletes to be sent in a single batch
116 def hbaseBulkDelete(hc: HBaseContext,
117 tableName: TableName,
118 f:(T) => Delete, batchSize:Int): Unit = {
119 hc.streamBulkDelete(dStream, tableName, f, batchSize)
123 * Implicit method that gives easy access to HBaseContext's
124 * foreachPartition method. This will ack very much like a normal DStream
125 * foreach method but for the fact that you will now have a HBase connection
126 * while iterating through the values.
128 * @param hc The hbaseContext object to identify which HBase
129 * cluster connection to use
130 * @param f This function will get an iterator for a Partition of an
131 * DStream along with a connection object to HBase
133 def hbaseForeachPartition(hc: HBaseContext,
134 f: (Iterator[T], Connection) => Unit): Unit = {
135 hc.streamForeachPartition(dStream, f)
139 * Implicit method that gives easy access to HBaseContext's
140 * mapPartitions method. This will ask very much like a normal DStream
141 * map partitions method but for the fact that you will now have a
142 * HBase connection while iterating through the values
144 * @param hc The hbaseContext object to identify which HBase
145 * cluster connection to use
146 * @param f This function will get an iterator for a Partition of an
147 * DStream along with a connection object to HBase
148 * @tparam R This is the type of objects that will go into the resulting
149 * DStream
150 * @return A resulting DStream of type R
152 def hbaseMapPartitions[R: ClassTag](hc: HBaseContext,
153 f: (Iterator[T], Connection) => Iterator[R]):
154 DStream[R] = {
155 hc.streamMapPartitions(dStream, f)