2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.regionserver
;
20 import java
.util
.List
;
22 import org
.apache
.hadoop
.hbase
.Cell
;
23 import org
.apache
.hadoop
.hbase
.HBaseInterfaceAudience
;
24 import org
.apache
.yetus
.audience
.InterfaceAudience
;
25 import org
.apache
.yetus
.audience
.InterfaceStability
;
26 import org
.apache
.hadoop
.hbase
.client
.metrics
.ServerSideScanMetrics
;
29 * ScannerContext instances encapsulate limit tracking AND progress towards those limits during
30 * invocations of {@link InternalScanner#next(java.util.List)} and
31 * {@link RegionScanner#next(java.util.List)}.
33 * A ScannerContext instance should be updated periodically throughout execution whenever progress
34 * towards a limit has been made. Each limit can be checked via the appropriate checkLimit method.
36 * Once a limit has been reached, the scan will stop. The invoker of
37 * {@link InternalScanner#next(java.util.List)} or {@link RegionScanner#next(java.util.List)} can
38 * use the appropriate check*Limit methods to see exactly which limits have been reached.
39 * Alternatively, {@link #checkAnyLimitReached(LimitScope)} is provided to see if ANY limit was
42 * {@link NoLimitScannerContext#NO_LIMIT} is an immutable static definition that can be used
43 * whenever a {@link ScannerContext} is needed but limits do not need to be enforced.
45 * NOTE: It is important that this class only ever expose setter methods that can be safely skipped
46 * when limits should be NOT enforced. This is because of the necessary immutability of the class
47 * {@link NoLimitScannerContext}. If a setter cannot be safely skipped, the immutable nature of
48 * {@link NoLimitScannerContext} will lead to incorrect behavior.
50 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience
.COPROC
)
51 @InterfaceStability.Evolving
52 public class ScannerContext
{
56 * A different set of progress fields. Only include batch, dataSize and heapSize. Compare to
57 * LimitFields, ProgressFields doesn't contain time field. As we save a deadline in LimitFields,
58 * so use {@link System#currentTimeMillis()} directly when check time limit.
60 ProgressFields progress
;
63 * The state of the scanner after the invocation of {@link InternalScanner#next(java.util.List)}
64 * or {@link RegionScanner#next(java.util.List)}.
66 NextState scannerState
;
67 private static final NextState DEFAULT_STATE
= NextState
.MORE_VALUES
;
70 * Used as an indication to invocations of {@link InternalScanner#next(java.util.List)} and
71 * {@link RegionScanner#next(java.util.List)} that, if true, the progress tracked within this
72 * {@link ScannerContext} instance should be considered while evaluating the limits. Useful for
73 * enforcing a set of limits across multiple calls (i.e. the limit may not be reached in a single
74 * invocation, but any progress made should be considered in future invocations)
76 * Defaulting this value to false means that, by default, any tracked progress will be wiped clean
77 * on invocations to {@link InternalScanner#next(java.util.List)} and
78 * {@link RegionScanner#next(java.util.List)} and the call will be treated as though no progress
79 * has been made towards the limits so far.
81 * This is an important mechanism. Users of Internal/Region scanners expect that they can define
82 * some limits and then repeatedly invoke {@link InternalScanner#next(List)} or
83 * {@link RegionScanner#next(List)} where each invocation respects these limits separately.
85 * For example: <pre> {@code
86 * ScannerContext context = new ScannerContext.newBuilder().setBatchLimit(5).build();
87 * RegionScanner scanner = ...
88 * List<Cell> results = new ArrayList<Cell>();
89 * while(scanner.next(results, context)) {
90 * // Do something with a batch of 5 cells
92 * }</pre> However, in the case of RPCs, the server wants to be able to define a set of
93 * limits for a particular RPC request and have those limits respected across multiple
94 * invocations. This means that the progress made towards the limits in earlier calls will be
95 * saved and considered in future invocations
98 private static boolean DEFAULT_KEEP_PROGRESS
= false;
100 private Cell lastPeekedCell
= null;
102 // Set this to true will have the same behavior with reaching the time limit.
103 // This is used when you want to make the current RSRpcService.scan returns immediately. For
104 // example, when we want to switch from pread to stream, we can only do it after the rpc call is
106 private boolean returnImmediately
;
109 * Tracks the relevant server side metrics during scans. null when metrics should not be tracked
111 final ServerSideScanMetrics metrics
;
113 ScannerContext(boolean keepProgress
, LimitFields limitsToCopy
, boolean trackMetrics
) {
114 this.limits
= new LimitFields();
115 if (limitsToCopy
!= null) {
116 this.limits
.copy(limitsToCopy
);
119 // Progress fields are initialized to 0
120 progress
= new ProgressFields(0, 0, 0);
122 this.keepProgress
= keepProgress
;
123 this.scannerState
= DEFAULT_STATE
;
124 this.metrics
= trackMetrics ?
new ServerSideScanMetrics() : null;
127 public boolean isTrackingMetrics() {
128 return this.metrics
!= null;
132 * Get the metrics instance. Should only be called after a call to {@link #isTrackingMetrics()}
133 * has been made to confirm that metrics are indeed being tracked.
134 * @return {@link ServerSideScanMetrics} instance that is tracking metrics for this scan
136 public ServerSideScanMetrics
getMetrics() {
137 assert isTrackingMetrics();
142 * @return true if the progress tracked so far in this instance will be considered during an
143 * invocation of {@link InternalScanner#next(java.util.List)} or
144 * {@link RegionScanner#next(java.util.List)}. false when the progress tracked so far
145 * should not be considered and should instead be wiped away via {@link #clearProgress()}
147 boolean getKeepProgress() {
151 void setKeepProgress(boolean keepProgress
) {
152 this.keepProgress
= keepProgress
;
156 * Progress towards the batch limit has been made. Increment internal tracking of batch progress
158 void incrementBatchProgress(int batch
) {
159 int currentBatch
= progress
.getBatch();
160 progress
.setBatch(currentBatch
+ batch
);
164 * Progress towards the size limit has been made. Increment internal tracking of size progress
166 void incrementSizeProgress(long dataSize
, long heapSize
) {
167 long curDataSize
= progress
.getDataSize();
168 progress
.setDataSize(curDataSize
+ dataSize
);
169 long curHeapSize
= progress
.getHeapSize();
170 progress
.setHeapSize(curHeapSize
+ heapSize
);
173 int getBatchProgress() {
174 return progress
.getBatch();
177 long getDataSizeProgress() {
178 return progress
.getDataSize();
181 long getHeapSizeProgress() {
182 return progress
.getHeapSize();
185 void setProgress(int batchProgress
, long sizeProgress
, long heapSizeProgress
) {
186 setBatchProgress(batchProgress
);
187 setSizeProgress(sizeProgress
, heapSizeProgress
);
190 void setSizeProgress(long dataSizeProgress
, long heapSizeProgress
) {
191 progress
.setDataSize(dataSizeProgress
);
192 progress
.setHeapSize(heapSizeProgress
);
195 void setBatchProgress(int batchProgress
) {
196 progress
.setBatch(batchProgress
);
200 * Clear away any progress that has been made so far. All progress fields are reset to initial
203 void clearProgress() {
204 progress
.setFields(0, 0, 0);
208 * Note that this is not a typical setter. This setter returns the {@link NextState} that was
209 * passed in so that methods can be invoked against the new state. Furthermore, this pattern
210 * allows the {@link NoLimitScannerContext} to cleanly override this setter and simply return the
211 * new state, thus preserving the immutability of {@link NoLimitScannerContext}
213 * @return The state that was passed in.
215 NextState
setScannerState(NextState state
) {
216 if (!NextState
.isValidState(state
)) {
217 throw new IllegalArgumentException("Cannot set to invalid state: " + state
);
220 this.scannerState
= state
;
225 * @return true when we have more cells for the current row. This usually because we have reached
226 * a limit in the middle of a row
228 boolean mayHaveMoreCellsInRow() {
229 return scannerState
== NextState
.SIZE_LIMIT_REACHED_MID_ROW
||
230 scannerState
== NextState
.TIME_LIMIT_REACHED_MID_ROW
||
231 scannerState
== NextState
.BATCH_LIMIT_REACHED
;
235 * @param checkerScope
236 * @return true if the batch limit can be enforced in the checker's scope
238 boolean hasBatchLimit(LimitScope checkerScope
) {
239 return limits
.canEnforceBatchLimitFromScope(checkerScope
) && limits
.getBatch() > 0;
243 * @param checkerScope
244 * @return true if the size limit can be enforced in the checker's scope
246 boolean hasSizeLimit(LimitScope checkerScope
) {
247 return limits
.canEnforceSizeLimitFromScope(checkerScope
)
248 && (limits
.getDataSize() > 0 || limits
.getHeapSize() > 0);
252 * @param checkerScope
253 * @return true if the time limit can be enforced in the checker's scope
255 boolean hasTimeLimit(LimitScope checkerScope
) {
256 return limits
.canEnforceTimeLimitFromScope(checkerScope
) &&
257 (limits
.getTime() > 0 || returnImmediately
);
261 * @param checkerScope
262 * @return true if any limit can be enforced within the checker's scope
264 boolean hasAnyLimit(LimitScope checkerScope
) {
265 return hasBatchLimit(checkerScope
) || hasSizeLimit(checkerScope
) || hasTimeLimit(checkerScope
);
269 * @param scope The scope in which the size limit will be enforced
271 void setSizeLimitScope(LimitScope scope
) {
272 limits
.setSizeScope(scope
);
276 * @param scope The scope in which the time limit will be enforced
278 void setTimeLimitScope(LimitScope scope
) {
279 limits
.setTimeScope(scope
);
282 int getBatchLimit() {
283 return limits
.getBatch();
286 long getDataSizeLimit() {
287 return limits
.getDataSize();
290 long getTimeLimit() {
291 return limits
.getTime();
295 * @param checkerScope The scope that the limit is being checked from
296 * @return true when the limit is enforceable from the checker's scope and it has been reached
298 boolean checkBatchLimit(LimitScope checkerScope
) {
299 return hasBatchLimit(checkerScope
) && progress
.getBatch() >= limits
.getBatch();
303 * @param checkerScope The scope that the limit is being checked from
304 * @return true when the limit is enforceable from the checker's scope and it has been reached
306 boolean checkSizeLimit(LimitScope checkerScope
) {
307 return hasSizeLimit(checkerScope
) && (progress
.getDataSize() >= limits
.getDataSize()
308 || progress
.getHeapSize() >= limits
.getHeapSize());
312 * @param checkerScope The scope that the limit is being checked from. The time limit is always
313 * checked against {@link System#currentTimeMillis()}
314 * @return true when the limit is enforceable from the checker's scope and it has been reached
316 boolean checkTimeLimit(LimitScope checkerScope
) {
317 return hasTimeLimit(checkerScope
) &&
318 (returnImmediately
|| System
.currentTimeMillis() >= limits
.getTime());
322 * @param checkerScope The scope that the limits are being checked from
323 * @return true when some limit is enforceable from the checker's scope and it has been reached
325 boolean checkAnyLimitReached(LimitScope checkerScope
) {
326 return checkSizeLimit(checkerScope
) || checkBatchLimit(checkerScope
)
327 || checkTimeLimit(checkerScope
);
330 Cell
getLastPeekedCell() {
331 return lastPeekedCell
;
334 void setLastPeekedCell(Cell lastPeekedCell
) {
335 this.lastPeekedCell
= lastPeekedCell
;
338 void returnImmediately() {
339 this.returnImmediately
= true;
343 public String
toString() {
344 StringBuilder sb
= new StringBuilder();
347 sb
.append("limits:");
350 sb
.append(", progress:");
353 sb
.append(", keepProgress:");
354 sb
.append(keepProgress
);
356 sb
.append(", state:");
357 sb
.append(scannerState
);
360 return sb
.toString();
363 public static Builder
newBuilder() {
364 return new Builder();
367 public static Builder
newBuilder(boolean keepProgress
) {
368 return new Builder(keepProgress
);
371 public static final class Builder
{
372 boolean keepProgress
= DEFAULT_KEEP_PROGRESS
;
373 boolean trackMetrics
= false;
374 LimitFields limits
= new LimitFields();
379 private Builder(boolean keepProgress
) {
380 this.keepProgress
= keepProgress
;
383 public Builder
setKeepProgress(boolean keepProgress
) {
384 this.keepProgress
= keepProgress
;
388 public Builder
setTrackMetrics(boolean trackMetrics
) {
389 this.trackMetrics
= trackMetrics
;
393 public Builder
setSizeLimit(LimitScope sizeScope
, long dataSizeLimit
, long heapSizeLimit
) {
394 limits
.setDataSize(dataSizeLimit
);
395 limits
.setHeapSize(heapSizeLimit
);
396 limits
.setSizeScope(sizeScope
);
400 public Builder
setTimeLimit(LimitScope timeScope
, long timeLimit
) {
401 limits
.setTime(timeLimit
);
402 limits
.setTimeScope(timeScope
);
406 public Builder
setBatchLimit(int batchLimit
) {
407 limits
.setBatch(batchLimit
);
411 public ScannerContext
build() {
412 return new ScannerContext(keepProgress
, limits
, trackMetrics
);
417 * The possible states a scanner may be in following a call to {@link InternalScanner#next(List)}
419 public enum NextState
{
420 MORE_VALUES(true, false),
421 NO_MORE_VALUES(false, false),
422 SIZE_LIMIT_REACHED(true, true),
425 * Special case of size limit reached to indicate that the size limit was reached in the middle
426 * of a row and thus a partial results was formed
428 SIZE_LIMIT_REACHED_MID_ROW(true, true),
429 TIME_LIMIT_REACHED(true, true),
432 * Special case of time limit reached to indicate that the time limit was reached in the middle
433 * of a row and thus a partial results was formed
435 TIME_LIMIT_REACHED_MID_ROW(true, true),
436 BATCH_LIMIT_REACHED(true, true);
438 private final boolean moreValues
;
439 private final boolean limitReached
;
441 private NextState(boolean moreValues
, boolean limitReached
) {
442 this.moreValues
= moreValues
;
443 this.limitReached
= limitReached
;
447 * @return true when the state indicates that more values may follow those that have been
450 public boolean hasMoreValues() {
451 return this.moreValues
;
455 * @return true when the state indicates that a limit has been reached and scan should stop
457 public boolean limitReached() {
458 return this.limitReached
;
461 public static boolean isValidState(NextState state
) {
462 return state
!= null;
465 public static boolean hasMoreValues(NextState state
) {
466 return isValidState(state
) && state
.hasMoreValues();
471 * The various scopes where a limit can be enforced. Used to differentiate when a limit should be
474 public enum LimitScope
{
476 * Enforcing a limit between rows means that the limit will not be considered until all the
477 * cells for a particular row have been retrieved
482 * Enforcing a limit between cells means that the limit will be considered after each full cell
488 * When enforcing a limit, we must check that the scope is appropriate for enforcement.
490 * To communicate this concept, each scope has a depth. A limit will be enforced if the depth of
491 * the checker's scope is less than or equal to the limit's scope. This means that when checking
492 * limits, the checker must know their own scope (i.e. are they checking the limits between
493 * rows, between cells, etc...)
497 LimitScope(int depth
) {
506 * @param checkerScope The scope in which the limit is being checked
507 * @return true when the checker is in a scope that indicates the limit can be enforced. Limits
508 * can be enforced from "higher or equal" scopes (i.e. the checker's scope is at a
509 * lesser depth than the limit)
511 boolean canEnforceLimitFromScope(LimitScope checkerScope
) {
512 return checkerScope
!= null && checkerScope
.depth() <= depth
;
517 * The different fields that can be used as limits in calls to
518 * {@link InternalScanner#next(java.util.List)} and {@link RegionScanner#next(java.util.List)}
520 private static class LimitFields
{
522 * Default values of the limit fields. Defined such that if a field does NOT change from its
523 * default, it will not be enforced
525 private static int DEFAULT_BATCH
= -1;
526 private static long DEFAULT_SIZE
= -1L;
527 private static long DEFAULT_TIME
= -1L;
530 * Default scope that is assigned to a limit if a scope is not specified.
532 private static final LimitScope DEFAULT_SCOPE
= LimitScope
.BETWEEN_ROWS
;
534 // The batch limit will always be enforced between cells, thus, there isn't a field to hold the
536 int batch
= DEFAULT_BATCH
;
538 LimitScope sizeScope
= DEFAULT_SCOPE
;
539 // The sum of cell data sizes(key + value). The Cell data might be in on heap or off heap area.
540 long dataSize
= DEFAULT_SIZE
;
541 // The sum of heap space occupied by all tracked cells. This includes Cell POJO's overhead as
542 // such AND data cells of Cells which are in on heap area.
543 long heapSize
= DEFAULT_SIZE
;
545 LimitScope timeScope
= DEFAULT_SCOPE
;
546 long time
= DEFAULT_TIME
;
549 * Fields keep their default values.
554 void copy(LimitFields limitsToCopy
) {
555 if (limitsToCopy
!= null) {
556 setFields(limitsToCopy
.getBatch(), limitsToCopy
.getSizeScope(), limitsToCopy
.getDataSize(),
557 limitsToCopy
.getHeapSize(), limitsToCopy
.getTimeScope(), limitsToCopy
.getTime());
562 * Set all fields together.
564 void setFields(int batch
, LimitScope sizeScope
, long dataSize
, long heapSize
,
565 LimitScope timeScope
, long time
) {
567 setSizeScope(sizeScope
);
568 setDataSize(dataSize
);
569 setHeapSize(heapSize
);
570 setTimeScope(timeScope
);
578 void setBatch(int batch
) {
583 * @param checkerScope
584 * @return true when the limit can be enforced from the scope of the checker
586 boolean canEnforceBatchLimitFromScope(LimitScope checkerScope
) {
587 return LimitScope
.BETWEEN_CELLS
.canEnforceLimitFromScope(checkerScope
);
591 return this.dataSize
;
595 return this.heapSize
;
598 void setDataSize(long dataSize
) {
599 this.dataSize
= dataSize
;
602 void setHeapSize(long heapSize
) {
603 this.heapSize
= heapSize
;
607 * @return {@link LimitScope} indicating scope in which the size limit is enforced
609 LimitScope
getSizeScope() {
610 return this.sizeScope
;
614 * Change the scope in which the size limit is enforced
616 void setSizeScope(LimitScope scope
) {
617 this.sizeScope
= scope
;
621 * @param checkerScope
622 * @return true when the limit can be enforced from the scope of the checker
624 boolean canEnforceSizeLimitFromScope(LimitScope checkerScope
) {
625 return this.sizeScope
.canEnforceLimitFromScope(checkerScope
);
632 void setTime(long time
) {
637 * @return {@link LimitScope} indicating scope in which the time limit is enforced
639 LimitScope
getTimeScope() {
640 return this.timeScope
;
644 * Change the scope in which the time limit is enforced
646 void setTimeScope(LimitScope scope
) {
647 this.timeScope
= scope
;
651 * @param checkerScope
652 * @return true when the limit can be enforced from the scope of the checker
654 boolean canEnforceTimeLimitFromScope(LimitScope checkerScope
) {
655 return this.timeScope
.canEnforceLimitFromScope(checkerScope
);
659 public String
toString() {
660 StringBuilder sb
= new StringBuilder();
666 sb
.append(", dataSize:");
669 sb
.append(", heapSize:");
672 sb
.append(", sizeScope:");
673 sb
.append(sizeScope
);
675 sb
.append(", time:");
678 sb
.append(", timeScope:");
679 sb
.append(timeScope
);
682 return sb
.toString();
686 private static class ProgressFields
{
688 private static int DEFAULT_BATCH
= -1;
689 private static long DEFAULT_SIZE
= -1L;
691 // The batch limit will always be enforced between cells, thus, there isn't a field to hold the
693 int batch
= DEFAULT_BATCH
;
695 // The sum of cell data sizes(key + value). The Cell data might be in on heap or off heap area.
696 long dataSize
= DEFAULT_SIZE
;
697 // The sum of heap space occupied by all tracked cells. This includes Cell POJO's overhead as
698 // such AND data cells of Cells which are in on heap area.
699 long heapSize
= DEFAULT_SIZE
;
701 ProgressFields(int batch
, long size
, long heapSize
) {
702 setFields(batch
, size
, heapSize
);
706 * Set all fields together.
708 void setFields(int batch
, long dataSize
, long heapSize
) {
710 setDataSize(dataSize
);
711 setHeapSize(heapSize
);
718 void setBatch(int batch
) {
723 return this.dataSize
;
727 return this.heapSize
;
730 void setDataSize(long dataSize
) {
731 this.dataSize
= dataSize
;
734 void setHeapSize(long heapSize
) {
735 this.heapSize
= heapSize
;
739 public String
toString() {
740 StringBuilder sb
= new StringBuilder();
746 sb
.append(", dataSize:");
749 sb
.append(", heapSize:");
753 return sb
.toString();