Add support for nested type of query parameter
[smart-dao.git] / smart-hbase-dao / src / main / java / com / smartitengineering / dao / impl / hbase / CommonDao.java
blob83ed8558e5b1b2c7c22e41492efe409d9d1d8dd8
1 /*
2 * This is a common dao with basic CRUD operations and is not limited to any
3 * persistent layer implementation
5 * Copyright (C) 2010 Imran M Yousuf (imyousuf@smartitengineering.com)
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 3 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 * You should have received a copy of the GNU Lesser General Public
16 * License along with this library; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 package com.smartitengineering.dao.impl.hbase;
21 import com.smartitengineering.dao.common.CommonReadDao;
22 import com.smartitengineering.dao.common.CommonWriteDao;
23 import com.smartitengineering.dao.common.queryparam.BasicCompoundQueryParameter;
24 import com.smartitengineering.dao.common.queryparam.BiOperandQueryParameter;
25 import com.smartitengineering.dao.common.queryparam.CompositionQueryParameter;
26 import com.smartitengineering.dao.common.queryparam.MatchMode;
27 import com.smartitengineering.dao.common.queryparam.OperatorType;
28 import com.smartitengineering.dao.common.queryparam.ParameterType;
29 import com.smartitengineering.dao.common.queryparam.QueryParameter;
30 import com.smartitengineering.dao.common.queryparam.QueryParameterCastHelper;
31 import com.smartitengineering.dao.common.queryparam.QueryParameterWithOperator;
32 import com.smartitengineering.dao.common.queryparam.QueryParameterWithPropertyName;
33 import com.smartitengineering.dao.common.queryparam.QueryParameterWithValue;
34 import com.smartitengineering.dao.common.queryparam.ValueOnlyQueryParameter;
35 import com.smartitengineering.dao.impl.hbase.spi.AsyncExecutorService;
36 import com.smartitengineering.dao.impl.hbase.spi.Callback;
37 import com.smartitengineering.dao.impl.hbase.spi.FilterConfig;
38 import com.smartitengineering.dao.impl.hbase.spi.ObjectRowConverter;
39 import com.smartitengineering.dao.impl.hbase.spi.SchemaInfoProvider;
40 import com.smartitengineering.dao.impl.hbase.spi.impl.BinarySuffixComparator;
41 import com.smartitengineering.dao.impl.hbase.spi.impl.RangeComparator;
42 import com.smartitengineering.domain.PersistentDTO;
43 import java.io.ByteArrayOutputStream;
44 import java.io.ObjectOutputStream;
45 import java.io.Serializable;
46 import java.util.ArrayList;
47 import java.util.Arrays;
48 import java.util.Collection;
49 import java.util.Collections;
50 import java.util.LinkedHashMap;
51 import java.util.LinkedHashSet;
52 import java.util.List;
53 import java.util.Map;
54 import java.util.Set;
55 import java.util.concurrent.Future;
56 import org.apache.commons.io.IOUtils;
57 import org.apache.commons.lang.StringUtils;
58 import org.apache.hadoop.hbase.client.Delete;
59 import org.apache.hadoop.hbase.client.Get;
60 import org.apache.hadoop.hbase.client.HTableInterface;
61 import org.apache.hadoop.hbase.client.Put;
62 import org.apache.hadoop.hbase.client.Result;
63 import org.apache.hadoop.hbase.client.ResultScanner;
64 import org.apache.hadoop.hbase.client.Scan;
65 import org.apache.hadoop.hbase.filter.BinaryComparator;
66 import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
67 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
68 import org.apache.hadoop.hbase.filter.Filter;
69 import org.apache.hadoop.hbase.filter.FilterList;
70 import org.apache.hadoop.hbase.filter.FilterList.Operator;
71 import org.apache.hadoop.hbase.filter.QualifierFilter;
72 import org.apache.hadoop.hbase.filter.RowFilter;
73 import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
74 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
75 import org.apache.hadoop.hbase.filter.SkipFilter;
76 import org.apache.hadoop.hbase.filter.SubstringComparator;
77 import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
78 import org.apache.hadoop.hbase.util.Bytes;
80 /**
81 * A common DAO implementation for HBase. Please note that all parameters for reading (i.e. Scan) assumes that the
82 * toString() method returns the string representation of the value to be compared in byte[] form.
83 * @author imyousuf
85 public class CommonDao<Template extends PersistentDTO, IdType extends Serializable> implements
86 CommonReadDao<Template, IdType>, CommonWriteDao<Template> {
88 public static final int DEFAULT_MAX_ROWS = 1000;
89 private ObjectRowConverter<Template> converter;
90 private SchemaInfoProvider infoProvider;
91 private AsyncExecutorService executorService;
92 private int maxRows = -1;
94 public AsyncExecutorService getExecutorService() {
95 return executorService;
98 public void setExecutorService(AsyncExecutorService executorService) {
99 this.executorService = executorService;
102 public int getMaxRows() {
103 return maxRows;
106 public void setMaxRows(int maxRows) {
107 this.maxRows = maxRows;
110 public ObjectRowConverter<Template> getConverter() {
111 return converter;
114 public void setConverter(ObjectRowConverter<Template> converter) {
115 this.converter = converter;
118 public SchemaInfoProvider getInfoProvider() {
119 return infoProvider;
122 public void setInfoProvider(SchemaInfoProvider infoProvider) {
123 this.infoProvider = infoProvider;
126 protected String getDefaultTableName() {
127 return getInfoProvider().getMainTableName();
130 protected int getMaxScanRows() {
131 return getMaxRows() > 0 ? getMaxRows() : DEFAULT_MAX_ROWS;
134 protected int getMaxScanRows(List<QueryParameter> params) {
135 if (params != null && !params.isEmpty()) {
136 for (QueryParameter param : params) {
137 if (ParameterType.PARAMETER_TYPE_MAX_RESULT.equals(param.getParameterType())) {
138 ValueOnlyQueryParameter<Integer> queryParameter = QueryParameterCastHelper.VALUE_PARAM_HELPER.cast(param);
139 return queryParameter.getValue();
143 return getMaxScanRows();
147 * READ OPERATIONS
151 * Unsupported read operations
153 @Override
154 public Set<Template> getAll() {
155 throw new UnsupportedOperationException("Not supported yet.");
158 @Override
159 public <OtherTemplate> OtherTemplate getOther(List<QueryParameter> query) {
160 throw new UnsupportedOperationException("Not supported yet.");
163 @Override
164 public <OtherTemplate> List<OtherTemplate> getOtherList(List<QueryParameter> query) {
165 throw new UnsupportedOperationException("Not supported yet.");
169 * Supported read operations
171 @Override
172 public Set<Template> getByIds(List<IdType> ids) {
173 LinkedHashSet<Future<Template>> set = new LinkedHashSet<Future<Template>>(ids.size());
174 LinkedHashSet<Template> resultSet = new LinkedHashSet<Template>(ids.size());
175 for (IdType id : ids) {
176 set.add(executorService.executeAsynchronously(getDefaultTableName(), getByIdCallback(id)));
178 for (Future<Template> future : set) {
179 try {
180 resultSet.add(future.get());
182 catch (Exception ex) {
183 ex.printStackTrace();
186 return resultSet;
189 @Override
190 public Template getById(final IdType id) {
191 return executorService.execute(getDefaultTableName(), getByIdCallback(id));
194 protected Callback<Template> getByIdCallback(final IdType id) {
195 return new Callback<Template>() {
197 @Override
198 public Template call(HTableInterface tableInterface) throws Exception {
199 final byte[] rowId;
200 if (id instanceof Integer) {
201 rowId = Bytes.toBytes((Integer) id);
203 else if (id instanceof String) {
204 rowId = Bytes.toBytes((String) id);
206 else if (id instanceof Long) {
207 rowId = Bytes.toBytes((Long) id);
209 else if (id instanceof Double) {
210 rowId = Bytes.toBytes((Double) id);
212 else if (id != null) {
213 final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
214 final ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
215 objectOutputStream.writeObject(id);
216 IOUtils.closeQuietly(objectOutputStream);
217 IOUtils.closeQuietly(byteArrayOutputStream);
218 rowId = byteArrayOutputStream.toByteArray();
220 else {
221 rowId = new byte[0];
223 Get get = new Get(rowId);
224 Result result = tableInterface.get(get);
225 if (result == null || result.isEmpty()) {
226 return null;
228 else {
229 return getConverter().rowsToObject(result, executorService);
235 @Override
236 public Template getSingle(final List<QueryParameter> query) {
237 return executorService.execute(getDefaultTableName(), new Callback<Template>() {
239 @Override
240 public Template call(HTableInterface tableInterface) throws Exception {
241 ResultScanner scanner = tableInterface.getScanner(formScan(query));
242 try {
243 Result result = scanner.next();
244 if (result == null || result.isEmpty()) {
245 return null;
247 else {
248 return getConverter().rowsToObject(result, executorService);
251 finally {
252 if (scanner != null) {
253 scanner.close();
260 @Override
261 public List<Template> getList(final List<QueryParameter> query) {
262 return executorService.execute(getDefaultTableName(), new Callback<List<Template>>() {
264 @Override
265 public List<Template> call(HTableInterface tableInterface) throws Exception {
266 ResultScanner scanner = tableInterface.getScanner(formScan(query));
267 try {
268 Result[] results = scanner.next(getMaxScanRows(query));
269 if (results == null) {
270 return Collections.emptyList();
272 else {
273 ArrayList<Template> templates = new ArrayList<Template>(results.length);
274 for (Result result : results) {
275 if (result == null || result.isEmpty()) {
276 continue;
278 else {
279 templates.add(getConverter().rowsToObject(result, executorService));
282 return templates;
285 finally {
286 if (scanner != null) {
287 scanner.close();
294 protected Scan formScan(List<QueryParameter> query) {
295 Scan scan = new Scan();
296 final Filter filter = getFilter(query, scan);
297 if (filter != null) {
298 scan.setFilter(filter);
300 return scan;
303 protected Filter getFilter(Collection<QueryParameter> queryParams, Scan scan) {
304 return getFilter(queryParams, scan, Operator.MUST_PASS_ALL);
307 protected Filter getFilter(Collection<QueryParameter> queryParams, Scan scan, Operator operator) {
308 return getFilter("", queryParams, scan, operator);
311 protected Filter getFilter(String namePrefix, Collection<QueryParameter> queryParams, Scan scan, Operator operator) {
312 final Filter filter;
313 if (queryParams != null && !queryParams.isEmpty()) {
314 List<Filter> filters = new ArrayList<Filter>(queryParams.size());
315 for (QueryParameter param : queryParams) {
316 switch (param.getParameterType()) {
317 case PARAMETER_TYPE_CONJUNCTION: {
318 BasicCompoundQueryParameter queryParameter =
319 QueryParameterCastHelper.BASIC_COMPOUND_PARAM_HELPER.cast(param);
320 Collection<QueryParameter> nestedParameters = queryParameter.getNestedParameters();
321 filters.add(getFilter(nestedParameters, scan, Operator.MUST_PASS_ALL));
322 break;
324 case PARAMETER_TYPE_NESTED_PROPERTY: {
325 CompositionQueryParameter queryParameter = QueryParameterCastHelper.COMPOSITION_PARAM_FOR_NESTED_TYPE.cast(
326 param);
327 Collection<QueryParameter> nestedParameters = queryParameter.getNestedParameters();
328 filters.add(getFilter(getPropertyName(namePrefix, param), nestedParameters, scan, Operator.MUST_PASS_ALL));
329 break;
331 case PARAMETER_TYPE_DISJUNCTION: {
332 BasicCompoundQueryParameter queryParameter =
333 QueryParameterCastHelper.BASIC_COMPOUND_PARAM_HELPER.cast(param);
334 Collection<QueryParameter> nestedParameters = queryParameter.getNestedParameters();
335 filters.add(getFilter(nestedParameters, scan, Operator.MUST_PASS_ONE));
336 break;
338 case PARAMETER_TYPE_PROPERTY: {
339 handlePropertyParam(namePrefix, param, filters);
340 break;
342 case PARAMETER_TYPE_FIRST_RESULT: {
343 Object value = getValue(param);
344 scan.setStartRow(Bytes.toBytes(value.toString()));
345 break;
347 case PARAMETER_TYPE_UNIT_PROP: {
348 FilterConfig config = getInfoProvider().getFilterConfig(getPropertyName(namePrefix, param));
349 if (config != null) {
350 scan.addFamily(config.getColumnFamily());
352 break;
354 default:
355 //Do nothing
358 if (!filters.isEmpty()) {
359 FilterList filterList = new FilterList(operator, filters);
360 filter = filterList;
362 else {
363 filter = null;
366 else {
367 filter = null;
369 return filter;
372 protected void handlePropertyParam(String namePrefix, QueryParameter queryParameter, List<Filter> filters) {
373 OperatorType operator = getOperator(queryParameter);
374 Object parameter = getValue(queryParameter);
375 FilterConfig filterConfig = getInfoProvider().getFilterConfig(getPropertyName(namePrefix, queryParameter));
376 switch (operator) {
377 case OPERATOR_EQUAL: {
378 filters.add(getCellFilter(filterConfig, CompareOp.EQUAL, Bytes.toBytes(parameter.toString())));
379 return;
381 case OPERATOR_LESSER: {
382 filters.add(getCellFilter(filterConfig, CompareOp.LESS, Bytes.toBytes(parameter.toString())));
383 return;
385 case OPERATOR_LESSER_EQUAL: {
386 filters.add(getCellFilter(filterConfig, CompareOp.LESS_OR_EQUAL, Bytes.toBytes(parameter.toString())));
387 return;
389 case OPERATOR_GREATER: {
390 filters.add(getCellFilter(filterConfig, CompareOp.GREATER, Bytes.toBytes(parameter.toString())));
391 return;
393 case OPERATOR_GREATER_EQUAL: {
394 filters.add(getCellFilter(filterConfig, CompareOp.GREATER_OR_EQUAL, Bytes.toBytes(parameter.toString())));
395 return;
397 case OPERATOR_NOT_EQUAL: {
398 filters.add(getCellFilter(filterConfig, CompareOp.NOT_EQUAL, Bytes.toBytes(parameter.toString())));
399 return;
401 case OPERATOR_IS_EMPTY:
402 case OPERATOR_IS_NULL: {
403 final Filter cellFilter = getCellFilter(filterConfig, CompareOp.EQUAL, Bytes.toBytes(""));
404 if (cellFilter instanceof SingleColumnValueFilter) {
405 ((SingleColumnValueFilter) cellFilter).setFilterIfMissing(false);
407 filters.add(cellFilter);
408 return;
410 case OPERATOR_IS_NOT_EMPTY:
411 case OPERATOR_IS_NOT_NULL: {
412 final Filter cellFilter = getCellFilter(filterConfig, CompareOp.NOT_EQUAL, Bytes.toBytes(""));
413 if (cellFilter instanceof SingleColumnValueFilter) {
414 ((SingleColumnValueFilter) cellFilter).setFilterIfMissing(true);
416 filters.add(cellFilter);
417 return;
419 case OPERATOR_STRING_LIKE: {
420 MatchMode matchMode = getMatchMode(queryParameter);
421 if (matchMode == null) {
422 matchMode = MatchMode.EXACT;
424 switch (matchMode) {
425 case END:
426 filters.add(getCellFilter(filterConfig, CompareOp.EQUAL, new BinarySuffixComparator(Bytes.toBytes(parameter.
427 toString()))));
428 break;
429 case EXACT:
430 filters.add(getCellFilter(filterConfig, CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(parameter.
431 toString()))));
432 break;
433 case START:
434 filters.add(getCellFilter(filterConfig, CompareOp.EQUAL, new BinaryPrefixComparator(Bytes.toBytes(parameter.
435 toString()))));
436 break;
437 default:
438 case ANYWHERE:
439 filters.add(getCellFilter(filterConfig, CompareOp.EQUAL, new SubstringComparator(parameter.toString())));
440 break;
442 return;
444 case OPERATOR_BETWEEN: {
445 parameter = getFirstParameter(queryParameter);
446 Object parameter2 = getSecondParameter(queryParameter);
447 filters.add(getCellFilter(filterConfig, CompareOp.EQUAL,
448 new RangeComparator(Bytes.toBytes(parameter.toString()),
449 Bytes.toBytes(parameter2.toString()))));
450 return;
452 case OPERATOR_IS_IN: {
453 Collection inCollectin = QueryParameterCastHelper.MULTI_OPERAND_PARAM_HELPER.cast(queryParameter).getValues();
454 Filter filterList = getInFilter(inCollectin, filterConfig);
455 filters.add(filterList);
456 return;
458 case OPERATOR_IS_NOT_IN: {
459 Collection notInCollectin = QueryParameterCastHelper.MULTI_OPERAND_PARAM_HELPER.cast(queryParameter).getValues();
460 Filter filterList = getInFilter(notInCollectin, filterConfig);
461 filters.add(new SkipFilter(filterList));
462 return;
465 return;
468 protected Filter getCellFilter(FilterConfig filterConfig, CompareOp op,
469 WritableByteArrayComparable comparator) {
470 if (filterConfig.isFilterOnRowId()) {
471 RowFilter rowFilter = new RowFilter(op, comparator);
472 return rowFilter;
474 else if (filterConfig.isQualifierARangePrefix()) {
475 QualifierFilter filter = new QualifierFilter(op, comparator);
476 return filter;
478 else {
479 final SingleColumnValueExcludeFilter valueFilter;
480 valueFilter = new SingleColumnValueExcludeFilter(filterConfig.getColumnFamily(),
481 filterConfig.getColumnQualifier(),
482 op, comparator);
483 valueFilter.setFilterIfMissing(filterConfig.isFilterOnIfMissing());
484 valueFilter.setLatestVersionOnly(filterConfig.isFilterOnLatestVersionOnly());
485 return valueFilter;
489 protected Filter getCellFilter(FilterConfig filterConfig, CompareOp op, byte[] value) {
490 return getCellFilter(filterConfig, op, new BinaryComparator(value));
493 protected Filter getInFilter(Collection inCollectin, FilterConfig config) {
494 FilterList filterList = new FilterList(Operator.MUST_PASS_ONE);
495 for (Object inObj : inCollectin) {
496 filterList.addFilter(getCellFilter(config, CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(inObj.toString()))));
498 return filterList;
501 protected String getPropertyName(String prefix, QueryParameter param) {
502 final StringBuilder propertyName = new StringBuilder("");
503 if(StringUtils.isNotBlank(prefix)) {
504 propertyName.append(prefix).append('.');
506 if (param instanceof QueryParameterWithPropertyName) {
507 propertyName.append(((QueryParameterWithPropertyName) param).getPropertyName());
509 return propertyName.toString();
512 protected Object getSecondParameter(QueryParameter queryParamemter) {
513 if (queryParamemter instanceof BiOperandQueryParameter) {
514 return QueryParameterCastHelper.BI_OPERAND_PARAM_HELPER.cast(queryParamemter).getSecondValue();
516 else {
517 return "";
521 protected Object getFirstParameter(QueryParameter queryParamemter) {
522 if (queryParamemter instanceof BiOperandQueryParameter) {
523 return QueryParameterCastHelper.BI_OPERAND_PARAM_HELPER.cast(queryParamemter).getFirstValue();
525 else {
526 return "";
530 protected MatchMode getMatchMode(QueryParameter queryParamemter) {
531 return QueryParameterCastHelper.STRING_PARAM_HELPER.cast(queryParamemter).getMatchMode();
534 protected Object getValue(QueryParameter queryParamemter) {
535 Object value;
536 if (queryParamemter instanceof QueryParameterWithValue) {
537 value = ((QueryParameterWithValue) queryParamemter).getValue();
539 else {
540 value = null;
542 if (value == null) {
543 value = "";
545 return value;
548 protected OperatorType getOperator(QueryParameter queryParamemter) {
549 if (QueryParameterCastHelper.BI_OPERAND_PARAM_HELPER.isWithOperator(queryParamemter)) {
550 QueryParameterWithOperator parameterWithOperator =
551 QueryParameterCastHelper.BI_OPERAND_PARAM_HELPER.castToOperatorParam(queryParamemter);
552 return parameterWithOperator.getOperatorType();
554 else {
555 return null;
559 @Override
560 public Template getSingle(QueryParameter... query) {
561 return getSingle(Arrays.asList(query));
564 @Override
565 public List<Template> getList(QueryParameter... query) {
566 return getList(Arrays.asList(query));
569 @Override
570 public <OtherTemplate> OtherTemplate getOther(QueryParameter... query) {
571 return this.<OtherTemplate>getOther(Arrays.asList(query));
574 @Override
575 public <OtherTemplate> List<OtherTemplate> getOtherList(QueryParameter... query) {
576 return this.<OtherTemplate>getOtherList(Arrays.asList(query));
580 * WRITE OPERATIONS
582 @Override
583 public void save(Template... states) {
584 LinkedHashMap<String, List<Put>> allPuts = new LinkedHashMap<String, List<Put>>();
585 for (Template state : states) {
586 if(!state.isValid()) {
587 throw new IllegalStateException("Entity not in valid state!");
589 LinkedHashMap<String, Put> puts = getConverter().objectToRows(state);
590 for (Map.Entry<String, Put> put : puts.entrySet()) {
591 final List<Put> putList;
592 if (allPuts.containsKey(put.getKey())) {
593 putList = allPuts.get(put.getKey());
595 else {
596 putList = new ArrayList<Put>();
597 allPuts.put(put.getKey(), putList);
599 putList.add(put.getValue());
602 for (final Map.Entry<String, List<Put>> puts : allPuts.entrySet()) {
603 executorService.execute(puts.getKey(), new Callback<Void>() {
605 @Override
606 public Void call(HTableInterface tableInterface) throws Exception {
607 tableInterface.put(puts.getValue());
608 return null;
614 @Override
615 public void update(Template... states) {
616 save(states);
619 @Override
620 public void delete(Template... states) {
621 LinkedHashMap<String, List<Delete>> allDels = new LinkedHashMap<String, List<Delete>>();
622 for (Template state : states) {
623 if(!state.isValid()) {
624 throw new IllegalStateException("Entity not in valid state!");
626 LinkedHashMap<String, Delete> dels = getConverter().objectToDeleteableRows(state);
627 for (Map.Entry<String, Delete> del : dels.entrySet()) {
628 final List<Delete> putList;
629 if (allDels.containsKey(del.getKey())) {
630 putList = allDels.get(del.getKey());
632 else {
633 putList = new ArrayList<Delete>();
634 allDels.put(del.getKey(), putList);
636 putList.add(del.getValue());
639 for (final Map.Entry<String, List<Delete>> dels : allDels.entrySet()) {
640 executorService.execute(dels.getKey(), new Callback<Void>() {
642 @Override
643 public Void call(HTableInterface tableInterface) throws Exception {
644 tableInterface.delete(dels.getValue());
645 return null;