2 * This is a common dao with basic CRUD operations and is not limited to any
3 * persistent layer implementation
5 * Copyright (C) 2010 Imran M Yousuf (imyousuf@smartitengineering.com)
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 3 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 * You should have received a copy of the GNU Lesser General Public
16 * License along with this library; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 package com
.smartitengineering
.dao
.impl
.hbase
;
21 import com
.smartitengineering
.dao
.common
.CommonReadDao
;
22 import com
.smartitengineering
.dao
.common
.CommonWriteDao
;
23 import com
.smartitengineering
.dao
.common
.queryparam
.BasicCompoundQueryParameter
;
24 import com
.smartitengineering
.dao
.common
.queryparam
.BiOperandQueryParameter
;
25 import com
.smartitengineering
.dao
.common
.queryparam
.CompositionQueryParameter
;
26 import com
.smartitengineering
.dao
.common
.queryparam
.MatchMode
;
27 import com
.smartitengineering
.dao
.common
.queryparam
.OperatorType
;
28 import com
.smartitengineering
.dao
.common
.queryparam
.ParameterType
;
29 import com
.smartitengineering
.dao
.common
.queryparam
.QueryParameter
;
30 import com
.smartitengineering
.dao
.common
.queryparam
.QueryParameterCastHelper
;
31 import com
.smartitengineering
.dao
.common
.queryparam
.QueryParameterWithOperator
;
32 import com
.smartitengineering
.dao
.common
.queryparam
.QueryParameterWithPropertyName
;
33 import com
.smartitengineering
.dao
.common
.queryparam
.QueryParameterWithValue
;
34 import com
.smartitengineering
.dao
.common
.queryparam
.ValueOnlyQueryParameter
;
35 import com
.smartitengineering
.dao
.impl
.hbase
.spi
.AsyncExecutorService
;
36 import com
.smartitengineering
.dao
.impl
.hbase
.spi
.Callback
;
37 import com
.smartitengineering
.dao
.impl
.hbase
.spi
.FilterConfig
;
38 import com
.smartitengineering
.dao
.impl
.hbase
.spi
.ObjectRowConverter
;
39 import com
.smartitengineering
.dao
.impl
.hbase
.spi
.SchemaInfoProvider
;
40 import com
.smartitengineering
.dao
.impl
.hbase
.spi
.impl
.BinarySuffixComparator
;
41 import com
.smartitengineering
.dao
.impl
.hbase
.spi
.impl
.RangeComparator
;
42 import com
.smartitengineering
.domain
.PersistentDTO
;
43 import java
.io
.ByteArrayOutputStream
;
44 import java
.io
.ObjectOutputStream
;
45 import java
.io
.Serializable
;
46 import java
.util
.ArrayList
;
47 import java
.util
.Arrays
;
48 import java
.util
.Collection
;
49 import java
.util
.Collections
;
50 import java
.util
.LinkedHashMap
;
51 import java
.util
.LinkedHashSet
;
52 import java
.util
.List
;
55 import java
.util
.concurrent
.Future
;
56 import org
.apache
.commons
.io
.IOUtils
;
57 import org
.apache
.commons
.lang
.StringUtils
;
58 import org
.apache
.hadoop
.hbase
.client
.Delete
;
59 import org
.apache
.hadoop
.hbase
.client
.Get
;
60 import org
.apache
.hadoop
.hbase
.client
.HTableInterface
;
61 import org
.apache
.hadoop
.hbase
.client
.Put
;
62 import org
.apache
.hadoop
.hbase
.client
.Result
;
63 import org
.apache
.hadoop
.hbase
.client
.ResultScanner
;
64 import org
.apache
.hadoop
.hbase
.client
.Scan
;
65 import org
.apache
.hadoop
.hbase
.filter
.BinaryComparator
;
66 import org
.apache
.hadoop
.hbase
.filter
.BinaryPrefixComparator
;
67 import org
.apache
.hadoop
.hbase
.filter
.CompareFilter
.CompareOp
;
68 import org
.apache
.hadoop
.hbase
.filter
.Filter
;
69 import org
.apache
.hadoop
.hbase
.filter
.FilterList
;
70 import org
.apache
.hadoop
.hbase
.filter
.FilterList
.Operator
;
71 import org
.apache
.hadoop
.hbase
.filter
.QualifierFilter
;
72 import org
.apache
.hadoop
.hbase
.filter
.RowFilter
;
73 import org
.apache
.hadoop
.hbase
.filter
.SingleColumnValueExcludeFilter
;
74 import org
.apache
.hadoop
.hbase
.filter
.SingleColumnValueFilter
;
75 import org
.apache
.hadoop
.hbase
.filter
.SkipFilter
;
76 import org
.apache
.hadoop
.hbase
.filter
.SubstringComparator
;
77 import org
.apache
.hadoop
.hbase
.filter
.WritableByteArrayComparable
;
78 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
81 * A common DAO implementation for HBase. Please note that all parameters for reading (i.e. Scan) assumes that the
82 * toString() method returns the string representation of the value to be compared in byte[] form.
85 public class CommonDao
<Template
extends PersistentDTO
, IdType
extends Serializable
> implements
86 CommonReadDao
<Template
, IdType
>, CommonWriteDao
<Template
> {
88 public static final int DEFAULT_MAX_ROWS
= 1000;
89 private ObjectRowConverter
<Template
> converter
;
90 private SchemaInfoProvider infoProvider
;
91 private AsyncExecutorService executorService
;
92 private int maxRows
= -1;
94 public AsyncExecutorService
getExecutorService() {
95 return executorService
;
98 public void setExecutorService(AsyncExecutorService executorService
) {
99 this.executorService
= executorService
;
102 public int getMaxRows() {
106 public void setMaxRows(int maxRows
) {
107 this.maxRows
= maxRows
;
110 public ObjectRowConverter
<Template
> getConverter() {
114 public void setConverter(ObjectRowConverter
<Template
> converter
) {
115 this.converter
= converter
;
118 public SchemaInfoProvider
getInfoProvider() {
122 public void setInfoProvider(SchemaInfoProvider infoProvider
) {
123 this.infoProvider
= infoProvider
;
126 protected String
getDefaultTableName() {
127 return getInfoProvider().getMainTableName();
130 protected int getMaxScanRows() {
131 return getMaxRows() > 0 ?
getMaxRows() : DEFAULT_MAX_ROWS
;
134 protected int getMaxScanRows(List
<QueryParameter
> params
) {
135 if (params
!= null && !params
.isEmpty()) {
136 for (QueryParameter param
: params
) {
137 if (ParameterType
.PARAMETER_TYPE_MAX_RESULT
.equals(param
.getParameterType())) {
138 ValueOnlyQueryParameter
<Integer
> queryParameter
= QueryParameterCastHelper
.VALUE_PARAM_HELPER
.cast(param
);
139 return queryParameter
.getValue();
143 return getMaxScanRows();
151 * Unsupported read operations
154 public Set
<Template
> getAll() {
155 throw new UnsupportedOperationException("Not supported yet.");
159 public <OtherTemplate
> OtherTemplate
getOther(List
<QueryParameter
> query
) {
160 throw new UnsupportedOperationException("Not supported yet.");
164 public <OtherTemplate
> List
<OtherTemplate
> getOtherList(List
<QueryParameter
> query
) {
165 throw new UnsupportedOperationException("Not supported yet.");
169 * Supported read operations
172 public Set
<Template
> getByIds(List
<IdType
> ids
) {
173 LinkedHashSet
<Future
<Template
>> set
= new LinkedHashSet
<Future
<Template
>>(ids
.size());
174 LinkedHashSet
<Template
> resultSet
= new LinkedHashSet
<Template
>(ids
.size());
175 for (IdType id
: ids
) {
176 set
.add(executorService
.executeAsynchronously(getDefaultTableName(), getByIdCallback(id
)));
178 for (Future
<Template
> future
: set
) {
180 resultSet
.add(future
.get());
182 catch (Exception ex
) {
183 ex
.printStackTrace();
190 public Template
getById(final IdType id
) {
191 return executorService
.execute(getDefaultTableName(), getByIdCallback(id
));
194 protected Callback
<Template
> getByIdCallback(final IdType id
) {
195 return new Callback
<Template
>() {
198 public Template
call(HTableInterface tableInterface
) throws Exception
{
200 if (id
instanceof Integer
) {
201 rowId
= Bytes
.toBytes((Integer
) id
);
203 else if (id
instanceof String
) {
204 rowId
= Bytes
.toBytes((String
) id
);
206 else if (id
instanceof Long
) {
207 rowId
= Bytes
.toBytes((Long
) id
);
209 else if (id
instanceof Double
) {
210 rowId
= Bytes
.toBytes((Double
) id
);
212 else if (id
!= null) {
213 final ByteArrayOutputStream byteArrayOutputStream
= new ByteArrayOutputStream();
214 final ObjectOutputStream objectOutputStream
= new ObjectOutputStream(byteArrayOutputStream
);
215 objectOutputStream
.writeObject(id
);
216 IOUtils
.closeQuietly(objectOutputStream
);
217 IOUtils
.closeQuietly(byteArrayOutputStream
);
218 rowId
= byteArrayOutputStream
.toByteArray();
223 Get get
= new Get(rowId
);
224 Result result
= tableInterface
.get(get
);
225 if (result
== null || result
.isEmpty()) {
229 return getConverter().rowsToObject(result
, executorService
);
236 public Template
getSingle(final List
<QueryParameter
> query
) {
237 return executorService
.execute(getDefaultTableName(), new Callback
<Template
>() {
240 public Template
call(HTableInterface tableInterface
) throws Exception
{
241 ResultScanner scanner
= tableInterface
.getScanner(formScan(query
));
243 Result result
= scanner
.next();
244 if (result
== null || result
.isEmpty()) {
248 return getConverter().rowsToObject(result
, executorService
);
252 if (scanner
!= null) {
261 public List
<Template
> getList(final List
<QueryParameter
> query
) {
262 return executorService
.execute(getDefaultTableName(), new Callback
<List
<Template
>>() {
265 public List
<Template
> call(HTableInterface tableInterface
) throws Exception
{
266 ResultScanner scanner
= tableInterface
.getScanner(formScan(query
));
268 Result
[] results
= scanner
.next(getMaxScanRows(query
));
269 if (results
== null) {
270 return Collections
.emptyList();
273 ArrayList
<Template
> templates
= new ArrayList
<Template
>(results
.length
);
274 for (Result result
: results
) {
275 if (result
== null || result
.isEmpty()) {
279 templates
.add(getConverter().rowsToObject(result
, executorService
));
286 if (scanner
!= null) {
294 protected Scan
formScan(List
<QueryParameter
> query
) {
295 Scan scan
= new Scan();
296 final Filter filter
= getFilter(query
, scan
);
297 if (filter
!= null) {
298 scan
.setFilter(filter
);
303 protected Filter
getFilter(Collection
<QueryParameter
> queryParams
, Scan scan
) {
304 return getFilter(queryParams
, scan
, Operator
.MUST_PASS_ALL
);
307 protected Filter
getFilter(Collection
<QueryParameter
> queryParams
, Scan scan
, Operator operator
) {
308 return getFilter("", queryParams
, scan
, operator
);
311 protected Filter
getFilter(String namePrefix
, Collection
<QueryParameter
> queryParams
, Scan scan
, Operator operator
) {
313 if (queryParams
!= null && !queryParams
.isEmpty()) {
314 List
<Filter
> filters
= new ArrayList
<Filter
>(queryParams
.size());
315 for (QueryParameter param
: queryParams
) {
316 switch (param
.getParameterType()) {
317 case PARAMETER_TYPE_CONJUNCTION
: {
318 BasicCompoundQueryParameter queryParameter
=
319 QueryParameterCastHelper
.BASIC_COMPOUND_PARAM_HELPER
.cast(param
);
320 Collection
<QueryParameter
> nestedParameters
= queryParameter
.getNestedParameters();
321 filters
.add(getFilter(nestedParameters
, scan
, Operator
.MUST_PASS_ALL
));
324 case PARAMETER_TYPE_NESTED_PROPERTY
: {
325 CompositionQueryParameter queryParameter
= QueryParameterCastHelper
.COMPOSITION_PARAM_FOR_NESTED_TYPE
.cast(
327 Collection
<QueryParameter
> nestedParameters
= queryParameter
.getNestedParameters();
328 filters
.add(getFilter(getPropertyName(namePrefix
, param
), nestedParameters
, scan
, Operator
.MUST_PASS_ALL
));
331 case PARAMETER_TYPE_DISJUNCTION
: {
332 BasicCompoundQueryParameter queryParameter
=
333 QueryParameterCastHelper
.BASIC_COMPOUND_PARAM_HELPER
.cast(param
);
334 Collection
<QueryParameter
> nestedParameters
= queryParameter
.getNestedParameters();
335 filters
.add(getFilter(nestedParameters
, scan
, Operator
.MUST_PASS_ONE
));
338 case PARAMETER_TYPE_PROPERTY
: {
339 handlePropertyParam(namePrefix
, param
, filters
);
342 case PARAMETER_TYPE_FIRST_RESULT
: {
343 Object value
= getValue(param
);
344 scan
.setStartRow(Bytes
.toBytes(value
.toString()));
347 case PARAMETER_TYPE_UNIT_PROP
: {
348 FilterConfig config
= getInfoProvider().getFilterConfig(getPropertyName(namePrefix
, param
));
349 if (config
!= null) {
350 scan
.addFamily(config
.getColumnFamily());
358 if (!filters
.isEmpty()) {
359 FilterList filterList
= new FilterList(operator
, filters
);
372 protected void handlePropertyParam(String namePrefix
, QueryParameter queryParameter
, List
<Filter
> filters
) {
373 OperatorType operator
= getOperator(queryParameter
);
374 Object parameter
= getValue(queryParameter
);
375 FilterConfig filterConfig
= getInfoProvider().getFilterConfig(getPropertyName(namePrefix
, queryParameter
));
377 case OPERATOR_EQUAL
: {
378 filters
.add(getCellFilter(filterConfig
, CompareOp
.EQUAL
, Bytes
.toBytes(parameter
.toString())));
381 case OPERATOR_LESSER
: {
382 filters
.add(getCellFilter(filterConfig
, CompareOp
.LESS
, Bytes
.toBytes(parameter
.toString())));
385 case OPERATOR_LESSER_EQUAL
: {
386 filters
.add(getCellFilter(filterConfig
, CompareOp
.LESS_OR_EQUAL
, Bytes
.toBytes(parameter
.toString())));
389 case OPERATOR_GREATER
: {
390 filters
.add(getCellFilter(filterConfig
, CompareOp
.GREATER
, Bytes
.toBytes(parameter
.toString())));
393 case OPERATOR_GREATER_EQUAL
: {
394 filters
.add(getCellFilter(filterConfig
, CompareOp
.GREATER_OR_EQUAL
, Bytes
.toBytes(parameter
.toString())));
397 case OPERATOR_NOT_EQUAL
: {
398 filters
.add(getCellFilter(filterConfig
, CompareOp
.NOT_EQUAL
, Bytes
.toBytes(parameter
.toString())));
401 case OPERATOR_IS_EMPTY
:
402 case OPERATOR_IS_NULL
: {
403 final Filter cellFilter
= getCellFilter(filterConfig
, CompareOp
.EQUAL
, Bytes
.toBytes(""));
404 if (cellFilter
instanceof SingleColumnValueFilter
) {
405 ((SingleColumnValueFilter
) cellFilter
).setFilterIfMissing(false);
407 filters
.add(cellFilter
);
410 case OPERATOR_IS_NOT_EMPTY
:
411 case OPERATOR_IS_NOT_NULL
: {
412 final Filter cellFilter
= getCellFilter(filterConfig
, CompareOp
.NOT_EQUAL
, Bytes
.toBytes(""));
413 if (cellFilter
instanceof SingleColumnValueFilter
) {
414 ((SingleColumnValueFilter
) cellFilter
).setFilterIfMissing(true);
416 filters
.add(cellFilter
);
419 case OPERATOR_STRING_LIKE
: {
420 MatchMode matchMode
= getMatchMode(queryParameter
);
421 if (matchMode
== null) {
422 matchMode
= MatchMode
.EXACT
;
426 filters
.add(getCellFilter(filterConfig
, CompareOp
.EQUAL
, new BinarySuffixComparator(Bytes
.toBytes(parameter
.
430 filters
.add(getCellFilter(filterConfig
, CompareOp
.EQUAL
, new BinaryComparator(Bytes
.toBytes(parameter
.
434 filters
.add(getCellFilter(filterConfig
, CompareOp
.EQUAL
, new BinaryPrefixComparator(Bytes
.toBytes(parameter
.
439 filters
.add(getCellFilter(filterConfig
, CompareOp
.EQUAL
, new SubstringComparator(parameter
.toString())));
444 case OPERATOR_BETWEEN
: {
445 parameter
= getFirstParameter(queryParameter
);
446 Object parameter2
= getSecondParameter(queryParameter
);
447 filters
.add(getCellFilter(filterConfig
, CompareOp
.EQUAL
,
448 new RangeComparator(Bytes
.toBytes(parameter
.toString()),
449 Bytes
.toBytes(parameter2
.toString()))));
452 case OPERATOR_IS_IN
: {
453 Collection inCollectin
= QueryParameterCastHelper
.MULTI_OPERAND_PARAM_HELPER
.cast(queryParameter
).getValues();
454 Filter filterList
= getInFilter(inCollectin
, filterConfig
);
455 filters
.add(filterList
);
458 case OPERATOR_IS_NOT_IN
: {
459 Collection notInCollectin
= QueryParameterCastHelper
.MULTI_OPERAND_PARAM_HELPER
.cast(queryParameter
).getValues();
460 Filter filterList
= getInFilter(notInCollectin
, filterConfig
);
461 filters
.add(new SkipFilter(filterList
));
468 protected Filter
getCellFilter(FilterConfig filterConfig
, CompareOp op
,
469 WritableByteArrayComparable comparator
) {
470 if (filterConfig
.isFilterOnRowId()) {
471 RowFilter rowFilter
= new RowFilter(op
, comparator
);
474 else if (filterConfig
.isQualifierARangePrefix()) {
475 QualifierFilter filter
= new QualifierFilter(op
, comparator
);
479 final SingleColumnValueExcludeFilter valueFilter
;
480 valueFilter
= new SingleColumnValueExcludeFilter(filterConfig
.getColumnFamily(),
481 filterConfig
.getColumnQualifier(),
483 valueFilter
.setFilterIfMissing(filterConfig
.isFilterOnIfMissing());
484 valueFilter
.setLatestVersionOnly(filterConfig
.isFilterOnLatestVersionOnly());
489 protected Filter
getCellFilter(FilterConfig filterConfig
, CompareOp op
, byte[] value
) {
490 return getCellFilter(filterConfig
, op
, new BinaryComparator(value
));
493 protected Filter
getInFilter(Collection inCollectin
, FilterConfig config
) {
494 FilterList filterList
= new FilterList(Operator
.MUST_PASS_ONE
);
495 for (Object inObj
: inCollectin
) {
496 filterList
.addFilter(getCellFilter(config
, CompareOp
.EQUAL
, new BinaryComparator(Bytes
.toBytes(inObj
.toString()))));
501 protected String
getPropertyName(String prefix
, QueryParameter param
) {
502 final StringBuilder propertyName
= new StringBuilder("");
503 if(StringUtils
.isNotBlank(prefix
)) {
504 propertyName
.append(prefix
).append('.');
506 if (param
instanceof QueryParameterWithPropertyName
) {
507 propertyName
.append(((QueryParameterWithPropertyName
) param
).getPropertyName());
509 return propertyName
.toString();
512 protected Object
getSecondParameter(QueryParameter queryParamemter
) {
513 if (queryParamemter
instanceof BiOperandQueryParameter
) {
514 return QueryParameterCastHelper
.BI_OPERAND_PARAM_HELPER
.cast(queryParamemter
).getSecondValue();
521 protected Object
getFirstParameter(QueryParameter queryParamemter
) {
522 if (queryParamemter
instanceof BiOperandQueryParameter
) {
523 return QueryParameterCastHelper
.BI_OPERAND_PARAM_HELPER
.cast(queryParamemter
).getFirstValue();
530 protected MatchMode
getMatchMode(QueryParameter queryParamemter
) {
531 return QueryParameterCastHelper
.STRING_PARAM_HELPER
.cast(queryParamemter
).getMatchMode();
534 protected Object
getValue(QueryParameter queryParamemter
) {
536 if (queryParamemter
instanceof QueryParameterWithValue
) {
537 value
= ((QueryParameterWithValue
) queryParamemter
).getValue();
548 protected OperatorType
getOperator(QueryParameter queryParamemter
) {
549 if (QueryParameterCastHelper
.BI_OPERAND_PARAM_HELPER
.isWithOperator(queryParamemter
)) {
550 QueryParameterWithOperator parameterWithOperator
=
551 QueryParameterCastHelper
.BI_OPERAND_PARAM_HELPER
.castToOperatorParam(queryParamemter
);
552 return parameterWithOperator
.getOperatorType();
560 public Template
getSingle(QueryParameter
... query
) {
561 return getSingle(Arrays
.asList(query
));
565 public List
<Template
> getList(QueryParameter
... query
) {
566 return getList(Arrays
.asList(query
));
570 public <OtherTemplate
> OtherTemplate
getOther(QueryParameter
... query
) {
571 return this.<OtherTemplate
>getOther(Arrays
.asList(query
));
575 public <OtherTemplate
> List
<OtherTemplate
> getOtherList(QueryParameter
... query
) {
576 return this.<OtherTemplate
>getOtherList(Arrays
.asList(query
));
583 public void save(Template
... states
) {
584 LinkedHashMap
<String
, List
<Put
>> allPuts
= new LinkedHashMap
<String
, List
<Put
>>();
585 for (Template state
: states
) {
586 if(!state
.isValid()) {
587 throw new IllegalStateException("Entity not in valid state!");
589 LinkedHashMap
<String
, Put
> puts
= getConverter().objectToRows(state
);
590 for (Map
.Entry
<String
, Put
> put
: puts
.entrySet()) {
591 final List
<Put
> putList
;
592 if (allPuts
.containsKey(put
.getKey())) {
593 putList
= allPuts
.get(put
.getKey());
596 putList
= new ArrayList
<Put
>();
597 allPuts
.put(put
.getKey(), putList
);
599 putList
.add(put
.getValue());
602 for (final Map
.Entry
<String
, List
<Put
>> puts
: allPuts
.entrySet()) {
603 executorService
.execute(puts
.getKey(), new Callback
<Void
>() {
606 public Void
call(HTableInterface tableInterface
) throws Exception
{
607 tableInterface
.put(puts
.getValue());
615 public void update(Template
... states
) {
620 public void delete(Template
... states
) {
621 LinkedHashMap
<String
, List
<Delete
>> allDels
= new LinkedHashMap
<String
, List
<Delete
>>();
622 for (Template state
: states
) {
623 if(!state
.isValid()) {
624 throw new IllegalStateException("Entity not in valid state!");
626 LinkedHashMap
<String
, Delete
> dels
= getConverter().objectToDeleteableRows(state
);
627 for (Map
.Entry
<String
, Delete
> del
: dels
.entrySet()) {
628 final List
<Delete
> putList
;
629 if (allDels
.containsKey(del
.getKey())) {
630 putList
= allDels
.get(del
.getKey());
633 putList
= new ArrayList
<Delete
>();
634 allDels
.put(del
.getKey(), putList
);
636 putList
.add(del
.getValue());
639 for (final Map
.Entry
<String
, List
<Delete
>> dels
: allDels
.entrySet()) {
640 executorService
.execute(dels
.getKey(), new Callback
<Void
>() {
643 public Void
call(HTableInterface tableInterface
) throws Exception
{
644 tableInterface
.delete(dels
.getValue());