2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.spark
20 import org
.apache
.hadoop
.conf
.Configuration
21 import org
.apache
.hadoop
.hbase
.TableName
22 import org
.apache
.hadoop
.hbase
.client
.{Connection
, Delete
, Get
, Put
, Result
, Scan
}
23 import org
.apache
.hadoop
.hbase
.io
.ImmutableBytesWritable
24 import org
.apache
.spark
.api
.java
.{JavaRDD
, JavaSparkContext
}
25 import org
.apache
.spark
.api
.java
.function
.{FlatMapFunction
, Function
, VoidFunction
}
26 import org
.apache
.spark
.streaming
.api
.java
.JavaDStream
28 import scala
.collection
.JavaConversions
._
29 import scala
.reflect
.ClassTag
32 * This is the Java Wrapper over HBaseContext which is written in
33 * Scala. This class will be used by developers that want to
34 * work with Spark or Spark Streaming in Java
36 * @param jsc This is the JavaSparkContext that we will wrap
37 * @param config This is the config information to out HBase cluster
39 class JavaHBaseContext(@transient jsc
: JavaSparkContext
,
40 @transient config
: Configuration
) extends Serializable
{
41 val hbaseContext
= new HBaseContext(jsc
.sc
, config
)
44 * A simple enrichment of the traditional Spark javaRdd foreachPartition.
45 * This function differs from the original in that it offers the
46 * developer access to a already connected HConnection object
48 * Note: Do not close the HConnection object. All HConnection
49 * management is handled outside this method
51 * @param javaRdd Original javaRdd with data to iterate over
52 * @param f Function to be given a iterator to iterate through
53 * the RDD values and a HConnection object to interact
56 def foreachPartition
[T
](javaRdd
: JavaRDD
[T
],
57 f
: VoidFunction
[(java
.util
.Iterator
[T
], Connection
)]) = {
59 hbaseContext
.foreachPartition(javaRdd
.rdd
,
60 (it
: Iterator
[T
], conn
: Connection
) => {
66 * A simple enrichment of the traditional Spark Streaming dStream foreach
67 * This function differs from the original in that it offers the
68 * developer access to a already connected HConnection object
70 * Note: Do not close the HConnection object. All HConnection
71 * management is handled outside this method
73 * @param javaDstream Original DStream with data to iterate over
74 * @param f Function to be given a iterator to iterate through
75 * the JavaDStream values and a HConnection object to
78 def foreachPartition
[T
](javaDstream
: JavaDStream
[T
],
79 f
: VoidFunction
[(Iterator
[T
], Connection
)]) = {
80 hbaseContext
.foreachPartition(javaDstream
.dstream
,
81 (it
: Iterator
[T
], conn
: Connection
) => f
.call(it
, conn
))
85 * A simple enrichment of the traditional Spark JavaRDD mapPartition.
86 * This function differs from the original in that it offers the
87 * developer access to a already connected HConnection object
89 * Note: Do not close the HConnection object. All HConnection
90 * management is handled outside this method
92 * Note: Make sure to partition correctly to avoid memory issue when
93 * getting data from HBase
95 * @param javaRdd Original JavaRdd with data to iterate over
96 * @param f Function to be given a iterator to iterate through
97 * the RDD values and a HConnection object to interact
99 * @return Returns a new RDD generated by the user definition
100 * function just like normal mapPartition
102 def mapPartitions
[T
, R
](javaRdd
: JavaRDD
[T
],
103 f
: FlatMapFunction
[(java
.util
.Iterator
[T
],
104 Connection
), R
]): JavaRDD
[R
] = {
106 def fn
= (it
: Iterator
[T
], conn
: Connection
) =>
108 f
.call((asJavaIterator(it
), conn
)).iterator()
111 JavaRDD
.fromRDD(hbaseContext
.mapPartitions(javaRdd
.rdd
,
112 (iterator
: Iterator
[T
], connection
: Connection
) =>
113 fn(iterator
, connection
))(fakeClassTag
[R
]))(fakeClassTag
[R
])
117 * A simple enrichment of the traditional Spark Streaming JavaDStream
120 * This function differs from the original in that it offers the
121 * developer access to a already connected HConnection object
123 * Note: Do not close the HConnection object. All HConnection
124 * management is handled outside this method
126 * Note: Make sure to partition correctly to avoid memory issue when
127 * getting data from HBase
129 * @param javaDstream Original JavaDStream with data to iterate over
130 * @param mp Function to be given a iterator to iterate through
131 * the JavaDStream values and a HConnection object to
132 * interact with HBase
133 * @return Returns a new JavaDStream generated by the user
134 * definition function just like normal mapPartition
136 def streamMap
[T
, U
](javaDstream
: JavaDStream
[T
],
137 mp
: Function
[(Iterator
[T
], Connection
), Iterator
[U
]]):
139 JavaDStream
.fromDStream(hbaseContext
.streamMapPartitions(javaDstream
.dstream
,
140 (it
: Iterator
[T
], conn
: Connection
) =>
141 mp
.call(it
, conn
))(fakeClassTag
[U
]))(fakeClassTag
[U
])
145 * A simple abstraction over the HBaseContext.foreachPartition method.
147 * It allow addition support for a user to take JavaRDD
148 * and generate puts and send them to HBase.
149 * The complexity of managing the HConnection is
150 * removed from the developer
152 * @param javaRdd Original JavaRDD with data to iterate over
153 * @param tableName The name of the table to put into
154 * @param f Function to convert a value in the JavaRDD
157 def bulkPut
[T
](javaRdd
: JavaRDD
[T
],
158 tableName
: TableName
,
159 f
: Function
[(T
), Put
]) {
161 hbaseContext
.bulkPut(javaRdd
.rdd
, tableName
, (t
: T
) => f
.call(t
))
165 * A simple abstraction over the HBaseContext.streamMapPartition method.
167 * It allow addition support for a user to take a JavaDStream and
168 * generate puts and send them to HBase.
170 * The complexity of managing the HConnection is
171 * removed from the developer
173 * @param javaDstream Original DStream with data to iterate over
174 * @param tableName The name of the table to put into
175 * @param f Function to convert a value in
176 * the JavaDStream to a HBase Put
178 def streamBulkPut
[T
](javaDstream
: JavaDStream
[T
],
179 tableName
: TableName
,
180 f
: Function
[T
, Put
]) = {
181 hbaseContext
.streamBulkPut(javaDstream
.dstream
,
187 * A simple abstraction over the HBaseContext.foreachPartition method.
189 * It allow addition support for a user to take a JavaRDD and
190 * generate delete and send them to HBase.
192 * The complexity of managing the HConnection is
193 * removed from the developer
195 * @param javaRdd Original JavaRDD with data to iterate over
196 * @param tableName The name of the table to delete from
197 * @param f Function to convert a value in the JavaRDD to a
199 * @param batchSize The number of deletes to batch before sending to HBase
201 def bulkDelete
[T
](javaRdd
: JavaRDD
[T
], tableName
: TableName
,
202 f
: Function
[T
, Delete
], batchSize
: Integer
) {
203 hbaseContext
.bulkDelete(javaRdd
.rdd
, tableName
, (t
: T
) => f
.call(t
), batchSize
)
207 * A simple abstraction over the HBaseContext.streamBulkMutation method.
209 * It allow addition support for a user to take a JavaDStream and
210 * generate Delete and send them to HBase.
212 * The complexity of managing the HConnection is
213 * removed from the developer
215 * @param javaDStream Original DStream with data to iterate over
216 * @param tableName The name of the table to delete from
217 * @param f Function to convert a value in the JavaDStream to a
219 * @param batchSize The number of deletes to be sent at once
221 def streamBulkDelete
[T
](javaDStream
: JavaDStream
[T
],
222 tableName
: TableName
,
223 f
: Function
[T
, Delete
],
224 batchSize
: Integer
) = {
225 hbaseContext
.streamBulkDelete(javaDStream
.dstream
, tableName
,
231 * A simple abstraction over the HBaseContext.mapPartition method.
233 * It allow addition support for a user to take a JavaRDD and generates a
234 * new RDD based on Gets and the results they bring back from HBase
236 * @param tableName The name of the table to get from
237 * @param batchSize batch size of how many gets to retrieve in a single fetch
238 * @param javaRdd Original JavaRDD with data to iterate over
239 * @param makeGet Function to convert a value in the JavaRDD to a
241 * @param convertResult This will convert the HBase Result object to
242 * what ever the user wants to put in the resulting
244 * @return New JavaRDD that is created by the Get to HBase
246 def bulkGet
[T
, U
](tableName
: TableName
,
249 makeGet
: Function
[T
, Get
],
250 convertResult
: Function
[Result
, U
]): JavaRDD
[U
] = {
252 JavaRDD
.fromRDD(hbaseContext
.bulkGet
[T
, U
](tableName
,
255 (t
: T
) => makeGet
.call(t
),
257 convertResult
.call(r
)
258 })(fakeClassTag
[U
]))(fakeClassTag
[U
])
263 * A simple abstraction over the HBaseContext.streamMap method.
265 * It allow addition support for a user to take a DStream and
266 * generates a new DStream based on Gets and the results
267 * they bring back from HBase
270 * @param tableName The name of the table to get from
271 * @param batchSize The number of gets to be batched together
272 * @param javaDStream Original DStream with data to iterate over
273 * @param makeGet Function to convert a value in the JavaDStream to a
275 * @param convertResult This will convert the HBase Result object to
276 * what ever the user wants to put in the resulting
278 * @return New JavaDStream that is created by the Get to HBase
280 def streamBulkGet
[T
, U
](tableName
: TableName
,
282 javaDStream
: JavaDStream
[T
],
283 makeGet
: Function
[T
, Get
],
284 convertResult
: Function
[Result
, U
]) {
285 JavaDStream
.fromDStream(hbaseContext
.streamBulkGet(tableName
,
288 (t
: T
) => makeGet
.call(t
),
289 (r
: Result
) => convertResult
.call(r
))(fakeClassTag
[U
]))(fakeClassTag
[U
])
293 * This function will use the native HBase TableInputFormat with the
294 * given scan object to generate a new JavaRDD
296 * @param tableName The name of the table to scan
297 * @param scans The HBase scan object to use to read data from HBase
298 * @param f Function to convert a Result object from HBase into
299 * What the user wants in the final generated JavaRDD
300 * @return New JavaRDD with results from scan
302 def hbaseRDD
[U
](tableName
: TableName
,
304 f
: Function
[(ImmutableBytesWritable
, Result
), U
]):
307 hbaseContext
.hbaseRDD
[U
](tableName
,
309 (v
: (ImmutableBytesWritable
, Result
)) =>
310 f
.call(v
._1
, v
._2
))(fakeClassTag
[U
]))(fakeClassTag
[U
])
314 * A overloaded version of HBaseContext hbaseRDD that define the
315 * type of the resulting JavaRDD
317 * @param tableName The name of the table to scan
318 * @param scans The HBase scan object to use to read data from HBase
319 * @return New JavaRDD with results from scan
321 def hbaseRDD(tableName
: TableName
,
323 JavaRDD
[(ImmutableBytesWritable
, Result
)] = {
324 JavaRDD
.fromRDD(hbaseContext
.hbaseRDD(tableName
, scans
))
328 * Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef].
330 * This method is used to keep ClassTags out of the external Java API, as the Java compiler
331 * cannot produce them automatically. While this ClassTag-faking does please the compiler,
332 * it can cause problems at runtime if the Scala API relies on ClassTags for correctness.
334 * Often, though, a ClassTag[AnyRef] will not lead to incorrect behavior,
335 * just worse performance or security issues.
336 * For instance, an Array[AnyRef] can hold any type T,
337 * but may lose primitive
341 def fakeClassTag
[T
]: ClassTag
[T
] = ClassTag
.AnyRef
.asInstanceOf
[ClassTag
[T
]]