2 * This is a common dao with basic CRUD operations and is not limited to any
3 * persistent layer implementation
5 * Copyright (C) 2010 Imran M Yousuf (imyousuf@smartitengineering.com)
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 3 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 * You should have received a copy of the GNU Lesser General Public
16 * License along with this library; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 package com
.smartitengineering
.dao
.impl
.hbase
;
21 import com
.smartitengineering
.dao
.common
.CommonReadDao
;
22 import com
.smartitengineering
.dao
.common
.CommonWriteDao
;
23 import com
.smartitengineering
.dao
.common
.queryparam
.BasicCompoundQueryParameter
;
24 import com
.smartitengineering
.dao
.common
.queryparam
.BiOperandQueryParameter
;
25 import com
.smartitengineering
.dao
.common
.queryparam
.MatchMode
;
26 import com
.smartitengineering
.dao
.common
.queryparam
.OperatorType
;
27 import com
.smartitengineering
.dao
.common
.queryparam
.ParameterType
;
28 import com
.smartitengineering
.dao
.common
.queryparam
.QueryParameter
;
29 import com
.smartitengineering
.dao
.common
.queryparam
.QueryParameterCastHelper
;
30 import com
.smartitengineering
.dao
.common
.queryparam
.QueryParameterWithOperator
;
31 import com
.smartitengineering
.dao
.common
.queryparam
.QueryParameterWithPropertyName
;
32 import com
.smartitengineering
.dao
.common
.queryparam
.QueryParameterWithValue
;
33 import com
.smartitengineering
.dao
.common
.queryparam
.ValueOnlyQueryParameter
;
34 import com
.smartitengineering
.dao
.impl
.hbase
.spi
.AsyncExecutorService
;
35 import com
.smartitengineering
.dao
.impl
.hbase
.spi
.Callback
;
36 import com
.smartitengineering
.dao
.impl
.hbase
.spi
.FilterConfig
;
37 import com
.smartitengineering
.dao
.impl
.hbase
.spi
.ObjectRowConverter
;
38 import com
.smartitengineering
.dao
.impl
.hbase
.spi
.SchemaInfoProvider
;
39 import com
.smartitengineering
.dao
.impl
.hbase
.spi
.impl
.BinarySuffixComparator
;
40 import com
.smartitengineering
.dao
.impl
.hbase
.spi
.impl
.RangeComparator
;
41 import com
.smartitengineering
.domain
.PersistentDTO
;
42 import java
.io
.ByteArrayOutputStream
;
43 import java
.io
.ObjectOutputStream
;
44 import java
.io
.Serializable
;
45 import java
.util
.ArrayList
;
46 import java
.util
.Arrays
;
47 import java
.util
.Collection
;
48 import java
.util
.Collections
;
49 import java
.util
.LinkedHashMap
;
50 import java
.util
.LinkedHashSet
;
51 import java
.util
.List
;
54 import java
.util
.concurrent
.Future
;
55 import org
.apache
.commons
.io
.IOUtils
;
56 import org
.apache
.hadoop
.hbase
.client
.Delete
;
57 import org
.apache
.hadoop
.hbase
.client
.Get
;
58 import org
.apache
.hadoop
.hbase
.client
.HTableInterface
;
59 import org
.apache
.hadoop
.hbase
.client
.Put
;
60 import org
.apache
.hadoop
.hbase
.client
.Result
;
61 import org
.apache
.hadoop
.hbase
.client
.ResultScanner
;
62 import org
.apache
.hadoop
.hbase
.client
.Scan
;
63 import org
.apache
.hadoop
.hbase
.filter
.BinaryComparator
;
64 import org
.apache
.hadoop
.hbase
.filter
.BinaryPrefixComparator
;
65 import org
.apache
.hadoop
.hbase
.filter
.CompareFilter
.CompareOp
;
66 import org
.apache
.hadoop
.hbase
.filter
.Filter
;
67 import org
.apache
.hadoop
.hbase
.filter
.FilterList
;
68 import org
.apache
.hadoop
.hbase
.filter
.FilterList
.Operator
;
69 import org
.apache
.hadoop
.hbase
.filter
.QualifierFilter
;
70 import org
.apache
.hadoop
.hbase
.filter
.RowFilter
;
71 import org
.apache
.hadoop
.hbase
.filter
.SingleColumnValueExcludeFilter
;
72 import org
.apache
.hadoop
.hbase
.filter
.SingleColumnValueFilter
;
73 import org
.apache
.hadoop
.hbase
.filter
.SkipFilter
;
74 import org
.apache
.hadoop
.hbase
.filter
.SubstringComparator
;
75 import org
.apache
.hadoop
.hbase
.filter
.WritableByteArrayComparable
;
76 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
79 * A common DAO implementation for HBase. Please note that all parameters for reading (i.e. Scan) assumes that the
80 * toString() method returns the string representation of the value to be compared in byte[] form.
83 public class CommonDao
<Template
extends PersistentDTO
, IdType
extends Serializable
> implements
84 CommonReadDao
<Template
, IdType
>, CommonWriteDao
<Template
> {
86 public static final int DEFAULT_MAX_ROWS
= 1000;
87 private ObjectRowConverter
<Template
> converter
;
88 private SchemaInfoProvider infoProvider
;
89 private AsyncExecutorService executorService
;
90 private int maxRows
= -1;
92 public AsyncExecutorService
getExecutorService() {
93 return executorService
;
96 public void setExecutorService(AsyncExecutorService executorService
) {
97 this.executorService
= executorService
;
100 public int getMaxRows() {
104 public void setMaxRows(int maxRows
) {
105 this.maxRows
= maxRows
;
108 public ObjectRowConverter
<Template
> getConverter() {
112 public void setConverter(ObjectRowConverter
<Template
> converter
) {
113 this.converter
= converter
;
116 public SchemaInfoProvider
getInfoProvider() {
120 public void setInfoProvider(SchemaInfoProvider infoProvider
) {
121 this.infoProvider
= infoProvider
;
124 protected String
getDefaultTableName() {
125 return getInfoProvider().getMainTableName();
128 protected int getMaxScanRows() {
129 return getMaxRows() > 0 ?
getMaxRows() : DEFAULT_MAX_ROWS
;
132 protected int getMaxScanRows(List
<QueryParameter
> params
) {
133 if (params
!= null && !params
.isEmpty()) {
134 for (QueryParameter param
: params
) {
135 if (ParameterType
.PARAMETER_TYPE_MAX_RESULT
.equals(param
.getParameterType())) {
136 ValueOnlyQueryParameter
<Integer
> queryParameter
= QueryParameterCastHelper
.VALUE_PARAM_HELPER
.cast(param
);
137 return queryParameter
.getValue();
141 return getMaxScanRows();
149 * Unsupported read operations
152 public Set
<Template
> getAll() {
153 throw new UnsupportedOperationException("Not supported yet.");
157 public <OtherTemplate
> OtherTemplate
getOther(List
<QueryParameter
> query
) {
158 throw new UnsupportedOperationException("Not supported yet.");
162 public <OtherTemplate
> List
<OtherTemplate
> getOtherList(List
<QueryParameter
> query
) {
163 throw new UnsupportedOperationException("Not supported yet.");
167 * Supported read operations
170 public Set
<Template
> getByIds(List
<IdType
> ids
) {
171 LinkedHashSet
<Future
<Template
>> set
= new LinkedHashSet
<Future
<Template
>>(ids
.size());
172 LinkedHashSet
<Template
> resultSet
= new LinkedHashSet
<Template
>(ids
.size());
173 for (IdType id
: ids
) {
174 set
.add(executorService
.executeAsynchronously(getDefaultTableName(), getByIdCallback(id
)));
176 for (Future
<Template
> future
: set
) {
178 resultSet
.add(future
.get());
180 catch (Exception ex
) {
181 ex
.printStackTrace();
188 public Template
getById(final IdType id
) {
189 return executorService
.execute(getDefaultTableName(), getByIdCallback(id
));
192 protected Callback
<Template
> getByIdCallback(final IdType id
) {
193 return new Callback
<Template
>() {
196 public Template
call(HTableInterface tableInterface
) throws Exception
{
198 if (id
instanceof Integer
) {
199 rowId
= Bytes
.toBytes((Integer
) id
);
201 else if (id
instanceof String
) {
202 rowId
= Bytes
.toBytes((String
) id
);
204 else if (id
instanceof Long
) {
205 rowId
= Bytes
.toBytes((Long
) id
);
207 else if (id
instanceof Double
) {
208 rowId
= Bytes
.toBytes((Double
) id
);
210 else if (id
!= null) {
211 final ByteArrayOutputStream byteArrayOutputStream
= new ByteArrayOutputStream();
212 final ObjectOutputStream objectOutputStream
= new ObjectOutputStream(byteArrayOutputStream
);
213 objectOutputStream
.writeObject(id
);
214 IOUtils
.closeQuietly(objectOutputStream
);
215 IOUtils
.closeQuietly(byteArrayOutputStream
);
216 rowId
= byteArrayOutputStream
.toByteArray();
221 Get get
= new Get(rowId
);
222 Result result
= tableInterface
.get(get
);
223 return getConverter().rowsToObject(result
, executorService
);
229 public Template
getSingle(final List
<QueryParameter
> query
) {
230 return executorService
.execute(getDefaultTableName(), new Callback
<Template
>() {
233 public Template
call(HTableInterface tableInterface
) throws Exception
{
234 ResultScanner scanner
= tableInterface
.getScanner(formScan(query
));
236 Result result
= scanner
.next();
237 if (result
== null) {
241 return getConverter().rowsToObject(result
, executorService
);
245 if (scanner
!= null) {
254 public List
<Template
> getList(final List
<QueryParameter
> query
) {
255 return executorService
.execute(getDefaultTableName(), new Callback
<List
<Template
>>() {
258 public List
<Template
> call(HTableInterface tableInterface
) throws Exception
{
259 ResultScanner scanner
= tableInterface
.getScanner(formScan(query
));
261 Result
[] results
= scanner
.next(getMaxScanRows(query
));
262 if (results
== null) {
263 return Collections
.emptyList();
266 ArrayList
<Template
> templates
= new ArrayList
<Template
>(results
.length
);
267 for (Result result
: results
) {
268 templates
.add(getConverter().rowsToObject(result
, executorService
));
274 if (scanner
!= null) {
282 protected Scan
formScan(List
<QueryParameter
> query
) {
283 Scan scan
= new Scan();
284 final Filter filter
= getFilter(query
, scan
);
285 if (filter
!= null) {
286 scan
.setFilter(filter
);
291 protected Filter
getFilter(Collection
<QueryParameter
> queryParams
, Scan scan
) {
292 return getFilter(queryParams
, scan
, Operator
.MUST_PASS_ALL
);
295 protected Filter
getFilter(Collection
<QueryParameter
> queryParams
, Scan scan
, Operator operator
) {
297 if (queryParams
!= null && !queryParams
.isEmpty()) {
298 List
<Filter
> filters
= new ArrayList
<Filter
>(queryParams
.size());
299 for (QueryParameter param
: queryParams
) {
300 switch (param
.getParameterType()) {
301 case PARAMETER_TYPE_CONJUNCTION
: {
302 BasicCompoundQueryParameter queryParameter
=
303 QueryParameterCastHelper
.BASIC_COMPOUND_PARAM_HELPER
.cast(param
);
304 Collection
<QueryParameter
> nestedParameters
= queryParameter
.getNestedParameters();
305 filters
.add(getFilter(nestedParameters
, scan
, Operator
.MUST_PASS_ALL
));
308 case PARAMETER_TYPE_DISJUNCTION
: {
309 BasicCompoundQueryParameter queryParameter
=
310 QueryParameterCastHelper
.BASIC_COMPOUND_PARAM_HELPER
.cast(param
);
311 Collection
<QueryParameter
> nestedParameters
= queryParameter
.getNestedParameters();
312 filters
.add(getFilter(nestedParameters
, scan
, Operator
.MUST_PASS_ONE
));
315 case PARAMETER_TYPE_PROPERTY
: {
316 handlePropertyParam(param
, filters
);
319 case PARAMETER_TYPE_FIRST_RESULT
: {
320 Object value
= getValue(param
);
321 scan
.setStartRow(Bytes
.toBytes(value
.toString()));
324 case PARAMETER_TYPE_UNIT_PROP
: {
325 FilterConfig config
= getInfoProvider().getFilterConfig(getPropertyName(param
));
326 if (config
!= null) {
327 scan
.addFamily(config
.getColumnFamily());
335 if (!filters
.isEmpty()) {
336 FilterList filterList
= new FilterList(operator
, filters
);
349 protected void handlePropertyParam(QueryParameter queryParameter
,
350 List
<Filter
> filters
) {
351 OperatorType operator
= getOperator(queryParameter
);
352 Object parameter
= getValue(queryParameter
);
353 FilterConfig filterConfig
= getInfoProvider().getFilterConfig(getPropertyName(queryParameter
));
355 case OPERATOR_EQUAL
: {
356 filters
.add(getCellFilter(filterConfig
, CompareOp
.EQUAL
, Bytes
.toBytes(parameter
.toString())));
359 case OPERATOR_LESSER
: {
360 filters
.add(getCellFilter(filterConfig
, CompareOp
.LESS
, Bytes
.toBytes(parameter
.toString())));
363 case OPERATOR_LESSER_EQUAL
: {
364 filters
.add(getCellFilter(filterConfig
, CompareOp
.LESS_OR_EQUAL
, Bytes
.toBytes(parameter
.toString())));
367 case OPERATOR_GREATER
: {
368 filters
.add(getCellFilter(filterConfig
, CompareOp
.GREATER
, Bytes
.toBytes(parameter
.toString())));
371 case OPERATOR_GREATER_EQUAL
: {
372 filters
.add(getCellFilter(filterConfig
, CompareOp
.GREATER_OR_EQUAL
, Bytes
.toBytes(parameter
.toString())));
375 case OPERATOR_NOT_EQUAL
: {
376 filters
.add(getCellFilter(filterConfig
, CompareOp
.NOT_EQUAL
, Bytes
.toBytes(parameter
.toString())));
379 case OPERATOR_IS_EMPTY
:
380 case OPERATOR_IS_NULL
: {
381 final Filter cellFilter
= getCellFilter(filterConfig
, CompareOp
.EQUAL
, Bytes
.toBytes(""));
382 if (cellFilter
instanceof SingleColumnValueFilter
) {
383 ((SingleColumnValueFilter
) cellFilter
).setFilterIfMissing(false);
385 filters
.add(cellFilter
);
388 case OPERATOR_IS_NOT_EMPTY
:
389 case OPERATOR_IS_NOT_NULL
: {
390 final Filter cellFilter
= getCellFilter(filterConfig
, CompareOp
.NOT_EQUAL
, Bytes
.toBytes(""));
391 if (cellFilter
instanceof SingleColumnValueFilter
) {
392 ((SingleColumnValueFilter
) cellFilter
).setFilterIfMissing(true);
394 filters
.add(cellFilter
);
397 case OPERATOR_STRING_LIKE
: {
398 MatchMode matchMode
= getMatchMode(queryParameter
);
399 if (matchMode
== null) {
400 matchMode
= MatchMode
.EXACT
;
404 filters
.add(getCellFilter(filterConfig
, CompareOp
.EQUAL
, new BinarySuffixComparator(Bytes
.toBytes(parameter
.
408 filters
.add(getCellFilter(filterConfig
, CompareOp
.EQUAL
, new BinaryComparator(Bytes
.toBytes(parameter
.
412 filters
.add(getCellFilter(filterConfig
, CompareOp
.EQUAL
, new BinaryPrefixComparator(Bytes
.toBytes(parameter
.
417 filters
.add(getCellFilter(filterConfig
, CompareOp
.EQUAL
, new SubstringComparator(parameter
.toString())));
422 case OPERATOR_BETWEEN
: {
423 parameter
= getFirstParameter(queryParameter
);
424 Object parameter2
= getSecondParameter(queryParameter
);
425 filters
.add(getCellFilter(filterConfig
, CompareOp
.EQUAL
,
426 new RangeComparator(Bytes
.toBytes(parameter
.toString()),
427 Bytes
.toBytes(parameter2
.toString()))));
430 case OPERATOR_IS_IN
: {
431 Collection inCollectin
= QueryParameterCastHelper
.MULTI_OPERAND_PARAM_HELPER
.cast(queryParameter
).getValues();
432 Filter filterList
= getInFilter(inCollectin
, filterConfig
);
433 filters
.add(filterList
);
436 case OPERATOR_IS_NOT_IN
: {
437 Collection notInCollectin
= QueryParameterCastHelper
.MULTI_OPERAND_PARAM_HELPER
.cast(queryParameter
).getValues();
438 Filter filterList
= getInFilter(notInCollectin
, filterConfig
);
439 filters
.add(new SkipFilter(filterList
));
446 protected Filter
getCellFilter(FilterConfig filterConfig
, CompareOp op
,
447 WritableByteArrayComparable comparator
) {
448 if (filterConfig
.isFilterOnRowId()) {
449 RowFilter rowFilter
= new RowFilter(op
, comparator
);
452 else if (filterConfig
.isQualifierARangePrefix()) {
453 QualifierFilter filter
= new QualifierFilter(op
, comparator
);
457 final SingleColumnValueExcludeFilter valueFilter
;
458 valueFilter
= new SingleColumnValueExcludeFilter(filterConfig
.getColumnFamily(),
459 filterConfig
.getColumnQualifier(),
461 valueFilter
.setFilterIfMissing(filterConfig
.isFilterOnIfMissing());
462 valueFilter
.setLatestVersionOnly(filterConfig
.isFilterOnLatestVersionOnly());
467 protected Filter
getCellFilter(FilterConfig filterConfig
, CompareOp op
, byte[] value
) {
468 return getCellFilter(filterConfig
, op
, new BinaryComparator(value
));
471 protected Filter
getInFilter(Collection inCollectin
, FilterConfig config
) {
472 FilterList filterList
= new FilterList(Operator
.MUST_PASS_ONE
);
473 for (Object inObj
: inCollectin
) {
474 filterList
.addFilter(getCellFilter(config
, CompareOp
.EQUAL
, new BinaryComparator(Bytes
.toBytes(inObj
.toString()))));
479 protected String
getPropertyName(QueryParameter param
) {
480 final String propertyName
;
481 if (param
instanceof QueryParameterWithPropertyName
) {
482 propertyName
= ((QueryParameterWithPropertyName
) param
).getPropertyName();
490 protected Object
getSecondParameter(QueryParameter queryParamemter
) {
491 if (queryParamemter
instanceof BiOperandQueryParameter
) {
492 return QueryParameterCastHelper
.BI_OPERAND_PARAM_HELPER
.cast(queryParamemter
).getSecondValue();
499 protected Object
getFirstParameter(QueryParameter queryParamemter
) {
500 if (queryParamemter
instanceof BiOperandQueryParameter
) {
501 return QueryParameterCastHelper
.BI_OPERAND_PARAM_HELPER
.cast(queryParamemter
).getFirstValue();
508 protected MatchMode
getMatchMode(QueryParameter queryParamemter
) {
509 return QueryParameterCastHelper
.STRING_PARAM_HELPER
.cast(queryParamemter
).getMatchMode();
512 protected Object
getValue(QueryParameter queryParamemter
) {
514 if (queryParamemter
instanceof QueryParameterWithValue
) {
515 value
= ((QueryParameterWithValue
) queryParamemter
).getValue();
526 protected OperatorType
getOperator(QueryParameter queryParamemter
) {
527 if (QueryParameterCastHelper
.BI_OPERAND_PARAM_HELPER
.isWithOperator(queryParamemter
)) {
528 QueryParameterWithOperator parameterWithOperator
=
529 QueryParameterCastHelper
.BI_OPERAND_PARAM_HELPER
.castToOperatorParam(queryParamemter
);
530 return parameterWithOperator
.getOperatorType();
538 public Template
getSingle(QueryParameter
... query
) {
539 return getSingle(Arrays
.asList(query
));
543 public List
<Template
> getList(QueryParameter
... query
) {
544 return getList(Arrays
.asList(query
));
548 public <OtherTemplate
> OtherTemplate
getOther(QueryParameter
... query
) {
549 return this.<OtherTemplate
>getOther(Arrays
.asList(query
));
553 public <OtherTemplate
> List
<OtherTemplate
> getOtherList(QueryParameter
... query
) {
554 return this.<OtherTemplate
>getOtherList(Arrays
.asList(query
));
561 public void save(Template
... states
) {
562 LinkedHashMap
<String
, List
<Put
>> allPuts
= new LinkedHashMap
<String
, List
<Put
>>();
563 for (Template state
: states
) {
564 LinkedHashMap
<String
, Put
> puts
= getConverter().objectToRows(state
);
565 for (Map
.Entry
<String
, Put
> put
: puts
.entrySet()) {
566 final List
<Put
> putList
;
567 if (allPuts
.containsKey(put
.getKey())) {
568 putList
= allPuts
.get(put
.getKey());
571 putList
= new ArrayList
<Put
>();
572 allPuts
.put(put
.getKey(), putList
);
574 putList
.add(put
.getValue());
577 for (final Map
.Entry
<String
, List
<Put
>> puts
: allPuts
.entrySet()) {
578 executorService
.execute(puts
.getKey(), new Callback
<Void
>() {
581 public Void
call(HTableInterface tableInterface
) throws Exception
{
582 tableInterface
.put(puts
.getValue());
590 public void update(Template
... states
) {
595 public void delete(Template
... states
) {
596 LinkedHashMap
<String
, List
<Delete
>> allDels
= new LinkedHashMap
<String
, List
<Delete
>>();
597 for (Template state
: states
) {
598 LinkedHashMap
<String
, Delete
> dels
= getConverter().objectToDeleteableRows(state
);
599 for (Map
.Entry
<String
, Delete
> del
: dels
.entrySet()) {
600 final List
<Delete
> putList
;
601 if (allDels
.containsKey(del
.getKey())) {
602 putList
= allDels
.get(del
.getKey());
605 putList
= new ArrayList
<Delete
>();
606 allDels
.put(del
.getKey(), putList
);
608 putList
.add(del
.getValue());
611 for (final Map
.Entry
<String
, List
<Delete
>> dels
: allDels
.entrySet()) {
612 executorService
.execute(dels
.getKey(), new Callback
<Void
>() {
615 public Void
call(HTableInterface tableInterface
) throws Exception
{
616 tableInterface
.delete(dels
.getValue());