Use AsyncExecutorService instead of ExecutorService
[smart-dao.git] / smart-hbase-dao / src / main / java / com / smartitengineering / dao / impl / hbase / CommonDao.java
blob5c5517f9d223980ab10f4bd8e263f3a20574b87e
1 /*
2 * This is a common dao with basic CRUD operations and is not limited to any
3 * persistent layer implementation
5 * Copyright (C) 2010 Imran M Yousuf (imyousuf@smartitengineering.com)
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 3 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 * You should have received a copy of the GNU Lesser General Public
16 * License along with this library; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 package com.smartitengineering.dao.impl.hbase;
21 import com.smartitengineering.dao.common.CommonReadDao;
22 import com.smartitengineering.dao.common.CommonWriteDao;
23 import com.smartitengineering.dao.common.queryparam.BasicCompoundQueryParameter;
24 import com.smartitengineering.dao.common.queryparam.BiOperandQueryParameter;
25 import com.smartitengineering.dao.common.queryparam.MatchMode;
26 import com.smartitengineering.dao.common.queryparam.OperatorType;
27 import com.smartitengineering.dao.common.queryparam.ParameterType;
28 import com.smartitengineering.dao.common.queryparam.QueryParameter;
29 import com.smartitengineering.dao.common.queryparam.QueryParameterCastHelper;
30 import com.smartitengineering.dao.common.queryparam.QueryParameterWithOperator;
31 import com.smartitengineering.dao.common.queryparam.QueryParameterWithPropertyName;
32 import com.smartitengineering.dao.common.queryparam.QueryParameterWithValue;
33 import com.smartitengineering.dao.common.queryparam.ValueOnlyQueryParameter;
34 import com.smartitengineering.dao.impl.hbase.spi.AsyncExecutorService;
35 import com.smartitengineering.dao.impl.hbase.spi.Callback;
36 import com.smartitengineering.dao.impl.hbase.spi.FilterConfig;
37 import com.smartitengineering.dao.impl.hbase.spi.ObjectRowConverter;
38 import com.smartitengineering.dao.impl.hbase.spi.SchemaInfoProvider;
39 import com.smartitengineering.dao.impl.hbase.spi.impl.BinarySuffixComparator;
40 import com.smartitengineering.dao.impl.hbase.spi.impl.RangeComparator;
41 import com.smartitengineering.domain.PersistentDTO;
42 import java.util.ArrayList;
43 import java.util.Arrays;
44 import java.util.Collection;
45 import java.util.Collections;
46 import java.util.LinkedHashMap;
47 import java.util.LinkedHashSet;
48 import java.util.List;
49 import java.util.Map;
50 import java.util.Set;
51 import java.util.concurrent.Future;
52 import org.apache.hadoop.hbase.client.Delete;
53 import org.apache.hadoop.hbase.client.Get;
54 import org.apache.hadoop.hbase.client.HTableInterface;
55 import org.apache.hadoop.hbase.client.Put;
56 import org.apache.hadoop.hbase.client.Result;
57 import org.apache.hadoop.hbase.client.ResultScanner;
58 import org.apache.hadoop.hbase.client.Scan;
59 import org.apache.hadoop.hbase.filter.BinaryComparator;
60 import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
61 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
62 import org.apache.hadoop.hbase.filter.Filter;
63 import org.apache.hadoop.hbase.filter.FilterList;
64 import org.apache.hadoop.hbase.filter.FilterList.Operator;
65 import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
66 import org.apache.hadoop.hbase.filter.SkipFilter;
67 import org.apache.hadoop.hbase.filter.SubstringComparator;
68 import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
69 import org.apache.hadoop.hbase.util.Bytes;
71 /**
72 * A common DAO implementation for HBase. Please note that all parameters for reading (i.e. Scan) assumes that the
73 * toString() method returns the string representation of the value to be compared in byte[] form.
74 * @author imyousuf
76 public class CommonDao<Template extends PersistentDTO> implements CommonReadDao<Template>,
77 CommonWriteDao<Template> {
79 public static final int DEFAULT_MAX_ROWS = 1000;
80 private ObjectRowConverter<Template> converter;
81 private SchemaInfoProvider infoProvider;
82 private AsyncExecutorService executorService;
83 private int maxRows = -1;
85 public AsyncExecutorService getExecutorService() {
86 return executorService;
89 public void setExecutorService(AsyncExecutorService executorService) {
90 this.executorService = executorService;
93 public int getMaxRows() {
94 return maxRows;
97 public void setMaxRows(int maxRows) {
98 this.maxRows = maxRows;
101 public ObjectRowConverter<Template> getConverter() {
102 return converter;
105 public void setConverter(ObjectRowConverter<Template> converter) {
106 this.converter = converter;
109 public SchemaInfoProvider getInfoProvider() {
110 return infoProvider;
113 public void setInfoProvider(SchemaInfoProvider infoProvider) {
114 this.infoProvider = infoProvider;
117 protected int getMaxScanRows() {
118 return getMaxRows() > 0 ? getMaxRows() : DEFAULT_MAX_ROWS;
121 protected int getMaxScanRows(List<QueryParameter> params) {
122 if (params != null && !params.isEmpty()) {
123 for (QueryParameter param : params) {
124 if (ParameterType.PARAMETER_TYPE_MAX_RESULT.equals(param.getParameterType())) {
125 ValueOnlyQueryParameter<Integer> queryParameter = QueryParameterCastHelper.VALUE_PARAM_HELPER.cast(param);
126 return queryParameter.getValue();
130 return getMaxScanRows();
134 * READ OPERATIONS
138 * Unsupported read operations
140 @Override
141 public Set<Template> getAll() {
142 throw new UnsupportedOperationException("Not supported yet.");
145 @Override
146 public <OtherTemplate> OtherTemplate getOther(List<QueryParameter> query) {
147 throw new UnsupportedOperationException("Not supported yet.");
150 @Override
151 public <OtherTemplate> List<OtherTemplate> getOtherList(List<QueryParameter> query) {
152 throw new UnsupportedOperationException("Not supported yet.");
156 * Supported read operations
158 @Override
159 public Set<Template> getByIds(List<Integer> ids) {
160 LinkedHashSet<Future<Template>> set = new LinkedHashSet<Future<Template>>(ids.size());
161 LinkedHashSet<Template> resultSet = new LinkedHashSet<Template>(ids.size());
162 for (Integer id : ids) {
163 set.add(executorService.executeAsynchronously(null, getByIdCallback(id)));
165 for (Future<Template> future : set) {
166 try {
167 resultSet.add(future.get());
169 catch (Exception ex) {
170 ex.printStackTrace();
173 return resultSet;
176 @Override
177 public Template getById(final Integer id) {
178 return executorService.execute("", getByIdCallback(id));
181 protected Callback<Template> getByIdCallback(final Integer id) {
182 return new Callback<Template>() {
184 @Override
185 public Template call(HTableInterface tableInterface) throws Exception {
186 Get get = new Get(Bytes.toBytes(id));
187 Result result = tableInterface.get(get);
188 return getConverter().rowsToObject(result, executorService);
193 @Override
194 public Template getSingle(final List<QueryParameter> query) {
195 return executorService.execute("", new Callback<Template>() {
197 @Override
198 public Template call(HTableInterface tableInterface) throws Exception {
199 ResultScanner scanner = tableInterface.getScanner(formScan(query));
200 try {
201 Result result = scanner.next();
202 if (result == null) {
203 return null;
205 else {
206 return getConverter().rowsToObject(result, executorService);
209 finally {
210 if (scanner != null) {
211 scanner.close();
218 @Override
219 public List<Template> getList(final List<QueryParameter> query) {
220 return executorService.execute(null, new Callback<List<Template>>() {
222 @Override
223 public List<Template> call(HTableInterface tableInterface) throws Exception {
224 ResultScanner scanner = tableInterface.getScanner(formScan(query));
225 try {
226 Result[] results = scanner.next(getMaxScanRows(query));
227 if (results == null) {
228 return Collections.emptyList();
230 else {
231 ArrayList<Template> templates = new ArrayList<Template>(results.length);
232 for (Result result : results) {
233 templates.add(getConverter().rowsToObject(result, executorService));
235 return templates;
238 finally {
239 if (scanner != null) {
240 scanner.close();
247 protected Scan formScan(List<QueryParameter> query) {
248 Scan scan = new Scan();
249 final Filter filter = getFilter(query);
250 if (filter != null) {
251 scan.setFilter(filter);
253 return scan;
256 protected Filter getFilter(Collection<QueryParameter> queryParams) {
257 return getFilter(queryParams, Operator.MUST_PASS_ALL);
260 protected Filter getFilter(Collection<QueryParameter> queryParams, Operator operator) {
261 final Filter filter;
262 if (queryParams != null && !queryParams.isEmpty()) {
263 List<Filter> filters = new ArrayList<Filter>(queryParams.size());
264 for (QueryParameter param : queryParams) {
265 switch (param.getParameterType()) {
266 case PARAMETER_TYPE_CONJUNCTION: {
267 BasicCompoundQueryParameter queryParameter =
268 QueryParameterCastHelper.BASIC_COMPOUND_PARAM_HELPER.cast(param);
269 Collection<QueryParameter> nestedParameters = queryParameter.getNestedParameters();
270 filters.add(getFilter(nestedParameters, Operator.MUST_PASS_ALL));
271 break;
273 case PARAMETER_TYPE_DISJUNCTION: {
274 BasicCompoundQueryParameter queryParameter =
275 QueryParameterCastHelper.BASIC_COMPOUND_PARAM_HELPER.cast(param);
276 Collection<QueryParameter> nestedParameters = queryParameter.getNestedParameters();
277 filters.add(getFilter(nestedParameters, Operator.MUST_PASS_ONE));
278 break;
280 case PARAMETER_TYPE_PROPERTY: {
281 handlePropertyParam(param, filters);
282 break;
284 default:
285 //Do nothing
288 if (!filters.isEmpty()) {
289 FilterList filterList = new FilterList(operator, filters);
290 filter = filterList;
292 else {
293 filter = null;
296 else {
297 filter = null;
299 return filter;
302 protected void handlePropertyParam(QueryParameter queryParameter,
303 List<Filter> filters) {
304 OperatorType operator = getOperator(queryParameter);
305 Object parameter = getValue(queryParameter);
306 FilterConfig filterConfig = getInfoProvider().getFilterConfig(getPropertyName(queryParameter));
307 switch (operator) {
308 case OPERATOR_EQUAL: {
309 filters.add(getCellFilter(filterConfig, CompareOp.EQUAL, Bytes.toBytes(parameter.toString())));
310 return;
312 case OPERATOR_LESSER: {
313 filters.add(getCellFilter(filterConfig, CompareOp.LESS, Bytes.toBytes(parameter.toString())));
314 return;
316 case OPERATOR_LESSER_EQUAL: {
317 filters.add(getCellFilter(filterConfig, CompareOp.LESS_OR_EQUAL, Bytes.toBytes(parameter.toString())));
318 return;
320 case OPERATOR_GREATER: {
321 filters.add(getCellFilter(filterConfig, CompareOp.GREATER, Bytes.toBytes(parameter.toString())));
322 return;
324 case OPERATOR_GREATER_EQUAL: {
325 filters.add(getCellFilter(filterConfig, CompareOp.GREATER_OR_EQUAL, Bytes.toBytes(parameter.toString())));
326 return;
328 case OPERATOR_NOT_EQUAL: {
329 filters.add(getCellFilter(filterConfig, CompareOp.NOT_EQUAL, Bytes.toBytes(parameter.toString())));
330 return;
332 case OPERATOR_IS_EMPTY:
333 case OPERATOR_IS_NULL: {
334 final SingleColumnValueExcludeFilter cellFilter =
335 getCellFilter(filterConfig, CompareOp.EQUAL, Bytes.toBytes(""));
336 cellFilter.setFilterIfMissing(false);
337 filters.add(cellFilter);
338 return;
340 case OPERATOR_IS_NOT_EMPTY:
341 case OPERATOR_IS_NOT_NULL: {
342 final SingleColumnValueExcludeFilter cellFilter = getCellFilter(filterConfig, CompareOp.NOT_EQUAL, Bytes.toBytes(
343 ""));
344 cellFilter.setFilterIfMissing(true);
345 filters.add(cellFilter);
346 return;
348 case OPERATOR_STRING_LIKE: {
349 MatchMode matchMode = getMatchMode(queryParameter);
350 if (matchMode == null) {
351 matchMode = MatchMode.EXACT;
353 switch (matchMode) {
354 case END:
355 filters.add(getCellFilter(filterConfig, CompareOp.EQUAL, new BinarySuffixComparator(Bytes.toBytes(parameter.
356 toString()))));
357 break;
358 case EXACT:
359 filters.add(getCellFilter(filterConfig, CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(parameter.
360 toString()))));
361 break;
362 case START:
363 filters.add(getCellFilter(filterConfig, CompareOp.EQUAL, new BinaryPrefixComparator(Bytes.toBytes(parameter.
364 toString()))));
365 break;
366 default:
367 case ANYWHERE:
368 filters.add(getCellFilter(filterConfig, CompareOp.EQUAL, new SubstringComparator(parameter.toString())));
369 break;
371 return;
373 case OPERATOR_BETWEEN: {
374 parameter = getFirstParameter(queryParameter);
375 Object parameter2 = getSecondParameter(queryParameter);
376 filters.add(getCellFilter(filterConfig, CompareOp.EQUAL,
377 new RangeComparator(Bytes.toBytes(parameter.toString()),
378 Bytes.toBytes(parameter2.toString()))));
379 return;
381 case OPERATOR_IS_IN: {
382 Collection inCollectin = QueryParameterCastHelper.MULTI_OPERAND_PARAM_HELPER.cast(queryParameter).getValues();
383 FilterList filterList = getInFilter(inCollectin, filterConfig);
384 filters.add(filterList);
385 return;
387 case OPERATOR_IS_NOT_IN: {
388 Collection notInCollectin = QueryParameterCastHelper.MULTI_OPERAND_PARAM_HELPER.cast(queryParameter).getValues();
389 FilterList filterList = getInFilter(notInCollectin, filterConfig);
390 filters.add(new SkipFilter(filterList));
391 return;
394 return;
397 protected SingleColumnValueExcludeFilter getCellFilter(FilterConfig filterConfig, CompareOp op,
398 WritableByteArrayComparable comparator) {
399 final SingleColumnValueExcludeFilter valueFilter;
400 valueFilter = new SingleColumnValueExcludeFilter(filterConfig.getColumnFamily(),
401 filterConfig.getColumnQualifier(),
402 op, comparator);
403 valueFilter.setFilterIfMissing(filterConfig.isFilterOnIfMissing());
404 valueFilter.setLatestVersionOnly(filterConfig.isFilterOnLatestVersionOnly());
405 return valueFilter;
408 protected SingleColumnValueExcludeFilter getCellFilter(FilterConfig filterConfig, CompareOp op, byte[] value) {
409 return getCellFilter(filterConfig, op, new BinaryComparator(value));
412 protected FilterList getInFilter(Collection inCollectin, FilterConfig config) {
413 FilterList filterList = new FilterList(Operator.MUST_PASS_ONE);
414 for (Object inObj : inCollectin) {
415 filterList.addFilter(getCellFilter(config, CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(inObj.toString()))));
417 return filterList;
420 protected String getPropertyName(QueryParameter param) {
421 final String propertyName;
422 if (param instanceof QueryParameterWithPropertyName) {
423 propertyName = ((QueryParameterWithPropertyName) param).getPropertyName();
425 else {
426 propertyName = "";
428 return propertyName;
431 protected Object getSecondParameter(QueryParameter queryParamemter) {
432 if (queryParamemter instanceof BiOperandQueryParameter) {
433 return QueryParameterCastHelper.BI_OPERAND_PARAM_HELPER.cast(queryParamemter).getSecondValue();
435 else {
436 return "";
440 protected Object getFirstParameter(QueryParameter queryParamemter) {
441 if (queryParamemter instanceof BiOperandQueryParameter) {
442 return QueryParameterCastHelper.BI_OPERAND_PARAM_HELPER.cast(queryParamemter).getFirstValue();
444 else {
445 return "";
449 protected MatchMode getMatchMode(QueryParameter queryParamemter) {
450 return QueryParameterCastHelper.STRING_PARAM_HELPER.cast(queryParamemter).getMatchMode();
453 protected Object getValue(QueryParameter queryParamemter) {
454 Object value;
455 if (queryParamemter instanceof QueryParameterWithValue) {
456 value = ((QueryParameterWithValue) queryParamemter).getValue();
458 else {
459 value = null;
461 if (value == null) {
462 value = "";
464 return value;
467 protected OperatorType getOperator(QueryParameter queryParamemter) {
468 if (QueryParameterCastHelper.BI_OPERAND_PARAM_HELPER.isWithOperator(queryParamemter)) {
469 QueryParameterWithOperator parameterWithOperator =
470 QueryParameterCastHelper.BI_OPERAND_PARAM_HELPER.castToOperatorParam(queryParamemter);
471 return parameterWithOperator.getOperatorType();
473 else {
474 return null;
478 @Override
479 public Template getSingle(QueryParameter... query) {
480 return getSingle(Arrays.asList(query));
483 @Override
484 public List<Template> getList(QueryParameter... query) {
485 return getList(Arrays.asList(query));
488 @Override
489 public <OtherTemplate> OtherTemplate getOther(QueryParameter... query) {
490 return this.<OtherTemplate>getOther(Arrays.asList(query));
493 @Override
494 public <OtherTemplate> List<OtherTemplate> getOtherList(QueryParameter... query) {
495 return this.<OtherTemplate>getOtherList(Arrays.asList(query));
499 * WRITE OPERATIONS
501 @Override
502 public void save(Template... states) {
503 LinkedHashMap<String, List<Put>> allPuts = new LinkedHashMap<String, List<Put>>();
504 for (Template state : states) {
505 LinkedHashMap<String, Put> puts = getConverter().objectToRows(state);
506 for (Map.Entry<String, Put> put : puts.entrySet()) {
507 final List<Put> putList;
508 if (allPuts.containsKey(put.getKey())) {
509 putList = allPuts.get(put.getKey());
511 else {
512 putList = new ArrayList<Put>();
513 allPuts.put(put.getKey(), putList);
515 putList.add(put.getValue());
518 for (final Map.Entry<String, List<Put>> puts : allPuts.entrySet()) {
519 executorService.execute(puts.getKey(), new Callback<Void>() {
521 @Override
522 public Void call(HTableInterface tableInterface) throws Exception {
523 tableInterface.put(puts.getValue());
524 return null;
530 @Override
531 public void update(Template... states) {
532 save(states);
535 @Override
536 public void delete(Template... states) {
537 LinkedHashMap<String, List<Delete>> allDels = new LinkedHashMap<String, List<Delete>>();
538 for (Template state : states) {
539 LinkedHashMap<String, Delete> dels = getConverter().objectToDeleteableRows(state);
540 for (Map.Entry<String, Delete> del : dels.entrySet()) {
541 final List<Delete> putList;
542 if (allDels.containsKey(del.getKey())) {
543 putList = allDels.get(del.getKey());
545 else {
546 putList = new ArrayList<Delete>();
547 allDels.put(del.getKey(), putList);
549 putList.add(del.getValue());
552 for (final Map.Entry<String, List<Delete>> dels : allDels.entrySet()) {
553 executorService.execute(dels.getKey(), new Callback<Void>() {
555 @Override
556 public Void call(HTableInterface tableInterface) throws Exception {
557 tableInterface.delete(dels.getValue());
558 return null;