2 * This is a common dao with basic CRUD operations and is not limited to any
3 * persistent layer implementation
5 * Copyright (C) 2010 Imran M Yousuf (imyousuf@smartitengineering.com)
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 3 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 * You should have received a copy of the GNU Lesser General Public
16 * License along with this library; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 package com
.smartitengineering
.dao
.impl
.hbase
;
21 import com
.smartitengineering
.dao
.common
.CommonReadDao
;
22 import com
.smartitengineering
.dao
.common
.CommonWriteDao
;
23 import com
.smartitengineering
.dao
.common
.queryparam
.BasicCompoundQueryParameter
;
24 import com
.smartitengineering
.dao
.common
.queryparam
.BiOperandQueryParameter
;
25 import com
.smartitengineering
.dao
.common
.queryparam
.MatchMode
;
26 import com
.smartitengineering
.dao
.common
.queryparam
.OperatorType
;
27 import com
.smartitengineering
.dao
.common
.queryparam
.ParameterType
;
28 import com
.smartitengineering
.dao
.common
.queryparam
.QueryParameter
;
29 import com
.smartitengineering
.dao
.common
.queryparam
.QueryParameterCastHelper
;
30 import com
.smartitengineering
.dao
.common
.queryparam
.QueryParameterWithOperator
;
31 import com
.smartitengineering
.dao
.common
.queryparam
.QueryParameterWithPropertyName
;
32 import com
.smartitengineering
.dao
.common
.queryparam
.QueryParameterWithValue
;
33 import com
.smartitengineering
.dao
.common
.queryparam
.ValueOnlyQueryParameter
;
34 import com
.smartitengineering
.dao
.impl
.hbase
.spi
.AsyncExecutorService
;
35 import com
.smartitengineering
.dao
.impl
.hbase
.spi
.Callback
;
36 import com
.smartitengineering
.dao
.impl
.hbase
.spi
.FilterConfig
;
37 import com
.smartitengineering
.dao
.impl
.hbase
.spi
.ObjectRowConverter
;
38 import com
.smartitengineering
.dao
.impl
.hbase
.spi
.SchemaInfoProvider
;
39 import com
.smartitengineering
.dao
.impl
.hbase
.spi
.impl
.BinarySuffixComparator
;
40 import com
.smartitengineering
.dao
.impl
.hbase
.spi
.impl
.RangeComparator
;
41 import com
.smartitengineering
.domain
.PersistentDTO
;
42 import java
.io
.ByteArrayOutputStream
;
43 import java
.io
.ObjectOutputStream
;
44 import java
.io
.Serializable
;
45 import java
.util
.ArrayList
;
46 import java
.util
.Arrays
;
47 import java
.util
.Collection
;
48 import java
.util
.Collections
;
49 import java
.util
.LinkedHashMap
;
50 import java
.util
.LinkedHashSet
;
51 import java
.util
.List
;
54 import java
.util
.concurrent
.Future
;
55 import org
.apache
.hadoop
.hbase
.client
.Delete
;
56 import org
.apache
.hadoop
.hbase
.client
.Get
;
57 import org
.apache
.hadoop
.hbase
.client
.HTableInterface
;
58 import org
.apache
.hadoop
.hbase
.client
.Put
;
59 import org
.apache
.hadoop
.hbase
.client
.Result
;
60 import org
.apache
.hadoop
.hbase
.client
.ResultScanner
;
61 import org
.apache
.hadoop
.hbase
.client
.Scan
;
62 import org
.apache
.hadoop
.hbase
.filter
.BinaryComparator
;
63 import org
.apache
.hadoop
.hbase
.filter
.BinaryPrefixComparator
;
64 import org
.apache
.hadoop
.hbase
.filter
.CompareFilter
.CompareOp
;
65 import org
.apache
.hadoop
.hbase
.filter
.Filter
;
66 import org
.apache
.hadoop
.hbase
.filter
.FilterList
;
67 import org
.apache
.hadoop
.hbase
.filter
.FilterList
.Operator
;
68 import org
.apache
.hadoop
.hbase
.filter
.QualifierFilter
;
69 import org
.apache
.hadoop
.hbase
.filter
.RowFilter
;
70 import org
.apache
.hadoop
.hbase
.filter
.SingleColumnValueExcludeFilter
;
71 import org
.apache
.hadoop
.hbase
.filter
.SingleColumnValueFilter
;
72 import org
.apache
.hadoop
.hbase
.filter
.SkipFilter
;
73 import org
.apache
.hadoop
.hbase
.filter
.SubstringComparator
;
74 import org
.apache
.hadoop
.hbase
.filter
.WritableByteArrayComparable
;
75 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
78 * A common DAO implementation for HBase. Please note that all parameters for reading (i.e. Scan) assumes that the
79 * toString() method returns the string representation of the value to be compared in byte[] form.
82 public class CommonDao
<Template
extends PersistentDTO
, IdType
extends Serializable
> implements
83 CommonReadDao
<Template
, IdType
>, CommonWriteDao
<Template
> {
85 public static final int DEFAULT_MAX_ROWS
= 1000;
86 private ObjectRowConverter
<Template
> converter
;
87 private SchemaInfoProvider infoProvider
;
88 private AsyncExecutorService executorService
;
89 private int maxRows
= -1;
91 public AsyncExecutorService
getExecutorService() {
92 return executorService
;
95 public void setExecutorService(AsyncExecutorService executorService
) {
96 this.executorService
= executorService
;
99 public int getMaxRows() {
103 public void setMaxRows(int maxRows
) {
104 this.maxRows
= maxRows
;
107 public ObjectRowConverter
<Template
> getConverter() {
111 public void setConverter(ObjectRowConverter
<Template
> converter
) {
112 this.converter
= converter
;
115 public SchemaInfoProvider
getInfoProvider() {
119 public void setInfoProvider(SchemaInfoProvider infoProvider
) {
120 this.infoProvider
= infoProvider
;
123 protected String
getDefaultTableName() {
124 return getInfoProvider().getMainTableName();
127 protected int getMaxScanRows() {
128 return getMaxRows() > 0 ?
getMaxRows() : DEFAULT_MAX_ROWS
;
131 protected int getMaxScanRows(List
<QueryParameter
> params
) {
132 if (params
!= null && !params
.isEmpty()) {
133 for (QueryParameter param
: params
) {
134 if (ParameterType
.PARAMETER_TYPE_MAX_RESULT
.equals(param
.getParameterType())) {
135 ValueOnlyQueryParameter
<Integer
> queryParameter
= QueryParameterCastHelper
.VALUE_PARAM_HELPER
.cast(param
);
136 return queryParameter
.getValue();
140 return getMaxScanRows();
148 * Unsupported read operations
151 public Set
<Template
> getAll() {
152 throw new UnsupportedOperationException("Not supported yet.");
156 public <OtherTemplate
> OtherTemplate
getOther(List
<QueryParameter
> query
) {
157 throw new UnsupportedOperationException("Not supported yet.");
161 public <OtherTemplate
> List
<OtherTemplate
> getOtherList(List
<QueryParameter
> query
) {
162 throw new UnsupportedOperationException("Not supported yet.");
166 * Supported read operations
169 public Set
<Template
> getByIds(List
<IdType
> ids
) {
170 LinkedHashSet
<Future
<Template
>> set
= new LinkedHashSet
<Future
<Template
>>(ids
.size());
171 LinkedHashSet
<Template
> resultSet
= new LinkedHashSet
<Template
>(ids
.size());
172 for (IdType id
: ids
) {
173 set
.add(executorService
.executeAsynchronously(getDefaultTableName(), getByIdCallback(id
)));
175 for (Future
<Template
> future
: set
) {
177 resultSet
.add(future
.get());
179 catch (Exception ex
) {
180 ex
.printStackTrace();
187 public Template
getById(final IdType id
) {
188 return executorService
.execute(getDefaultTableName(), getByIdCallback(id
));
191 protected Callback
<Template
> getByIdCallback(final IdType id
) {
192 return new Callback
<Template
>() {
195 public Template
call(HTableInterface tableInterface
) throws Exception
{
197 if (id
instanceof Integer
) {
198 rowId
= Bytes
.toBytes((Integer
) id
);
200 else if (id
instanceof String
) {
201 rowId
= Bytes
.toBytes((String
) id
);
203 else if (id
instanceof Long
) {
204 rowId
= Bytes
.toBytes((Long
) id
);
206 else if (id
instanceof Double
) {
207 rowId
= Bytes
.toBytes((Double
) id
);
209 else if (id
!= null) {
210 final ByteArrayOutputStream byteArrayOutputStream
= new ByteArrayOutputStream();
211 new ObjectOutputStream(byteArrayOutputStream
).writeObject(id
);
212 rowId
= byteArrayOutputStream
.toByteArray();
217 Get get
= new Get(rowId
);
218 Result result
= tableInterface
.get(get
);
219 return getConverter().rowsToObject(result
, executorService
);
225 public Template
getSingle(final List
<QueryParameter
> query
) {
226 return executorService
.execute(getDefaultTableName(), new Callback
<Template
>() {
229 public Template
call(HTableInterface tableInterface
) throws Exception
{
230 ResultScanner scanner
= tableInterface
.getScanner(formScan(query
));
232 Result result
= scanner
.next();
233 if (result
== null) {
237 return getConverter().rowsToObject(result
, executorService
);
241 if (scanner
!= null) {
250 public List
<Template
> getList(final List
<QueryParameter
> query
) {
251 return executorService
.execute(getDefaultTableName(), new Callback
<List
<Template
>>() {
254 public List
<Template
> call(HTableInterface tableInterface
) throws Exception
{
255 ResultScanner scanner
= tableInterface
.getScanner(formScan(query
));
257 Result
[] results
= scanner
.next(getMaxScanRows(query
));
258 if (results
== null) {
259 return Collections
.emptyList();
262 ArrayList
<Template
> templates
= new ArrayList
<Template
>(results
.length
);
263 for (Result result
: results
) {
264 templates
.add(getConverter().rowsToObject(result
, executorService
));
270 if (scanner
!= null) {
278 protected Scan
formScan(List
<QueryParameter
> query
) {
279 Scan scan
= new Scan();
280 final Filter filter
= getFilter(query
, scan
);
281 if (filter
!= null) {
282 scan
.setFilter(filter
);
287 protected Filter
getFilter(Collection
<QueryParameter
> queryParams
, Scan scan
) {
288 return getFilter(queryParams
, scan
, Operator
.MUST_PASS_ALL
);
291 protected Filter
getFilter(Collection
<QueryParameter
> queryParams
, Scan scan
, Operator operator
) {
293 if (queryParams
!= null && !queryParams
.isEmpty()) {
294 List
<Filter
> filters
= new ArrayList
<Filter
>(queryParams
.size());
295 for (QueryParameter param
: queryParams
) {
296 switch (param
.getParameterType()) {
297 case PARAMETER_TYPE_CONJUNCTION
: {
298 BasicCompoundQueryParameter queryParameter
=
299 QueryParameterCastHelper
.BASIC_COMPOUND_PARAM_HELPER
.cast(param
);
300 Collection
<QueryParameter
> nestedParameters
= queryParameter
.getNestedParameters();
301 filters
.add(getFilter(nestedParameters
, scan
, Operator
.MUST_PASS_ALL
));
304 case PARAMETER_TYPE_DISJUNCTION
: {
305 BasicCompoundQueryParameter queryParameter
=
306 QueryParameterCastHelper
.BASIC_COMPOUND_PARAM_HELPER
.cast(param
);
307 Collection
<QueryParameter
> nestedParameters
= queryParameter
.getNestedParameters();
308 filters
.add(getFilter(nestedParameters
, scan
, Operator
.MUST_PASS_ONE
));
311 case PARAMETER_TYPE_PROPERTY
: {
312 handlePropertyParam(param
, filters
);
315 case PARAMETER_TYPE_FIRST_RESULT
: {
316 Object value
= getValue(param
);
317 scan
.setStartRow(Bytes
.toBytes(value
.toString()));
320 case PARAMETER_TYPE_UNIT_PROP
: {
321 FilterConfig config
= getInfoProvider().getFilterConfig(getPropertyName(param
));
322 if (config
!= null) {
323 scan
.addFamily(config
.getColumnFamily());
331 if (!filters
.isEmpty()) {
332 FilterList filterList
= new FilterList(operator
, filters
);
345 protected void handlePropertyParam(QueryParameter queryParameter
,
346 List
<Filter
> filters
) {
347 OperatorType operator
= getOperator(queryParameter
);
348 Object parameter
= getValue(queryParameter
);
349 FilterConfig filterConfig
= getInfoProvider().getFilterConfig(getPropertyName(queryParameter
));
351 case OPERATOR_EQUAL
: {
352 filters
.add(getCellFilter(filterConfig
, CompareOp
.EQUAL
, Bytes
.toBytes(parameter
.toString())));
355 case OPERATOR_LESSER
: {
356 filters
.add(getCellFilter(filterConfig
, CompareOp
.LESS
, Bytes
.toBytes(parameter
.toString())));
359 case OPERATOR_LESSER_EQUAL
: {
360 filters
.add(getCellFilter(filterConfig
, CompareOp
.LESS_OR_EQUAL
, Bytes
.toBytes(parameter
.toString())));
363 case OPERATOR_GREATER
: {
364 filters
.add(getCellFilter(filterConfig
, CompareOp
.GREATER
, Bytes
.toBytes(parameter
.toString())));
367 case OPERATOR_GREATER_EQUAL
: {
368 filters
.add(getCellFilter(filterConfig
, CompareOp
.GREATER_OR_EQUAL
, Bytes
.toBytes(parameter
.toString())));
371 case OPERATOR_NOT_EQUAL
: {
372 filters
.add(getCellFilter(filterConfig
, CompareOp
.NOT_EQUAL
, Bytes
.toBytes(parameter
.toString())));
375 case OPERATOR_IS_EMPTY
:
376 case OPERATOR_IS_NULL
: {
377 final Filter cellFilter
= getCellFilter(filterConfig
, CompareOp
.EQUAL
, Bytes
.toBytes(""));
378 if (cellFilter
instanceof SingleColumnValueFilter
) {
379 ((SingleColumnValueFilter
) cellFilter
).setFilterIfMissing(false);
381 filters
.add(cellFilter
);
384 case OPERATOR_IS_NOT_EMPTY
:
385 case OPERATOR_IS_NOT_NULL
: {
386 final Filter cellFilter
= getCellFilter(filterConfig
, CompareOp
.NOT_EQUAL
, Bytes
.toBytes(""));
387 if (cellFilter
instanceof SingleColumnValueFilter
) {
388 ((SingleColumnValueFilter
) cellFilter
).setFilterIfMissing(true);
390 filters
.add(cellFilter
);
393 case OPERATOR_STRING_LIKE
: {
394 MatchMode matchMode
= getMatchMode(queryParameter
);
395 if (matchMode
== null) {
396 matchMode
= MatchMode
.EXACT
;
400 filters
.add(getCellFilter(filterConfig
, CompareOp
.EQUAL
, new BinarySuffixComparator(Bytes
.toBytes(parameter
.
404 filters
.add(getCellFilter(filterConfig
, CompareOp
.EQUAL
, new BinaryComparator(Bytes
.toBytes(parameter
.
408 filters
.add(getCellFilter(filterConfig
, CompareOp
.EQUAL
, new BinaryPrefixComparator(Bytes
.toBytes(parameter
.
413 filters
.add(getCellFilter(filterConfig
, CompareOp
.EQUAL
, new SubstringComparator(parameter
.toString())));
418 case OPERATOR_BETWEEN
: {
419 parameter
= getFirstParameter(queryParameter
);
420 Object parameter2
= getSecondParameter(queryParameter
);
421 filters
.add(getCellFilter(filterConfig
, CompareOp
.EQUAL
,
422 new RangeComparator(Bytes
.toBytes(parameter
.toString()),
423 Bytes
.toBytes(parameter2
.toString()))));
426 case OPERATOR_IS_IN
: {
427 Collection inCollectin
= QueryParameterCastHelper
.MULTI_OPERAND_PARAM_HELPER
.cast(queryParameter
).getValues();
428 Filter filterList
= getInFilter(inCollectin
, filterConfig
);
429 filters
.add(filterList
);
432 case OPERATOR_IS_NOT_IN
: {
433 Collection notInCollectin
= QueryParameterCastHelper
.MULTI_OPERAND_PARAM_HELPER
.cast(queryParameter
).getValues();
434 Filter filterList
= getInFilter(notInCollectin
, filterConfig
);
435 filters
.add(new SkipFilter(filterList
));
442 protected Filter
getCellFilter(FilterConfig filterConfig
, CompareOp op
,
443 WritableByteArrayComparable comparator
) {
444 if (filterConfig
.isFilterOnRowId()) {
445 RowFilter rowFilter
= new RowFilter(op
, comparator
);
448 else if (filterConfig
.isQualifierARangePrefix()) {
449 QualifierFilter filter
= new QualifierFilter(op
, comparator
);
453 final SingleColumnValueExcludeFilter valueFilter
;
454 valueFilter
= new SingleColumnValueExcludeFilter(filterConfig
.getColumnFamily(),
455 filterConfig
.getColumnQualifier(),
457 valueFilter
.setFilterIfMissing(filterConfig
.isFilterOnIfMissing());
458 valueFilter
.setLatestVersionOnly(filterConfig
.isFilterOnLatestVersionOnly());
463 protected Filter
getCellFilter(FilterConfig filterConfig
, CompareOp op
, byte[] value
) {
464 return getCellFilter(filterConfig
, op
, new BinaryComparator(value
));
467 protected Filter
getInFilter(Collection inCollectin
, FilterConfig config
) {
468 FilterList filterList
= new FilterList(Operator
.MUST_PASS_ONE
);
469 for (Object inObj
: inCollectin
) {
470 filterList
.addFilter(getCellFilter(config
, CompareOp
.EQUAL
, new BinaryComparator(Bytes
.toBytes(inObj
.toString()))));
475 protected String
getPropertyName(QueryParameter param
) {
476 final String propertyName
;
477 if (param
instanceof QueryParameterWithPropertyName
) {
478 propertyName
= ((QueryParameterWithPropertyName
) param
).getPropertyName();
486 protected Object
getSecondParameter(QueryParameter queryParamemter
) {
487 if (queryParamemter
instanceof BiOperandQueryParameter
) {
488 return QueryParameterCastHelper
.BI_OPERAND_PARAM_HELPER
.cast(queryParamemter
).getSecondValue();
495 protected Object
getFirstParameter(QueryParameter queryParamemter
) {
496 if (queryParamemter
instanceof BiOperandQueryParameter
) {
497 return QueryParameterCastHelper
.BI_OPERAND_PARAM_HELPER
.cast(queryParamemter
).getFirstValue();
504 protected MatchMode
getMatchMode(QueryParameter queryParamemter
) {
505 return QueryParameterCastHelper
.STRING_PARAM_HELPER
.cast(queryParamemter
).getMatchMode();
508 protected Object
getValue(QueryParameter queryParamemter
) {
510 if (queryParamemter
instanceof QueryParameterWithValue
) {
511 value
= ((QueryParameterWithValue
) queryParamemter
).getValue();
522 protected OperatorType
getOperator(QueryParameter queryParamemter
) {
523 if (QueryParameterCastHelper
.BI_OPERAND_PARAM_HELPER
.isWithOperator(queryParamemter
)) {
524 QueryParameterWithOperator parameterWithOperator
=
525 QueryParameterCastHelper
.BI_OPERAND_PARAM_HELPER
.castToOperatorParam(queryParamemter
);
526 return parameterWithOperator
.getOperatorType();
534 public Template
getSingle(QueryParameter
... query
) {
535 return getSingle(Arrays
.asList(query
));
539 public List
<Template
> getList(QueryParameter
... query
) {
540 return getList(Arrays
.asList(query
));
544 public <OtherTemplate
> OtherTemplate
getOther(QueryParameter
... query
) {
545 return this.<OtherTemplate
>getOther(Arrays
.asList(query
));
549 public <OtherTemplate
> List
<OtherTemplate
> getOtherList(QueryParameter
... query
) {
550 return this.<OtherTemplate
>getOtherList(Arrays
.asList(query
));
557 public void save(Template
... states
) {
558 LinkedHashMap
<String
, List
<Put
>> allPuts
= new LinkedHashMap
<String
, List
<Put
>>();
559 for (Template state
: states
) {
560 LinkedHashMap
<String
, Put
> puts
= getConverter().objectToRows(state
);
561 for (Map
.Entry
<String
, Put
> put
: puts
.entrySet()) {
562 final List
<Put
> putList
;
563 if (allPuts
.containsKey(put
.getKey())) {
564 putList
= allPuts
.get(put
.getKey());
567 putList
= new ArrayList
<Put
>();
568 allPuts
.put(put
.getKey(), putList
);
570 putList
.add(put
.getValue());
573 for (final Map
.Entry
<String
, List
<Put
>> puts
: allPuts
.entrySet()) {
574 executorService
.execute(puts
.getKey(), new Callback
<Void
>() {
577 public Void
call(HTableInterface tableInterface
) throws Exception
{
578 tableInterface
.put(puts
.getValue());
586 public void update(Template
... states
) {
591 public void delete(Template
... states
) {
592 LinkedHashMap
<String
, List
<Delete
>> allDels
= new LinkedHashMap
<String
, List
<Delete
>>();
593 for (Template state
: states
) {
594 LinkedHashMap
<String
, Delete
> dels
= getConverter().objectToDeleteableRows(state
);
595 for (Map
.Entry
<String
, Delete
> del
: dels
.entrySet()) {
596 final List
<Delete
> putList
;
597 if (allDels
.containsKey(del
.getKey())) {
598 putList
= allDels
.get(del
.getKey());
601 putList
= new ArrayList
<Delete
>();
602 allDels
.put(del
.getKey(), putList
);
604 putList
.add(del
.getValue());
607 for (final Map
.Entry
<String
, List
<Delete
>> dels
: allDels
.entrySet()) {
608 executorService
.execute(dels
.getKey(), new Callback
<Void
>() {
611 public Void
call(HTableInterface tableInterface
) throws Exception
{
612 tableInterface
.delete(dels
.getValue());