2 * This is a common dao with basic CRUD operations and is not limited to any
3 * persistent layer implementation
5 * Copyright (C) 2010 Imran M Yousuf (imyousuf@smartitengineering.com)
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 3 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 * You should have received a copy of the GNU Lesser General Public
16 * License along with this library; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 package com
.smartitengineering
.dao
.impl
.hbase
;
21 import com
.smartitengineering
.dao
.common
.CommonReadDao
;
22 import com
.smartitengineering
.dao
.common
.CommonWriteDao
;
23 import com
.smartitengineering
.dao
.common
.queryparam
.BasicCompoundQueryParameter
;
24 import com
.smartitengineering
.dao
.common
.queryparam
.BiOperandQueryParameter
;
25 import com
.smartitengineering
.dao
.common
.queryparam
.MatchMode
;
26 import com
.smartitengineering
.dao
.common
.queryparam
.OperatorType
;
27 import com
.smartitengineering
.dao
.common
.queryparam
.ParameterType
;
28 import com
.smartitengineering
.dao
.common
.queryparam
.QueryParameter
;
29 import com
.smartitengineering
.dao
.common
.queryparam
.QueryParameterCastHelper
;
30 import com
.smartitengineering
.dao
.common
.queryparam
.QueryParameterWithOperator
;
31 import com
.smartitengineering
.dao
.common
.queryparam
.QueryParameterWithPropertyName
;
32 import com
.smartitengineering
.dao
.common
.queryparam
.QueryParameterWithValue
;
33 import com
.smartitengineering
.dao
.common
.queryparam
.ValueOnlyQueryParameter
;
34 import com
.smartitengineering
.dao
.impl
.hbase
.spi
.AsyncExecutorService
;
35 import com
.smartitengineering
.dao
.impl
.hbase
.spi
.Callback
;
36 import com
.smartitengineering
.dao
.impl
.hbase
.spi
.FilterConfig
;
37 import com
.smartitengineering
.dao
.impl
.hbase
.spi
.ObjectRowConverter
;
38 import com
.smartitengineering
.dao
.impl
.hbase
.spi
.SchemaInfoProvider
;
39 import com
.smartitengineering
.dao
.impl
.hbase
.spi
.impl
.BinarySuffixComparator
;
40 import com
.smartitengineering
.dao
.impl
.hbase
.spi
.impl
.RangeComparator
;
41 import com
.smartitengineering
.domain
.PersistentDTO
;
42 import java
.util
.ArrayList
;
43 import java
.util
.Arrays
;
44 import java
.util
.Collection
;
45 import java
.util
.Collections
;
46 import java
.util
.LinkedHashMap
;
47 import java
.util
.LinkedHashSet
;
48 import java
.util
.List
;
51 import java
.util
.concurrent
.Future
;
52 import org
.apache
.hadoop
.hbase
.client
.Delete
;
53 import org
.apache
.hadoop
.hbase
.client
.Get
;
54 import org
.apache
.hadoop
.hbase
.client
.HTableInterface
;
55 import org
.apache
.hadoop
.hbase
.client
.Put
;
56 import org
.apache
.hadoop
.hbase
.client
.Result
;
57 import org
.apache
.hadoop
.hbase
.client
.ResultScanner
;
58 import org
.apache
.hadoop
.hbase
.client
.Scan
;
59 import org
.apache
.hadoop
.hbase
.filter
.BinaryComparator
;
60 import org
.apache
.hadoop
.hbase
.filter
.BinaryPrefixComparator
;
61 import org
.apache
.hadoop
.hbase
.filter
.CompareFilter
.CompareOp
;
62 import org
.apache
.hadoop
.hbase
.filter
.Filter
;
63 import org
.apache
.hadoop
.hbase
.filter
.FilterList
;
64 import org
.apache
.hadoop
.hbase
.filter
.FilterList
.Operator
;
65 import org
.apache
.hadoop
.hbase
.filter
.SingleColumnValueExcludeFilter
;
66 import org
.apache
.hadoop
.hbase
.filter
.SkipFilter
;
67 import org
.apache
.hadoop
.hbase
.filter
.SubstringComparator
;
68 import org
.apache
.hadoop
.hbase
.filter
.WritableByteArrayComparable
;
69 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
72 * A common DAO implementation for HBase. Please note that all parameters for reading (i.e. Scan) assumes that the
73 * toString() method returns the string representation of the value to be compared in byte[] form.
76 public class CommonDao
<Template
extends PersistentDTO
> implements CommonReadDao
<Template
>,
77 CommonWriteDao
<Template
> {
79 public static final int DEFAULT_MAX_ROWS
= 1000;
80 private ObjectRowConverter
<Template
> converter
;
81 private SchemaInfoProvider infoProvider
;
82 private AsyncExecutorService executorService
;
83 private int maxRows
= -1;
85 public AsyncExecutorService
getExecutorService() {
86 return executorService
;
89 public void setExecutorService(AsyncExecutorService executorService
) {
90 this.executorService
= executorService
;
93 public int getMaxRows() {
97 public void setMaxRows(int maxRows
) {
98 this.maxRows
= maxRows
;
101 public ObjectRowConverter
<Template
> getConverter() {
105 public void setConverter(ObjectRowConverter
<Template
> converter
) {
106 this.converter
= converter
;
109 public SchemaInfoProvider
getInfoProvider() {
113 public void setInfoProvider(SchemaInfoProvider infoProvider
) {
114 this.infoProvider
= infoProvider
;
117 protected int getMaxScanRows() {
118 return getMaxRows() > 0 ?
getMaxRows() : DEFAULT_MAX_ROWS
;
121 protected int getMaxScanRows(List
<QueryParameter
> params
) {
122 if (params
!= null && !params
.isEmpty()) {
123 for (QueryParameter param
: params
) {
124 if (ParameterType
.PARAMETER_TYPE_MAX_RESULT
.equals(param
.getParameterType())) {
125 ValueOnlyQueryParameter
<Integer
> queryParameter
= QueryParameterCastHelper
.VALUE_PARAM_HELPER
.cast(param
);
126 return queryParameter
.getValue();
130 return getMaxScanRows();
138 * Unsupported read operations
141 public Set
<Template
> getAll() {
142 throw new UnsupportedOperationException("Not supported yet.");
146 public <OtherTemplate
> OtherTemplate
getOther(List
<QueryParameter
> query
) {
147 throw new UnsupportedOperationException("Not supported yet.");
151 public <OtherTemplate
> List
<OtherTemplate
> getOtherList(List
<QueryParameter
> query
) {
152 throw new UnsupportedOperationException("Not supported yet.");
156 * Supported read operations
159 public Set
<Template
> getByIds(List
<Integer
> ids
) {
160 LinkedHashSet
<Future
<Template
>> set
= new LinkedHashSet
<Future
<Template
>>(ids
.size());
161 LinkedHashSet
<Template
> resultSet
= new LinkedHashSet
<Template
>(ids
.size());
162 for (Integer id
: ids
) {
163 set
.add(executorService
.executeAsynchronously(null, getByIdCallback(id
)));
165 for (Future
<Template
> future
: set
) {
167 resultSet
.add(future
.get());
169 catch (Exception ex
) {
170 ex
.printStackTrace();
177 public Template
getById(final Integer id
) {
178 return executorService
.execute("", getByIdCallback(id
));
181 protected Callback
<Template
> getByIdCallback(final Integer id
) {
182 return new Callback
<Template
>() {
185 public Template
call(HTableInterface tableInterface
) throws Exception
{
186 Get get
= new Get(Bytes
.toBytes(id
));
187 Result result
= tableInterface
.get(get
);
188 return getConverter().rowsToObject(result
, executorService
);
194 public Template
getSingle(final List
<QueryParameter
> query
) {
195 return executorService
.execute("", new Callback
<Template
>() {
198 public Template
call(HTableInterface tableInterface
) throws Exception
{
199 ResultScanner scanner
= tableInterface
.getScanner(formScan(query
));
201 Result result
= scanner
.next();
202 if (result
== null) {
206 return getConverter().rowsToObject(result
, executorService
);
210 if (scanner
!= null) {
219 public List
<Template
> getList(final List
<QueryParameter
> query
) {
220 return executorService
.execute(null, new Callback
<List
<Template
>>() {
223 public List
<Template
> call(HTableInterface tableInterface
) throws Exception
{
224 ResultScanner scanner
= tableInterface
.getScanner(formScan(query
));
226 Result
[] results
= scanner
.next(getMaxScanRows(query
));
227 if (results
== null) {
228 return Collections
.emptyList();
231 ArrayList
<Template
> templates
= new ArrayList
<Template
>(results
.length
);
232 for (Result result
: results
) {
233 templates
.add(getConverter().rowsToObject(result
, executorService
));
239 if (scanner
!= null) {
247 protected Scan
formScan(List
<QueryParameter
> query
) {
248 Scan scan
= new Scan();
249 final Filter filter
= getFilter(query
);
250 if (filter
!= null) {
251 scan
.setFilter(filter
);
256 protected Filter
getFilter(Collection
<QueryParameter
> queryParams
) {
257 return getFilter(queryParams
, Operator
.MUST_PASS_ALL
);
260 protected Filter
getFilter(Collection
<QueryParameter
> queryParams
, Operator operator
) {
262 if (queryParams
!= null && !queryParams
.isEmpty()) {
263 List
<Filter
> filters
= new ArrayList
<Filter
>(queryParams
.size());
264 for (QueryParameter param
: queryParams
) {
265 switch (param
.getParameterType()) {
266 case PARAMETER_TYPE_CONJUNCTION
: {
267 BasicCompoundQueryParameter queryParameter
=
268 QueryParameterCastHelper
.BASIC_COMPOUND_PARAM_HELPER
.cast(param
);
269 Collection
<QueryParameter
> nestedParameters
= queryParameter
.getNestedParameters();
270 filters
.add(getFilter(nestedParameters
, Operator
.MUST_PASS_ALL
));
273 case PARAMETER_TYPE_DISJUNCTION
: {
274 BasicCompoundQueryParameter queryParameter
=
275 QueryParameterCastHelper
.BASIC_COMPOUND_PARAM_HELPER
.cast(param
);
276 Collection
<QueryParameter
> nestedParameters
= queryParameter
.getNestedParameters();
277 filters
.add(getFilter(nestedParameters
, Operator
.MUST_PASS_ONE
));
280 case PARAMETER_TYPE_PROPERTY
: {
281 handlePropertyParam(param
, filters
);
288 if (!filters
.isEmpty()) {
289 FilterList filterList
= new FilterList(operator
, filters
);
302 protected void handlePropertyParam(QueryParameter queryParameter
,
303 List
<Filter
> filters
) {
304 OperatorType operator
= getOperator(queryParameter
);
305 Object parameter
= getValue(queryParameter
);
306 FilterConfig filterConfig
= getInfoProvider().getFilterConfig(getPropertyName(queryParameter
));
308 case OPERATOR_EQUAL
: {
309 filters
.add(getCellFilter(filterConfig
, CompareOp
.EQUAL
, Bytes
.toBytes(parameter
.toString())));
312 case OPERATOR_LESSER
: {
313 filters
.add(getCellFilter(filterConfig
, CompareOp
.LESS
, Bytes
.toBytes(parameter
.toString())));
316 case OPERATOR_LESSER_EQUAL
: {
317 filters
.add(getCellFilter(filterConfig
, CompareOp
.LESS_OR_EQUAL
, Bytes
.toBytes(parameter
.toString())));
320 case OPERATOR_GREATER
: {
321 filters
.add(getCellFilter(filterConfig
, CompareOp
.GREATER
, Bytes
.toBytes(parameter
.toString())));
324 case OPERATOR_GREATER_EQUAL
: {
325 filters
.add(getCellFilter(filterConfig
, CompareOp
.GREATER_OR_EQUAL
, Bytes
.toBytes(parameter
.toString())));
328 case OPERATOR_NOT_EQUAL
: {
329 filters
.add(getCellFilter(filterConfig
, CompareOp
.NOT_EQUAL
, Bytes
.toBytes(parameter
.toString())));
332 case OPERATOR_IS_EMPTY
:
333 case OPERATOR_IS_NULL
: {
334 final SingleColumnValueExcludeFilter cellFilter
=
335 getCellFilter(filterConfig
, CompareOp
.EQUAL
, Bytes
.toBytes(""));
336 cellFilter
.setFilterIfMissing(false);
337 filters
.add(cellFilter
);
340 case OPERATOR_IS_NOT_EMPTY
:
341 case OPERATOR_IS_NOT_NULL
: {
342 final SingleColumnValueExcludeFilter cellFilter
= getCellFilter(filterConfig
, CompareOp
.NOT_EQUAL
, Bytes
.toBytes(
344 cellFilter
.setFilterIfMissing(true);
345 filters
.add(cellFilter
);
348 case OPERATOR_STRING_LIKE
: {
349 MatchMode matchMode
= getMatchMode(queryParameter
);
350 if (matchMode
== null) {
351 matchMode
= MatchMode
.EXACT
;
355 filters
.add(getCellFilter(filterConfig
, CompareOp
.EQUAL
, new BinarySuffixComparator(Bytes
.toBytes(parameter
.
359 filters
.add(getCellFilter(filterConfig
, CompareOp
.EQUAL
, new BinaryComparator(Bytes
.toBytes(parameter
.
363 filters
.add(getCellFilter(filterConfig
, CompareOp
.EQUAL
, new BinaryPrefixComparator(Bytes
.toBytes(parameter
.
368 filters
.add(getCellFilter(filterConfig
, CompareOp
.EQUAL
, new SubstringComparator(parameter
.toString())));
373 case OPERATOR_BETWEEN
: {
374 parameter
= getFirstParameter(queryParameter
);
375 Object parameter2
= getSecondParameter(queryParameter
);
376 filters
.add(getCellFilter(filterConfig
, CompareOp
.EQUAL
,
377 new RangeComparator(Bytes
.toBytes(parameter
.toString()),
378 Bytes
.toBytes(parameter2
.toString()))));
381 case OPERATOR_IS_IN
: {
382 Collection inCollectin
= QueryParameterCastHelper
.MULTI_OPERAND_PARAM_HELPER
.cast(queryParameter
).getValues();
383 FilterList filterList
= getInFilter(inCollectin
, filterConfig
);
384 filters
.add(filterList
);
387 case OPERATOR_IS_NOT_IN
: {
388 Collection notInCollectin
= QueryParameterCastHelper
.MULTI_OPERAND_PARAM_HELPER
.cast(queryParameter
).getValues();
389 FilterList filterList
= getInFilter(notInCollectin
, filterConfig
);
390 filters
.add(new SkipFilter(filterList
));
397 protected SingleColumnValueExcludeFilter
getCellFilter(FilterConfig filterConfig
, CompareOp op
,
398 WritableByteArrayComparable comparator
) {
399 final SingleColumnValueExcludeFilter valueFilter
;
400 valueFilter
= new SingleColumnValueExcludeFilter(filterConfig
.getColumnFamily(),
401 filterConfig
.getColumnQualifier(),
403 valueFilter
.setFilterIfMissing(filterConfig
.isFilterOnIfMissing());
404 valueFilter
.setLatestVersionOnly(filterConfig
.isFilterOnLatestVersionOnly());
408 protected SingleColumnValueExcludeFilter
getCellFilter(FilterConfig filterConfig
, CompareOp op
, byte[] value
) {
409 return getCellFilter(filterConfig
, op
, new BinaryComparator(value
));
412 protected FilterList
getInFilter(Collection inCollectin
, FilterConfig config
) {
413 FilterList filterList
= new FilterList(Operator
.MUST_PASS_ONE
);
414 for (Object inObj
: inCollectin
) {
415 filterList
.addFilter(getCellFilter(config
, CompareOp
.EQUAL
, new BinaryComparator(Bytes
.toBytes(inObj
.toString()))));
420 protected String
getPropertyName(QueryParameter param
) {
421 final String propertyName
;
422 if (param
instanceof QueryParameterWithPropertyName
) {
423 propertyName
= ((QueryParameterWithPropertyName
) param
).getPropertyName();
431 protected Object
getSecondParameter(QueryParameter queryParamemter
) {
432 if (queryParamemter
instanceof BiOperandQueryParameter
) {
433 return QueryParameterCastHelper
.BI_OPERAND_PARAM_HELPER
.cast(queryParamemter
).getSecondValue();
440 protected Object
getFirstParameter(QueryParameter queryParamemter
) {
441 if (queryParamemter
instanceof BiOperandQueryParameter
) {
442 return QueryParameterCastHelper
.BI_OPERAND_PARAM_HELPER
.cast(queryParamemter
).getFirstValue();
449 protected MatchMode
getMatchMode(QueryParameter queryParamemter
) {
450 return QueryParameterCastHelper
.STRING_PARAM_HELPER
.cast(queryParamemter
).getMatchMode();
453 protected Object
getValue(QueryParameter queryParamemter
) {
455 if (queryParamemter
instanceof QueryParameterWithValue
) {
456 value
= ((QueryParameterWithValue
) queryParamemter
).getValue();
467 protected OperatorType
getOperator(QueryParameter queryParamemter
) {
468 if (QueryParameterCastHelper
.BI_OPERAND_PARAM_HELPER
.isWithOperator(queryParamemter
)) {
469 QueryParameterWithOperator parameterWithOperator
=
470 QueryParameterCastHelper
.BI_OPERAND_PARAM_HELPER
.castToOperatorParam(queryParamemter
);
471 return parameterWithOperator
.getOperatorType();
479 public Template
getSingle(QueryParameter
... query
) {
480 return getSingle(Arrays
.asList(query
));
484 public List
<Template
> getList(QueryParameter
... query
) {
485 return getList(Arrays
.asList(query
));
489 public <OtherTemplate
> OtherTemplate
getOther(QueryParameter
... query
) {
490 return this.<OtherTemplate
>getOther(Arrays
.asList(query
));
494 public <OtherTemplate
> List
<OtherTemplate
> getOtherList(QueryParameter
... query
) {
495 return this.<OtherTemplate
>getOtherList(Arrays
.asList(query
));
502 public void save(Template
... states
) {
503 LinkedHashMap
<String
, List
<Put
>> allPuts
= new LinkedHashMap
<String
, List
<Put
>>();
504 for (Template state
: states
) {
505 LinkedHashMap
<String
, Put
> puts
= getConverter().objectToRows(state
);
506 for (Map
.Entry
<String
, Put
> put
: puts
.entrySet()) {
507 final List
<Put
> putList
;
508 if (allPuts
.containsKey(put
.getKey())) {
509 putList
= allPuts
.get(put
.getKey());
512 putList
= new ArrayList
<Put
>();
513 allPuts
.put(put
.getKey(), putList
);
515 putList
.add(put
.getValue());
518 for (final Map
.Entry
<String
, List
<Put
>> puts
: allPuts
.entrySet()) {
519 executorService
.execute(puts
.getKey(), new Callback
<Void
>() {
522 public Void
call(HTableInterface tableInterface
) throws Exception
{
523 tableInterface
.put(puts
.getValue());
531 public void update(Template
... states
) {
536 public void delete(Template
... states
) {
537 LinkedHashMap
<String
, List
<Delete
>> allDels
= new LinkedHashMap
<String
, List
<Delete
>>();
538 for (Template state
: states
) {
539 LinkedHashMap
<String
, Delete
> dels
= getConverter().objectToDeleteableRows(state
);
540 for (Map
.Entry
<String
, Delete
> del
: dels
.entrySet()) {
541 final List
<Delete
> putList
;
542 if (allDels
.containsKey(del
.getKey())) {
543 putList
= allDels
.get(del
.getKey());
546 putList
= new ArrayList
<Delete
>();
547 allDels
.put(del
.getKey(), putList
);
549 putList
.add(del
.getValue());
552 for (final Map
.Entry
<String
, List
<Delete
>> dels
: allDels
.entrySet()) {
553 executorService
.execute(dels
.getKey(), new Callback
<Void
>() {
556 public Void
call(HTableInterface tableInterface
) throws Exception
{
557 tableInterface
.delete(dels
.getValue());