HBASE-26688 Threads shared EMPTY_RESULT may lead to unexpected client job down. ...
[hbase.git] / hbase-client / src / main / java / org / apache / hadoop / hbase / client / BatchScanResultCache.java
blob3b27298585e962ed2b020a47ba6928a0c74ae866
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.client;
20 import static org.apache.hadoop.hbase.client.ConnectionUtils.filterCells;
22 import java.io.IOException;
23 import java.util.ArrayDeque;
24 import java.util.ArrayList;
25 import java.util.Deque;
26 import java.util.List;
28 import org.apache.hadoop.hbase.Cell;
29 import org.apache.hadoop.hbase.CellUtil;
30 import org.apache.yetus.audience.InterfaceAudience;
31 import org.apache.hadoop.hbase.util.Bytes;
33 /**
34 * A scan result cache for batched scan, i.e,
35 * {@code scan.getBatch() > 0 && !scan.getAllowPartialResults()}.
36 * <p>
37 * If user setBatch(5) and rpc returns 3+5+5+5+3 cells, we should return 5+5+5+5+1 to user. setBatch
38 * doesn't mean setAllowPartialResult(true).
39 * @since 2.0.0
41 @InterfaceAudience.Private
42 public class BatchScanResultCache implements ScanResultCache {
44 private final int batch;
46 // used to filter out the cells that already returned to user as we always start from the
47 // beginning of a row when retry.
48 private Cell lastCell;
50 private boolean lastResultPartial;
52 private final Deque<Result> partialResults = new ArrayDeque<>();
54 private int numCellsOfPartialResults;
56 private int numberOfCompleteRows;
58 public BatchScanResultCache(int batch) {
59 this.batch = batch;
62 private void recordLastResult(Result result) {
63 lastCell = result.rawCells()[result.rawCells().length - 1];
64 lastResultPartial = result.mayHaveMoreCellsInRow();
67 private Result createCompletedResult() throws IOException {
68 numberOfCompleteRows++;
69 Result result = Result.createCompleteResult(partialResults);
70 partialResults.clear();
71 numCellsOfPartialResults = 0;
72 return result;
75 // Add new result to the partial list and return a batched Result if caching size exceed batching
76 // limit. As the RS will also respect the scan.getBatch, we can make sure that we will get only
77 // one Result back at most(or null, which means we do not have enough cells).
78 private Result regroupResults(Result result) {
79 partialResults.addLast(result);
80 numCellsOfPartialResults += result.size();
81 if (numCellsOfPartialResults < batch) {
82 return null;
84 Cell[] cells = new Cell[batch];
85 int cellCount = 0;
86 boolean stale = false;
87 for (;;) {
88 Result r = partialResults.pollFirst();
89 stale = stale || r.isStale();
90 int newCellCount = cellCount + r.size();
91 if (newCellCount > batch) {
92 // We have more cells than expected, so split the current result
93 int len = batch - cellCount;
94 System.arraycopy(r.rawCells(), 0, cells, cellCount, len);
95 Cell[] remainingCells = new Cell[r.size() - len];
96 System.arraycopy(r.rawCells(), len, remainingCells, 0, r.size() - len);
97 partialResults.addFirst(
98 Result.create(remainingCells, r.getExists(), r.isStale(), r.mayHaveMoreCellsInRow()));
99 break;
101 System.arraycopy(r.rawCells(), 0, cells, cellCount, r.size());
102 if (newCellCount == batch) {
103 break;
105 cellCount = newCellCount;
107 numCellsOfPartialResults -= batch;
108 return Result.create(cells, null, stale,
109 result.mayHaveMoreCellsInRow() || !partialResults.isEmpty());
112 @Override
113 public Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException {
114 if (results.length == 0) {
115 if (!isHeartbeatMessage) {
116 if (!partialResults.isEmpty()) {
117 return new Result[] { createCompletedResult() };
119 if (lastResultPartial) {
120 // An empty non heartbeat result indicate that there must be a row change. So if the
121 // lastResultPartial is true then we need to increase numberOfCompleteRows.
122 numberOfCompleteRows++;
125 return EMPTY_RESULT_ARRAY;
127 List<Result> regroupedResults = new ArrayList<>();
128 for (Result result : results) {
129 result = filterCells(result, lastCell);
130 if (result == null) {
131 continue;
133 if (!partialResults.isEmpty()) {
134 if (!Bytes.equals(partialResults.peek().getRow(), result.getRow())) {
135 // there is a row change
136 regroupedResults.add(createCompletedResult());
138 } else if (lastResultPartial && !CellUtil.matchingRows(lastCell, result.getRow())) {
139 // As for batched scan we may return partial results to user if we reach the batch limit, so
140 // here we need to use lastCell to determine if there is row change and increase
141 // numberOfCompleteRows.
142 numberOfCompleteRows++;
144 // check if we have a row change
145 if (!partialResults.isEmpty() &&
146 !Bytes.equals(partialResults.peek().getRow(), result.getRow())) {
147 regroupedResults.add(createCompletedResult());
149 Result regroupedResult = regroupResults(result);
150 if (regroupedResult != null) {
151 if (!regroupedResult.mayHaveMoreCellsInRow()) {
152 numberOfCompleteRows++;
154 regroupedResults.add(regroupedResult);
155 // only update last cell when we actually return it to user.
156 recordLastResult(regroupedResult);
158 if (!result.mayHaveMoreCellsInRow() && !partialResults.isEmpty()) {
159 // We are done for this row
160 regroupedResults.add(createCompletedResult());
163 return regroupedResults.toArray(new Result[0]);
166 @Override
167 public void clear() {
168 partialResults.clear();
169 numCellsOfPartialResults = 0;
172 @Override
173 public int numberOfCompleteRows() {
174 return numberOfCompleteRows;