Revert "HBASE-15572 Adding optional timestamp semantics to HBase-Spark (Weiqing Yang)"
[hbase.git] / hbase-spark / src / main / scala / org / apache / hadoop / hbase / spark / JavaHBaseContext.scala
blobd8fdb23cbc0879a50c091e8d228dee50fcf92ea1
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 package org.apache.hadoop.hbase.spark
20 import org.apache.hadoop.conf.Configuration
21 import org.apache.hadoop.hbase.TableName
22 import org.apache.hadoop.hbase.client.{Connection, Delete, Get, Put, Result, Scan}
23 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
24 import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
25 import org.apache.spark.api.java.function.{FlatMapFunction, Function, VoidFunction}
26 import org.apache.spark.streaming.api.java.JavaDStream
28 import scala.collection.JavaConversions._
29 import scala.reflect.ClassTag
31 /**
32 * This is the Java Wrapper over HBaseContext which is written in
33 * Scala. This class will be used by developers that want to
34 * work with Spark or Spark Streaming in Java
36 * @param jsc This is the JavaSparkContext that we will wrap
37 * @param config This is the config information to out HBase cluster
39 class JavaHBaseContext(@transient jsc: JavaSparkContext,
40 @transient config: Configuration) extends Serializable {
41 val hbaseContext = new HBaseContext(jsc.sc, config)
43 /**
44 * A simple enrichment of the traditional Spark javaRdd foreachPartition.
45 * This function differs from the original in that it offers the
46 * developer access to a already connected HConnection object
48 * Note: Do not close the HConnection object. All HConnection
49 * management is handled outside this method
51 * @param javaRdd Original javaRdd with data to iterate over
52 * @param f Function to be given a iterator to iterate through
53 * the RDD values and a HConnection object to interact
54 * with HBase
56 def foreachPartition[T](javaRdd: JavaRDD[T],
57 f: VoidFunction[(java.util.Iterator[T], Connection)]) = {
59 hbaseContext.foreachPartition(javaRdd.rdd,
60 (it: Iterator[T], conn: Connection) => {
61 f.call((it, conn))
65 /**
66 * A simple enrichment of the traditional Spark Streaming dStream foreach
67 * This function differs from the original in that it offers the
68 * developer access to a already connected HConnection object
70 * Note: Do not close the HConnection object. All HConnection
71 * management is handled outside this method
73 * @param javaDstream Original DStream with data to iterate over
74 * @param f Function to be given a iterator to iterate through
75 * the JavaDStream values and a HConnection object to
76 * interact with HBase
78 def foreachPartition[T](javaDstream: JavaDStream[T],
79 f: VoidFunction[(Iterator[T], Connection)]) = {
80 hbaseContext.foreachPartition(javaDstream.dstream,
81 (it: Iterator[T], conn: Connection) => f.call(it, conn))
84 /**
85 * A simple enrichment of the traditional Spark JavaRDD mapPartition.
86 * This function differs from the original in that it offers the
87 * developer access to a already connected HConnection object
89 * Note: Do not close the HConnection object. All HConnection
90 * management is handled outside this method
92 * Note: Make sure to partition correctly to avoid memory issue when
93 * getting data from HBase
95 * @param javaRdd Original JavaRdd with data to iterate over
96 * @param f Function to be given a iterator to iterate through
97 * the RDD values and a HConnection object to interact
98 * with HBase
99 * @return Returns a new RDD generated by the user definition
100 * function just like normal mapPartition
102 def mapPartitions[T, R](javaRdd: JavaRDD[T],
103 f: FlatMapFunction[(java.util.Iterator[T],
104 Connection), R]): JavaRDD[R] = {
106 def fn = (it: Iterator[T], conn: Connection) =>
107 asScalaIterator(
108 f.call((asJavaIterator(it), conn)).iterator()
111 JavaRDD.fromRDD(hbaseContext.mapPartitions(javaRdd.rdd,
112 (iterator: Iterator[T], connection: Connection) =>
113 fn(iterator, connection))(fakeClassTag[R]))(fakeClassTag[R])
117 * A simple enrichment of the traditional Spark Streaming JavaDStream
118 * mapPartition.
120 * This function differs from the original in that it offers the
121 * developer access to a already connected HConnection object
123 * Note: Do not close the HConnection object. All HConnection
124 * management is handled outside this method
126 * Note: Make sure to partition correctly to avoid memory issue when
127 * getting data from HBase
129 * @param javaDstream Original JavaDStream with data to iterate over
130 * @param mp Function to be given a iterator to iterate through
131 * the JavaDStream values and a HConnection object to
132 * interact with HBase
133 * @return Returns a new JavaDStream generated by the user
134 * definition function just like normal mapPartition
136 def streamMap[T, U](javaDstream: JavaDStream[T],
137 mp: Function[(Iterator[T], Connection), Iterator[U]]):
138 JavaDStream[U] = {
139 JavaDStream.fromDStream(hbaseContext.streamMapPartitions(javaDstream.dstream,
140 (it: Iterator[T], conn: Connection) =>
141 mp.call(it, conn))(fakeClassTag[U]))(fakeClassTag[U])
145 * A simple abstraction over the HBaseContext.foreachPartition method.
147 * It allow addition support for a user to take JavaRDD
148 * and generate puts and send them to HBase.
149 * The complexity of managing the HConnection is
150 * removed from the developer
152 * @param javaRdd Original JavaRDD with data to iterate over
153 * @param tableName The name of the table to put into
154 * @param f Function to convert a value in the JavaRDD
155 * to a HBase Put
157 def bulkPut[T](javaRdd: JavaRDD[T],
158 tableName: TableName,
159 f: Function[(T), Put]) {
161 hbaseContext.bulkPut(javaRdd.rdd, tableName, (t: T) => f.call(t))
165 * A simple abstraction over the HBaseContext.streamMapPartition method.
167 * It allow addition support for a user to take a JavaDStream and
168 * generate puts and send them to HBase.
170 * The complexity of managing the HConnection is
171 * removed from the developer
173 * @param javaDstream Original DStream with data to iterate over
174 * @param tableName The name of the table to put into
175 * @param f Function to convert a value in
176 * the JavaDStream to a HBase Put
178 def streamBulkPut[T](javaDstream: JavaDStream[T],
179 tableName: TableName,
180 f: Function[T, Put]) = {
181 hbaseContext.streamBulkPut(javaDstream.dstream,
182 tableName,
183 (t: T) => f.call(t))
187 * A simple abstraction over the HBaseContext.foreachPartition method.
189 * It allow addition support for a user to take a JavaRDD and
190 * generate delete and send them to HBase.
192 * The complexity of managing the HConnection is
193 * removed from the developer
195 * @param javaRdd Original JavaRDD with data to iterate over
196 * @param tableName The name of the table to delete from
197 * @param f Function to convert a value in the JavaRDD to a
198 * HBase Deletes
199 * @param batchSize The number of deletes to batch before sending to HBase
201 def bulkDelete[T](javaRdd: JavaRDD[T], tableName: TableName,
202 f: Function[T, Delete], batchSize: Integer) {
203 hbaseContext.bulkDelete(javaRdd.rdd, tableName, (t: T) => f.call(t), batchSize)
207 * A simple abstraction over the HBaseContext.streamBulkMutation method.
209 * It allow addition support for a user to take a JavaDStream and
210 * generate Delete and send them to HBase.
212 * The complexity of managing the HConnection is
213 * removed from the developer
215 * @param javaDStream Original DStream with data to iterate over
216 * @param tableName The name of the table to delete from
217 * @param f Function to convert a value in the JavaDStream to a
218 * HBase Delete
219 * @param batchSize The number of deletes to be sent at once
221 def streamBulkDelete[T](javaDStream: JavaDStream[T],
222 tableName: TableName,
223 f: Function[T, Delete],
224 batchSize: Integer) = {
225 hbaseContext.streamBulkDelete(javaDStream.dstream, tableName,
226 (t: T) => f.call(t),
227 batchSize)
231 * A simple abstraction over the HBaseContext.mapPartition method.
233 * It allow addition support for a user to take a JavaRDD and generates a
234 * new RDD based on Gets and the results they bring back from HBase
236 * @param tableName The name of the table to get from
237 * @param batchSize batch size of how many gets to retrieve in a single fetch
238 * @param javaRdd Original JavaRDD with data to iterate over
239 * @param makeGet Function to convert a value in the JavaRDD to a
240 * HBase Get
241 * @param convertResult This will convert the HBase Result object to
242 * what ever the user wants to put in the resulting
243 * JavaRDD
244 * @return New JavaRDD that is created by the Get to HBase
246 def bulkGet[T, U](tableName: TableName,
247 batchSize: Integer,
248 javaRdd: JavaRDD[T],
249 makeGet: Function[T, Get],
250 convertResult: Function[Result, U]): JavaRDD[U] = {
252 JavaRDD.fromRDD(hbaseContext.bulkGet[T, U](tableName,
253 batchSize,
254 javaRdd.rdd,
255 (t: T) => makeGet.call(t),
256 (r: Result) => {
257 convertResult.call(r)
258 })(fakeClassTag[U]))(fakeClassTag[U])
263 * A simple abstraction over the HBaseContext.streamMap method.
265 * It allow addition support for a user to take a DStream and
266 * generates a new DStream based on Gets and the results
267 * they bring back from HBase
270 * @param tableName The name of the table to get from
271 * @param batchSize The number of gets to be batched together
272 * @param javaDStream Original DStream with data to iterate over
273 * @param makeGet Function to convert a value in the JavaDStream to a
274 * HBase Get
275 * @param convertResult This will convert the HBase Result object to
276 * what ever the user wants to put in the resulting
277 * JavaDStream
278 * @return New JavaDStream that is created by the Get to HBase
280 def streamBulkGet[T, U](tableName: TableName,
281 batchSize: Integer,
282 javaDStream: JavaDStream[T],
283 makeGet: Function[T, Get],
284 convertResult: Function[Result, U]) {
285 JavaDStream.fromDStream(hbaseContext.streamBulkGet(tableName,
286 batchSize,
287 javaDStream.dstream,
288 (t: T) => makeGet.call(t),
289 (r: Result) => convertResult.call(r))(fakeClassTag[U]))(fakeClassTag[U])
293 * This function will use the native HBase TableInputFormat with the
294 * given scan object to generate a new JavaRDD
296 * @param tableName The name of the table to scan
297 * @param scans The HBase scan object to use to read data from HBase
298 * @param f Function to convert a Result object from HBase into
299 * What the user wants in the final generated JavaRDD
300 * @return New JavaRDD with results from scan
302 def hbaseRDD[U](tableName: TableName,
303 scans: Scan,
304 f: Function[(ImmutableBytesWritable, Result), U]):
305 JavaRDD[U] = {
306 JavaRDD.fromRDD(
307 hbaseContext.hbaseRDD[U](tableName,
308 scans,
309 (v: (ImmutableBytesWritable, Result)) =>
310 f.call(v._1, v._2))(fakeClassTag[U]))(fakeClassTag[U])
314 * A overloaded version of HBaseContext hbaseRDD that define the
315 * type of the resulting JavaRDD
317 * @param tableName The name of the table to scan
318 * @param scans The HBase scan object to use to read data from HBase
319 * @return New JavaRDD with results from scan
321 def hbaseRDD(tableName: TableName,
322 scans: Scan):
323 JavaRDD[(ImmutableBytesWritable, Result)] = {
324 JavaRDD.fromRDD(hbaseContext.hbaseRDD(tableName, scans))
328 * Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef].
330 * This method is used to keep ClassTags out of the external Java API, as the Java compiler
331 * cannot produce them automatically. While this ClassTag-faking does please the compiler,
332 * it can cause problems at runtime if the Scala API relies on ClassTags for correctness.
334 * Often, though, a ClassTag[AnyRef] will not lead to incorrect behavior,
335 * just worse performance or security issues.
336 * For instance, an Array[AnyRef] can hold any type T,
337 * but may lose primitive
338 * specialization.
340 private[spark]
341 def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]