Free executor service implementation of schema info provider
[smart-dao.git] / smart-hbase-dao / src / main / java / com / smartitengineering / dao / impl / hbase / CommonDao.java
blob8dd5820b07667df9cb603334808ddbd26dbd282e
1 /*
2 * This is a common dao with basic CRUD operations and is not limited to any
3 * persistent layer implementation
5 * Copyright (C) 2010 Imran M Yousuf (imyousuf@smartitengineering.com)
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 3 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 * You should have received a copy of the GNU Lesser General Public
16 * License along with this library; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 package com.smartitengineering.dao.impl.hbase;
21 import com.smartitengineering.dao.common.CommonReadDao;
22 import com.smartitengineering.dao.common.CommonWriteDao;
23 import com.smartitengineering.dao.common.queryparam.BasicCompoundQueryParameter;
24 import com.smartitengineering.dao.common.queryparam.BiOperandQueryParameter;
25 import com.smartitengineering.dao.common.queryparam.MatchMode;
26 import com.smartitengineering.dao.common.queryparam.OperatorType;
27 import com.smartitengineering.dao.common.queryparam.ParameterType;
28 import com.smartitengineering.dao.common.queryparam.QueryParameter;
29 import com.smartitengineering.dao.common.queryparam.QueryParameterCastHelper;
30 import com.smartitengineering.dao.common.queryparam.QueryParameterWithOperator;
31 import com.smartitengineering.dao.common.queryparam.QueryParameterWithPropertyName;
32 import com.smartitengineering.dao.common.queryparam.QueryParameterWithValue;
33 import com.smartitengineering.dao.common.queryparam.ValueOnlyQueryParameter;
34 import com.smartitengineering.dao.impl.hbase.spi.AsyncExecutorService;
35 import com.smartitengineering.dao.impl.hbase.spi.Callback;
36 import com.smartitengineering.dao.impl.hbase.spi.FilterConfig;
37 import com.smartitengineering.dao.impl.hbase.spi.ObjectRowConverter;
38 import com.smartitengineering.dao.impl.hbase.spi.SchemaInfoProvider;
39 import com.smartitengineering.dao.impl.hbase.spi.impl.BinarySuffixComparator;
40 import com.smartitengineering.dao.impl.hbase.spi.impl.RangeComparator;
41 import com.smartitengineering.domain.PersistentDTO;
42 import java.io.ByteArrayOutputStream;
43 import java.io.ObjectOutputStream;
44 import java.io.Serializable;
45 import java.util.ArrayList;
46 import java.util.Arrays;
47 import java.util.Collection;
48 import java.util.Collections;
49 import java.util.LinkedHashMap;
50 import java.util.LinkedHashSet;
51 import java.util.List;
52 import java.util.Map;
53 import java.util.Set;
54 import java.util.concurrent.Future;
55 import org.apache.hadoop.hbase.client.Delete;
56 import org.apache.hadoop.hbase.client.Get;
57 import org.apache.hadoop.hbase.client.HTableInterface;
58 import org.apache.hadoop.hbase.client.Put;
59 import org.apache.hadoop.hbase.client.Result;
60 import org.apache.hadoop.hbase.client.ResultScanner;
61 import org.apache.hadoop.hbase.client.Scan;
62 import org.apache.hadoop.hbase.filter.BinaryComparator;
63 import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
64 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
65 import org.apache.hadoop.hbase.filter.Filter;
66 import org.apache.hadoop.hbase.filter.FilterList;
67 import org.apache.hadoop.hbase.filter.FilterList.Operator;
68 import org.apache.hadoop.hbase.filter.QualifierFilter;
69 import org.apache.hadoop.hbase.filter.RowFilter;
70 import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
71 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
72 import org.apache.hadoop.hbase.filter.SkipFilter;
73 import org.apache.hadoop.hbase.filter.SubstringComparator;
74 import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
75 import org.apache.hadoop.hbase.util.Bytes;
77 /**
78 * A common DAO implementation for HBase. Please note that all parameters for reading (i.e. Scan) assumes that the
79 * toString() method returns the string representation of the value to be compared in byte[] form.
80 * @author imyousuf
82 public class CommonDao<Template extends PersistentDTO, IdType extends Serializable> implements
83 CommonReadDao<Template, IdType>, CommonWriteDao<Template> {
85 public static final int DEFAULT_MAX_ROWS = 1000;
86 private ObjectRowConverter<Template> converter;
87 private SchemaInfoProvider infoProvider;
88 private AsyncExecutorService executorService;
89 private int maxRows = -1;
91 public AsyncExecutorService getExecutorService() {
92 return executorService;
95 public void setExecutorService(AsyncExecutorService executorService) {
96 this.executorService = executorService;
99 public int getMaxRows() {
100 return maxRows;
103 public void setMaxRows(int maxRows) {
104 this.maxRows = maxRows;
107 public ObjectRowConverter<Template> getConverter() {
108 return converter;
111 public void setConverter(ObjectRowConverter<Template> converter) {
112 this.converter = converter;
115 public SchemaInfoProvider getInfoProvider() {
116 return infoProvider;
119 public void setInfoProvider(SchemaInfoProvider infoProvider) {
120 this.infoProvider = infoProvider;
123 protected String getDefaultTableName() {
124 return getInfoProvider().getMainTableName();
127 protected int getMaxScanRows() {
128 return getMaxRows() > 0 ? getMaxRows() : DEFAULT_MAX_ROWS;
131 protected int getMaxScanRows(List<QueryParameter> params) {
132 if (params != null && !params.isEmpty()) {
133 for (QueryParameter param : params) {
134 if (ParameterType.PARAMETER_TYPE_MAX_RESULT.equals(param.getParameterType())) {
135 ValueOnlyQueryParameter<Integer> queryParameter = QueryParameterCastHelper.VALUE_PARAM_HELPER.cast(param);
136 return queryParameter.getValue();
140 return getMaxScanRows();
144 * READ OPERATIONS
148 * Unsupported read operations
150 @Override
151 public Set<Template> getAll() {
152 throw new UnsupportedOperationException("Not supported yet.");
155 @Override
156 public <OtherTemplate> OtherTemplate getOther(List<QueryParameter> query) {
157 throw new UnsupportedOperationException("Not supported yet.");
160 @Override
161 public <OtherTemplate> List<OtherTemplate> getOtherList(List<QueryParameter> query) {
162 throw new UnsupportedOperationException("Not supported yet.");
166 * Supported read operations
168 @Override
169 public Set<Template> getByIds(List<IdType> ids) {
170 LinkedHashSet<Future<Template>> set = new LinkedHashSet<Future<Template>>(ids.size());
171 LinkedHashSet<Template> resultSet = new LinkedHashSet<Template>(ids.size());
172 for (IdType id : ids) {
173 set.add(executorService.executeAsynchronously(getDefaultTableName(), getByIdCallback(id)));
175 for (Future<Template> future : set) {
176 try {
177 resultSet.add(future.get());
179 catch (Exception ex) {
180 ex.printStackTrace();
183 return resultSet;
186 @Override
187 public Template getById(final IdType id) {
188 return executorService.execute(getDefaultTableName(), getByIdCallback(id));
191 protected Callback<Template> getByIdCallback(final IdType id) {
192 return new Callback<Template>() {
194 @Override
195 public Template call(HTableInterface tableInterface) throws Exception {
196 final byte[] rowId;
197 if (id instanceof Integer) {
198 rowId = Bytes.toBytes((Integer) id);
200 else if (id instanceof String) {
201 rowId = Bytes.toBytes((String) id);
203 else if (id instanceof Long) {
204 rowId = Bytes.toBytes((Long) id);
206 else if (id instanceof Double) {
207 rowId = Bytes.toBytes((Double) id);
209 else if (id != null) {
210 final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
211 new ObjectOutputStream(byteArrayOutputStream).writeObject(id);
212 rowId = byteArrayOutputStream.toByteArray();
214 else {
215 rowId = new byte[0];
217 Get get = new Get(rowId);
218 Result result = tableInterface.get(get);
219 return getConverter().rowsToObject(result, executorService);
224 @Override
225 public Template getSingle(final List<QueryParameter> query) {
226 return executorService.execute(getDefaultTableName(), new Callback<Template>() {
228 @Override
229 public Template call(HTableInterface tableInterface) throws Exception {
230 ResultScanner scanner = tableInterface.getScanner(formScan(query));
231 try {
232 Result result = scanner.next();
233 if (result == null) {
234 return null;
236 else {
237 return getConverter().rowsToObject(result, executorService);
240 finally {
241 if (scanner != null) {
242 scanner.close();
249 @Override
250 public List<Template> getList(final List<QueryParameter> query) {
251 return executorService.execute(getDefaultTableName(), new Callback<List<Template>>() {
253 @Override
254 public List<Template> call(HTableInterface tableInterface) throws Exception {
255 ResultScanner scanner = tableInterface.getScanner(formScan(query));
256 try {
257 Result[] results = scanner.next(getMaxScanRows(query));
258 if (results == null) {
259 return Collections.emptyList();
261 else {
262 ArrayList<Template> templates = new ArrayList<Template>(results.length);
263 for (Result result : results) {
264 templates.add(getConverter().rowsToObject(result, executorService));
266 return templates;
269 finally {
270 if (scanner != null) {
271 scanner.close();
278 protected Scan formScan(List<QueryParameter> query) {
279 Scan scan = new Scan();
280 final Filter filter = getFilter(query, scan);
281 if (filter != null) {
282 scan.setFilter(filter);
284 return scan;
287 protected Filter getFilter(Collection<QueryParameter> queryParams, Scan scan) {
288 return getFilter(queryParams, scan, Operator.MUST_PASS_ALL);
291 protected Filter getFilter(Collection<QueryParameter> queryParams, Scan scan, Operator operator) {
292 final Filter filter;
293 if (queryParams != null && !queryParams.isEmpty()) {
294 List<Filter> filters = new ArrayList<Filter>(queryParams.size());
295 for (QueryParameter param : queryParams) {
296 switch (param.getParameterType()) {
297 case PARAMETER_TYPE_CONJUNCTION: {
298 BasicCompoundQueryParameter queryParameter =
299 QueryParameterCastHelper.BASIC_COMPOUND_PARAM_HELPER.cast(param);
300 Collection<QueryParameter> nestedParameters = queryParameter.getNestedParameters();
301 filters.add(getFilter(nestedParameters, scan, Operator.MUST_PASS_ALL));
302 break;
304 case PARAMETER_TYPE_DISJUNCTION: {
305 BasicCompoundQueryParameter queryParameter =
306 QueryParameterCastHelper.BASIC_COMPOUND_PARAM_HELPER.cast(param);
307 Collection<QueryParameter> nestedParameters = queryParameter.getNestedParameters();
308 filters.add(getFilter(nestedParameters, scan, Operator.MUST_PASS_ONE));
309 break;
311 case PARAMETER_TYPE_PROPERTY: {
312 handlePropertyParam(param, filters);
313 break;
315 case PARAMETER_TYPE_FIRST_RESULT: {
316 Object value = getValue(param);
317 scan.setStartRow(Bytes.toBytes(value.toString()));
318 break;
320 case PARAMETER_TYPE_UNIT_PROP: {
321 FilterConfig config = getInfoProvider().getFilterConfig(getPropertyName(param));
322 if (config != null) {
323 scan.addFamily(config.getColumnFamily());
325 break;
327 default:
328 //Do nothing
331 if (!filters.isEmpty()) {
332 FilterList filterList = new FilterList(operator, filters);
333 filter = filterList;
335 else {
336 filter = null;
339 else {
340 filter = null;
342 return filter;
345 protected void handlePropertyParam(QueryParameter queryParameter,
346 List<Filter> filters) {
347 OperatorType operator = getOperator(queryParameter);
348 Object parameter = getValue(queryParameter);
349 FilterConfig filterConfig = getInfoProvider().getFilterConfig(getPropertyName(queryParameter));
350 switch (operator) {
351 case OPERATOR_EQUAL: {
352 filters.add(getCellFilter(filterConfig, CompareOp.EQUAL, Bytes.toBytes(parameter.toString())));
353 return;
355 case OPERATOR_LESSER: {
356 filters.add(getCellFilter(filterConfig, CompareOp.LESS, Bytes.toBytes(parameter.toString())));
357 return;
359 case OPERATOR_LESSER_EQUAL: {
360 filters.add(getCellFilter(filterConfig, CompareOp.LESS_OR_EQUAL, Bytes.toBytes(parameter.toString())));
361 return;
363 case OPERATOR_GREATER: {
364 filters.add(getCellFilter(filterConfig, CompareOp.GREATER, Bytes.toBytes(parameter.toString())));
365 return;
367 case OPERATOR_GREATER_EQUAL: {
368 filters.add(getCellFilter(filterConfig, CompareOp.GREATER_OR_EQUAL, Bytes.toBytes(parameter.toString())));
369 return;
371 case OPERATOR_NOT_EQUAL: {
372 filters.add(getCellFilter(filterConfig, CompareOp.NOT_EQUAL, Bytes.toBytes(parameter.toString())));
373 return;
375 case OPERATOR_IS_EMPTY:
376 case OPERATOR_IS_NULL: {
377 final Filter cellFilter = getCellFilter(filterConfig, CompareOp.EQUAL, Bytes.toBytes(""));
378 if (cellFilter instanceof SingleColumnValueFilter) {
379 ((SingleColumnValueFilter) cellFilter).setFilterIfMissing(false);
381 filters.add(cellFilter);
382 return;
384 case OPERATOR_IS_NOT_EMPTY:
385 case OPERATOR_IS_NOT_NULL: {
386 final Filter cellFilter = getCellFilter(filterConfig, CompareOp.NOT_EQUAL, Bytes.toBytes(""));
387 if (cellFilter instanceof SingleColumnValueFilter) {
388 ((SingleColumnValueFilter) cellFilter).setFilterIfMissing(true);
390 filters.add(cellFilter);
391 return;
393 case OPERATOR_STRING_LIKE: {
394 MatchMode matchMode = getMatchMode(queryParameter);
395 if (matchMode == null) {
396 matchMode = MatchMode.EXACT;
398 switch (matchMode) {
399 case END:
400 filters.add(getCellFilter(filterConfig, CompareOp.EQUAL, new BinarySuffixComparator(Bytes.toBytes(parameter.
401 toString()))));
402 break;
403 case EXACT:
404 filters.add(getCellFilter(filterConfig, CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(parameter.
405 toString()))));
406 break;
407 case START:
408 filters.add(getCellFilter(filterConfig, CompareOp.EQUAL, new BinaryPrefixComparator(Bytes.toBytes(parameter.
409 toString()))));
410 break;
411 default:
412 case ANYWHERE:
413 filters.add(getCellFilter(filterConfig, CompareOp.EQUAL, new SubstringComparator(parameter.toString())));
414 break;
416 return;
418 case OPERATOR_BETWEEN: {
419 parameter = getFirstParameter(queryParameter);
420 Object parameter2 = getSecondParameter(queryParameter);
421 filters.add(getCellFilter(filterConfig, CompareOp.EQUAL,
422 new RangeComparator(Bytes.toBytes(parameter.toString()),
423 Bytes.toBytes(parameter2.toString()))));
424 return;
426 case OPERATOR_IS_IN: {
427 Collection inCollectin = QueryParameterCastHelper.MULTI_OPERAND_PARAM_HELPER.cast(queryParameter).getValues();
428 Filter filterList = getInFilter(inCollectin, filterConfig);
429 filters.add(filterList);
430 return;
432 case OPERATOR_IS_NOT_IN: {
433 Collection notInCollectin = QueryParameterCastHelper.MULTI_OPERAND_PARAM_HELPER.cast(queryParameter).getValues();
434 Filter filterList = getInFilter(notInCollectin, filterConfig);
435 filters.add(new SkipFilter(filterList));
436 return;
439 return;
442 protected Filter getCellFilter(FilterConfig filterConfig, CompareOp op,
443 WritableByteArrayComparable comparator) {
444 if (filterConfig.isFilterOnRowId()) {
445 RowFilter rowFilter = new RowFilter(op, comparator);
446 return rowFilter;
448 else if (filterConfig.isQualifierARangePrefix()) {
449 QualifierFilter filter = new QualifierFilter(op, comparator);
450 return filter;
452 else {
453 final SingleColumnValueExcludeFilter valueFilter;
454 valueFilter = new SingleColumnValueExcludeFilter(filterConfig.getColumnFamily(),
455 filterConfig.getColumnQualifier(),
456 op, comparator);
457 valueFilter.setFilterIfMissing(filterConfig.isFilterOnIfMissing());
458 valueFilter.setLatestVersionOnly(filterConfig.isFilterOnLatestVersionOnly());
459 return valueFilter;
463 protected Filter getCellFilter(FilterConfig filterConfig, CompareOp op, byte[] value) {
464 return getCellFilter(filterConfig, op, new BinaryComparator(value));
467 protected Filter getInFilter(Collection inCollectin, FilterConfig config) {
468 FilterList filterList = new FilterList(Operator.MUST_PASS_ONE);
469 for (Object inObj : inCollectin) {
470 filterList.addFilter(getCellFilter(config, CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(inObj.toString()))));
472 return filterList;
475 protected String getPropertyName(QueryParameter param) {
476 final String propertyName;
477 if (param instanceof QueryParameterWithPropertyName) {
478 propertyName = ((QueryParameterWithPropertyName) param).getPropertyName();
480 else {
481 propertyName = "";
483 return propertyName;
486 protected Object getSecondParameter(QueryParameter queryParamemter) {
487 if (queryParamemter instanceof BiOperandQueryParameter) {
488 return QueryParameterCastHelper.BI_OPERAND_PARAM_HELPER.cast(queryParamemter).getSecondValue();
490 else {
491 return "";
495 protected Object getFirstParameter(QueryParameter queryParamemter) {
496 if (queryParamemter instanceof BiOperandQueryParameter) {
497 return QueryParameterCastHelper.BI_OPERAND_PARAM_HELPER.cast(queryParamemter).getFirstValue();
499 else {
500 return "";
504 protected MatchMode getMatchMode(QueryParameter queryParamemter) {
505 return QueryParameterCastHelper.STRING_PARAM_HELPER.cast(queryParamemter).getMatchMode();
508 protected Object getValue(QueryParameter queryParamemter) {
509 Object value;
510 if (queryParamemter instanceof QueryParameterWithValue) {
511 value = ((QueryParameterWithValue) queryParamemter).getValue();
513 else {
514 value = null;
516 if (value == null) {
517 value = "";
519 return value;
522 protected OperatorType getOperator(QueryParameter queryParamemter) {
523 if (QueryParameterCastHelper.BI_OPERAND_PARAM_HELPER.isWithOperator(queryParamemter)) {
524 QueryParameterWithOperator parameterWithOperator =
525 QueryParameterCastHelper.BI_OPERAND_PARAM_HELPER.castToOperatorParam(queryParamemter);
526 return parameterWithOperator.getOperatorType();
528 else {
529 return null;
533 @Override
534 public Template getSingle(QueryParameter... query) {
535 return getSingle(Arrays.asList(query));
538 @Override
539 public List<Template> getList(QueryParameter... query) {
540 return getList(Arrays.asList(query));
543 @Override
544 public <OtherTemplate> OtherTemplate getOther(QueryParameter... query) {
545 return this.<OtherTemplate>getOther(Arrays.asList(query));
548 @Override
549 public <OtherTemplate> List<OtherTemplate> getOtherList(QueryParameter... query) {
550 return this.<OtherTemplate>getOtherList(Arrays.asList(query));
554 * WRITE OPERATIONS
556 @Override
557 public void save(Template... states) {
558 LinkedHashMap<String, List<Put>> allPuts = new LinkedHashMap<String, List<Put>>();
559 for (Template state : states) {
560 LinkedHashMap<String, Put> puts = getConverter().objectToRows(state);
561 for (Map.Entry<String, Put> put : puts.entrySet()) {
562 final List<Put> putList;
563 if (allPuts.containsKey(put.getKey())) {
564 putList = allPuts.get(put.getKey());
566 else {
567 putList = new ArrayList<Put>();
568 allPuts.put(put.getKey(), putList);
570 putList.add(put.getValue());
573 for (final Map.Entry<String, List<Put>> puts : allPuts.entrySet()) {
574 executorService.execute(puts.getKey(), new Callback<Void>() {
576 @Override
577 public Void call(HTableInterface tableInterface) throws Exception {
578 tableInterface.put(puts.getValue());
579 return null;
585 @Override
586 public void update(Template... states) {
587 save(states);
590 @Override
591 public void delete(Template... states) {
592 LinkedHashMap<String, List<Delete>> allDels = new LinkedHashMap<String, List<Delete>>();
593 for (Template state : states) {
594 LinkedHashMap<String, Delete> dels = getConverter().objectToDeleteableRows(state);
595 for (Map.Entry<String, Delete> del : dels.entrySet()) {
596 final List<Delete> putList;
597 if (allDels.containsKey(del.getKey())) {
598 putList = allDels.get(del.getKey());
600 else {
601 putList = new ArrayList<Delete>();
602 allDels.put(del.getKey(), putList);
604 putList.add(del.getValue());
607 for (final Map.Entry<String, List<Delete>> dels : allDels.entrySet()) {
608 executorService.execute(dels.getKey(), new Callback<Void>() {
610 @Override
611 public Void call(HTableInterface tableInterface) throws Exception {
612 tableInterface.delete(dels.getValue());
613 return null;