2 * This is a common dao with basic CRUD operations and is not limited to any
3 * persistent layer implementation
5 * Copyright (C) 2010 Imran M Yousuf (imyousuf@smartitengineering.com)
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 3 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 * You should have received a copy of the GNU Lesser General Public
16 * License along with this library; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 package com
.smartitengineering
.dao
.impl
.hbase
;
21 import com
.smartitengineering
.dao
.common
.CommonReadDao
;
22 import com
.smartitengineering
.dao
.common
.CommonWriteDao
;
23 import com
.smartitengineering
.dao
.common
.queryparam
.BasicCompoundQueryParameter
;
24 import com
.smartitengineering
.dao
.common
.queryparam
.BiOperandQueryParameter
;
25 import com
.smartitengineering
.dao
.common
.queryparam
.MatchMode
;
26 import com
.smartitengineering
.dao
.common
.queryparam
.OperatorType
;
27 import com
.smartitengineering
.dao
.common
.queryparam
.ParameterType
;
28 import com
.smartitengineering
.dao
.common
.queryparam
.QueryParameter
;
29 import com
.smartitengineering
.dao
.common
.queryparam
.QueryParameterCastHelper
;
30 import com
.smartitengineering
.dao
.common
.queryparam
.QueryParameterWithOperator
;
31 import com
.smartitengineering
.dao
.common
.queryparam
.QueryParameterWithPropertyName
;
32 import com
.smartitengineering
.dao
.common
.queryparam
.QueryParameterWithValue
;
33 import com
.smartitengineering
.dao
.common
.queryparam
.ValueOnlyQueryParameter
;
34 import com
.smartitengineering
.dao
.impl
.hbase
.spi
.FilterConfig
;
35 import com
.smartitengineering
.dao
.impl
.hbase
.spi
.ObjectRowConverter
;
36 import com
.smartitengineering
.dao
.impl
.hbase
.spi
.SchemaInfoProvider
;
37 import com
.smartitengineering
.dao
.impl
.hbase
.spi
.impl
.BinarySuffixComparator
;
38 import com
.smartitengineering
.dao
.impl
.hbase
.spi
.impl
.RangeComparator
;
39 import com
.smartitengineering
.domain
.PersistentDTO
;
40 import java
.util
.ArrayList
;
41 import java
.util
.Arrays
;
42 import java
.util
.Collection
;
43 import java
.util
.Collections
;
44 import java
.util
.LinkedHashMap
;
45 import java
.util
.LinkedHashSet
;
46 import java
.util
.List
;
49 import org
.apache
.hadoop
.conf
.Configuration
;
50 import org
.apache
.hadoop
.hbase
.HBaseConfiguration
;
51 import org
.apache
.hadoop
.hbase
.client
.Delete
;
52 import org
.apache
.hadoop
.hbase
.client
.Get
;
53 import org
.apache
.hadoop
.hbase
.client
.HTableInterface
;
54 import org
.apache
.hadoop
.hbase
.client
.HTablePool
;
55 import org
.apache
.hadoop
.hbase
.client
.Put
;
56 import org
.apache
.hadoop
.hbase
.client
.Result
;
57 import org
.apache
.hadoop
.hbase
.client
.ResultScanner
;
58 import org
.apache
.hadoop
.hbase
.client
.Scan
;
59 import org
.apache
.hadoop
.hbase
.filter
.BinaryComparator
;
60 import org
.apache
.hadoop
.hbase
.filter
.BinaryPrefixComparator
;
61 import org
.apache
.hadoop
.hbase
.filter
.CompareFilter
.CompareOp
;
62 import org
.apache
.hadoop
.hbase
.filter
.Filter
;
63 import org
.apache
.hadoop
.hbase
.filter
.FilterList
;
64 import org
.apache
.hadoop
.hbase
.filter
.FilterList
.Operator
;
65 import org
.apache
.hadoop
.hbase
.filter
.SingleColumnValueExcludeFilter
;
66 import org
.apache
.hadoop
.hbase
.filter
.SkipFilter
;
67 import org
.apache
.hadoop
.hbase
.filter
.SubstringComparator
;
68 import org
.apache
.hadoop
.hbase
.filter
.WritableByteArrayComparable
;
69 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
72 * A common DAO implementation for HBase. Please note that all parameters for reading (i.e. Scan) assumes that the
73 * toString() method returns the string representation of the value to be compared in byte[] form.
76 public class CommonDao
<Template
extends PersistentDTO
> implements CommonReadDao
<Template
>,
77 CommonWriteDao
<Template
> {
79 public static final int DEFAULT_MAX_HTABLE_POOL_SIZE
= 3000;
80 public static final int DEFAULT_MAX_ROWS
= 1000;
81 private ObjectRowConverter
<Template
> converter
;
82 private SchemaInfoProvider infoProvider
;
83 private Configuration configuration
;
84 private HTablePool tablePool
;
85 private int maxRows
= -1;
87 public int getMaxRows() {
91 public void setMaxRows(int maxRows
) {
92 this.maxRows
= maxRows
;
95 public ObjectRowConverter
<Template
> getConverter() {
99 public void setConverter(ObjectRowConverter
<Template
> converter
) {
100 this.converter
= converter
;
103 public SchemaInfoProvider
getInfoProvider() {
107 public void setInfoProvider(SchemaInfoProvider infoProvider
) {
108 this.infoProvider
= infoProvider
;
111 protected Configuration
getConfiguration() {
112 if (configuration
== null) {
113 configuration
= HBaseConfiguration
.create();
115 return configuration
;
118 protected int getMaxScanRows() {
119 return getMaxRows() > 0 ?
getMaxRows() : DEFAULT_MAX_ROWS
;
122 protected int getMaxScanRows(List
<QueryParameter
> params
) {
123 if (params
!= null && !params
.isEmpty()) {
124 for (QueryParameter param
: params
) {
125 if (ParameterType
.PARAMETER_TYPE_MAX_RESULT
.equals(param
.getParameterType())) {
126 ValueOnlyQueryParameter
<Integer
> queryParameter
= QueryParameterCastHelper
.VALUE_PARAM_HELPER
.cast(param
);
127 return queryParameter
.getValue();
131 return getMaxScanRows();
134 public void setConfiguration(Configuration configuration
) {
135 this.configuration
= configuration
;
138 protected HTablePool
getTablePool() {
139 if (tablePool
== null) {
140 tablePool
= new HTablePool(getConfiguration(), DEFAULT_MAX_HTABLE_POOL_SIZE
);
145 protected HTableInterface
getDefaultTable() {
146 return getTablePool().getTable(infoProvider
.getMainTableName());
154 * Unsupported read operations
157 public Set
<Template
> getAll() {
158 throw new UnsupportedOperationException("Not supported yet.");
162 public <OtherTemplate
> OtherTemplate
getOther(List
<QueryParameter
> query
) {
163 throw new UnsupportedOperationException("Not supported yet.");
167 public <OtherTemplate
> List
<OtherTemplate
> getOtherList(List
<QueryParameter
> query
) {
168 throw new UnsupportedOperationException("Not supported yet.");
172 * Supported read operations
175 public Set
<Template
> getByIds(List
<Integer
> ids
) {
176 LinkedHashSet
<Template
> set
= new LinkedHashSet
<Template
>(ids
.size());
177 for (Integer id
: ids
) {
178 set
.add(getById(id
));
184 public Template
getById(Integer id
) {
185 HTableInterface hTable
= null;
187 Get get
= new Get(Bytes
.toBytes(id
));
188 hTable
= getDefaultTable();
189 Result result
= hTable
.get(get
);
190 return getConverter().rowsToObject(result
, getTablePool());
192 catch (Exception ex
) {
193 throw new RuntimeException(ex
);
197 if (hTable
!= null) {
198 getTablePool().putTable(hTable
);
201 catch (Exception ex
) {
202 ex
.printStackTrace();
208 public Template
getSingle(List
<QueryParameter
> query
) {
209 Scan scan
= formScan(query
);
210 HTableInterface table
= getDefaultTable();
211 ResultScanner scanner
= null;
213 scanner
= table
.getScanner(scan
);
214 Result result
= scanner
.next();
215 if (result
== null) {
219 return getConverter().rowsToObject(result
, getTablePool());
222 catch (Exception ex
) {
223 throw new RuntimeException(ex
);
226 if (scanner
!= null) {
229 getTablePool().putTable(table
);
234 public List
<Template
> getList(List
<QueryParameter
> query
) {
235 Scan scan
= formScan(query
);
236 HTableInterface table
= getDefaultTable();
237 ResultScanner scanner
= null;
239 scanner
= table
.getScanner(scan
);
240 Result
[] results
= scanner
.next(getMaxScanRows(query
));
241 if (results
== null) {
242 return Collections
.emptyList();
245 ArrayList
<Template
> templates
= new ArrayList
<Template
>(results
.length
);
246 for (Result result
: results
) {
247 templates
.add(getConverter().rowsToObject(result
, getTablePool()));
252 catch (Exception ex
) {
253 throw new RuntimeException(ex
);
256 if (scanner
!= null) {
259 getTablePool().putTable(table
);
263 protected Scan
formScan(List
<QueryParameter
> query
) {
264 Scan scan
= new Scan();
265 final Filter filter
= getFilter(query
);
266 if (filter
!= null) {
267 scan
.setFilter(filter
);
272 protected Filter
getFilter(Collection
<QueryParameter
> queryParams
) {
273 return getFilter(queryParams
, Operator
.MUST_PASS_ALL
);
276 protected Filter
getFilter(Collection
<QueryParameter
> queryParams
, Operator operator
) {
278 if (queryParams
!= null && !queryParams
.isEmpty()) {
279 List
<Filter
> filters
= new ArrayList
<Filter
>(queryParams
.size());
280 for (QueryParameter param
: queryParams
) {
281 switch (param
.getParameterType()) {
282 case PARAMETER_TYPE_CONJUNCTION
: {
283 BasicCompoundQueryParameter queryParameter
=
284 QueryParameterCastHelper
.BASIC_COMPOUND_PARAM_HELPER
.cast(param
);
285 Collection
<QueryParameter
> nestedParameters
= queryParameter
.getNestedParameters();
286 filters
.add(getFilter(nestedParameters
, Operator
.MUST_PASS_ALL
));
289 case PARAMETER_TYPE_DISJUNCTION
: {
290 BasicCompoundQueryParameter queryParameter
=
291 QueryParameterCastHelper
.BASIC_COMPOUND_PARAM_HELPER
.cast(param
);
292 Collection
<QueryParameter
> nestedParameters
= queryParameter
.getNestedParameters();
293 filters
.add(getFilter(nestedParameters
, Operator
.MUST_PASS_ONE
));
296 case PARAMETER_TYPE_PROPERTY
: {
297 handlePropertyParam(param
, filters
);
304 if (!filters
.isEmpty()) {
305 FilterList filterList
= new FilterList(operator
, filters
);
318 protected void handlePropertyParam(QueryParameter queryParameter
,
319 List
<Filter
> filters
) {
320 OperatorType operator
= getOperator(queryParameter
);
321 Object parameter
= getValue(queryParameter
);
322 FilterConfig filterConfig
= getInfoProvider().getFilterConfig(getPropertyName(queryParameter
));
324 case OPERATOR_EQUAL
: {
325 filters
.add(getCellFilter(filterConfig
, CompareOp
.EQUAL
, Bytes
.toBytes(parameter
.toString())));
328 case OPERATOR_LESSER
: {
329 filters
.add(getCellFilter(filterConfig
, CompareOp
.LESS
, Bytes
.toBytes(parameter
.toString())));
332 case OPERATOR_LESSER_EQUAL
: {
333 filters
.add(getCellFilter(filterConfig
, CompareOp
.LESS_OR_EQUAL
, Bytes
.toBytes(parameter
.toString())));
336 case OPERATOR_GREATER
: {
337 filters
.add(getCellFilter(filterConfig
, CompareOp
.GREATER
, Bytes
.toBytes(parameter
.toString())));
340 case OPERATOR_GREATER_EQUAL
: {
341 filters
.add(getCellFilter(filterConfig
, CompareOp
.GREATER_OR_EQUAL
, Bytes
.toBytes(parameter
.toString())));
344 case OPERATOR_NOT_EQUAL
: {
345 filters
.add(getCellFilter(filterConfig
, CompareOp
.NOT_EQUAL
, Bytes
.toBytes(parameter
.toString())));
348 case OPERATOR_IS_EMPTY
:
349 case OPERATOR_IS_NULL
: {
350 final SingleColumnValueExcludeFilter cellFilter
=
351 getCellFilter(filterConfig
, CompareOp
.EQUAL
, Bytes
.toBytes(""));
352 cellFilter
.setFilterIfMissing(false);
353 filters
.add(cellFilter
);
356 case OPERATOR_IS_NOT_EMPTY
:
357 case OPERATOR_IS_NOT_NULL
: {
358 final SingleColumnValueExcludeFilter cellFilter
= getCellFilter(filterConfig
, CompareOp
.NOT_EQUAL
, Bytes
.toBytes(
360 cellFilter
.setFilterIfMissing(true);
361 filters
.add(cellFilter
);
364 case OPERATOR_STRING_LIKE
: {
365 MatchMode matchMode
= getMatchMode(queryParameter
);
366 if (matchMode
== null) {
367 matchMode
= MatchMode
.EXACT
;
371 filters
.add(getCellFilter(filterConfig
, CompareOp
.EQUAL
, new BinarySuffixComparator(Bytes
.toBytes(parameter
.
375 filters
.add(getCellFilter(filterConfig
, CompareOp
.EQUAL
, new BinaryComparator(Bytes
.toBytes(parameter
.
379 filters
.add(getCellFilter(filterConfig
, CompareOp
.EQUAL
, new BinaryPrefixComparator(Bytes
.toBytes(parameter
.
384 filters
.add(getCellFilter(filterConfig
, CompareOp
.EQUAL
, new SubstringComparator(parameter
.toString())));
389 case OPERATOR_BETWEEN
: {
390 parameter
= getFirstParameter(queryParameter
);
391 Object parameter2
= getSecondParameter(queryParameter
);
392 filters
.add(getCellFilter(filterConfig
, CompareOp
.EQUAL
,
393 new RangeComparator(Bytes
.toBytes(parameter
.toString()),
394 Bytes
.toBytes(parameter2
.toString()))));
397 case OPERATOR_IS_IN
: {
398 Collection inCollectin
= QueryParameterCastHelper
.MULTI_OPERAND_PARAM_HELPER
.cast(queryParameter
).getValues();
399 FilterList filterList
= getInFilter(inCollectin
, filterConfig
);
400 filters
.add(filterList
);
403 case OPERATOR_IS_NOT_IN
: {
404 Collection notInCollectin
= QueryParameterCastHelper
.MULTI_OPERAND_PARAM_HELPER
.cast(queryParameter
).getValues();
405 FilterList filterList
= getInFilter(notInCollectin
, filterConfig
);
406 filters
.add(new SkipFilter(filterList
));
413 protected SingleColumnValueExcludeFilter
getCellFilter(FilterConfig filterConfig
, CompareOp op
,
414 WritableByteArrayComparable comparator
) {
415 final SingleColumnValueExcludeFilter valueFilter
;
416 valueFilter
= new SingleColumnValueExcludeFilter(filterConfig
.getColumnFamily(),
417 filterConfig
.getColumnQualifier(),
419 valueFilter
.setFilterIfMissing(filterConfig
.isFilterOnIfMissing());
420 valueFilter
.setLatestVersionOnly(filterConfig
.isFilterOnLatestVersionOnly());
424 protected SingleColumnValueExcludeFilter
getCellFilter(FilterConfig filterConfig
, CompareOp op
, byte[] value
) {
425 return getCellFilter(filterConfig
, op
, new BinaryComparator(value
));
428 protected FilterList
getInFilter(Collection inCollectin
, FilterConfig config
) {
429 FilterList filterList
= new FilterList(Operator
.MUST_PASS_ONE
);
430 for (Object inObj
: inCollectin
) {
431 filterList
.addFilter(getCellFilter(config
, CompareOp
.EQUAL
, new BinaryComparator(Bytes
.toBytes(inObj
.toString()))));
436 protected String
getPropertyName(QueryParameter param
) {
437 final String propertyName
;
438 if (param
instanceof QueryParameterWithPropertyName
) {
439 propertyName
= ((QueryParameterWithPropertyName
) param
).getPropertyName();
447 protected Object
getSecondParameter(QueryParameter queryParamemter
) {
448 if (queryParamemter
instanceof BiOperandQueryParameter
) {
449 return QueryParameterCastHelper
.BI_OPERAND_PARAM_HELPER
.cast(queryParamemter
).getSecondValue();
456 protected Object
getFirstParameter(QueryParameter queryParamemter
) {
457 if (queryParamemter
instanceof BiOperandQueryParameter
) {
458 return QueryParameterCastHelper
.BI_OPERAND_PARAM_HELPER
.cast(queryParamemter
).getFirstValue();
465 protected MatchMode
getMatchMode(QueryParameter queryParamemter
) {
466 return QueryParameterCastHelper
.STRING_PARAM_HELPER
.cast(queryParamemter
).getMatchMode();
469 protected Object
getValue(QueryParameter queryParamemter
) {
471 if (queryParamemter
instanceof QueryParameterWithValue
) {
472 value
= ((QueryParameterWithValue
) queryParamemter
).getValue();
483 protected OperatorType
getOperator(QueryParameter queryParamemter
) {
484 if (QueryParameterCastHelper
.BI_OPERAND_PARAM_HELPER
.isWithOperator(queryParamemter
)) {
485 QueryParameterWithOperator parameterWithOperator
=
486 QueryParameterCastHelper
.BI_OPERAND_PARAM_HELPER
.castToOperatorParam(queryParamemter
);
487 return parameterWithOperator
.getOperatorType();
495 public Template
getSingle(QueryParameter
... query
) {
496 return getSingle(Arrays
.asList(query
));
500 public List
<Template
> getList(QueryParameter
... query
) {
501 return getList(Arrays
.asList(query
));
505 public <OtherTemplate
> OtherTemplate
getOther(QueryParameter
... query
) {
506 return this.<OtherTemplate
>getOther(Arrays
.asList(query
));
510 public <OtherTemplate
> List
<OtherTemplate
> getOtherList(QueryParameter
... query
) {
511 return this.<OtherTemplate
>getOtherList(Arrays
.asList(query
));
518 public void save(Template
... states
) {
519 LinkedHashMap
<String
, List
<Put
>> allPuts
= new LinkedHashMap
<String
, List
<Put
>>();
520 for (Template state
: states
) {
521 LinkedHashMap
<String
, Put
> puts
= getConverter().objectToRows(state
);
522 for (Map
.Entry
<String
, Put
> put
: puts
.entrySet()) {
523 final List
<Put
> putList
;
524 if (allPuts
.containsKey(put
.getKey())) {
525 putList
= allPuts
.get(put
.getKey());
528 putList
= new ArrayList
<Put
>();
529 allPuts
.put(put
.getKey(), putList
);
531 putList
.add(put
.getValue());
534 for (Map
.Entry
<String
, List
<Put
>> puts
: allPuts
.entrySet()) {
535 HTableInterface hTable
= null;
537 hTable
= getTablePool().getTable(puts
.getKey());
538 hTable
.put(puts
.getValue());
540 catch (Exception ex
) {
541 throw new RuntimeException(ex
);
545 if (hTable
!= null) {
546 getTablePool().putTable(hTable
);
549 catch (Exception ex
) {
550 ex
.printStackTrace();
557 public void update(Template
... states
) {
562 public void delete(Template
... states
) {
563 LinkedHashMap
<String
, List
<Delete
>> allDels
= new LinkedHashMap
<String
, List
<Delete
>>();
564 for (Template state
: states
) {
565 LinkedHashMap
<String
, Delete
> dels
= getConverter().objectToDeleteableRows(state
);
566 for (Map
.Entry
<String
, Delete
> del
: dels
.entrySet()) {
567 final List
<Delete
> putList
;
568 if (allDels
.containsKey(del
.getKey())) {
569 putList
= allDels
.get(del
.getKey());
572 putList
= new ArrayList
<Delete
>();
573 allDels
.put(del
.getKey(), putList
);
575 putList
.add(del
.getValue());
578 for (Map
.Entry
<String
, List
<Delete
>> dels
: allDels
.entrySet()) {
579 HTableInterface hTable
= null;
581 hTable
= getTablePool().getTable(dels
.getKey());
582 hTable
.delete(dels
.getValue());
584 catch (Exception ex
) {
585 throw new RuntimeException(ex
);
589 if (hTable
!= null) {
590 getTablePool().putTable(hTable
);
593 catch (Exception ex
) {
594 ex
.printStackTrace();