3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
20 package org
.apache
.hadoop
.hbase
.thrift
;
22 import static org
.apache
.hadoop
.hbase
.thrift
.Constants
.COALESCE_INC_KEY
;
23 import static org
.apache
.hadoop
.hbase
.util
.Bytes
.getBytes
;
25 import java
.io
.IOException
;
26 import java
.nio
.ByteBuffer
;
27 import java
.util
.ArrayList
;
28 import java
.util
.Collections
;
29 import java
.util
.HashMap
;
30 import java
.util
.List
;
32 import java
.util
.TreeMap
;
33 import org
.apache
.hadoop
.conf
.Configuration
;
34 import org
.apache
.hadoop
.hbase
.Cell
;
35 import org
.apache
.hadoop
.hbase
.CellBuilder
;
36 import org
.apache
.hadoop
.hbase
.CellBuilderFactory
;
37 import org
.apache
.hadoop
.hbase
.CellBuilderType
;
38 import org
.apache
.hadoop
.hbase
.CellUtil
;
39 import org
.apache
.hadoop
.hbase
.HColumnDescriptor
;
40 import org
.apache
.hadoop
.hbase
.HConstants
;
41 import org
.apache
.hadoop
.hbase
.HRegionLocation
;
42 import org
.apache
.hadoop
.hbase
.HTableDescriptor
;
43 import org
.apache
.hadoop
.hbase
.KeyValue
;
44 import org
.apache
.hadoop
.hbase
.MetaTableAccessor
;
45 import org
.apache
.hadoop
.hbase
.ServerName
;
46 import org
.apache
.hadoop
.hbase
.TableName
;
47 import org
.apache
.hadoop
.hbase
.TableNotFoundException
;
48 import org
.apache
.hadoop
.hbase
.client
.Append
;
49 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptor
;
50 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptorBuilder
;
51 import org
.apache
.hadoop
.hbase
.client
.Delete
;
52 import org
.apache
.hadoop
.hbase
.client
.Durability
;
53 import org
.apache
.hadoop
.hbase
.client
.Get
;
54 import org
.apache
.hadoop
.hbase
.client
.Increment
;
55 import org
.apache
.hadoop
.hbase
.client
.OperationWithAttributes
;
56 import org
.apache
.hadoop
.hbase
.client
.Put
;
57 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
58 import org
.apache
.hadoop
.hbase
.client
.RegionLocator
;
59 import org
.apache
.hadoop
.hbase
.client
.Result
;
60 import org
.apache
.hadoop
.hbase
.client
.ResultScanner
;
61 import org
.apache
.hadoop
.hbase
.client
.Scan
;
62 import org
.apache
.hadoop
.hbase
.client
.Table
;
63 import org
.apache
.hadoop
.hbase
.client
.TableDescriptorBuilder
;
64 import org
.apache
.hadoop
.hbase
.filter
.Filter
;
65 import org
.apache
.hadoop
.hbase
.filter
.ParseFilter
;
66 import org
.apache
.hadoop
.hbase
.filter
.PrefixFilter
;
67 import org
.apache
.hadoop
.hbase
.filter
.WhileMatchFilter
;
68 import org
.apache
.hadoop
.hbase
.security
.UserProvider
;
69 import org
.apache
.hadoop
.hbase
.thrift
.generated
.AlreadyExists
;
70 import org
.apache
.hadoop
.hbase
.thrift
.generated
.BatchMutation
;
71 import org
.apache
.hadoop
.hbase
.thrift
.generated
.ColumnDescriptor
;
72 import org
.apache
.hadoop
.hbase
.thrift
.generated
.Hbase
;
73 import org
.apache
.hadoop
.hbase
.thrift
.generated
.IOError
;
74 import org
.apache
.hadoop
.hbase
.thrift
.generated
.IllegalArgument
;
75 import org
.apache
.hadoop
.hbase
.thrift
.generated
.Mutation
;
76 import org
.apache
.hadoop
.hbase
.thrift
.generated
.TAppend
;
77 import org
.apache
.hadoop
.hbase
.thrift
.generated
.TCell
;
78 import org
.apache
.hadoop
.hbase
.thrift
.generated
.TIncrement
;
79 import org
.apache
.hadoop
.hbase
.thrift
.generated
.TRegionInfo
;
80 import org
.apache
.hadoop
.hbase
.thrift
.generated
.TRowResult
;
81 import org
.apache
.hadoop
.hbase
.thrift
.generated
.TScan
;
82 import org
.apache
.hadoop
.hbase
.thrift
.generated
.TThriftServerType
;
83 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
84 import org
.apache
.thrift
.TException
;
85 import org
.apache
.yetus
.audience
.InterfaceAudience
;
86 import org
.slf4j
.Logger
;
87 import org
.slf4j
.LoggerFactory
;
89 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.base
.Throwables
;
92 * The HBaseServiceHandler is a glue object that connects Thrift RPC calls to the
93 * HBase client API primarily defined in the Admin and Table objects.
95 @InterfaceAudience.Private
96 @SuppressWarnings("deprecation")
97 public class ThriftHBaseServiceHandler
extends HBaseServiceHandler
implements Hbase
.Iface
{
98 private static final Logger LOG
= LoggerFactory
.getLogger(ThriftHBaseServiceHandler
.class);
100 public static final int HREGION_VERSION
= 1;
102 // nextScannerId and scannerMap are used to manage scanner state
103 private int nextScannerId
= 0;
104 private HashMap
<Integer
, ResultScannerWrapper
> scannerMap
;
105 IncrementCoalescer coalescer
;
108 * Returns a list of all the column families for a given Table.
110 byte[][] getAllColumns(Table table
) throws IOException
{
111 ColumnFamilyDescriptor
[] cds
= table
.getDescriptor().getColumnFamilies();
112 byte[][] columns
= new byte[cds
.length
][];
113 for (int i
= 0; i
< cds
.length
; i
++) {
114 columns
[i
] = Bytes
.add(cds
[i
].getName(), KeyValue
.COLUMN_FAMILY_DELIM_ARRAY
);
121 * Assigns a unique ID to the scanner and adds the mapping to an internal
124 * @param scanner the {@link ResultScanner} to add
125 * @return integer scanner id
127 protected synchronized int addScanner(ResultScanner scanner
, boolean sortColumns
) {
128 int id
= nextScannerId
++;
129 ResultScannerWrapper resultScannerWrapper
=
130 new ResultScannerWrapper(scanner
, sortColumns
);
131 scannerMap
.put(id
, resultScannerWrapper
);
136 * Returns the scanner associated with the specified ID.
138 * @param id the ID of the scanner to get
139 * @return a Scanner, or null if ID was invalid.
141 private synchronized ResultScannerWrapper
getScanner(int id
) {
142 return scannerMap
.get(id
);
146 * Removes the scanner associated with the specified ID from the internal
147 * id->scanner hash-map.
149 * @param id the ID of the scanner to remove
150 * @return a Scanner, or null if ID was invalid.
152 private synchronized ResultScannerWrapper
removeScanner(int id
) {
153 return scannerMap
.remove(id
);
156 protected ThriftHBaseServiceHandler(final Configuration c
,
157 final UserProvider userProvider
) throws IOException
{
158 super(c
, userProvider
);
159 scannerMap
= new HashMap
<>();
160 this.coalescer
= new IncrementCoalescer(this);
165 public void enableTable(ByteBuffer tableName
) throws IOError
{
167 getAdmin().enableTable(getTableName(tableName
));
168 } catch (IOException e
) {
169 LOG
.warn(e
.getMessage(), e
);
175 public void disableTable(ByteBuffer tableName
) throws IOError
{
177 getAdmin().disableTable(getTableName(tableName
));
178 } catch (IOException e
) {
179 LOG
.warn(e
.getMessage(), e
);
185 public boolean isTableEnabled(ByteBuffer tableName
) throws IOError
{
187 return this.connectionCache
.getAdmin().isTableEnabled(getTableName(tableName
));
188 } catch (IOException e
) {
189 LOG
.warn(e
.getMessage(), e
);
194 // ThriftServerRunner.compact should be deprecated and replaced with methods specific to
197 public void compact(ByteBuffer tableNameOrRegionName
) throws IOError
{
200 getAdmin().compactRegion(getBytes(tableNameOrRegionName
));
201 } catch (IllegalArgumentException e
) {
202 // Invalid region, try table
203 getAdmin().compact(TableName
.valueOf(getBytes(tableNameOrRegionName
)));
205 } catch (IOException e
) {
206 LOG
.warn(e
.getMessage(), e
);
211 // ThriftServerRunner.majorCompact should be deprecated and replaced with methods specific
212 // to table and region.
214 public void majorCompact(ByteBuffer tableNameOrRegionName
) throws IOError
{
217 getAdmin().compactRegion(getBytes(tableNameOrRegionName
));
218 } catch (IllegalArgumentException e
) {
219 // Invalid region, try table
220 getAdmin().compact(TableName
.valueOf(getBytes(tableNameOrRegionName
)));
222 } catch (IOException e
) {
223 LOG
.warn(e
.getMessage(), e
);
229 public List
<ByteBuffer
> getTableNames() throws IOError
{
231 TableName
[] tableNames
= this.getAdmin().listTableNames();
232 ArrayList
<ByteBuffer
> list
= new ArrayList
<>(tableNames
.length
);
233 for (TableName tableName
: tableNames
) {
234 list
.add(ByteBuffer
.wrap(tableName
.getName()));
237 } catch (IOException e
) {
238 LOG
.warn(e
.getMessage(), e
);
244 * @return the list of regions in the given table, or an empty list if the table does not exist
247 public List
<TRegionInfo
> getTableRegions(ByteBuffer tableName
) throws IOError
{
248 try (RegionLocator locator
= connectionCache
.getRegionLocator(getBytes(tableName
))) {
249 List
<HRegionLocation
> regionLocations
= locator
.getAllRegionLocations();
250 List
<TRegionInfo
> results
= new ArrayList
<>(regionLocations
.size());
251 for (HRegionLocation regionLocation
: regionLocations
) {
252 RegionInfo info
= regionLocation
.getRegion();
253 ServerName serverName
= regionLocation
.getServerName();
254 TRegionInfo region
= new TRegionInfo();
255 region
.serverName
= ByteBuffer
.wrap(
256 Bytes
.toBytes(serverName
.getHostname()));
257 region
.port
= serverName
.getPort();
258 region
.startKey
= ByteBuffer
.wrap(info
.getStartKey());
259 region
.endKey
= ByteBuffer
.wrap(info
.getEndKey());
260 region
.id
= info
.getRegionId();
261 region
.name
= ByteBuffer
.wrap(info
.getRegionName());
262 region
.version
= HREGION_VERSION
; // HRegion now not versioned, PB encoding used
266 } catch (TableNotFoundException e
) {
267 // Return empty list for non-existing table
268 return Collections
.emptyList();
269 } catch (IOException e
){
270 LOG
.warn(e
.getMessage(), e
);
276 public List
<TCell
> get(
277 ByteBuffer tableName
, ByteBuffer row
, ByteBuffer column
,
278 Map
<ByteBuffer
, ByteBuffer
> attributes
)
280 byte [][] famAndQf
= CellUtil
.parseColumn(getBytes(column
));
281 if (famAndQf
.length
== 1) {
282 return get(tableName
, row
, famAndQf
[0], null, attributes
);
284 if (famAndQf
.length
== 2) {
285 return get(tableName
, row
, famAndQf
[0], famAndQf
[1], attributes
);
287 throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
291 * Note: this internal interface is slightly different from public APIs in regard to handling
292 * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather,
293 * we respect qual == null as a request for the entire column family. The caller (
294 * {@link #get(ByteBuffer, ByteBuffer, ByteBuffer, Map)}) interface IS consistent in that the
295 * column is parse like normal.
297 protected List
<TCell
> get(ByteBuffer tableName
,
301 Map
<ByteBuffer
, ByteBuffer
> attributes
) throws IOError
{
304 table
= getTable(tableName
);
305 Get get
= new Get(getBytes(row
));
306 addAttributes(get
, attributes
);
307 if (qualifier
== null) {
308 get
.addFamily(family
);
310 get
.addColumn(family
, qualifier
);
312 Result result
= table
.get(get
);
313 return ThriftUtilities
.cellFromHBase(result
.rawCells());
314 } catch (IOException e
) {
315 LOG
.warn(e
.getMessage(), e
);
323 public List
<TCell
> getVer(ByteBuffer tableName
, ByteBuffer row
, ByteBuffer column
,
324 int numVersions
, Map
<ByteBuffer
, ByteBuffer
> attributes
) throws IOError
{
325 byte [][] famAndQf
= CellUtil
.parseColumn(getBytes(column
));
326 if(famAndQf
.length
== 1) {
327 return getVer(tableName
, row
, famAndQf
[0], null, numVersions
, attributes
);
329 if (famAndQf
.length
== 2) {
330 return getVer(tableName
, row
, famAndQf
[0], famAndQf
[1], numVersions
, attributes
);
332 throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
337 * Note: this public interface is slightly different from public Java APIs in regard to
338 * handling of the qualifier. Here we differ from the public Java API in that null != byte[0].
339 * Rather, we respect qual == null as a request for the entire column family. If you want to
340 * access the entire column family, use
341 * {@link #getVer(ByteBuffer, ByteBuffer, ByteBuffer, int, Map)} with a {@code column} value
342 * that lacks a {@code ':'}.
344 public List
<TCell
> getVer(ByteBuffer tableName
, ByteBuffer row
, byte[] family
,
345 byte[] qualifier
, int numVersions
, Map
<ByteBuffer
, ByteBuffer
> attributes
) throws IOError
{
349 table
= getTable(tableName
);
350 Get get
= new Get(getBytes(row
));
351 addAttributes(get
, attributes
);
352 if (null == qualifier
) {
353 get
.addFamily(family
);
355 get
.addColumn(family
, qualifier
);
357 get
.readVersions(numVersions
);
358 Result result
= table
.get(get
);
359 return ThriftUtilities
.cellFromHBase(result
.rawCells());
360 } catch (IOException e
) {
361 LOG
.warn(e
.getMessage(), e
);
369 public List
<TCell
> getVerTs(ByteBuffer tableName
, ByteBuffer row
, ByteBuffer column
,
370 long timestamp
, int numVersions
, Map
<ByteBuffer
, ByteBuffer
> attributes
) throws IOError
{
371 byte [][] famAndQf
= CellUtil
.parseColumn(getBytes(column
));
372 if (famAndQf
.length
== 1) {
373 return getVerTs(tableName
, row
, famAndQf
[0], null, timestamp
, numVersions
, attributes
);
375 if (famAndQf
.length
== 2) {
376 return getVerTs(tableName
, row
, famAndQf
[0], famAndQf
[1], timestamp
, numVersions
,
379 throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
383 * Note: this internal interface is slightly different from public APIs in regard to handling
384 * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather,
385 * we respect qual == null as a request for the entire column family. The caller (
386 * {@link #getVerTs(ByteBuffer, ByteBuffer, ByteBuffer, long, int, Map)}) interface IS
387 * consistent in that the column is parse like normal.
389 protected List
<TCell
> getVerTs(ByteBuffer tableName
, ByteBuffer row
, byte[] family
,
390 byte[] qualifier
, long timestamp
, int numVersions
, Map
<ByteBuffer
, ByteBuffer
> attributes
)
395 table
= getTable(tableName
);
396 Get get
= new Get(getBytes(row
));
397 addAttributes(get
, attributes
);
398 if (null == qualifier
) {
399 get
.addFamily(family
);
401 get
.addColumn(family
, qualifier
);
403 get
.setTimeRange(0, timestamp
);
404 get
.readVersions(numVersions
);
405 Result result
= table
.get(get
);
406 return ThriftUtilities
.cellFromHBase(result
.rawCells());
407 } catch (IOException e
) {
408 LOG
.warn(e
.getMessage(), e
);
416 public List
<TRowResult
> getRow(ByteBuffer tableName
, ByteBuffer row
,
417 Map
<ByteBuffer
, ByteBuffer
> attributes
) throws IOError
{
418 return getRowWithColumnsTs(tableName
, row
, null,
419 HConstants
.LATEST_TIMESTAMP
,
424 public List
<TRowResult
> getRowWithColumns(ByteBuffer tableName
,
426 List
<ByteBuffer
> columns
,
427 Map
<ByteBuffer
, ByteBuffer
> attributes
) throws IOError
{
428 return getRowWithColumnsTs(tableName
, row
, columns
,
429 HConstants
.LATEST_TIMESTAMP
,
434 public List
<TRowResult
> getRowTs(ByteBuffer tableName
, ByteBuffer row
,
435 long timestamp
, Map
<ByteBuffer
, ByteBuffer
> attributes
) throws IOError
{
436 return getRowWithColumnsTs(tableName
, row
, null,
437 timestamp
, attributes
);
441 public List
<TRowResult
> getRowWithColumnsTs(
442 ByteBuffer tableName
, ByteBuffer row
, List
<ByteBuffer
> columns
,
443 long timestamp
, Map
<ByteBuffer
, ByteBuffer
> attributes
) throws IOError
{
447 table
= getTable(tableName
);
448 if (columns
== null) {
449 Get get
= new Get(getBytes(row
));
450 addAttributes(get
, attributes
);
451 get
.setTimeRange(0, timestamp
);
452 Result result
= table
.get(get
);
453 return ThriftUtilities
.rowResultFromHBase(result
);
455 Get get
= new Get(getBytes(row
));
456 addAttributes(get
, attributes
);
457 for(ByteBuffer column
: columns
) {
458 byte [][] famAndQf
= CellUtil
.parseColumn(getBytes(column
));
459 if (famAndQf
.length
== 1) {
460 get
.addFamily(famAndQf
[0]);
462 get
.addColumn(famAndQf
[0], famAndQf
[1]);
465 get
.setTimeRange(0, timestamp
);
466 Result result
= table
.get(get
);
467 return ThriftUtilities
.rowResultFromHBase(result
);
468 } catch (IOException e
) {
469 LOG
.warn(e
.getMessage(), e
);
477 public List
<TRowResult
> getRows(ByteBuffer tableName
,
478 List
<ByteBuffer
> rows
,
479 Map
<ByteBuffer
, ByteBuffer
> attributes
)
481 return getRowsWithColumnsTs(tableName
, rows
, null,
482 HConstants
.LATEST_TIMESTAMP
,
487 public List
<TRowResult
> getRowsWithColumns(ByteBuffer tableName
,
488 List
<ByteBuffer
> rows
,
489 List
<ByteBuffer
> columns
,
490 Map
<ByteBuffer
, ByteBuffer
> attributes
) throws IOError
{
491 return getRowsWithColumnsTs(tableName
, rows
, columns
,
492 HConstants
.LATEST_TIMESTAMP
,
497 public List
<TRowResult
> getRowsTs(ByteBuffer tableName
,
498 List
<ByteBuffer
> rows
,
500 Map
<ByteBuffer
, ByteBuffer
> attributes
) throws IOError
{
501 return getRowsWithColumnsTs(tableName
, rows
, null,
502 timestamp
, attributes
);
506 public List
<TRowResult
> getRowsWithColumnsTs(ByteBuffer tableName
,
507 List
<ByteBuffer
> rows
,
508 List
<ByteBuffer
> columns
, long timestamp
,
509 Map
<ByteBuffer
, ByteBuffer
> attributes
) throws IOError
{
513 List
<Get
> gets
= new ArrayList
<>(rows
.size());
514 table
= getTable(tableName
);
515 if (metrics
!= null) {
516 metrics
.incNumRowKeysInBatchGet(rows
.size());
518 for (ByteBuffer row
: rows
) {
519 Get get
= new Get(getBytes(row
));
520 addAttributes(get
, attributes
);
521 if (columns
!= null) {
523 for(ByteBuffer column
: columns
) {
524 byte [][] famAndQf
= CellUtil
.parseColumn(getBytes(column
));
525 if (famAndQf
.length
== 1) {
526 get
.addFamily(famAndQf
[0]);
528 get
.addColumn(famAndQf
[0], famAndQf
[1]);
532 get
.setTimeRange(0, timestamp
);
535 Result
[] result
= table
.get(gets
);
536 return ThriftUtilities
.rowResultFromHBase(result
);
537 } catch (IOException e
) {
538 LOG
.warn(e
.getMessage(), e
);
546 public void deleteAll(
547 ByteBuffer tableName
, ByteBuffer row
, ByteBuffer column
,
548 Map
<ByteBuffer
, ByteBuffer
> attributes
)
550 deleteAllTs(tableName
, row
, column
, HConstants
.LATEST_TIMESTAMP
,
555 public void deleteAllTs(ByteBuffer tableName
,
558 long timestamp
, Map
<ByteBuffer
, ByteBuffer
> attributes
) throws IOError
{
561 table
= getTable(tableName
);
562 Delete delete
= new Delete(getBytes(row
));
563 addAttributes(delete
, attributes
);
564 byte [][] famAndQf
= CellUtil
.parseColumn(getBytes(column
));
565 if (famAndQf
.length
== 1) {
566 delete
.addFamily(famAndQf
[0], timestamp
);
568 delete
.addColumns(famAndQf
[0], famAndQf
[1], timestamp
);
570 table
.delete(delete
);
572 } catch (IOException e
) {
573 LOG
.warn(e
.getMessage(), e
);
581 public void deleteAllRow(
582 ByteBuffer tableName
, ByteBuffer row
,
583 Map
<ByteBuffer
, ByteBuffer
> attributes
) throws IOError
{
584 deleteAllRowTs(tableName
, row
, HConstants
.LATEST_TIMESTAMP
, attributes
);
588 public void deleteAllRowTs(
589 ByteBuffer tableName
, ByteBuffer row
, long timestamp
,
590 Map
<ByteBuffer
, ByteBuffer
> attributes
) throws IOError
{
593 table
= getTable(tableName
);
594 Delete delete
= new Delete(getBytes(row
), timestamp
);
595 addAttributes(delete
, attributes
);
596 table
.delete(delete
);
597 } catch (IOException e
) {
598 LOG
.warn(e
.getMessage(), e
);
606 public void createTable(ByteBuffer in_tableName
,
607 List
<ColumnDescriptor
> columnFamilies
) throws IOError
, IllegalArgument
, AlreadyExists
{
608 TableName tableName
= getTableName(in_tableName
);
610 if (getAdmin().tableExists(tableName
)) {
611 throw new AlreadyExists("table name already in use");
613 TableDescriptorBuilder
.ModifyableTableDescriptor tableDescriptor
=
614 new TableDescriptorBuilder
.ModifyableTableDescriptor(tableName
);
615 for (ColumnDescriptor col
: columnFamilies
) {
616 ColumnFamilyDescriptorBuilder
.ModifyableColumnFamilyDescriptor familyDescriptor
=
617 ThriftUtilities
.colDescFromThrift(col
);
618 tableDescriptor
.setColumnFamily(familyDescriptor
);
620 getAdmin().createTable(tableDescriptor
);
621 } catch (IOException e
) {
622 LOG
.warn(e
.getMessage(), e
);
624 } catch (IllegalArgumentException e
) {
625 LOG
.warn(e
.getMessage(), e
);
626 throw new IllegalArgument(Throwables
.getStackTraceAsString(e
));
630 private static TableName
getTableName(ByteBuffer buffer
) {
631 return TableName
.valueOf(getBytes(buffer
));
635 public void deleteTable(ByteBuffer in_tableName
) throws IOError
{
636 TableName tableName
= getTableName(in_tableName
);
637 if (LOG
.isDebugEnabled()) {
638 LOG
.debug("deleteTable: table={}", tableName
);
641 if (!getAdmin().tableExists(tableName
)) {
642 throw new IOException("table does not exist");
644 getAdmin().deleteTable(tableName
);
645 } catch (IOException e
) {
646 LOG
.warn(e
.getMessage(), e
);
652 public void mutateRow(ByteBuffer tableName
, ByteBuffer row
,
653 List
<Mutation
> mutations
, Map
<ByteBuffer
, ByteBuffer
> attributes
)
654 throws IOError
, IllegalArgument
{
655 mutateRowTs(tableName
, row
, mutations
, HConstants
.LATEST_TIMESTAMP
, attributes
);
659 public void mutateRowTs(ByteBuffer tableName
, ByteBuffer row
,
660 List
<Mutation
> mutations
, long timestamp
,
661 Map
<ByteBuffer
, ByteBuffer
> attributes
)
662 throws IOError
, IllegalArgument
{
665 table
= getTable(tableName
);
666 Put put
= new Put(getBytes(row
), timestamp
);
667 addAttributes(put
, attributes
);
669 Delete delete
= new Delete(getBytes(row
));
670 addAttributes(delete
, attributes
);
671 if (metrics
!= null) {
672 metrics
.incNumRowKeysInBatchMutate(mutations
.size());
675 // I apologize for all this mess :)
676 CellBuilder builder
= CellBuilderFactory
.create(CellBuilderType
.SHALLOW_COPY
);
677 for (Mutation m
: mutations
) {
678 byte[][] famAndQf
= CellUtil
.parseColumn(getBytes(m
.column
));
680 if (famAndQf
.length
== 1) {
681 delete
.addFamily(famAndQf
[0], timestamp
);
683 delete
.addColumns(famAndQf
[0], famAndQf
[1], timestamp
);
685 delete
.setDurability(m
.writeToWAL ? Durability
.SYNC_WAL
: Durability
.SKIP_WAL
);
687 if(famAndQf
.length
== 1) {
688 LOG
.warn("No column qualifier specified. Delete is the only mutation supported "
689 + "over the whole column family.");
691 put
.add(builder
.clear()
692 .setRow(put
.getRow())
693 .setFamily(famAndQf
[0])
694 .setQualifier(famAndQf
[1])
695 .setTimestamp(put
.getTimestamp())
696 .setType(Cell
.Type
.Put
)
697 .setValue(m
.value
!= null ?
getBytes(m
.value
)
698 : HConstants
.EMPTY_BYTE_ARRAY
)
701 put
.setDurability(m
.writeToWAL ? Durability
.SYNC_WAL
: Durability
.SKIP_WAL
);
704 if (!delete
.isEmpty()) {
705 table
.delete(delete
);
707 if (!put
.isEmpty()) {
710 } catch (IOException e
) {
711 LOG
.warn(e
.getMessage(), e
);
713 } catch (IllegalArgumentException e
) {
714 LOG
.warn(e
.getMessage(), e
);
715 throw new IllegalArgument(Throwables
.getStackTraceAsString(e
));
722 public void mutateRows(ByteBuffer tableName
, List
<BatchMutation
> rowBatches
,
723 Map
<ByteBuffer
, ByteBuffer
> attributes
)
724 throws IOError
, IllegalArgument
, TException
{
725 mutateRowsTs(tableName
, rowBatches
, HConstants
.LATEST_TIMESTAMP
, attributes
);
729 public void mutateRowsTs(
730 ByteBuffer tableName
, List
<BatchMutation
> rowBatches
, long timestamp
,
731 Map
<ByteBuffer
, ByteBuffer
> attributes
)
732 throws IOError
, IllegalArgument
, TException
{
733 List
<Put
> puts
= new ArrayList
<>();
734 List
<Delete
> deletes
= new ArrayList
<>();
735 CellBuilder builder
= CellBuilderFactory
.create(CellBuilderType
.SHALLOW_COPY
);
736 for (BatchMutation batch
: rowBatches
) {
737 byte[] row
= getBytes(batch
.row
);
738 List
<Mutation
> mutations
= batch
.mutations
;
739 Delete delete
= new Delete(row
);
740 addAttributes(delete
, attributes
);
741 Put put
= new Put(row
, timestamp
);
742 addAttributes(put
, attributes
);
743 for (Mutation m
: mutations
) {
744 byte[][] famAndQf
= CellUtil
.parseColumn(getBytes(m
.column
));
746 // no qualifier, family only.
747 if (famAndQf
.length
== 1) {
748 delete
.addFamily(famAndQf
[0], timestamp
);
750 delete
.addColumns(famAndQf
[0], famAndQf
[1], timestamp
);
752 delete
.setDurability(m
.writeToWAL ? Durability
.SYNC_WAL
753 : Durability
.SKIP_WAL
);
755 if (famAndQf
.length
== 1) {
756 LOG
.warn("No column qualifier specified. Delete is the only mutation supported "
757 + "over the whole column family.");
759 if (famAndQf
.length
== 2) {
761 put
.add(builder
.clear()
762 .setRow(put
.getRow())
763 .setFamily(famAndQf
[0])
764 .setQualifier(famAndQf
[1])
765 .setTimestamp(put
.getTimestamp())
766 .setType(Cell
.Type
.Put
)
767 .setValue(m
.value
!= null ?
getBytes(m
.value
)
768 : HConstants
.EMPTY_BYTE_ARRAY
)
770 } catch (IOException e
) {
771 throw new IllegalArgumentException(e
);
774 throw new IllegalArgumentException("Invalid famAndQf provided.");
776 put
.setDurability(m
.writeToWAL ? Durability
.SYNC_WAL
: Durability
.SKIP_WAL
);
779 if (!delete
.isEmpty()) {
782 if (!put
.isEmpty()) {
789 table
= getTable(tableName
);
790 if (!puts
.isEmpty()) {
793 if (!deletes
.isEmpty()) {
794 table
.delete(deletes
);
796 } catch (IOException e
) {
797 LOG
.warn(e
.getMessage(), e
);
799 } catch (IllegalArgumentException e
) {
800 LOG
.warn(e
.getMessage(), e
);
801 throw new IllegalArgument(Throwables
.getStackTraceAsString(e
));
808 public long atomicIncrement(
809 ByteBuffer tableName
, ByteBuffer row
, ByteBuffer column
, long amount
)
810 throws IOError
, IllegalArgument
, TException
{
811 byte [][] famAndQf
= CellUtil
.parseColumn(getBytes(column
));
812 if(famAndQf
.length
== 1) {
813 return atomicIncrement(tableName
, row
, famAndQf
[0], HConstants
.EMPTY_BYTE_ARRAY
, amount
);
815 return atomicIncrement(tableName
, row
, famAndQf
[0], famAndQf
[1], amount
);
818 protected long atomicIncrement(ByteBuffer tableName
, ByteBuffer row
,
819 byte [] family
, byte [] qualifier
, long amount
)
820 throws IOError
, IllegalArgument
, TException
{
823 table
= getTable(tableName
);
824 return table
.incrementColumnValue(
825 getBytes(row
), family
, qualifier
, amount
);
826 } catch (IOException e
) {
827 LOG
.warn(e
.getMessage(), e
);
835 public void scannerClose(int id
) throws IOError
, IllegalArgument
{
836 LOG
.debug("scannerClose: id={}", id
);
837 ResultScannerWrapper resultScannerWrapper
= getScanner(id
);
838 if (resultScannerWrapper
== null) {
839 LOG
.warn("scanner ID is invalid");
840 throw new IllegalArgument("scanner ID is invalid");
842 resultScannerWrapper
.getScanner().close();
847 public List
<TRowResult
> scannerGetList(int id
,int nbRows
)
848 throws IllegalArgument
, IOError
{
849 LOG
.debug("scannerGetList: id={}", id
);
850 ResultScannerWrapper resultScannerWrapper
= getScanner(id
);
851 if (null == resultScannerWrapper
) {
852 String message
= "scanner ID is invalid";
854 throw new IllegalArgument("scanner ID is invalid");
859 results
= resultScannerWrapper
.getScanner().next(nbRows
);
860 if (null == results
) {
861 return new ArrayList
<>();
863 } catch (IOException e
) {
864 LOG
.warn(e
.getMessage(), e
);
867 return ThriftUtilities
.rowResultFromHBase(results
, resultScannerWrapper
.isColumnSorted());
871 public List
<TRowResult
> scannerGet(int id
) throws IllegalArgument
, IOError
{
872 return scannerGetList(id
,1);
876 public int scannerOpenWithScan(ByteBuffer tableName
, TScan tScan
,
877 Map
<ByteBuffer
, ByteBuffer
> attributes
)
882 table
= getTable(tableName
);
883 Scan scan
= new Scan();
884 addAttributes(scan
, attributes
);
885 if (tScan
.isSetStartRow()) {
886 scan
.withStartRow(tScan
.getStartRow());
888 if (tScan
.isSetStopRow()) {
889 scan
.setStopRow(tScan
.getStopRow());
891 if (tScan
.isSetTimestamp()) {
892 scan
.setTimeRange(0, tScan
.getTimestamp());
894 if (tScan
.isSetCaching()) {
895 scan
.setCaching(tScan
.getCaching());
897 if (tScan
.isSetBatchSize()) {
898 scan
.setBatch(tScan
.getBatchSize());
900 if (tScan
.isSetColumns() && !tScan
.getColumns().isEmpty()) {
901 for(ByteBuffer column
: tScan
.getColumns()) {
902 byte [][] famQf
= CellUtil
.parseColumn(getBytes(column
));
903 if(famQf
.length
== 1) {
904 scan
.addFamily(famQf
[0]);
906 scan
.addColumn(famQf
[0], famQf
[1]);
910 if (tScan
.isSetFilterString()) {
911 ParseFilter parseFilter
= new ParseFilter();
913 parseFilter
.parseFilterString(tScan
.getFilterString()));
915 if (tScan
.isSetReversed()) {
916 scan
.setReversed(tScan
.isReversed());
918 if (tScan
.isSetCacheBlocks()) {
919 scan
.setCacheBlocks(tScan
.isCacheBlocks());
921 return addScanner(table
.getScanner(scan
), tScan
.sortColumns
);
922 } catch (IOException e
) {
923 LOG
.warn(e
.getMessage(), e
);
931 public int scannerOpen(ByteBuffer tableName
, ByteBuffer startRow
,
932 List
<ByteBuffer
> columns
,
933 Map
<ByteBuffer
, ByteBuffer
> attributes
) throws IOError
{
937 table
= getTable(tableName
);
938 Scan scan
= new Scan().withStartRow(getBytes(startRow
));
939 addAttributes(scan
, attributes
);
940 if(columns
!= null && !columns
.isEmpty()) {
941 for(ByteBuffer column
: columns
) {
942 byte [][] famQf
= CellUtil
.parseColumn(getBytes(column
));
943 if(famQf
.length
== 1) {
944 scan
.addFamily(famQf
[0]);
946 scan
.addColumn(famQf
[0], famQf
[1]);
950 return addScanner(table
.getScanner(scan
), false);
951 } catch (IOException e
) {
952 LOG
.warn(e
.getMessage(), e
);
960 public int scannerOpenWithStop(ByteBuffer tableName
, ByteBuffer startRow
,
961 ByteBuffer stopRow
, List
<ByteBuffer
> columns
,
962 Map
<ByteBuffer
, ByteBuffer
> attributes
)
963 throws IOError
, TException
{
967 table
= getTable(tableName
);
968 Scan scan
= new Scan().withStartRow(getBytes(startRow
)).withStopRow(getBytes(stopRow
));
969 addAttributes(scan
, attributes
);
970 if(columns
!= null && !columns
.isEmpty()) {
971 for(ByteBuffer column
: columns
) {
972 byte [][] famQf
= CellUtil
.parseColumn(getBytes(column
));
973 if(famQf
.length
== 1) {
974 scan
.addFamily(famQf
[0]);
976 scan
.addColumn(famQf
[0], famQf
[1]);
980 return addScanner(table
.getScanner(scan
), false);
981 } catch (IOException e
) {
982 LOG
.warn(e
.getMessage(), e
);
990 public int scannerOpenWithPrefix(ByteBuffer tableName
,
991 ByteBuffer startAndPrefix
,
992 List
<ByteBuffer
> columns
,
993 Map
<ByteBuffer
, ByteBuffer
> attributes
)
994 throws IOError
, TException
{
998 table
= getTable(tableName
);
999 Scan scan
= new Scan().withStartRow(getBytes(startAndPrefix
));
1000 addAttributes(scan
, attributes
);
1001 Filter f
= new WhileMatchFilter(
1002 new PrefixFilter(getBytes(startAndPrefix
)));
1004 if (columns
!= null && !columns
.isEmpty()) {
1005 for(ByteBuffer column
: columns
) {
1006 byte [][] famQf
= CellUtil
.parseColumn(getBytes(column
));
1007 if(famQf
.length
== 1) {
1008 scan
.addFamily(famQf
[0]);
1010 scan
.addColumn(famQf
[0], famQf
[1]);
1014 return addScanner(table
.getScanner(scan
), false);
1015 } catch (IOException e
) {
1016 LOG
.warn(e
.getMessage(), e
);
1017 throw getIOError(e
);
1024 public int scannerOpenTs(ByteBuffer tableName
, ByteBuffer startRow
,
1025 List
<ByteBuffer
> columns
, long timestamp
,
1026 Map
<ByteBuffer
, ByteBuffer
> attributes
) throws IOError
, TException
{
1030 table
= getTable(tableName
);
1031 Scan scan
= new Scan().withStartRow(getBytes(startRow
));
1032 addAttributes(scan
, attributes
);
1033 scan
.setTimeRange(0, timestamp
);
1034 if (columns
!= null && !columns
.isEmpty()) {
1035 for (ByteBuffer column
: columns
) {
1036 byte [][] famQf
= CellUtil
.parseColumn(getBytes(column
));
1037 if(famQf
.length
== 1) {
1038 scan
.addFamily(famQf
[0]);
1040 scan
.addColumn(famQf
[0], famQf
[1]);
1044 return addScanner(table
.getScanner(scan
), false);
1045 } catch (IOException e
) {
1046 LOG
.warn(e
.getMessage(), e
);
1047 throw getIOError(e
);
1054 public int scannerOpenWithStopTs(ByteBuffer tableName
, ByteBuffer startRow
,
1055 ByteBuffer stopRow
, List
<ByteBuffer
> columns
, long timestamp
,
1056 Map
<ByteBuffer
, ByteBuffer
> attributes
)
1057 throws IOError
, TException
{
1061 table
= getTable(tableName
);
1062 Scan scan
= new Scan().withStartRow(getBytes(startRow
)).withStopRow(getBytes(stopRow
));
1063 addAttributes(scan
, attributes
);
1064 scan
.setTimeRange(0, timestamp
);
1065 if (columns
!= null && !columns
.isEmpty()) {
1066 for (ByteBuffer column
: columns
) {
1067 byte [][] famQf
= CellUtil
.parseColumn(getBytes(column
));
1068 if(famQf
.length
== 1) {
1069 scan
.addFamily(famQf
[0]);
1071 scan
.addColumn(famQf
[0], famQf
[1]);
1075 scan
.setTimeRange(0, timestamp
);
1076 return addScanner(table
.getScanner(scan
), false);
1077 } catch (IOException e
) {
1078 LOG
.warn(e
.getMessage(), e
);
1079 throw getIOError(e
);
1086 public Map
<ByteBuffer
, ColumnDescriptor
> getColumnDescriptors(
1087 ByteBuffer tableName
) throws IOError
, TException
{
1091 TreeMap
<ByteBuffer
, ColumnDescriptor
> columns
= new TreeMap
<>();
1093 table
= getTable(tableName
);
1094 HTableDescriptor desc
= new HTableDescriptor(table
.getDescriptor());
1096 for (HColumnDescriptor e
: desc
.getFamilies()) {
1097 ColumnDescriptor col
= ThriftUtilities
.colDescFromHbase(e
);
1098 columns
.put(col
.name
, col
);
1101 } catch (IOException e
) {
1102 LOG
.warn(e
.getMessage(), e
);
1103 throw getIOError(e
);
1109 private void closeTable(Table table
) throws IOError
{
1114 } catch (IOException e
){
1115 LOG
.error(e
.getMessage(), e
);
1116 throw getIOError(e
);
1121 public TRegionInfo
getRegionInfo(ByteBuffer searchRow
) throws IOError
{
1123 byte[] row
= getBytes(searchRow
);
1124 Result startRowResult
= getReverseScanResult(TableName
.META_TABLE_NAME
.getName(), row
,
1125 HConstants
.CATALOG_FAMILY
);
1127 if (startRowResult
== null) {
1128 throw new IOException("Cannot find row in "+ TableName
.META_TABLE_NAME
+", row="
1129 + Bytes
.toStringBinary(row
));
1132 // find region start and end keys
1133 RegionInfo regionInfo
= MetaTableAccessor
.getRegionInfo(startRowResult
);
1134 if (regionInfo
== null) {
1135 throw new IOException("RegionInfo REGIONINFO was null or " +
1136 " empty in Meta for row="
1137 + Bytes
.toStringBinary(row
));
1139 TRegionInfo region
= new TRegionInfo();
1140 region
.setStartKey(regionInfo
.getStartKey());
1141 region
.setEndKey(regionInfo
.getEndKey());
1142 region
.id
= regionInfo
.getRegionId();
1143 region
.setName(regionInfo
.getRegionName());
1144 region
.version
= HREGION_VERSION
; // version not used anymore, PB encoding used.
1146 // find region assignment to server
1147 ServerName serverName
= MetaTableAccessor
.getServerName(startRowResult
, 0);
1148 if (serverName
!= null) {
1149 region
.setServerName(Bytes
.toBytes(serverName
.getHostname()));
1150 region
.port
= serverName
.getPort();
1153 } catch (IOException e
) {
1154 LOG
.warn(e
.getMessage(), e
);
1155 throw getIOError(e
);
1159 private Result
getReverseScanResult(byte[] tableName
, byte[] row
, byte[] family
)
1160 throws IOException
{
1161 Scan scan
= new Scan().withStartRow(row
);
1162 scan
.setReversed(true);
1163 scan
.addFamily(family
);
1164 scan
.withStartRow(row
);
1165 try (Table table
= getTable(tableName
);
1166 ResultScanner scanner
= table
.getScanner(scan
)) {
1167 return scanner
.next();
1172 public void increment(TIncrement tincrement
) throws IOError
, TException
{
1174 if (tincrement
.getRow().length
== 0 || tincrement
.getTable().length
== 0) {
1175 throw new TException("Must supply a table and a row key; can't increment");
1178 if (conf
.getBoolean(COALESCE_INC_KEY
, false)) {
1179 this.coalescer
.queueIncrement(tincrement
);
1185 table
= getTable(tincrement
.getTable());
1186 Increment inc
= ThriftUtilities
.incrementFromThrift(tincrement
);
1187 table
.increment(inc
);
1188 } catch (IOException e
) {
1189 LOG
.warn(e
.getMessage(), e
);
1190 throw getIOError(e
);
1197 public void incrementRows(List
<TIncrement
> tincrements
) throws IOError
, TException
{
1198 if (conf
.getBoolean(COALESCE_INC_KEY
, false)) {
1199 this.coalescer
.queueIncrements(tincrements
);
1202 for (TIncrement tinc
: tincrements
) {
1208 public List
<TCell
> append(TAppend tappend
) throws IOError
, TException
{
1209 if (tappend
.getRow().length
== 0 || tappend
.getTable().length
== 0) {
1210 throw new TException("Must supply a table and a row key; can't append");
1215 table
= getTable(tappend
.getTable());
1216 Append append
= ThriftUtilities
.appendFromThrift(tappend
);
1217 Result result
= table
.append(append
);
1218 return ThriftUtilities
.cellFromHBase(result
.rawCells());
1219 } catch (IOException e
) {
1220 LOG
.warn(e
.getMessage(), e
);
1221 throw getIOError(e
);
1228 public boolean checkAndPut(ByteBuffer tableName
, ByteBuffer row
, ByteBuffer column
,
1229 ByteBuffer value
, Mutation mput
, Map
<ByteBuffer
, ByteBuffer
> attributes
) throws IOError
,
1230 IllegalArgument
, TException
{
1233 put
= new Put(getBytes(row
), HConstants
.LATEST_TIMESTAMP
);
1234 addAttributes(put
, attributes
);
1236 byte[][] famAndQf
= CellUtil
.parseColumn(getBytes(mput
.column
));
1237 put
.add(CellBuilderFactory
.create(CellBuilderType
.SHALLOW_COPY
)
1238 .setRow(put
.getRow())
1239 .setFamily(famAndQf
[0])
1240 .setQualifier(famAndQf
[1])
1241 .setTimestamp(put
.getTimestamp())
1242 .setType(Cell
.Type
.Put
)
1243 .setValue(mput
.value
!= null ?
getBytes(mput
.value
)
1244 : HConstants
.EMPTY_BYTE_ARRAY
)
1246 put
.setDurability(mput
.writeToWAL ? Durability
.SYNC_WAL
: Durability
.SKIP_WAL
);
1247 } catch (IOException
| IllegalArgumentException e
) {
1248 LOG
.warn(e
.getMessage(), e
);
1249 throw new IllegalArgument(Throwables
.getStackTraceAsString(e
));
1254 table
= getTable(tableName
);
1255 byte[][] famAndQf
= CellUtil
.parseColumn(getBytes(column
));
1256 Table
.CheckAndMutateBuilder mutateBuilder
=
1257 table
.checkAndMutate(getBytes(row
), famAndQf
[0]).qualifier(famAndQf
[1]);
1258 if (value
!= null) {
1259 return mutateBuilder
.ifEquals(getBytes(value
)).thenPut(put
);
1261 return mutateBuilder
.ifNotExists().thenPut(put
);
1263 } catch (IOException e
) {
1264 LOG
.warn(e
.getMessage(), e
);
1265 throw getIOError(e
);
1266 } catch (IllegalArgumentException e
) {
1267 LOG
.warn(e
.getMessage(), e
);
1268 throw new IllegalArgument(Throwables
.getStackTraceAsString(e
));
1275 public TThriftServerType
getThriftServerType() {
1276 return TThriftServerType
.ONE
;
1280 public String
getClusterId() throws TException
{
1281 return connectionCache
.getClusterId();
1284 private static IOError
getIOError(Throwable throwable
) {
1285 IOError error
= new IOErrorWithCause(throwable
);
1286 error
.setMessage(Throwables
.getStackTraceAsString(throwable
));
1291 * Adds all the attributes into the Operation object
1293 private static void addAttributes(OperationWithAttributes op
,
1294 Map
<ByteBuffer
, ByteBuffer
> attributes
) {
1295 if (attributes
== null || attributes
.isEmpty()) {
1298 for (Map
.Entry
<ByteBuffer
, ByteBuffer
> entry
: attributes
.entrySet()) {
1299 String name
= Bytes
.toStringBinary(getBytes(entry
.getKey()));
1300 byte[] value
= getBytes(entry
.getValue());
1301 op
.setAttribute(name
, value
);
1305 protected static class ResultScannerWrapper
{
1307 private final ResultScanner scanner
;
1308 private final boolean sortColumns
;
1309 public ResultScannerWrapper(ResultScanner resultScanner
,
1310 boolean sortResultColumns
) {
1311 scanner
= resultScanner
;
1312 sortColumns
= sortResultColumns
;
1315 public ResultScanner
getScanner() {
1319 public boolean isColumnSorted() {
1324 public static class IOErrorWithCause
extends IOError
{
1325 private final Throwable cause
;
1326 public IOErrorWithCause(Throwable cause
) {
1331 public synchronized Throwable
getCause() {
1336 public boolean equals(Object other
) {
1337 if (super.equals(other
) &&
1338 other
instanceof IOErrorWithCause
) {
1339 Throwable otherCause
= ((IOErrorWithCause
) other
).getCause();
1340 if (this.getCause() != null) {
1341 return otherCause
!= null && this.getCause().equals(otherCause
);
1343 return otherCause
== null;
1350 public int hashCode() {
1351 int result
= super.hashCode();
1352 result
= 31 * result
+ (cause
!= null ? cause
.hashCode() : 0);