2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.apache
.hadoop
.hbase
.client
.ConnectionUtils
.filterCells
;
22 import java
.io
.IOException
;
23 import java
.util
.ArrayDeque
;
24 import java
.util
.ArrayList
;
25 import java
.util
.Deque
;
26 import java
.util
.List
;
28 import org
.apache
.hadoop
.hbase
.Cell
;
29 import org
.apache
.hadoop
.hbase
.CellUtil
;
30 import org
.apache
.yetus
.audience
.InterfaceAudience
;
31 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
34 * A scan result cache for batched scan, i.e,
35 * {@code scan.getBatch() > 0 && !scan.getAllowPartialResults()}.
37 * If user setBatch(5) and rpc returns 3+5+5+5+3 cells, we should return 5+5+5+5+1 to user. setBatch
38 * doesn't mean setAllowPartialResult(true).
41 @InterfaceAudience.Private
42 public class BatchScanResultCache
implements ScanResultCache
{
44 private final int batch
;
46 // used to filter out the cells that already returned to user as we always start from the
47 // beginning of a row when retry.
48 private Cell lastCell
;
50 private boolean lastResultPartial
;
52 private final Deque
<Result
> partialResults
= new ArrayDeque
<>();
54 private int numCellsOfPartialResults
;
56 private int numberOfCompleteRows
;
58 public BatchScanResultCache(int batch
) {
62 private void recordLastResult(Result result
) {
63 lastCell
= result
.rawCells()[result
.rawCells().length
- 1];
64 lastResultPartial
= result
.mayHaveMoreCellsInRow();
67 private Result
createCompletedResult() throws IOException
{
68 numberOfCompleteRows
++;
69 Result result
= Result
.createCompleteResult(partialResults
);
70 partialResults
.clear();
71 numCellsOfPartialResults
= 0;
75 // Add new result to the partial list and return a batched Result if caching size exceed batching
76 // limit. As the RS will also respect the scan.getBatch, we can make sure that we will get only
77 // one Result back at most(or null, which means we do not have enough cells).
78 private Result
regroupResults(Result result
) {
79 partialResults
.addLast(result
);
80 numCellsOfPartialResults
+= result
.size();
81 if (numCellsOfPartialResults
< batch
) {
84 Cell
[] cells
= new Cell
[batch
];
86 boolean stale
= false;
88 Result r
= partialResults
.pollFirst();
89 stale
= stale
|| r
.isStale();
90 int newCellCount
= cellCount
+ r
.size();
91 if (newCellCount
> batch
) {
92 // We have more cells than expected, so split the current result
93 int len
= batch
- cellCount
;
94 System
.arraycopy(r
.rawCells(), 0, cells
, cellCount
, len
);
95 Cell
[] remainingCells
= new Cell
[r
.size() - len
];
96 System
.arraycopy(r
.rawCells(), len
, remainingCells
, 0, r
.size() - len
);
97 partialResults
.addFirst(
98 Result
.create(remainingCells
, r
.getExists(), r
.isStale(), r
.mayHaveMoreCellsInRow()));
101 System
.arraycopy(r
.rawCells(), 0, cells
, cellCount
, r
.size());
102 if (newCellCount
== batch
) {
105 cellCount
= newCellCount
;
107 numCellsOfPartialResults
-= batch
;
108 return Result
.create(cells
, null, stale
,
109 result
.mayHaveMoreCellsInRow() || !partialResults
.isEmpty());
113 public Result
[] addAndGet(Result
[] results
, boolean isHeartbeatMessage
) throws IOException
{
114 if (results
.length
== 0) {
115 if (!isHeartbeatMessage
) {
116 if (!partialResults
.isEmpty()) {
117 return new Result
[] { createCompletedResult() };
119 if (lastResultPartial
) {
120 // An empty non heartbeat result indicate that there must be a row change. So if the
121 // lastResultPartial is true then we need to increase numberOfCompleteRows.
122 numberOfCompleteRows
++;
125 return EMPTY_RESULT_ARRAY
;
127 List
<Result
> regroupedResults
= new ArrayList
<>();
128 for (Result result
: results
) {
129 result
= filterCells(result
, lastCell
);
130 if (result
== null) {
133 if (!partialResults
.isEmpty()) {
134 if (!Bytes
.equals(partialResults
.peek().getRow(), result
.getRow())) {
135 // there is a row change
136 regroupedResults
.add(createCompletedResult());
138 } else if (lastResultPartial
&& !CellUtil
.matchingRows(lastCell
, result
.getRow())) {
139 // As for batched scan we may return partial results to user if we reach the batch limit, so
140 // here we need to use lastCell to determine if there is row change and increase
141 // numberOfCompleteRows.
142 numberOfCompleteRows
++;
144 // check if we have a row change
145 if (!partialResults
.isEmpty() &&
146 !Bytes
.equals(partialResults
.peek().getRow(), result
.getRow())) {
147 regroupedResults
.add(createCompletedResult());
149 Result regroupedResult
= regroupResults(result
);
150 if (regroupedResult
!= null) {
151 if (!regroupedResult
.mayHaveMoreCellsInRow()) {
152 numberOfCompleteRows
++;
154 regroupedResults
.add(regroupedResult
);
155 // only update last cell when we actually return it to user.
156 recordLastResult(regroupedResult
);
158 if (!result
.mayHaveMoreCellsInRow() && !partialResults
.isEmpty()) {
159 // We are done for this row
160 regroupedResults
.add(createCompletedResult());
163 return regroupedResults
.toArray(new Result
[0]);
167 public void clear() {
168 partialResults
.clear();
169 numCellsOfPartialResults
= 0;
173 public int numberOfCompleteRows() {
174 return numberOfCompleteRows
;