Close the streams when writing serializable id to object stream
[smart-dao.git] / smart-hbase-dao / src / main / java / com / smartitengineering / dao / impl / hbase / CommonDao.java
blobfd3c4d22614fe3513456ba77043049a6800458a3
1 /*
2 * This is a common dao with basic CRUD operations and is not limited to any
3 * persistent layer implementation
5 * Copyright (C) 2010 Imran M Yousuf (imyousuf@smartitengineering.com)
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 3 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 * You should have received a copy of the GNU Lesser General Public
16 * License along with this library; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 package com.smartitengineering.dao.impl.hbase;
21 import com.smartitengineering.dao.common.CommonReadDao;
22 import com.smartitengineering.dao.common.CommonWriteDao;
23 import com.smartitengineering.dao.common.queryparam.BasicCompoundQueryParameter;
24 import com.smartitengineering.dao.common.queryparam.BiOperandQueryParameter;
25 import com.smartitengineering.dao.common.queryparam.MatchMode;
26 import com.smartitengineering.dao.common.queryparam.OperatorType;
27 import com.smartitengineering.dao.common.queryparam.ParameterType;
28 import com.smartitengineering.dao.common.queryparam.QueryParameter;
29 import com.smartitengineering.dao.common.queryparam.QueryParameterCastHelper;
30 import com.smartitengineering.dao.common.queryparam.QueryParameterWithOperator;
31 import com.smartitengineering.dao.common.queryparam.QueryParameterWithPropertyName;
32 import com.smartitengineering.dao.common.queryparam.QueryParameterWithValue;
33 import com.smartitengineering.dao.common.queryparam.ValueOnlyQueryParameter;
34 import com.smartitengineering.dao.impl.hbase.spi.AsyncExecutorService;
35 import com.smartitengineering.dao.impl.hbase.spi.Callback;
36 import com.smartitengineering.dao.impl.hbase.spi.FilterConfig;
37 import com.smartitengineering.dao.impl.hbase.spi.ObjectRowConverter;
38 import com.smartitengineering.dao.impl.hbase.spi.SchemaInfoProvider;
39 import com.smartitengineering.dao.impl.hbase.spi.impl.BinarySuffixComparator;
40 import com.smartitengineering.dao.impl.hbase.spi.impl.RangeComparator;
41 import com.smartitengineering.domain.PersistentDTO;
42 import java.io.ByteArrayOutputStream;
43 import java.io.ObjectOutputStream;
44 import java.io.Serializable;
45 import java.util.ArrayList;
46 import java.util.Arrays;
47 import java.util.Collection;
48 import java.util.Collections;
49 import java.util.LinkedHashMap;
50 import java.util.LinkedHashSet;
51 import java.util.List;
52 import java.util.Map;
53 import java.util.Set;
54 import java.util.concurrent.Future;
55 import org.apache.commons.io.IOUtils;
56 import org.apache.hadoop.hbase.client.Delete;
57 import org.apache.hadoop.hbase.client.Get;
58 import org.apache.hadoop.hbase.client.HTableInterface;
59 import org.apache.hadoop.hbase.client.Put;
60 import org.apache.hadoop.hbase.client.Result;
61 import org.apache.hadoop.hbase.client.ResultScanner;
62 import org.apache.hadoop.hbase.client.Scan;
63 import org.apache.hadoop.hbase.filter.BinaryComparator;
64 import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
65 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
66 import org.apache.hadoop.hbase.filter.Filter;
67 import org.apache.hadoop.hbase.filter.FilterList;
68 import org.apache.hadoop.hbase.filter.FilterList.Operator;
69 import org.apache.hadoop.hbase.filter.QualifierFilter;
70 import org.apache.hadoop.hbase.filter.RowFilter;
71 import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
72 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
73 import org.apache.hadoop.hbase.filter.SkipFilter;
74 import org.apache.hadoop.hbase.filter.SubstringComparator;
75 import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
76 import org.apache.hadoop.hbase.util.Bytes;
78 /**
79 * A common DAO implementation for HBase. Please note that all parameters for reading (i.e. Scan) assumes that the
80 * toString() method returns the string representation of the value to be compared in byte[] form.
81 * @author imyousuf
83 public class CommonDao<Template extends PersistentDTO, IdType extends Serializable> implements
84 CommonReadDao<Template, IdType>, CommonWriteDao<Template> {
86 public static final int DEFAULT_MAX_ROWS = 1000;
87 private ObjectRowConverter<Template> converter;
88 private SchemaInfoProvider infoProvider;
89 private AsyncExecutorService executorService;
90 private int maxRows = -1;
92 public AsyncExecutorService getExecutorService() {
93 return executorService;
96 public void setExecutorService(AsyncExecutorService executorService) {
97 this.executorService = executorService;
100 public int getMaxRows() {
101 return maxRows;
104 public void setMaxRows(int maxRows) {
105 this.maxRows = maxRows;
108 public ObjectRowConverter<Template> getConverter() {
109 return converter;
112 public void setConverter(ObjectRowConverter<Template> converter) {
113 this.converter = converter;
116 public SchemaInfoProvider getInfoProvider() {
117 return infoProvider;
120 public void setInfoProvider(SchemaInfoProvider infoProvider) {
121 this.infoProvider = infoProvider;
124 protected String getDefaultTableName() {
125 return getInfoProvider().getMainTableName();
128 protected int getMaxScanRows() {
129 return getMaxRows() > 0 ? getMaxRows() : DEFAULT_MAX_ROWS;
132 protected int getMaxScanRows(List<QueryParameter> params) {
133 if (params != null && !params.isEmpty()) {
134 for (QueryParameter param : params) {
135 if (ParameterType.PARAMETER_TYPE_MAX_RESULT.equals(param.getParameterType())) {
136 ValueOnlyQueryParameter<Integer> queryParameter = QueryParameterCastHelper.VALUE_PARAM_HELPER.cast(param);
137 return queryParameter.getValue();
141 return getMaxScanRows();
145 * READ OPERATIONS
149 * Unsupported read operations
151 @Override
152 public Set<Template> getAll() {
153 throw new UnsupportedOperationException("Not supported yet.");
156 @Override
157 public <OtherTemplate> OtherTemplate getOther(List<QueryParameter> query) {
158 throw new UnsupportedOperationException("Not supported yet.");
161 @Override
162 public <OtherTemplate> List<OtherTemplate> getOtherList(List<QueryParameter> query) {
163 throw new UnsupportedOperationException("Not supported yet.");
167 * Supported read operations
169 @Override
170 public Set<Template> getByIds(List<IdType> ids) {
171 LinkedHashSet<Future<Template>> set = new LinkedHashSet<Future<Template>>(ids.size());
172 LinkedHashSet<Template> resultSet = new LinkedHashSet<Template>(ids.size());
173 for (IdType id : ids) {
174 set.add(executorService.executeAsynchronously(getDefaultTableName(), getByIdCallback(id)));
176 for (Future<Template> future : set) {
177 try {
178 resultSet.add(future.get());
180 catch (Exception ex) {
181 ex.printStackTrace();
184 return resultSet;
187 @Override
188 public Template getById(final IdType id) {
189 return executorService.execute(getDefaultTableName(), getByIdCallback(id));
192 protected Callback<Template> getByIdCallback(final IdType id) {
193 return new Callback<Template>() {
195 @Override
196 public Template call(HTableInterface tableInterface) throws Exception {
197 final byte[] rowId;
198 if (id instanceof Integer) {
199 rowId = Bytes.toBytes((Integer) id);
201 else if (id instanceof String) {
202 rowId = Bytes.toBytes((String) id);
204 else if (id instanceof Long) {
205 rowId = Bytes.toBytes((Long) id);
207 else if (id instanceof Double) {
208 rowId = Bytes.toBytes((Double) id);
210 else if (id != null) {
211 final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
212 final ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
213 objectOutputStream.writeObject(id);
214 IOUtils.closeQuietly(objectOutputStream);
215 IOUtils.closeQuietly(byteArrayOutputStream);
216 rowId = byteArrayOutputStream.toByteArray();
218 else {
219 rowId = new byte[0];
221 Get get = new Get(rowId);
222 Result result = tableInterface.get(get);
223 return getConverter().rowsToObject(result, executorService);
228 @Override
229 public Template getSingle(final List<QueryParameter> query) {
230 return executorService.execute(getDefaultTableName(), new Callback<Template>() {
232 @Override
233 public Template call(HTableInterface tableInterface) throws Exception {
234 ResultScanner scanner = tableInterface.getScanner(formScan(query));
235 try {
236 Result result = scanner.next();
237 if (result == null) {
238 return null;
240 else {
241 return getConverter().rowsToObject(result, executorService);
244 finally {
245 if (scanner != null) {
246 scanner.close();
253 @Override
254 public List<Template> getList(final List<QueryParameter> query) {
255 return executorService.execute(getDefaultTableName(), new Callback<List<Template>>() {
257 @Override
258 public List<Template> call(HTableInterface tableInterface) throws Exception {
259 ResultScanner scanner = tableInterface.getScanner(formScan(query));
260 try {
261 Result[] results = scanner.next(getMaxScanRows(query));
262 if (results == null) {
263 return Collections.emptyList();
265 else {
266 ArrayList<Template> templates = new ArrayList<Template>(results.length);
267 for (Result result : results) {
268 templates.add(getConverter().rowsToObject(result, executorService));
270 return templates;
273 finally {
274 if (scanner != null) {
275 scanner.close();
282 protected Scan formScan(List<QueryParameter> query) {
283 Scan scan = new Scan();
284 final Filter filter = getFilter(query, scan);
285 if (filter != null) {
286 scan.setFilter(filter);
288 return scan;
291 protected Filter getFilter(Collection<QueryParameter> queryParams, Scan scan) {
292 return getFilter(queryParams, scan, Operator.MUST_PASS_ALL);
295 protected Filter getFilter(Collection<QueryParameter> queryParams, Scan scan, Operator operator) {
296 final Filter filter;
297 if (queryParams != null && !queryParams.isEmpty()) {
298 List<Filter> filters = new ArrayList<Filter>(queryParams.size());
299 for (QueryParameter param : queryParams) {
300 switch (param.getParameterType()) {
301 case PARAMETER_TYPE_CONJUNCTION: {
302 BasicCompoundQueryParameter queryParameter =
303 QueryParameterCastHelper.BASIC_COMPOUND_PARAM_HELPER.cast(param);
304 Collection<QueryParameter> nestedParameters = queryParameter.getNestedParameters();
305 filters.add(getFilter(nestedParameters, scan, Operator.MUST_PASS_ALL));
306 break;
308 case PARAMETER_TYPE_DISJUNCTION: {
309 BasicCompoundQueryParameter queryParameter =
310 QueryParameterCastHelper.BASIC_COMPOUND_PARAM_HELPER.cast(param);
311 Collection<QueryParameter> nestedParameters = queryParameter.getNestedParameters();
312 filters.add(getFilter(nestedParameters, scan, Operator.MUST_PASS_ONE));
313 break;
315 case PARAMETER_TYPE_PROPERTY: {
316 handlePropertyParam(param, filters);
317 break;
319 case PARAMETER_TYPE_FIRST_RESULT: {
320 Object value = getValue(param);
321 scan.setStartRow(Bytes.toBytes(value.toString()));
322 break;
324 case PARAMETER_TYPE_UNIT_PROP: {
325 FilterConfig config = getInfoProvider().getFilterConfig(getPropertyName(param));
326 if (config != null) {
327 scan.addFamily(config.getColumnFamily());
329 break;
331 default:
332 //Do nothing
335 if (!filters.isEmpty()) {
336 FilterList filterList = new FilterList(operator, filters);
337 filter = filterList;
339 else {
340 filter = null;
343 else {
344 filter = null;
346 return filter;
349 protected void handlePropertyParam(QueryParameter queryParameter,
350 List<Filter> filters) {
351 OperatorType operator = getOperator(queryParameter);
352 Object parameter = getValue(queryParameter);
353 FilterConfig filterConfig = getInfoProvider().getFilterConfig(getPropertyName(queryParameter));
354 switch (operator) {
355 case OPERATOR_EQUAL: {
356 filters.add(getCellFilter(filterConfig, CompareOp.EQUAL, Bytes.toBytes(parameter.toString())));
357 return;
359 case OPERATOR_LESSER: {
360 filters.add(getCellFilter(filterConfig, CompareOp.LESS, Bytes.toBytes(parameter.toString())));
361 return;
363 case OPERATOR_LESSER_EQUAL: {
364 filters.add(getCellFilter(filterConfig, CompareOp.LESS_OR_EQUAL, Bytes.toBytes(parameter.toString())));
365 return;
367 case OPERATOR_GREATER: {
368 filters.add(getCellFilter(filterConfig, CompareOp.GREATER, Bytes.toBytes(parameter.toString())));
369 return;
371 case OPERATOR_GREATER_EQUAL: {
372 filters.add(getCellFilter(filterConfig, CompareOp.GREATER_OR_EQUAL, Bytes.toBytes(parameter.toString())));
373 return;
375 case OPERATOR_NOT_EQUAL: {
376 filters.add(getCellFilter(filterConfig, CompareOp.NOT_EQUAL, Bytes.toBytes(parameter.toString())));
377 return;
379 case OPERATOR_IS_EMPTY:
380 case OPERATOR_IS_NULL: {
381 final Filter cellFilter = getCellFilter(filterConfig, CompareOp.EQUAL, Bytes.toBytes(""));
382 if (cellFilter instanceof SingleColumnValueFilter) {
383 ((SingleColumnValueFilter) cellFilter).setFilterIfMissing(false);
385 filters.add(cellFilter);
386 return;
388 case OPERATOR_IS_NOT_EMPTY:
389 case OPERATOR_IS_NOT_NULL: {
390 final Filter cellFilter = getCellFilter(filterConfig, CompareOp.NOT_EQUAL, Bytes.toBytes(""));
391 if (cellFilter instanceof SingleColumnValueFilter) {
392 ((SingleColumnValueFilter) cellFilter).setFilterIfMissing(true);
394 filters.add(cellFilter);
395 return;
397 case OPERATOR_STRING_LIKE: {
398 MatchMode matchMode = getMatchMode(queryParameter);
399 if (matchMode == null) {
400 matchMode = MatchMode.EXACT;
402 switch (matchMode) {
403 case END:
404 filters.add(getCellFilter(filterConfig, CompareOp.EQUAL, new BinarySuffixComparator(Bytes.toBytes(parameter.
405 toString()))));
406 break;
407 case EXACT:
408 filters.add(getCellFilter(filterConfig, CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(parameter.
409 toString()))));
410 break;
411 case START:
412 filters.add(getCellFilter(filterConfig, CompareOp.EQUAL, new BinaryPrefixComparator(Bytes.toBytes(parameter.
413 toString()))));
414 break;
415 default:
416 case ANYWHERE:
417 filters.add(getCellFilter(filterConfig, CompareOp.EQUAL, new SubstringComparator(parameter.toString())));
418 break;
420 return;
422 case OPERATOR_BETWEEN: {
423 parameter = getFirstParameter(queryParameter);
424 Object parameter2 = getSecondParameter(queryParameter);
425 filters.add(getCellFilter(filterConfig, CompareOp.EQUAL,
426 new RangeComparator(Bytes.toBytes(parameter.toString()),
427 Bytes.toBytes(parameter2.toString()))));
428 return;
430 case OPERATOR_IS_IN: {
431 Collection inCollectin = QueryParameterCastHelper.MULTI_OPERAND_PARAM_HELPER.cast(queryParameter).getValues();
432 Filter filterList = getInFilter(inCollectin, filterConfig);
433 filters.add(filterList);
434 return;
436 case OPERATOR_IS_NOT_IN: {
437 Collection notInCollectin = QueryParameterCastHelper.MULTI_OPERAND_PARAM_HELPER.cast(queryParameter).getValues();
438 Filter filterList = getInFilter(notInCollectin, filterConfig);
439 filters.add(new SkipFilter(filterList));
440 return;
443 return;
446 protected Filter getCellFilter(FilterConfig filterConfig, CompareOp op,
447 WritableByteArrayComparable comparator) {
448 if (filterConfig.isFilterOnRowId()) {
449 RowFilter rowFilter = new RowFilter(op, comparator);
450 return rowFilter;
452 else if (filterConfig.isQualifierARangePrefix()) {
453 QualifierFilter filter = new QualifierFilter(op, comparator);
454 return filter;
456 else {
457 final SingleColumnValueExcludeFilter valueFilter;
458 valueFilter = new SingleColumnValueExcludeFilter(filterConfig.getColumnFamily(),
459 filterConfig.getColumnQualifier(),
460 op, comparator);
461 valueFilter.setFilterIfMissing(filterConfig.isFilterOnIfMissing());
462 valueFilter.setLatestVersionOnly(filterConfig.isFilterOnLatestVersionOnly());
463 return valueFilter;
467 protected Filter getCellFilter(FilterConfig filterConfig, CompareOp op, byte[] value) {
468 return getCellFilter(filterConfig, op, new BinaryComparator(value));
471 protected Filter getInFilter(Collection inCollectin, FilterConfig config) {
472 FilterList filterList = new FilterList(Operator.MUST_PASS_ONE);
473 for (Object inObj : inCollectin) {
474 filterList.addFilter(getCellFilter(config, CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(inObj.toString()))));
476 return filterList;
479 protected String getPropertyName(QueryParameter param) {
480 final String propertyName;
481 if (param instanceof QueryParameterWithPropertyName) {
482 propertyName = ((QueryParameterWithPropertyName) param).getPropertyName();
484 else {
485 propertyName = "";
487 return propertyName;
490 protected Object getSecondParameter(QueryParameter queryParamemter) {
491 if (queryParamemter instanceof BiOperandQueryParameter) {
492 return QueryParameterCastHelper.BI_OPERAND_PARAM_HELPER.cast(queryParamemter).getSecondValue();
494 else {
495 return "";
499 protected Object getFirstParameter(QueryParameter queryParamemter) {
500 if (queryParamemter instanceof BiOperandQueryParameter) {
501 return QueryParameterCastHelper.BI_OPERAND_PARAM_HELPER.cast(queryParamemter).getFirstValue();
503 else {
504 return "";
508 protected MatchMode getMatchMode(QueryParameter queryParamemter) {
509 return QueryParameterCastHelper.STRING_PARAM_HELPER.cast(queryParamemter).getMatchMode();
512 protected Object getValue(QueryParameter queryParamemter) {
513 Object value;
514 if (queryParamemter instanceof QueryParameterWithValue) {
515 value = ((QueryParameterWithValue) queryParamemter).getValue();
517 else {
518 value = null;
520 if (value == null) {
521 value = "";
523 return value;
526 protected OperatorType getOperator(QueryParameter queryParamemter) {
527 if (QueryParameterCastHelper.BI_OPERAND_PARAM_HELPER.isWithOperator(queryParamemter)) {
528 QueryParameterWithOperator parameterWithOperator =
529 QueryParameterCastHelper.BI_OPERAND_PARAM_HELPER.castToOperatorParam(queryParamemter);
530 return parameterWithOperator.getOperatorType();
532 else {
533 return null;
537 @Override
538 public Template getSingle(QueryParameter... query) {
539 return getSingle(Arrays.asList(query));
542 @Override
543 public List<Template> getList(QueryParameter... query) {
544 return getList(Arrays.asList(query));
547 @Override
548 public <OtherTemplate> OtherTemplate getOther(QueryParameter... query) {
549 return this.<OtherTemplate>getOther(Arrays.asList(query));
552 @Override
553 public <OtherTemplate> List<OtherTemplate> getOtherList(QueryParameter... query) {
554 return this.<OtherTemplate>getOtherList(Arrays.asList(query));
558 * WRITE OPERATIONS
560 @Override
561 public void save(Template... states) {
562 LinkedHashMap<String, List<Put>> allPuts = new LinkedHashMap<String, List<Put>>();
563 for (Template state : states) {
564 LinkedHashMap<String, Put> puts = getConverter().objectToRows(state);
565 for (Map.Entry<String, Put> put : puts.entrySet()) {
566 final List<Put> putList;
567 if (allPuts.containsKey(put.getKey())) {
568 putList = allPuts.get(put.getKey());
570 else {
571 putList = new ArrayList<Put>();
572 allPuts.put(put.getKey(), putList);
574 putList.add(put.getValue());
577 for (final Map.Entry<String, List<Put>> puts : allPuts.entrySet()) {
578 executorService.execute(puts.getKey(), new Callback<Void>() {
580 @Override
581 public Void call(HTableInterface tableInterface) throws Exception {
582 tableInterface.put(puts.getValue());
583 return null;
589 @Override
590 public void update(Template... states) {
591 save(states);
594 @Override
595 public void delete(Template... states) {
596 LinkedHashMap<String, List<Delete>> allDels = new LinkedHashMap<String, List<Delete>>();
597 for (Template state : states) {
598 LinkedHashMap<String, Delete> dels = getConverter().objectToDeleteableRows(state);
599 for (Map.Entry<String, Delete> del : dels.entrySet()) {
600 final List<Delete> putList;
601 if (allDels.containsKey(del.getKey())) {
602 putList = allDels.get(del.getKey());
604 else {
605 putList = new ArrayList<Delete>();
606 allDels.put(del.getKey(), putList);
608 putList.add(del.getValue());
611 for (final Map.Entry<String, List<Delete>> dels : allDels.entrySet()) {
612 executorService.execute(dels.getKey(), new Callback<Void>() {
614 @Override
615 public Void call(HTableInterface tableInterface) throws Exception {
616 tableInterface.delete(dels.getValue());
617 return null;