3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
.regionserver
;
21 import java
.io
.IOException
;
22 import java
.io
.InterruptedIOException
;
23 import java
.net
.BindException
;
24 import java
.net
.InetSocketAddress
;
25 import java
.net
.UnknownHostException
;
26 import java
.nio
.ByteBuffer
;
27 import java
.util
.ArrayList
;
28 import java
.util
.Collections
;
29 import java
.util
.HashMap
;
30 import java
.util
.Iterator
;
31 import java
.util
.List
;
33 import java
.util
.Map
.Entry
;
34 import java
.util
.NavigableMap
;
36 import java
.util
.TreeSet
;
37 import java
.util
.concurrent
.ConcurrentHashMap
;
38 import java
.util
.concurrent
.atomic
.AtomicLong
;
40 import org
.apache
.commons
.logging
.Log
;
41 import org
.apache
.commons
.logging
.LogFactory
;
42 import org
.apache
.hadoop
.conf
.Configuration
;
43 import org
.apache
.hadoop
.hbase
.ByteBufferedCell
;
44 import org
.apache
.hadoop
.hbase
.Cell
;
45 import org
.apache
.hadoop
.hbase
.CellScannable
;
46 import org
.apache
.hadoop
.hbase
.CellScanner
;
47 import org
.apache
.hadoop
.hbase
.CellUtil
;
48 import org
.apache
.hadoop
.hbase
.DoNotRetryIOException
;
49 import org
.apache
.hadoop
.hbase
.DroppedSnapshotException
;
50 import org
.apache
.hadoop
.hbase
.HBaseIOException
;
51 import org
.apache
.hadoop
.hbase
.HConstants
;
52 import org
.apache
.hadoop
.hbase
.HRegionInfo
;
53 import org
.apache
.hadoop
.hbase
.HTableDescriptor
;
54 import org
.apache
.hadoop
.hbase
.MultiActionResultTooLarge
;
55 import org
.apache
.hadoop
.hbase
.NotServingRegionException
;
56 import org
.apache
.hadoop
.hbase
.ServerName
;
57 import org
.apache
.hadoop
.hbase
.TableName
;
58 import org
.apache
.hadoop
.hbase
.UnknownScannerException
;
59 import org
.apache
.hadoop
.hbase
.classification
.InterfaceAudience
;
60 import org
.apache
.hadoop
.hbase
.client
.Append
;
61 import org
.apache
.hadoop
.hbase
.client
.ConnectionUtils
;
62 import org
.apache
.hadoop
.hbase
.client
.Delete
;
63 import org
.apache
.hadoop
.hbase
.client
.Durability
;
64 import org
.apache
.hadoop
.hbase
.client
.Get
;
65 import org
.apache
.hadoop
.hbase
.client
.Increment
;
66 import org
.apache
.hadoop
.hbase
.client
.Mutation
;
67 import org
.apache
.hadoop
.hbase
.client
.Put
;
68 import org
.apache
.hadoop
.hbase
.client
.RegionReplicaUtil
;
69 import org
.apache
.hadoop
.hbase
.client
.Result
;
70 import org
.apache
.hadoop
.hbase
.client
.RowMutations
;
71 import org
.apache
.hadoop
.hbase
.client
.Scan
;
72 import org
.apache
.hadoop
.hbase
.client
.VersionInfoUtil
;
73 import org
.apache
.hadoop
.hbase
.conf
.ConfigurationObserver
;
74 import org
.apache
.hadoop
.hbase
.exceptions
.FailedSanityCheckException
;
75 import org
.apache
.hadoop
.hbase
.exceptions
.MergeRegionException
;
76 import org
.apache
.hadoop
.hbase
.exceptions
.OperationConflictException
;
77 import org
.apache
.hadoop
.hbase
.exceptions
.OutOfOrderScannerNextException
;
78 import org
.apache
.hadoop
.hbase
.filter
.ByteArrayComparable
;
79 import org
.apache
.hadoop
.hbase
.filter
.CompareFilter
.CompareOp
;
80 import org
.apache
.hadoop
.hbase
.ipc
.HBaseRPCErrorHandler
;
81 import org
.apache
.hadoop
.hbase
.ipc
.PayloadCarryingRpcController
;
82 import org
.apache
.hadoop
.hbase
.ipc
.PriorityFunction
;
83 import org
.apache
.hadoop
.hbase
.ipc
.QosPriority
;
84 import org
.apache
.hadoop
.hbase
.ipc
.RpcCallContext
;
85 import org
.apache
.hadoop
.hbase
.ipc
.RpcCallback
;
86 import org
.apache
.hadoop
.hbase
.ipc
.RpcServer
;
87 import org
.apache
.hadoop
.hbase
.ipc
.RpcServer
.BlockingServiceAndInterface
;
88 import org
.apache
.hadoop
.hbase
.ipc
.RpcServerInterface
;
89 import org
.apache
.hadoop
.hbase
.ipc
.ServerNotRunningYetException
;
90 import org
.apache
.hadoop
.hbase
.ipc
.ServerRpcController
;
91 import org
.apache
.hadoop
.hbase
.ipc
.TimeLimitedRpcController
;
92 import org
.apache
.hadoop
.hbase
.master
.MasterRpcServices
;
93 import org
.apache
.hadoop
.hbase
.protobuf
.ProtobufUtil
;
94 import org
.apache
.hadoop
.hbase
.protobuf
.RequestConverter
;
95 import org
.apache
.hadoop
.hbase
.protobuf
.ResponseConverter
;
96 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.AdminProtos
.AdminService
;
97 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.AdminProtos
.CloseRegionRequest
;
98 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.AdminProtos
.CloseRegionResponse
;
99 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.AdminProtos
.CompactRegionRequest
;
100 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.AdminProtos
.CompactRegionResponse
;
101 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.AdminProtos
.FlushRegionRequest
;
102 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.AdminProtos
.FlushRegionResponse
;
103 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.AdminProtos
.GetOnlineRegionRequest
;
104 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.AdminProtos
.GetOnlineRegionResponse
;
105 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.AdminProtos
.GetRegionInfoRequest
;
106 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.AdminProtos
.GetRegionInfoResponse
;
107 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.AdminProtos
.GetServerInfoRequest
;
108 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.AdminProtos
.GetServerInfoResponse
;
109 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.AdminProtos
.GetStoreFileRequest
;
110 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.AdminProtos
.GetStoreFileResponse
;
111 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.AdminProtos
.MergeRegionsRequest
;
112 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.AdminProtos
.MergeRegionsResponse
;
113 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.AdminProtos
.OpenRegionRequest
;
114 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.AdminProtos
.OpenRegionRequest
.RegionOpenInfo
;
115 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.AdminProtos
.OpenRegionResponse
;
116 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.AdminProtos
.OpenRegionResponse
.RegionOpeningState
;
117 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.AdminProtos
.ReplicateWALEntryRequest
;
118 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.AdminProtos
.ReplicateWALEntryResponse
;
119 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.AdminProtos
.RollWALWriterRequest
;
120 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.AdminProtos
.RollWALWriterResponse
;
121 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.AdminProtos
.SplitRegionRequest
;
122 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.AdminProtos
.SplitRegionResponse
;
123 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.AdminProtos
.StopServerRequest
;
124 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.AdminProtos
.StopServerResponse
;
125 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.AdminProtos
.UpdateConfigurationRequest
;
126 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.AdminProtos
.UpdateConfigurationResponse
;
127 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.AdminProtos
.UpdateFavoredNodesRequest
;
128 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.AdminProtos
.UpdateFavoredNodesResponse
;
129 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.AdminProtos
.WALEntry
;
130 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.AdminProtos
.WarmupRegionRequest
;
131 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.AdminProtos
.WarmupRegionResponse
;
132 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.ClientProtos
;
133 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.ClientProtos
.Action
;
134 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.ClientProtos
.BulkLoadHFileRequest
;
135 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.ClientProtos
.BulkLoadHFileRequest
.FamilyPath
;
136 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.ClientProtos
.BulkLoadHFileResponse
;
137 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.ClientProtos
.ClientService
;
138 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.ClientProtos
.Condition
;
139 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.ClientProtos
.CoprocessorServiceRequest
;
140 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.ClientProtos
.CoprocessorServiceResponse
;
141 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.ClientProtos
.GetRequest
;
142 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.ClientProtos
.GetResponse
;
143 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.ClientProtos
.MultiRegionLoadStats
;
144 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.ClientProtos
.MultiRequest
;
145 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.ClientProtos
.MultiResponse
;
146 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.ClientProtos
.MutateRequest
;
147 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.ClientProtos
.MutateResponse
;
148 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.ClientProtos
.MutationProto
;
149 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.ClientProtos
.MutationProto
.MutationType
;
150 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.ClientProtos
.RegionAction
;
151 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.ClientProtos
.RegionActionResult
;
152 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.ClientProtos
.ResultOrException
;
153 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.ClientProtos
.ScanRequest
;
154 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.ClientProtos
.ScanResponse
;
155 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.HBaseProtos
.NameInt64Pair
;
156 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.HBaseProtos
.RegionInfo
;
157 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.HBaseProtos
.RegionSpecifier
;
158 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.HBaseProtos
.RegionSpecifier
.RegionSpecifierType
;
159 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.MapReduceProtos
.ScanMetrics
;
160 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.RPCProtos
.RequestHeader
;
161 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.WALProtos
.BulkLoadDescriptor
;
162 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.WALProtos
.CompactionDescriptor
;
163 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.WALProtos
.FlushDescriptor
;
164 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.WALProtos
.RegionEventDescriptor
;
165 import org
.apache
.hadoop
.hbase
.quotas
.OperationQuota
;
166 import org
.apache
.hadoop
.hbase
.quotas
.RegionServerQuotaManager
;
167 import org
.apache
.hadoop
.hbase
.regionserver
.HRegion
.RegionScannerImpl
;
168 import org
.apache
.hadoop
.hbase
.regionserver
.Leases
.Lease
;
169 import org
.apache
.hadoop
.hbase
.regionserver
.Leases
.LeaseStillHeldException
;
170 import org
.apache
.hadoop
.hbase
.regionserver
.Region
.Operation
;
171 import org
.apache
.hadoop
.hbase
.regionserver
.ScannerContext
.LimitScope
;
172 import org
.apache
.hadoop
.hbase
.regionserver
.handler
.OpenMetaHandler
;
173 import org
.apache
.hadoop
.hbase
.regionserver
.handler
.OpenRegionHandler
;
174 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.WALEdit
;
175 import org
.apache
.hadoop
.hbase
.security
.User
;
176 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
177 import org
.apache
.hadoop
.hbase
.util
.Counter
;
178 import org
.apache
.hadoop
.hbase
.util
.DNS
;
179 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
180 import org
.apache
.hadoop
.hbase
.util
.Pair
;
181 import org
.apache
.hadoop
.hbase
.util
.ServerRegionReplicaUtil
;
182 import org
.apache
.hadoop
.hbase
.util
.Strings
;
183 import org
.apache
.hadoop
.hbase
.wal
.WAL
;
184 import org
.apache
.hadoop
.hbase
.wal
.WALKey
;
185 import org
.apache
.hadoop
.hbase
.wal
.WALSplitter
;
186 import org
.apache
.hadoop
.hbase
.zookeeper
.ZKSplitLog
;
187 import org
.apache
.zookeeper
.KeeperException
;
189 import com
.google
.common
.annotations
.VisibleForTesting
;
190 import com
.google
.protobuf
.ByteString
;
191 import com
.google
.protobuf
.Message
;
192 import com
.google
.protobuf
.RpcController
;
193 import com
.google
.protobuf
.ServiceException
;
194 import com
.google
.protobuf
.TextFormat
;
197 * Implements the regionserver RPC services.
199 @InterfaceAudience.Private
200 @SuppressWarnings("deprecation")
201 public class RSRpcServices
implements HBaseRPCErrorHandler
,
202 AdminService
.BlockingInterface
, ClientService
.BlockingInterface
, PriorityFunction
,
203 ConfigurationObserver
{
204 protected static final Log LOG
= LogFactory
.getLog(RSRpcServices
.class);
206 /** RPC scheduler to use for the region server. */
207 public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS
=
208 "hbase.region.server.rpc.scheduler.factory.class";
211 * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This
212 * configuration exists to prevent the scenario where a time limit is specified to be so
213 * restrictive that the time limit is reached immediately (before any cells are scanned).
215 private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA
=
216 "hbase.region.server.rpc.minimum.scan.time.limit.delta";
218 * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA}
220 private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA
= 10;
222 // Request counter. (Includes requests that are not serviced by regions.)
223 final Counter requestCount
= new Counter();
225 // Request counter for rpc get
226 final Counter rpcGetRequestCount
= new Counter();
228 // Request counter for rpc scan
229 final Counter rpcScanRequestCount
= new Counter();
231 // Request counter for rpc multi
232 final Counter rpcMultiRequestCount
= new Counter();
234 // Request counter for rpc mutate
235 final Counter rpcMutateRequestCount
= new Counter();
237 // Server to handle client requests.
238 final RpcServerInterface rpcServer
;
239 final InetSocketAddress isa
;
241 private final HRegionServer regionServer
;
242 private final long maxScannerResultSize
;
244 // The reference to the priority extraction function
245 private final PriorityFunction priority
;
247 private final AtomicLong scannerIdGen
= new AtomicLong(0L);
248 private final ConcurrentHashMap
<String
, RegionScannerHolder
> scanners
=
249 new ConcurrentHashMap
<String
, RegionScannerHolder
>();
252 * The lease timeout period for client scanners (milliseconds).
254 private final int scannerLeaseTimeoutPeriod
;
257 * The RPC timeout period (milliseconds)
259 private final int rpcTimeout
;
262 * The minimum allowable delta to use for the scan limit
264 private final long minimumScanTimeLimitDelta
;
267 * An Rpc callback for closing a RegionScanner.
269 static class RegionScannerCloseCallBack
implements RpcCallback
{
271 private final RegionScanner scanner
;
273 public RegionScannerCloseCallBack(RegionScanner scanner
){
274 this.scanner
= scanner
;
278 public void run() throws IOException
{
279 this.scanner
.close();
284 * An Rpc callback for doing shipped() call on a RegionScanner.
286 private class RegionScannerShippedCallBack
implements RpcCallback
{
288 private final String scannerName
;
289 private final RegionScanner scanner
;
290 private final Lease lease
;
292 public RegionScannerShippedCallBack(String scannerName
, RegionScanner scanner
, Lease lease
) {
293 this.scannerName
= scannerName
;
294 this.scanner
= scanner
;
299 public void run() throws IOException
{
300 this.scanner
.shipped();
301 // We're done. On way out re-add the above removed lease. The lease was temp removed for this
302 // Rpc call and we are at end of the call now. Time to add it back.
303 if (scanners
.containsKey(scannerName
)) {
304 if (lease
!= null) regionServer
.leases
.addLease(lease
);
310 * An RpcCallBack that creates a list of scanners that needs to perform callBack operation on
311 * completion of multiGets.
313 static class RegionScannersCloseCallBack
implements RpcCallback
{
314 private final List
<RegionScanner
> scanners
= new ArrayList
<RegionScanner
>();
316 public void addScanner(RegionScanner scanner
) {
317 this.scanners
.add(scanner
);
322 for (RegionScanner scanner
: scanners
) {
325 } catch (IOException e
) {
326 LOG
.error("Exception while closing the scanner " + scanner
, e
);
333 * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together.
335 private static class RegionScannerHolder
{
336 private AtomicLong nextCallSeq
= new AtomicLong(0);
337 private RegionScanner s
;
339 final RpcCallback closeCallBack
;
340 final RpcCallback shippedCallback
;
342 public RegionScannerHolder(RegionScanner s
, Region r
, RpcCallback closeCallBack
,
343 RpcCallback shippedCallback
) {
346 this.closeCallBack
= closeCallBack
;
347 this.shippedCallback
= shippedCallback
;
350 private long getNextCallSeq() {
351 return nextCallSeq
.get();
354 private void incNextCallSeq() {
355 nextCallSeq
.incrementAndGet();
358 private void rollbackNextCallSeq() {
359 nextCallSeq
.decrementAndGet();
364 * Instantiated as a scanner lease. If the lease times out, the scanner is
367 private class ScannerListener
implements LeaseListener
{
368 private final String scannerName
;
370 ScannerListener(final String n
) {
371 this.scannerName
= n
;
375 public void leaseExpired() {
376 RegionScannerHolder rsh
= scanners
.remove(this.scannerName
);
378 RegionScanner s
= rsh
.s
;
379 LOG
.info("Scanner " + this.scannerName
+ " lease expired on region "
380 + s
.getRegionInfo().getRegionNameAsString());
381 Region region
= null;
383 region
= regionServer
.getRegion(s
.getRegionInfo().getRegionName());
384 if (region
!= null && region
.getCoprocessorHost() != null) {
385 region
.getCoprocessorHost().preScannerClose(s
);
387 } catch (IOException e
) {
388 LOG
.error("Closing scanner for " + s
.getRegionInfo().getRegionNameAsString(), e
);
392 if (region
!= null && region
.getCoprocessorHost() != null) {
393 region
.getCoprocessorHost().postScannerClose(s
);
395 } catch (IOException e
) {
396 LOG
.error("Closing scanner for " + s
.getRegionInfo().getRegionNameAsString(), e
);
400 LOG
.warn("Scanner " + this.scannerName
+ " lease expired, but no related" +
401 " scanner found, hence no chance to close that related scanner!");
406 private static ResultOrException
getResultOrException(final ClientProtos
.Result r
,
408 return getResultOrException(ResponseConverter
.buildActionResult(r
), index
);
411 private static ResultOrException
getResultOrException(final Exception e
, final int index
) {
412 return getResultOrException(ResponseConverter
.buildActionResult(e
), index
);
415 private static ResultOrException
getResultOrException(
416 final ResultOrException
.Builder builder
, final int index
) {
417 return builder
.setIndex(index
).build();
421 * Starts the nonce operation for a mutation, if needed.
422 * @param mutation Mutation.
423 * @param nonceGroup Nonce group from the request.
424 * @returns Nonce used (can be NO_NONCE).
426 private long startNonceOperation(final MutationProto mutation
, long nonceGroup
)
427 throws IOException
, OperationConflictException
{
428 if (regionServer
.nonceManager
== null || !mutation
.hasNonce()) return HConstants
.NO_NONCE
;
429 boolean canProceed
= false;
431 canProceed
= regionServer
.nonceManager
.startOperation(
432 nonceGroup
, mutation
.getNonce(), regionServer
);
433 } catch (InterruptedException ex
) {
434 throw new InterruptedIOException("Nonce start operation interrupted");
437 // TODO: instead, we could convert append/increment to get w/mvcc
438 String message
= "The operation with nonce {" + nonceGroup
+ ", " + mutation
.getNonce()
439 + "} on row [" + Bytes
.toString(mutation
.getRow().toByteArray())
440 + "] may have already completed";
441 throw new OperationConflictException(message
);
443 return mutation
.getNonce();
447 * Ends nonce operation for a mutation, if needed.
448 * @param mutation Mutation.
449 * @param nonceGroup Nonce group from the request. Always 0 in initial implementation.
450 * @param success Whether the operation for this nonce has succeeded.
452 private void endNonceOperation(final MutationProto mutation
,
453 long nonceGroup
, boolean success
) {
454 if (regionServer
.nonceManager
!= null && mutation
.hasNonce()) {
455 regionServer
.nonceManager
.endOperation(nonceGroup
, mutation
.getNonce(), success
);
460 * @return True if current call supports cellblocks
462 private boolean isClientCellBlockSupport() {
463 RpcCallContext context
= RpcServer
.getCurrentCall();
464 return context
!= null && context
.isClientCellBlockSupported();
467 private boolean isClientCellBlockSupport(RpcCallContext context
) {
468 return context
!= null && context
.isClientCellBlockSupported();
471 private void addResult(final MutateResponse
.Builder builder
, final Result result
,
472 final PayloadCarryingRpcController rpcc
) {
473 if (result
== null) return;
474 if (isClientCellBlockSupport()) {
475 builder
.setResult(ProtobufUtil
.toResultNoData(result
));
476 rpcc
.setCellScanner(result
.cellScanner());
478 ClientProtos
.Result pbr
= ProtobufUtil
.toResult(result
);
479 builder
.setResult(pbr
);
483 private void addResults(final ScanResponse
.Builder builder
, final List
<Result
> results
,
484 final RpcController controller
, boolean isDefaultRegion
, boolean clientCellBlockSupported
) {
485 builder
.setStale(!isDefaultRegion
);
486 if (results
== null || results
.isEmpty()) return;
487 if (clientCellBlockSupported
) {
488 for (Result res
: results
) {
489 builder
.addCellsPerResult(res
.size());
490 builder
.addPartialFlagPerResult(res
.isPartial());
492 ((PayloadCarryingRpcController
)controller
).
493 setCellScanner(CellUtil
.createCellScanner(results
));
495 for (Result res
: results
) {
496 ClientProtos
.Result pbr
= ProtobufUtil
.toResult(res
);
497 builder
.addResults(pbr
);
503 * Mutate a list of rows atomically.
507 * @param cellScanner if non-null, the mutation data -- the Cell content.
508 * @throws IOException
510 private void mutateRows(final Region region
,
511 final List
<ClientProtos
.Action
> actions
,
512 final CellScanner cellScanner
, RegionActionResult
.Builder builder
) throws IOException
{
513 if (!region
.getRegionInfo().isMetaTable()) {
514 regionServer
.cacheFlusher
.reclaimMemStoreMemory();
516 RowMutations rm
= null;
518 ClientProtos
.ResultOrException
.Builder resultOrExceptionOrBuilder
=
519 ClientProtos
.ResultOrException
.newBuilder();
520 for (ClientProtos
.Action action
: actions
) {
521 if (action
.hasGet()) {
522 throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
525 MutationType type
= action
.getMutation().getMutateType();
527 rm
= new RowMutations(action
.getMutation().getRow().toByteArray());
531 rm
.add(ProtobufUtil
.toPut(action
.getMutation(), cellScanner
));
534 rm
.add(ProtobufUtil
.toDelete(action
.getMutation(), cellScanner
));
537 throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type
.name());
539 // To unify the response format with doNonAtomicRegionMutation and read through client's
540 // AsyncProcess we have to add an empty result instance per operation
541 resultOrExceptionOrBuilder
.clear();
542 resultOrExceptionOrBuilder
.setIndex(i
++);
543 builder
.addResultOrException(
544 resultOrExceptionOrBuilder
.build());
546 region
.mutateRow(rm
);
550 * Mutate a list of rows atomically.
554 * @param cellScanner if non-null, the mutation data -- the Cell content.
559 * @param comparator @throws IOException
561 private boolean checkAndRowMutate(final Region region
, final List
<ClientProtos
.Action
> actions
,
562 final CellScanner cellScanner
, byte[] row
, byte[] family
, byte[] qualifier
,
563 CompareOp compareOp
, ByteArrayComparable comparator
,
564 RegionActionResult
.Builder builder
) throws IOException
{
565 if (!region
.getRegionInfo().isMetaTable()) {
566 regionServer
.cacheFlusher
.reclaimMemStoreMemory();
568 RowMutations rm
= null;
570 ClientProtos
.ResultOrException
.Builder resultOrExceptionOrBuilder
=
571 ClientProtos
.ResultOrException
.newBuilder();
572 for (ClientProtos
.Action action
: actions
) {
573 if (action
.hasGet()) {
574 throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
577 MutationType type
= action
.getMutation().getMutateType();
579 rm
= new RowMutations(action
.getMutation().getRow().toByteArray());
583 rm
.add(ProtobufUtil
.toPut(action
.getMutation(), cellScanner
));
586 rm
.add(ProtobufUtil
.toDelete(action
.getMutation(), cellScanner
));
589 throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type
.name());
591 // To unify the response format with doNonAtomicRegionMutation and read through client's
592 // AsyncProcess we have to add an empty result instance per operation
593 resultOrExceptionOrBuilder
.clear();
594 resultOrExceptionOrBuilder
.setIndex(i
++);
595 builder
.addResultOrException(
596 resultOrExceptionOrBuilder
.build());
598 return region
.checkAndRowMutate(row
, family
, qualifier
, compareOp
,
599 comparator
, rm
, Boolean
.TRUE
);
603 * Execute an append mutation.
608 * @return result to return to client if default operation should be
609 * bypassed as indicated by RegionObserver, null otherwise
610 * @throws IOException
612 private Result
append(final Region region
, final OperationQuota quota
, final MutationProto m
,
613 final CellScanner cellScanner
, long nonceGroup
) throws IOException
{
614 long before
= EnvironmentEdgeManager
.currentTime();
615 Append append
= ProtobufUtil
.toAppend(m
, cellScanner
);
616 quota
.addMutation(append
);
618 if (region
.getCoprocessorHost() != null) {
619 r
= region
.getCoprocessorHost().preAppend(append
);
622 long nonce
= startNonceOperation(m
, nonceGroup
);
623 boolean success
= false;
625 r
= region
.append(append
, nonceGroup
, nonce
);
628 endNonceOperation(m
, nonceGroup
, success
);
630 if (region
.getCoprocessorHost() != null) {
631 region
.getCoprocessorHost().postAppend(append
, r
);
634 if (regionServer
.metricsRegionServer
!= null) {
635 regionServer
.metricsRegionServer
.updateAppend(
636 EnvironmentEdgeManager
.currentTime() - before
);
642 * Execute an increment mutation.
647 * @throws IOException
649 private Result
increment(final Region region
, final OperationQuota quota
,
650 final MutationProto mutation
, final CellScanner cells
, long nonceGroup
)
652 long before
= EnvironmentEdgeManager
.currentTime();
653 Increment increment
= ProtobufUtil
.toIncrement(mutation
, cells
);
654 quota
.addMutation(increment
);
656 if (region
.getCoprocessorHost() != null) {
657 r
= region
.getCoprocessorHost().preIncrement(increment
);
660 long nonce
= startNonceOperation(mutation
, nonceGroup
);
661 boolean success
= false;
663 r
= region
.increment(increment
, nonceGroup
, nonce
);
666 endNonceOperation(mutation
, nonceGroup
, success
);
668 if (region
.getCoprocessorHost() != null) {
669 r
= region
.getCoprocessorHost().postIncrement(increment
, r
);
672 if (regionServer
.metricsRegionServer
!= null) {
673 regionServer
.metricsRegionServer
.updateIncrement(
674 EnvironmentEdgeManager
.currentTime() - before
);
680 * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when
681 * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation.
686 * @param cellsToReturn Could be null. May be allocated in this method. This is what this
687 * method returns as a 'result'.
688 * @param closeCallBack the callback to be used with multigets
689 * @param context the current RpcCallContext
690 * @return Return the <code>cellScanner</code> passed
692 private List
<CellScannable
> doNonAtomicRegionMutation(final Region region
,
693 final OperationQuota quota
, final RegionAction actions
, final CellScanner cellScanner
,
694 final RegionActionResult
.Builder builder
, List
<CellScannable
> cellsToReturn
, long nonceGroup
,
695 final RegionScannersCloseCallBack closeCallBack
, RpcCallContext context
) {
696 // Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do
697 // one at a time, we instead pass them in batch. Be aware that the corresponding
698 // ResultOrException instance that matches each Put or Delete is then added down in the
699 // doBatchOp call. We should be staying aligned though the Put and Delete are deferred/batched
700 List
<ClientProtos
.Action
> mutations
= null;
701 long maxQuotaResultSize
= Math
.min(maxScannerResultSize
, quota
.getReadAvailable());
702 IOException sizeIOE
= null;
703 Object lastBlock
= null;
704 for (ClientProtos
.Action action
: actions
.getActionList()) {
705 ClientProtos
.ResultOrException
.Builder resultOrExceptionBuilder
= null;
710 && context
.isRetryImmediatelySupported()
711 && (context
.getResponseCellSize() > maxQuotaResultSize
712 || context
.getResponseBlockSize() > maxQuotaResultSize
)) {
714 // We're storing the exception since the exception and reason string won't
715 // change after the response size limit is reached.
716 if (sizeIOE
== null ) {
717 // We don't need the stack un-winding do don't throw the exception.
718 // Throwing will kill the JVM's JIT.
720 // Instead just create the exception and then store it.
721 sizeIOE
= new MultiActionResultTooLarge("Max size exceeded"
722 + " CellSize: " + context
.getResponseCellSize()
723 + " BlockSize: " + context
.getResponseBlockSize());
725 // Only report the exception once since there's only one request that
726 // caused the exception. Otherwise this number will dominate the exceptions count.
727 rpcServer
.getMetrics().exception(sizeIOE
);
730 // Now that there's an exception is known to be created
731 // use it for the response.
733 // This will create a copy in the builder.
734 resultOrExceptionBuilder
= ResultOrException
.newBuilder().
735 setException(ResponseConverter
.buildException(sizeIOE
));
736 resultOrExceptionBuilder
.setIndex(action
.getIndex());
737 builder
.addResultOrException(resultOrExceptionBuilder
.build());
738 if (cellScanner
!= null) {
739 skipCellsForMutation(action
, cellScanner
);
743 if (action
.hasGet()) {
744 long before
= EnvironmentEdgeManager
.currentTime();
746 Get get
= ProtobufUtil
.toGet(action
.getGet());
747 if (context
!= null) {
748 r
= get(get
, ((HRegion
) region
), closeCallBack
, context
);
753 if (regionServer
.metricsRegionServer
!= null) {
754 regionServer
.metricsRegionServer
.updateGet(
755 EnvironmentEdgeManager
.currentTime() - before
);
758 } else if (action
.hasServiceCall()) {
759 resultOrExceptionBuilder
= ResultOrException
.newBuilder();
761 Message result
= execServiceOnRegion(region
, action
.getServiceCall());
762 ClientProtos
.CoprocessorServiceResult
.Builder serviceResultBuilder
=
763 ClientProtos
.CoprocessorServiceResult
.newBuilder();
764 resultOrExceptionBuilder
.setServiceResult(
765 serviceResultBuilder
.setValue(
766 serviceResultBuilder
.getValueBuilder()
767 .setName(result
.getClass().getName())
768 .setValue(result
.toByteString())));
769 } catch (IOException ioe
) {
770 rpcServer
.getMetrics().exception(ioe
);
771 resultOrExceptionBuilder
.setException(ResponseConverter
.buildException(ioe
));
773 } else if (action
.hasMutation()) {
774 MutationType type
= action
.getMutation().getMutateType();
775 if (type
!= MutationType
.PUT
&& type
!= MutationType
.DELETE
&& mutations
!= null &&
776 !mutations
.isEmpty()) {
777 // Flush out any Puts or Deletes already collected.
778 doBatchOp(builder
, region
, quota
, mutations
, cellScanner
);
783 r
= append(region
, quota
, action
.getMutation(), cellScanner
, nonceGroup
);
786 r
= increment(region
, quota
, action
.getMutation(), cellScanner
, nonceGroup
);
790 // Collect the individual mutations and apply in a batch
791 if (mutations
== null) {
792 mutations
= new ArrayList
<ClientProtos
.Action
>(actions
.getActionCount());
794 mutations
.add(action
);
797 throw new DoNotRetryIOException("Unsupported mutate type: " + type
.name());
800 throw new HBaseIOException("Unexpected Action type");
803 ClientProtos
.Result pbResult
= null;
804 if (isClientCellBlockSupport(context
)) {
805 pbResult
= ProtobufUtil
.toResultNoData(r
);
806 // Hard to guess the size here. Just make a rough guess.
807 if (cellsToReturn
== null) {
808 cellsToReturn
= new ArrayList
<CellScannable
>();
810 cellsToReturn
.add(r
);
812 pbResult
= ProtobufUtil
.toResult(r
);
814 lastBlock
= addSize(context
, r
, lastBlock
);
815 resultOrExceptionBuilder
=
816 ClientProtos
.ResultOrException
.newBuilder().setResult(pbResult
);
818 // Could get to here and there was no result and no exception. Presumes we added
819 // a Put or Delete to the collecting Mutations List for adding later. In this
820 // case the corresponding ResultOrException instance for the Put or Delete will be added
821 // down in the doBatchOp method call rather than up here.
822 } catch (IOException ie
) {
823 rpcServer
.getMetrics().exception(ie
);
824 resultOrExceptionBuilder
= ResultOrException
.newBuilder().
825 setException(ResponseConverter
.buildException(ie
));
827 if (resultOrExceptionBuilder
!= null) {
829 resultOrExceptionBuilder
.setIndex(action
.getIndex());
830 builder
.addResultOrException(resultOrExceptionBuilder
.build());
833 // Finish up any outstanding mutations
834 if (mutations
!= null && !mutations
.isEmpty()) {
835 doBatchOp(builder
, region
, quota
, mutations
, cellScanner
);
837 return cellsToReturn
;
841 * Execute a list of Put/Delete mutations.
847 private void doBatchOp(final RegionActionResult
.Builder builder
, final Region region
,
848 final OperationQuota quota
, final List
<ClientProtos
.Action
> mutations
,
849 final CellScanner cells
) {
850 Mutation
[] mArray
= new Mutation
[mutations
.size()];
851 long before
= EnvironmentEdgeManager
.currentTime();
852 boolean batchContainsPuts
= false, batchContainsDelete
= false;
855 for (ClientProtos
.Action action
: mutations
) {
856 MutationProto m
= action
.getMutation();
858 if (m
.getMutateType() == MutationType
.PUT
) {
859 mutation
= ProtobufUtil
.toPut(m
, cells
);
860 batchContainsPuts
= true;
862 mutation
= ProtobufUtil
.toDelete(m
, cells
);
863 batchContainsDelete
= true;
865 mArray
[i
++] = mutation
;
866 quota
.addMutation(mutation
);
869 if (!region
.getRegionInfo().isMetaTable()) {
870 regionServer
.cacheFlusher
.reclaimMemStoreMemory();
873 OperationStatus
[] codes
= region
.batchMutate(mArray
, HConstants
.NO_NONCE
,
874 HConstants
.NO_NONCE
);
875 for (i
= 0; i
< codes
.length
; i
++) {
876 int index
= mutations
.get(i
).getIndex();
878 switch (codes
[i
].getOperationStatusCode()) {
880 e
= new NoSuchColumnFamilyException(codes
[i
].getExceptionMsg());
881 builder
.addResultOrException(getResultOrException(e
, index
));
884 case SANITY_CHECK_FAILURE
:
885 e
= new FailedSanityCheckException(codes
[i
].getExceptionMsg());
886 builder
.addResultOrException(getResultOrException(e
, index
));
890 e
= new DoNotRetryIOException(codes
[i
].getExceptionMsg());
891 builder
.addResultOrException(getResultOrException(e
, index
));
895 builder
.addResultOrException(getResultOrException(
896 ClientProtos
.Result
.getDefaultInstance(), index
));
900 } catch (IOException ie
) {
901 for (int i
= 0; i
< mutations
.size(); i
++) {
902 builder
.addResultOrException(getResultOrException(ie
, mutations
.get(i
).getIndex()));
905 if (regionServer
.metricsRegionServer
!= null) {
906 long after
= EnvironmentEdgeManager
.currentTime();
907 if (batchContainsPuts
) {
908 regionServer
.metricsRegionServer
.updatePut(after
- before
);
910 if (batchContainsDelete
) {
911 regionServer
.metricsRegionServer
.updateDelete(after
- before
);
917 * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of
918 * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse.
922 * @return an array of OperationStatus which internally contains the OperationStatusCode and the
923 * exceptionMessage if any
924 * @throws IOException
926 private OperationStatus
[] doReplayBatchOp(final Region region
,
927 final List
<WALSplitter
.MutationReplay
> mutations
, long replaySeqId
) throws IOException
{
928 long before
= EnvironmentEdgeManager
.currentTime();
929 boolean batchContainsPuts
= false, batchContainsDelete
= false;
931 for (Iterator
<WALSplitter
.MutationReplay
> it
= mutations
.iterator(); it
.hasNext();) {
932 WALSplitter
.MutationReplay m
= it
.next();
934 if (m
.type
== MutationType
.PUT
) {
935 batchContainsPuts
= true;
937 batchContainsDelete
= true;
940 NavigableMap
<byte[], List
<Cell
>> map
= m
.mutation
.getFamilyCellMap();
941 List
<Cell
> metaCells
= map
.get(WALEdit
.METAFAMILY
);
942 if (metaCells
!= null && !metaCells
.isEmpty()) {
943 for (Cell metaCell
: metaCells
) {
944 CompactionDescriptor compactionDesc
= WALEdit
.getCompaction(metaCell
);
945 boolean isDefaultReplica
= RegionReplicaUtil
.isDefaultReplica(region
.getRegionInfo());
946 HRegion hRegion
= (HRegion
)region
;
947 if (compactionDesc
!= null) {
948 // replay the compaction. Remove the files from stores only if we are the primary
949 // region replica (thus own the files)
950 hRegion
.replayWALCompactionMarker(compactionDesc
, !isDefaultReplica
, isDefaultReplica
,
954 FlushDescriptor flushDesc
= WALEdit
.getFlushDescriptor(metaCell
);
955 if (flushDesc
!= null && !isDefaultReplica
) {
956 hRegion
.replayWALFlushMarker(flushDesc
, replaySeqId
);
959 RegionEventDescriptor regionEvent
= WALEdit
.getRegionEventDescriptor(metaCell
);
960 if (regionEvent
!= null && !isDefaultReplica
) {
961 hRegion
.replayWALRegionEventMarker(regionEvent
);
964 BulkLoadDescriptor bulkLoadEvent
= WALEdit
.getBulkLoadDescriptor(metaCell
);
965 if (bulkLoadEvent
!= null) {
966 hRegion
.replayWALBulkLoadEventMarker(bulkLoadEvent
);
973 requestCount
.add(mutations
.size());
974 if (!region
.getRegionInfo().isMetaTable()) {
975 regionServer
.cacheFlusher
.reclaimMemStoreMemory();
977 return region
.batchReplay(mutations
.toArray(
978 new WALSplitter
.MutationReplay
[mutations
.size()]), replaySeqId
);
980 if (regionServer
.metricsRegionServer
!= null) {
981 long after
= EnvironmentEdgeManager
.currentTime();
982 if (batchContainsPuts
) {
983 regionServer
.metricsRegionServer
.updatePut(after
- before
);
985 if (batchContainsDelete
) {
986 regionServer
.metricsRegionServer
.updateDelete(after
- before
);
992 private void closeAllScanners() {
993 // Close any outstanding scanners. Means they'll get an UnknownScanner
994 // exception next time they come in.
995 for (Map
.Entry
<String
, RegionScannerHolder
> e
: scanners
.entrySet()) {
997 e
.getValue().s
.close();
998 } catch (IOException ioe
) {
999 LOG
.warn("Closing scanner " + e
.getKey(), ioe
);
1004 public RSRpcServices(HRegionServer rs
) throws IOException
{
1007 RpcSchedulerFactory rpcSchedulerFactory
;
1009 Class
<?
> rpcSchedulerFactoryClass
= rs
.conf
.getClass(
1010 REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS
,
1011 SimpleRpcSchedulerFactory
.class);
1012 rpcSchedulerFactory
= ((RpcSchedulerFactory
) rpcSchedulerFactoryClass
.newInstance());
1013 } catch (InstantiationException e
) {
1014 throw new IllegalArgumentException(e
);
1015 } catch (IllegalAccessException e
) {
1016 throw new IllegalArgumentException(e
);
1018 // Server to handle client requests.
1019 InetSocketAddress initialIsa
;
1020 InetSocketAddress bindAddress
;
1021 if(this instanceof MasterRpcServices
) {
1022 String hostname
= getHostname(rs
.conf
, true);
1023 int port
= rs
.conf
.getInt(HConstants
.MASTER_PORT
, HConstants
.DEFAULT_MASTER_PORT
);
1024 // Creation of a HSA will force a resolve.
1025 initialIsa
= new InetSocketAddress(hostname
, port
);
1026 bindAddress
= new InetSocketAddress(rs
.conf
.get("hbase.master.ipc.address", hostname
), port
);
1028 String hostname
= getHostname(rs
.conf
, false);
1029 int port
= rs
.conf
.getInt(HConstants
.REGIONSERVER_PORT
,
1030 HConstants
.DEFAULT_REGIONSERVER_PORT
);
1031 // Creation of a HSA will force a resolve.
1032 initialIsa
= new InetSocketAddress(hostname
, port
);
1033 bindAddress
= new InetSocketAddress(
1034 rs
.conf
.get("hbase.regionserver.ipc.address", hostname
), port
);
1036 if (initialIsa
.getAddress() == null) {
1037 throw new IllegalArgumentException("Failed resolve of " + initialIsa
);
1039 priority
= createPriority();
1040 String name
= rs
.getProcessName() + "/" + initialIsa
.toString();
1041 // Set how many times to retry talking to another server over Connection.
1042 ConnectionUtils
.setServerSideHConnectionRetriesConfig(rs
.conf
, name
, LOG
);
1044 rpcServer
= new RpcServer(rs
, name
, getServices(),
1045 bindAddress
, // use final bindAddress for this server.
1047 rpcSchedulerFactory
.create(rs
.conf
, this, rs
));
1048 } catch (BindException be
) {
1049 String configName
= (this instanceof MasterRpcServices
) ? HConstants
.MASTER_PORT
:
1050 HConstants
.REGIONSERVER_PORT
;
1051 throw new IOException(be
.getMessage() + ". To switch ports use the '" + configName
+
1052 "' configuration property.", be
.getCause() != null ? be
.getCause() : be
);
1055 scannerLeaseTimeoutPeriod
= rs
.conf
.getInt(
1056 HConstants
.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD
,
1057 HConstants
.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD
);
1058 maxScannerResultSize
= rs
.conf
.getLong(
1059 HConstants
.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY
,
1060 HConstants
.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE
);
1061 rpcTimeout
= rs
.conf
.getInt(
1062 HConstants
.HBASE_RPC_TIMEOUT_KEY
,
1063 HConstants
.DEFAULT_HBASE_RPC_TIMEOUT
);
1064 minimumScanTimeLimitDelta
= rs
.conf
.getLong(
1065 REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA
,
1066 DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA
);
1068 InetSocketAddress address
= rpcServer
.getListenerAddress();
1069 if (address
== null) {
1070 throw new IOException("Listener channel is closed");
1072 // Set our address, however we need the final port that was given to rpcServer
1073 isa
= new InetSocketAddress(initialIsa
.getHostName(), address
.getPort());
1074 rpcServer
.setErrorHandler(this);
1079 public void onConfigurationChange(Configuration newConf
) {
1080 if (rpcServer
instanceof ConfigurationObserver
) {
1081 ((ConfigurationObserver
)rpcServer
).onConfigurationChange(newConf
);
1085 protected PriorityFunction
createPriority() {
1086 return new AnnotationReadingPriorityFunction(this);
1089 public static String
getHostname(Configuration conf
, boolean isMaster
)
1090 throws UnknownHostException
{
1091 String hostname
= conf
.get(isMaster? HRegionServer
.MASTER_HOSTNAME_KEY
:
1092 HRegionServer
.RS_HOSTNAME_KEY
);
1093 if (hostname
== null || hostname
.isEmpty()) {
1094 String masterOrRS
= isMaster ?
"master" : "regionserver";
1095 return Strings
.domainNamePointerToHostName(DNS
.getDefaultHost(
1096 conf
.get("hbase." + masterOrRS
+ ".dns.interface", "default"),
1097 conf
.get("hbase." + masterOrRS
+ ".dns.nameserver", "default")));
1099 LOG
.info("hostname is configured to be " + hostname
);
1104 RegionScanner
getScanner(long scannerId
) {
1105 String scannerIdString
= Long
.toString(scannerId
);
1106 RegionScannerHolder scannerHolder
= scanners
.get(scannerIdString
);
1107 if (scannerHolder
!= null) {
1108 return scannerHolder
.s
;
1114 * Get the vtime associated with the scanner.
1115 * Currently the vtime is the number of "next" calls.
1117 long getScannerVirtualTime(long scannerId
) {
1118 String scannerIdString
= Long
.toString(scannerId
);
1119 RegionScannerHolder scannerHolder
= scanners
.get(scannerIdString
);
1120 if (scannerHolder
!= null) {
1121 return scannerHolder
.getNextCallSeq();
1127 * Method to account for the size of retained cells and retained data blocks.
1128 * @return an object that represents the last referenced block from this response.
1130 Object
addSize(RpcCallContext context
, Result r
, Object lastBlock
) {
1131 if (context
!= null && !r
.isEmpty()) {
1132 for (Cell c
: r
.rawCells()) {
1133 context
.incrementResponseCellSize(CellUtil
.estimatedHeapSizeOf(c
));
1135 // Since byte buffers can point all kinds of crazy places it's harder to keep track
1136 // of which blocks are kept alive by what byte buffer.
1137 // So we make a guess.
1138 if (c
instanceof ByteBufferedCell
) {
1139 ByteBufferedCell bbCell
= (ByteBufferedCell
) c
;
1140 ByteBuffer bb
= bbCell
.getValueByteBuffer();
1141 if (bb
!= lastBlock
) {
1142 context
.incrementResponseBlockSize(bb
.capacity());
1146 // We're using the last block being the same as the current block as
1147 // a proxy for pointing to a new block. This won't be exact.
1148 // If there are multiple gets that bounce back and forth
1149 // Then it's possible that this will over count the size of
1150 // referenced blocks. However it's better to over count and
1151 // use two rpcs than to OOME the regionserver.
1152 byte[] valueArray
= c
.getValueArray();
1153 if (valueArray
!= lastBlock
) {
1154 context
.incrementResponseBlockSize(valueArray
.length
);
1155 lastBlock
= valueArray
;
1164 RegionScannerHolder
addScanner(String scannerName
, RegionScanner s
, Region r
)
1165 throws LeaseStillHeldException
{
1166 Lease lease
= regionServer
.leases
.createLease(scannerName
, this.scannerLeaseTimeoutPeriod
,
1167 new ScannerListener(scannerName
));
1168 RpcCallback shippedCallback
= new RegionScannerShippedCallBack(scannerName
, s
, lease
);
1169 RpcCallback closeCallback
;
1170 if (s
instanceof RpcCallback
) {
1171 closeCallback
= (RpcCallback
) s
;
1173 closeCallback
= new RegionScannerCloseCallBack(s
);
1175 RegionScannerHolder rsh
= new RegionScannerHolder(s
, r
, closeCallback
, shippedCallback
);
1176 RegionScannerHolder existing
= scanners
.putIfAbsent(scannerName
, rsh
);
1177 assert existing
== null : "scannerId must be unique within regionserver's whole lifecycle!";
1182 * Find the HRegion based on a region specifier
1184 * @param regionSpecifier the region specifier
1185 * @return the corresponding region
1186 * @throws IOException if the specifier is not null,
1187 * but failed to find the region
1190 public Region
getRegion(
1191 final RegionSpecifier regionSpecifier
) throws IOException
{
1192 ByteString value
= regionSpecifier
.getValue();
1193 RegionSpecifierType type
= regionSpecifier
.getType();
1196 byte[] regionName
= value
.toByteArray();
1197 String encodedRegionName
= HRegionInfo
.encodeRegionName(regionName
);
1198 return regionServer
.getRegionByEncodedName(regionName
, encodedRegionName
);
1199 case ENCODED_REGION_NAME
:
1200 return regionServer
.getRegionByEncodedName(value
.toStringUtf8());
1202 throw new DoNotRetryIOException(
1203 "Unsupported region specifier type: " + type
);
1208 public PriorityFunction
getPriority() {
1213 public Configuration
getConfiguration() {
1214 return regionServer
.getConfiguration();
1217 private RegionServerQuotaManager
getQuotaManager() {
1218 return regionServer
.getRegionServerQuotaManager();
1231 * Called to verify that this server is up and running.
1233 * @throws IOException
1235 protected void checkOpen() throws IOException
{
1236 if (regionServer
.isAborted()) {
1237 throw new RegionServerAbortedException("Server " + regionServer
.serverName
+ " aborting");
1239 if (regionServer
.isStopped()) {
1240 throw new RegionServerStoppedException("Server " + regionServer
.serverName
+ " stopping");
1242 if (!regionServer
.fsOk
) {
1243 throw new RegionServerStoppedException("File system not available");
1245 if (!regionServer
.isOnline()) {
1246 throw new ServerNotRunningYetException("Server is not running yet");
1251 * @return list of blocking services and their security info classes that this server supports
1253 protected List
<BlockingServiceAndInterface
> getServices() {
1254 List
<BlockingServiceAndInterface
> bssi
= new ArrayList
<BlockingServiceAndInterface
>(2);
1255 bssi
.add(new BlockingServiceAndInterface(
1256 ClientService
.newReflectiveBlockingService(this),
1257 ClientService
.BlockingInterface
.class));
1258 bssi
.add(new BlockingServiceAndInterface(
1259 AdminService
.newReflectiveBlockingService(this),
1260 AdminService
.BlockingInterface
.class));
1264 public InetSocketAddress
getSocketAddress() {
1269 public int getPriority(RequestHeader header
, Message param
, User user
) {
1270 return priority
.getPriority(header
, param
, user
);
1274 public long getDeadline(RequestHeader header
, Message param
) {
1275 return priority
.getDeadline(header
, param
);
1279 * Check if an OOME and, if so, abort immediately to avoid creating more objects.
1283 * @return True if we OOME'd and are aborting.
1286 public boolean checkOOME(final Throwable e
) {
1287 boolean stop
= false;
1289 if (e
instanceof OutOfMemoryError
1290 || (e
.getCause() != null && e
.getCause() instanceof OutOfMemoryError
)
1291 || (e
.getMessage() != null && e
.getMessage().contains(
1292 "java.lang.OutOfMemoryError"))) {
1294 LOG
.fatal("Run out of memory; " + getClass().getSimpleName()
1295 + " will abort itself immediately", e
);
1299 Runtime
.getRuntime().halt(1);
1306 * Close a region on the region server.
1308 * @param controller the RPC controller
1309 * @param request the request
1310 * @throws ServiceException
1313 @QosPriority(priority
=HConstants
.ADMIN_QOS
)
1314 public CloseRegionResponse
closeRegion(final RpcController controller
,
1315 final CloseRegionRequest request
) throws ServiceException
{
1316 final ServerName sn
= (request
.hasDestinationServer() ?
1317 ProtobufUtil
.toServerName(request
.getDestinationServer()) : null);
1321 if (request
.hasServerStartCode()) {
1322 // check that we are the same server that this RPC is intended for.
1323 long serverStartCode
= request
.getServerStartCode();
1324 if (regionServer
.serverName
.getStartcode() != serverStartCode
) {
1325 throw new ServiceException(new DoNotRetryIOException("This RPC was intended for a " +
1326 "different server with startCode: " + serverStartCode
+ ", this server is: "
1327 + regionServer
.serverName
));
1330 final String encodedRegionName
= ProtobufUtil
.getRegionEncodedName(request
.getRegion());
1332 requestCount
.increment();
1333 LOG
.info("Close " + encodedRegionName
+ ", moving to " + sn
);
1334 boolean closed
= regionServer
.closeRegion(encodedRegionName
, false, sn
);
1335 CloseRegionResponse
.Builder builder
= CloseRegionResponse
.newBuilder().setClosed(closed
);
1336 return builder
.build();
1337 } catch (IOException ie
) {
1338 throw new ServiceException(ie
);
1343 * Compact a region on the region server.
1345 * @param controller the RPC controller
1346 * @param request the request
1347 * @throws ServiceException
1350 @QosPriority(priority
=HConstants
.ADMIN_QOS
)
1351 public CompactRegionResponse
compactRegion(final RpcController controller
,
1352 final CompactRegionRequest request
) throws ServiceException
{
1355 requestCount
.increment();
1356 Region region
= getRegion(request
.getRegion());
1357 region
.startRegionOperation(Operation
.COMPACT_REGION
);
1358 LOG
.info("Compacting " + region
.getRegionInfo().getRegionNameAsString());
1359 boolean major
= false;
1360 byte [] family
= null;
1362 if (request
.hasFamily()) {
1363 family
= request
.getFamily().toByteArray();
1364 store
= region
.getStore(family
);
1365 if (store
== null) {
1366 throw new ServiceException(new IOException("column family " + Bytes
.toString(family
)
1367 + " does not exist in region " + region
.getRegionInfo().getRegionNameAsString()));
1370 if (request
.hasMajor()) {
1371 major
= request
.getMajor();
1374 if (family
!= null) {
1375 store
.triggerMajorCompaction();
1377 region
.triggerMajorCompaction();
1381 String familyLogMsg
= (family
!= null)?
" for column family: " + Bytes
.toString(family
):"";
1382 if (LOG
.isTraceEnabled()) {
1383 LOG
.trace("User-triggered compaction requested for region "
1384 + region
.getRegionInfo().getRegionNameAsString() + familyLogMsg
);
1386 String log
= "User-triggered " + (major ?
"major " : "") + "compaction" + familyLogMsg
;
1387 if(family
!= null) {
1388 regionServer
.compactSplitThread
.requestCompaction(region
, store
, log
,
1389 Store
.PRIORITY_USER
, null, RpcServer
.getRequestUser());
1391 regionServer
.compactSplitThread
.requestCompaction(region
, log
,
1392 Store
.PRIORITY_USER
, null, RpcServer
.getRequestUser());
1394 return CompactRegionResponse
.newBuilder().build();
1395 } catch (IOException ie
) {
1396 throw new ServiceException(ie
);
1401 * Flush a region on the region server.
1403 * @param controller the RPC controller
1404 * @param request the request
1405 * @throws ServiceException
1408 @QosPriority(priority
=HConstants
.ADMIN_QOS
)
1409 public FlushRegionResponse
flushRegion(final RpcController controller
,
1410 final FlushRegionRequest request
) throws ServiceException
{
1413 requestCount
.increment();
1414 Region region
= getRegion(request
.getRegion());
1415 LOG
.info("Flushing " + region
.getRegionInfo().getRegionNameAsString());
1416 boolean shouldFlush
= true;
1417 if (request
.hasIfOlderThanTs()) {
1418 shouldFlush
= region
.getEarliestFlushTimeForAllStores() < request
.getIfOlderThanTs();
1420 FlushRegionResponse
.Builder builder
= FlushRegionResponse
.newBuilder();
1422 boolean writeFlushWalMarker
= request
.hasWriteFlushWalMarker() ?
1423 request
.getWriteFlushWalMarker() : false;
1424 // Go behind the curtain so we can manage writing of the flush WAL marker
1425 HRegion
.FlushResultImpl flushResult
= (HRegion
.FlushResultImpl
)
1426 ((HRegion
)region
).flushcache(true, writeFlushWalMarker
);
1427 boolean compactionNeeded
= flushResult
.isCompactionNeeded();
1428 if (compactionNeeded
) {
1429 regionServer
.compactSplitThread
.requestSystemCompaction(region
,
1430 "Compaction through user triggered flush");
1432 builder
.setFlushed(flushResult
.isFlushSucceeded());
1433 builder
.setWroteFlushWalMarker(flushResult
.wroteFlushWalMarker
);
1435 builder
.setLastFlushTime(region
.getEarliestFlushTimeForAllStores());
1436 return builder
.build();
1437 } catch (DroppedSnapshotException ex
) {
1438 // Cache flush can fail in a few places. If it fails in a critical
1439 // section, we get a DroppedSnapshotException and a replay of wal
1440 // is required. Currently the only way to do this is a restart of
1442 regionServer
.abort("Replay of WAL required. Forcing server shutdown", ex
);
1443 throw new ServiceException(ex
);
1444 } catch (IOException ie
) {
1445 throw new ServiceException(ie
);
1450 @QosPriority(priority
=HConstants
.ADMIN_QOS
)
1451 public GetOnlineRegionResponse
getOnlineRegion(final RpcController controller
,
1452 final GetOnlineRegionRequest request
) throws ServiceException
{
1455 requestCount
.increment();
1456 Map
<String
, Region
> onlineRegions
= regionServer
.onlineRegions
;
1457 List
<HRegionInfo
> list
= new ArrayList
<HRegionInfo
>(onlineRegions
.size());
1458 for (Region region
: onlineRegions
.values()) {
1459 list
.add(region
.getRegionInfo());
1461 Collections
.sort(list
);
1462 return ResponseConverter
.buildGetOnlineRegionResponse(list
);
1463 } catch (IOException ie
) {
1464 throw new ServiceException(ie
);
1469 @QosPriority(priority
=HConstants
.ADMIN_QOS
)
1470 public GetRegionInfoResponse
getRegionInfo(final RpcController controller
,
1471 final GetRegionInfoRequest request
) throws ServiceException
{
1474 requestCount
.increment();
1475 Region region
= getRegion(request
.getRegion());
1476 HRegionInfo info
= region
.getRegionInfo();
1477 GetRegionInfoResponse
.Builder builder
= GetRegionInfoResponse
.newBuilder();
1478 builder
.setRegionInfo(HRegionInfo
.convert(info
));
1479 if (request
.hasCompactionState() && request
.getCompactionState()) {
1480 builder
.setCompactionState(region
.getCompactionState());
1482 builder
.setIsRecovering(region
.isRecovering());
1483 return builder
.build();
1484 } catch (IOException ie
) {
1485 throw new ServiceException(ie
);
1490 * Get some information of the region server.
1492 * @param controller the RPC controller
1493 * @param request the request
1494 * @throws ServiceException
1497 @QosPriority(priority
=HConstants
.ADMIN_QOS
)
1498 public GetServerInfoResponse
getServerInfo(final RpcController controller
,
1499 final GetServerInfoRequest request
) throws ServiceException
{
1502 } catch (IOException ie
) {
1503 throw new ServiceException(ie
);
1505 requestCount
.increment();
1506 int infoPort
= regionServer
.infoServer
!= null ? regionServer
.infoServer
.getPort() : -1;
1507 return ResponseConverter
.buildGetServerInfoResponse(regionServer
.serverName
, infoPort
);
1511 @QosPriority(priority
=HConstants
.ADMIN_QOS
)
1512 public GetStoreFileResponse
getStoreFile(final RpcController controller
,
1513 final GetStoreFileRequest request
) throws ServiceException
{
1516 Region region
= getRegion(request
.getRegion());
1517 requestCount
.increment();
1518 Set
<byte[]> columnFamilies
;
1519 if (request
.getFamilyCount() == 0) {
1520 columnFamilies
= region
.getTableDesc().getFamiliesKeys();
1522 columnFamilies
= new TreeSet
<byte[]>(Bytes
.BYTES_RAWCOMPARATOR
);
1523 for (ByteString cf
: request
.getFamilyList()) {
1524 columnFamilies
.add(cf
.toByteArray());
1527 int nCF
= columnFamilies
.size();
1528 List
<String
> fileList
= region
.getStoreFileList(
1529 columnFamilies
.toArray(new byte[nCF
][]));
1530 GetStoreFileResponse
.Builder builder
= GetStoreFileResponse
.newBuilder();
1531 builder
.addAllStoreFile(fileList
);
1532 return builder
.build();
1533 } catch (IOException ie
) {
1534 throw new ServiceException(ie
);
1539 * Merge regions on the region server.
1541 * @param controller the RPC controller
1542 * @param request the request
1543 * @return merge regions response
1544 * @throws ServiceException
1547 @QosPriority(priority
= HConstants
.ADMIN_QOS
)
1548 public MergeRegionsResponse
mergeRegions(final RpcController controller
,
1549 final MergeRegionsRequest request
) throws ServiceException
{
1552 requestCount
.increment();
1553 Region regionA
= getRegion(request
.getRegionA());
1554 Region regionB
= getRegion(request
.getRegionB());
1555 boolean forcible
= request
.getForcible();
1556 long masterSystemTime
= request
.hasMasterSystemTime() ? request
.getMasterSystemTime() : -1;
1557 regionA
.startRegionOperation(Operation
.MERGE_REGION
);
1558 regionB
.startRegionOperation(Operation
.MERGE_REGION
);
1559 if (regionA
.getRegionInfo().getReplicaId() != HRegionInfo
.DEFAULT_REPLICA_ID
||
1560 regionB
.getRegionInfo().getReplicaId() != HRegionInfo
.DEFAULT_REPLICA_ID
) {
1561 throw new ServiceException(new MergeRegionException("Can't merge non-default replicas"));
1563 LOG
.info("Receiving merging request for " + regionA
+ ", " + regionB
1564 + ",forcible=" + forcible
);
1565 regionA
.flush(true);
1566 regionB
.flush(true);
1567 regionServer
.compactSplitThread
.requestRegionsMerge(regionA
, regionB
, forcible
,
1568 masterSystemTime
, RpcServer
.getRequestUser());
1569 return MergeRegionsResponse
.newBuilder().build();
1570 } catch (DroppedSnapshotException ex
) {
1571 regionServer
.abort("Replay of WAL required. Forcing server shutdown", ex
);
1572 throw new ServiceException(ex
);
1573 } catch (IOException ie
) {
1574 throw new ServiceException(ie
);
1579 * Open asynchronously a region or a set of regions on the region server.
1581 * The opening is coordinated by ZooKeeper, and this method requires the znode to be created
1582 * before being called. As a consequence, this method should be called only from the master.
1584 * Different manages states for the region are:
1586 * <li>region not opened: the region opening will start asynchronously.</li>
1587 * <li>a close is already in progress: this is considered as an error.</li>
1588 * <li>an open is already in progress: this new open request will be ignored. This is important
1589 * because the Master can do multiple requests if it crashes.</li>
1590 * <li>the region is already opened: this new open request will be ignored.</li>
1593 * Bulk assign: If there are more than 1 region to open, it will be considered as a bulk assign.
1594 * For a single region opening, errors are sent through a ServiceException. For bulk assign,
1595 * errors are put in the response as FAILED_OPENING.
1597 * @param controller the RPC controller
1598 * @param request the request
1599 * @throws ServiceException
1602 @QosPriority(priority
=HConstants
.ADMIN_QOS
)
1603 public OpenRegionResponse
openRegion(final RpcController controller
,
1604 final OpenRegionRequest request
) throws ServiceException
{
1605 requestCount
.increment();
1606 if (request
.hasServerStartCode()) {
1607 // check that we are the same server that this RPC is intended for.
1608 long serverStartCode
= request
.getServerStartCode();
1609 if (regionServer
.serverName
.getStartcode() != serverStartCode
) {
1610 throw new ServiceException(new DoNotRetryIOException("This RPC was intended for a " +
1611 "different server with startCode: " + serverStartCode
+ ", this server is: "
1612 + regionServer
.serverName
));
1616 OpenRegionResponse
.Builder builder
= OpenRegionResponse
.newBuilder();
1617 final int regionCount
= request
.getOpenInfoCount();
1618 final Map
<TableName
, HTableDescriptor
> htds
=
1619 new HashMap
<TableName
, HTableDescriptor
>(regionCount
);
1620 final boolean isBulkAssign
= regionCount
> 1;
1623 } catch (IOException ie
) {
1624 TableName tableName
= null;
1625 if (regionCount
== 1) {
1626 RegionInfo ri
= request
.getOpenInfo(0).getRegion();
1628 tableName
= ProtobufUtil
.toTableName(ri
.getTableName());
1631 if (!TableName
.META_TABLE_NAME
.equals(tableName
)) {
1632 throw new ServiceException(ie
);
1634 // We are assigning meta, wait a little for regionserver to finish initialization.
1635 int timeout
= regionServer
.conf
.getInt(HConstants
.HBASE_RPC_TIMEOUT_KEY
,
1636 HConstants
.DEFAULT_HBASE_RPC_TIMEOUT
) >> 2; // Quarter of RPC timeout
1637 long endTime
= System
.currentTimeMillis() + timeout
;
1638 synchronized (regionServer
.online
) {
1640 while (System
.currentTimeMillis() <= endTime
1641 && !regionServer
.isStopped() && !regionServer
.isOnline()) {
1642 regionServer
.online
.wait(regionServer
.msgInterval
);
1645 } catch (InterruptedException t
) {
1646 Thread
.currentThread().interrupt();
1647 throw new ServiceException(t
);
1648 } catch (IOException e
) {
1649 throw new ServiceException(e
);
1654 long masterSystemTime
= request
.hasMasterSystemTime() ? request
.getMasterSystemTime() : -1;
1656 for (RegionOpenInfo regionOpenInfo
: request
.getOpenInfoList()) {
1657 final HRegionInfo region
= HRegionInfo
.convert(regionOpenInfo
.getRegion());
1658 HTableDescriptor htd
;
1660 String encodedName
= region
.getEncodedName();
1661 byte[] encodedNameBytes
= region
.getEncodedNameAsBytes();
1662 final Region onlineRegion
= regionServer
.getFromOnlineRegions(encodedName
);
1663 if (onlineRegion
!= null) {
1664 // The region is already online. This should not happen any more.
1665 String error
= "Received OPEN for the region:"
1666 + region
.getRegionNameAsString() + ", which is already online";
1667 regionServer
.abort(error
);
1668 throw new IOException(error
);
1670 LOG
.info("Open " + region
.getRegionNameAsString());
1671 htd
= htds
.get(region
.getTable());
1673 htd
= regionServer
.tableDescriptors
.get(region
.getTable());
1674 htds
.put(region
.getTable(), htd
);
1677 final Boolean previous
= regionServer
.regionsInTransitionInRS
.putIfAbsent(
1678 encodedNameBytes
, Boolean
.TRUE
);
1680 if (Boolean
.FALSE
.equals(previous
)) {
1681 if (regionServer
.getFromOnlineRegions(encodedName
) != null) {
1682 // There is a close in progress. This should not happen any more.
1683 String error
= "Received OPEN for the region:"
1684 + region
.getRegionNameAsString() + ", which we are already trying to CLOSE";
1685 regionServer
.abort(error
);
1686 throw new IOException(error
);
1688 regionServer
.regionsInTransitionInRS
.put(encodedNameBytes
, Boolean
.TRUE
);
1691 if (Boolean
.TRUE
.equals(previous
)) {
1692 // An open is in progress. This is supported, but let's log this.
1693 LOG
.info("Receiving OPEN for the region:" +
1694 region
.getRegionNameAsString() + ", which we are already trying to OPEN"
1695 + " - ignoring this new request for this region.");
1698 // We are opening this region. If it moves back and forth for whatever reason, we don't
1699 // want to keep returning the stale moved record while we are opening/if we close again.
1700 regionServer
.removeFromMovedRegions(region
.getEncodedName());
1702 if (previous
== null || !previous
.booleanValue()) {
1703 // check if the region to be opened is marked in recovering state in ZK
1704 if (ZKSplitLog
.isRegionMarkedRecoveringInZK(regionServer
.getZooKeeper(),
1705 region
.getEncodedName())) {
1706 // Check if current region open is for distributedLogReplay. This check is to support
1707 // rolling restart/upgrade where we want to Master/RS see same configuration
1708 if (!regionOpenInfo
.hasOpenForDistributedLogReplay()
1709 || regionOpenInfo
.getOpenForDistributedLogReplay()) {
1710 regionServer
.recoveringRegions
.put(region
.getEncodedName(), null);
1712 // Remove stale recovery region from ZK when we open region not for recovering which
1713 // could happen when turn distributedLogReplay off from on.
1714 List
<String
> tmpRegions
= new ArrayList
<String
>();
1715 tmpRegions
.add(region
.getEncodedName());
1716 ZKSplitLog
.deleteRecoveringRegionZNodes(regionServer
.getZooKeeper(),
1720 // If there is no action in progress, we can submit a specific handler.
1721 // Need to pass the expected version in the constructor.
1722 if (region
.isMetaRegion()) {
1723 regionServer
.service
.submit(new OpenMetaHandler(
1724 regionServer
, regionServer
, region
, htd
, masterSystemTime
));
1726 regionServer
.updateRegionFavoredNodesMapping(region
.getEncodedName(),
1727 regionOpenInfo
.getFavoredNodesList());
1728 regionServer
.service
.submit(new OpenRegionHandler(
1729 regionServer
, regionServer
, region
, htd
, masterSystemTime
));
1733 builder
.addOpeningState(RegionOpeningState
.OPENED
);
1735 } catch (KeeperException zooKeeperEx
) {
1736 LOG
.error("Can't retrieve recovering state from zookeeper", zooKeeperEx
);
1737 throw new ServiceException(zooKeeperEx
);
1738 } catch (IOException ie
) {
1739 LOG
.warn("Failed opening region " + region
.getRegionNameAsString(), ie
);
1741 builder
.addOpeningState(RegionOpeningState
.FAILED_OPENING
);
1743 throw new ServiceException(ie
);
1747 return builder
.build();
1751 * Wamrmup a region on this server.
1753 * This method should only be called by Master. It synchrnously opens the region and
1754 * closes the region bringing the most important pages in cache.
1757 * @param controller the RPC controller
1758 * @param request the request
1759 * @throws ServiceException
1762 public WarmupRegionResponse
warmupRegion(final RpcController controller
,
1763 final WarmupRegionRequest request
) throws ServiceException
{
1765 RegionInfo regionInfo
= request
.getRegionInfo();
1766 final HRegionInfo region
= HRegionInfo
.convert(regionInfo
);
1767 HTableDescriptor htd
;
1768 WarmupRegionResponse response
= WarmupRegionResponse
.getDefaultInstance();
1772 String encodedName
= region
.getEncodedName();
1773 byte[] encodedNameBytes
= region
.getEncodedNameAsBytes();
1774 final Region onlineRegion
= regionServer
.getFromOnlineRegions(encodedName
);
1776 if (onlineRegion
!= null) {
1777 LOG
.info("Region already online. Skipping warming up " + region
);
1781 if (LOG
.isDebugEnabled()) {
1782 LOG
.debug("Warming up Region " + region
.getRegionNameAsString());
1785 htd
= regionServer
.tableDescriptors
.get(region
.getTable());
1787 if (regionServer
.getRegionsInTransitionInRS().containsKey(encodedNameBytes
)) {
1788 LOG
.info("Region is in transition. Skipping warmup " + region
);
1792 HRegion
.warmupHRegion(region
, htd
, regionServer
.getWAL(region
),
1793 regionServer
.getConfiguration(), regionServer
, null);
1795 } catch (IOException ie
) {
1796 LOG
.error("Failed warming up region " + region
.getRegionNameAsString(), ie
);
1797 throw new ServiceException(ie
);
1804 * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is
1805 * that the given mutations will be durable on the receiving RS if this method returns without any
1807 * @param controller the RPC controller
1808 * @param request the request
1809 * @throws ServiceException
1812 @QosPriority(priority
= HConstants
.REPLAY_QOS
)
1813 public ReplicateWALEntryResponse
replay(final RpcController controller
,
1814 final ReplicateWALEntryRequest request
) throws ServiceException
{
1815 long before
= EnvironmentEdgeManager
.currentTime();
1816 CellScanner cells
= ((PayloadCarryingRpcController
) controller
).cellScanner();
1819 List
<WALEntry
> entries
= request
.getEntryList();
1820 if (entries
== null || entries
.isEmpty()) {
1822 return ReplicateWALEntryResponse
.newBuilder().build();
1824 ByteString regionName
= entries
.get(0).getKey().getEncodedRegionName();
1825 Region region
= regionServer
.getRegionByEncodedName(regionName
.toStringUtf8());
1826 RegionCoprocessorHost coprocessorHost
=
1827 ServerRegionReplicaUtil
.isDefaultReplica(region
.getRegionInfo())
1828 ? region
.getCoprocessorHost()
1829 : null; // do not invoke coprocessors if this is a secondary region replica
1830 List
<Pair
<WALKey
, WALEdit
>> walEntries
= new ArrayList
<Pair
<WALKey
, WALEdit
>>();
1832 // Skip adding the edits to WAL if this is a secondary region replica
1833 boolean isPrimary
= RegionReplicaUtil
.isDefaultReplica(region
.getRegionInfo());
1834 Durability durability
= isPrimary ? Durability
.USE_DEFAULT
: Durability
.SKIP_WAL
;
1836 for (WALEntry entry
: entries
) {
1837 if (!regionName
.equals(entry
.getKey().getEncodedRegionName())) {
1838 throw new NotServingRegionException("Replay request contains entries from multiple " +
1839 "regions. First region:" + regionName
.toStringUtf8() + " , other region:"
1840 + entry
.getKey().getEncodedRegionName());
1842 if (regionServer
.nonceManager
!= null && isPrimary
) {
1843 long nonceGroup
= entry
.getKey().hasNonceGroup()
1844 ? entry
.getKey().getNonceGroup() : HConstants
.NO_NONCE
;
1845 long nonce
= entry
.getKey().hasNonce() ? entry
.getKey().getNonce() : HConstants
.NO_NONCE
;
1846 regionServer
.nonceManager
.reportOperationFromWal(
1849 entry
.getKey().getWriteTime());
1851 Pair
<WALKey
, WALEdit
> walEntry
= (coprocessorHost
== null) ?
null :
1852 new Pair
<WALKey
, WALEdit
>();
1853 List
<WALSplitter
.MutationReplay
> edits
= WALSplitter
.getMutationsFromWALEntry(entry
,
1854 cells
, walEntry
, durability
);
1855 if (coprocessorHost
!= null) {
1856 // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a
1858 if (coprocessorHost
.preWALRestore(region
.getRegionInfo(), walEntry
.getFirst(),
1859 walEntry
.getSecond())) {
1860 // if bypass this log entry, ignore it ...
1863 walEntries
.add(walEntry
);
1865 if(edits
!=null && !edits
.isEmpty()) {
1866 long replaySeqId
= (entry
.getKey().hasOrigSequenceNumber()) ?
1867 entry
.getKey().getOrigSequenceNumber() : entry
.getKey().getLogSequenceNumber();
1868 OperationStatus
[] result
= doReplayBatchOp(region
, edits
, replaySeqId
);
1869 // check if it's a partial success
1870 for (int i
= 0; result
!= null && i
< result
.length
; i
++) {
1871 if (result
[i
] != OperationStatus
.SUCCESS
) {
1872 throw new IOException(result
[i
].getExceptionMsg());
1878 //sync wal at the end because ASYNC_WAL is used above
1879 WAL wal
= getWAL(region
);
1884 if (coprocessorHost
!= null) {
1885 for (Pair
<WALKey
, WALEdit
> entry
: walEntries
) {
1886 coprocessorHost
.postWALRestore(region
.getRegionInfo(), entry
.getFirst(),
1890 return ReplicateWALEntryResponse
.newBuilder().build();
1891 } catch (IOException ie
) {
1892 throw new ServiceException(ie
);
1894 if (regionServer
.metricsRegionServer
!= null) {
1895 regionServer
.metricsRegionServer
.updateReplay(
1896 EnvironmentEdgeManager
.currentTime() - before
);
1901 WAL
getWAL(Region region
) {
1902 return ((HRegion
)region
).getWAL();
1906 * Replicate WAL entries on the region server.
1908 * @param controller the RPC controller
1909 * @param request the request
1910 * @throws ServiceException
1913 @QosPriority(priority
=HConstants
.REPLICATION_QOS
)
1914 public ReplicateWALEntryResponse
replicateWALEntry(final RpcController controller
,
1915 final ReplicateWALEntryRequest request
) throws ServiceException
{
1918 if (regionServer
.replicationSinkHandler
!= null) {
1919 requestCount
.increment();
1920 List
<WALEntry
> entries
= request
.getEntryList();
1921 CellScanner cellScanner
= ((PayloadCarryingRpcController
)controller
).cellScanner();
1922 regionServer
.getRegionServerCoprocessorHost().preReplicateLogEntries(entries
, cellScanner
);
1923 regionServer
.replicationSinkHandler
.replicateLogEntries(entries
, cellScanner
,
1924 request
.getReplicationClusterId(), request
.getSourceBaseNamespaceDirPath(),
1925 request
.getSourceHFileArchiveDirPath());
1926 regionServer
.getRegionServerCoprocessorHost().postReplicateLogEntries(entries
, cellScanner
);
1927 return ReplicateWALEntryResponse
.newBuilder().build();
1929 throw new ServiceException("Replication services are not initialized yet");
1931 } catch (IOException ie
) {
1932 throw new ServiceException(ie
);
1937 * Roll the WAL writer of the region server.
1938 * @param controller the RPC controller
1939 * @param request the request
1940 * @throws ServiceException
1943 public RollWALWriterResponse
rollWALWriter(final RpcController controller
,
1944 final RollWALWriterRequest request
) throws ServiceException
{
1947 requestCount
.increment();
1948 regionServer
.getRegionServerCoprocessorHost().preRollWALWriterRequest();
1949 regionServer
.walRoller
.requestRollAll();
1950 regionServer
.getRegionServerCoprocessorHost().postRollWALWriterRequest();
1951 RollWALWriterResponse
.Builder builder
= RollWALWriterResponse
.newBuilder();
1952 return builder
.build();
1953 } catch (IOException ie
) {
1954 throw new ServiceException(ie
);
1959 * Split a region on the region server.
1961 * @param controller the RPC controller
1962 * @param request the request
1963 * @throws ServiceException
1966 @QosPriority(priority
=HConstants
.ADMIN_QOS
)
1967 public SplitRegionResponse
splitRegion(final RpcController controller
,
1968 final SplitRegionRequest request
) throws ServiceException
{
1971 requestCount
.increment();
1972 Region region
= getRegion(request
.getRegion());
1973 region
.startRegionOperation(Operation
.SPLIT_REGION
);
1974 if (region
.getRegionInfo().getReplicaId() != HRegionInfo
.DEFAULT_REPLICA_ID
) {
1975 throw new IOException("Can't split replicas directly. "
1976 + "Replicas are auto-split when their primary is split.");
1978 LOG
.info("Splitting " + region
.getRegionInfo().getRegionNameAsString());
1980 byte[] splitPoint
= null;
1981 if (request
.hasSplitPoint()) {
1982 splitPoint
= request
.getSplitPoint().toByteArray();
1984 ((HRegion
)region
).forceSplit(splitPoint
);
1985 regionServer
.compactSplitThread
.requestSplit(region
, ((HRegion
)region
).checkSplit(),
1986 RpcServer
.getRequestUser());
1987 return SplitRegionResponse
.newBuilder().build();
1988 } catch (DroppedSnapshotException ex
) {
1989 regionServer
.abort("Replay of WAL required. Forcing server shutdown", ex
);
1990 throw new ServiceException(ex
);
1991 } catch (IOException ie
) {
1992 throw new ServiceException(ie
);
1997 * Stop the region server.
1999 * @param controller the RPC controller
2000 * @param request the request
2001 * @throws ServiceException
2004 @QosPriority(priority
=HConstants
.ADMIN_QOS
)
2005 public StopServerResponse
stopServer(final RpcController controller
,
2006 final StopServerRequest request
) throws ServiceException
{
2007 requestCount
.increment();
2008 String reason
= request
.getReason();
2009 regionServer
.stop(reason
);
2010 return StopServerResponse
.newBuilder().build();
2014 public UpdateFavoredNodesResponse
updateFavoredNodes(RpcController controller
,
2015 UpdateFavoredNodesRequest request
) throws ServiceException
{
2016 List
<UpdateFavoredNodesRequest
.RegionUpdateInfo
> openInfoList
= request
.getUpdateInfoList();
2017 UpdateFavoredNodesResponse
.Builder respBuilder
= UpdateFavoredNodesResponse
.newBuilder();
2018 for (UpdateFavoredNodesRequest
.RegionUpdateInfo regionUpdateInfo
: openInfoList
) {
2019 HRegionInfo hri
= HRegionInfo
.convert(regionUpdateInfo
.getRegion());
2020 regionServer
.updateRegionFavoredNodesMapping(hri
.getEncodedName(),
2021 regionUpdateInfo
.getFavoredNodesList());
2023 respBuilder
.setResponse(openInfoList
.size());
2024 return respBuilder
.build();
2028 * Atomically bulk load several HFiles into an open region
2029 * @return true if successful, false is failed but recoverably (no action)
2030 * @throws ServiceException if failed unrecoverably
2033 public BulkLoadHFileResponse
bulkLoadHFile(final RpcController controller
,
2034 final BulkLoadHFileRequest request
) throws ServiceException
{
2037 requestCount
.increment();
2038 Region region
= getRegion(request
.getRegion());
2039 List
<Pair
<byte[], String
>> familyPaths
= new ArrayList
<Pair
<byte[], String
>>();
2040 for (FamilyPath familyPath
: request
.getFamilyPathList()) {
2041 familyPaths
.add(new Pair
<byte[], String
>(familyPath
.getFamily().toByteArray(),
2042 familyPath
.getPath()));
2044 boolean bypass
= false;
2045 if (region
.getCoprocessorHost() != null) {
2046 bypass
= region
.getCoprocessorHost().preBulkLoadHFile(familyPaths
);
2048 boolean loaded
= false;
2050 loaded
= region
.bulkLoadHFiles(familyPaths
, request
.getAssignSeqNum(), null);
2052 if (region
.getCoprocessorHost() != null) {
2053 loaded
= region
.getCoprocessorHost().postBulkLoadHFile(familyPaths
, loaded
);
2055 BulkLoadHFileResponse
.Builder builder
= BulkLoadHFileResponse
.newBuilder();
2056 builder
.setLoaded(loaded
);
2057 return builder
.build();
2058 } catch (IOException ie
) {
2059 throw new ServiceException(ie
);
2064 public CoprocessorServiceResponse
execService(final RpcController controller
,
2065 final CoprocessorServiceRequest request
) throws ServiceException
{
2068 requestCount
.increment();
2069 Region region
= getRegion(request
.getRegion());
2070 Message result
= execServiceOnRegion(region
, request
.getCall());
2071 CoprocessorServiceResponse
.Builder builder
=
2072 CoprocessorServiceResponse
.newBuilder();
2073 builder
.setRegion(RequestConverter
.buildRegionSpecifier(
2074 RegionSpecifierType
.REGION_NAME
, region
.getRegionInfo().getRegionName()));
2076 builder
.getValueBuilder().setName(result
.getClass().getName())
2077 .setValue(result
.toByteString()));
2078 return builder
.build();
2079 } catch (IOException ie
) {
2080 throw new ServiceException(ie
);
2084 private Message
execServiceOnRegion(Region region
,
2085 final ClientProtos
.CoprocessorServiceCall serviceCall
) throws IOException
{
2086 // ignore the passed in controller (from the serialized call)
2087 ServerRpcController execController
= new ServerRpcController();
2088 return region
.execService(execController
, serviceCall
);
2092 * Get data from a table.
2094 * @param controller the RPC controller
2095 * @param request the get request
2096 * @throws ServiceException
2099 public GetResponse
get(final RpcController controller
,
2100 final GetRequest request
) throws ServiceException
{
2101 long before
= EnvironmentEdgeManager
.currentTime();
2102 OperationQuota quota
= null;
2105 requestCount
.increment();
2106 rpcGetRequestCount
.increment();
2107 Region region
= getRegion(request
.getRegion());
2109 GetResponse
.Builder builder
= GetResponse
.newBuilder();
2110 ClientProtos
.Get get
= request
.getGet();
2111 Boolean existence
= null;
2113 RpcCallContext context
= RpcServer
.getCurrentCall();
2114 quota
= getQuotaManager().checkQuota(region
, OperationQuota
.OperationType
.GET
);
2116 Get clientGet
= ProtobufUtil
.toGet(get
);
2117 if (get
.getExistenceOnly() && region
.getCoprocessorHost() != null) {
2118 existence
= region
.getCoprocessorHost().preExists(clientGet
);
2120 if (existence
== null) {
2121 if (context
!= null) {
2122 r
= get(clientGet
, ((HRegion
) region
), null, context
);
2125 r
= region
.get(clientGet
);
2127 if (get
.getExistenceOnly()) {
2128 boolean exists
= r
.getExists();
2129 if (region
.getCoprocessorHost() != null) {
2130 exists
= region
.getCoprocessorHost().postExists(clientGet
, exists
);
2135 if (existence
!= null) {
2136 ClientProtos
.Result pbr
=
2137 ProtobufUtil
.toResult(existence
, region
.getRegionInfo().getReplicaId() != 0);
2138 builder
.setResult(pbr
);
2139 } else if (r
!= null) {
2140 ClientProtos
.Result pbr
;
2141 RpcCallContext call
= RpcServer
.getCurrentCall();
2142 if (isClientCellBlockSupport(call
) && controller
instanceof PayloadCarryingRpcController
2143 && VersionInfoUtil
.hasMinimumVersion(call
.getClientVersionInfo(), 1, 3)) {
2144 pbr
= ProtobufUtil
.toResultNoData(r
);
2145 ((PayloadCarryingRpcController
) controller
).setCellScanner(CellUtil
.createCellScanner(r
2148 pbr
= ProtobufUtil
.toResult(r
);
2150 builder
.setResult(pbr
);
2153 quota
.addGetResult(r
);
2155 return builder
.build();
2156 } catch (IOException ie
) {
2157 throw new ServiceException(ie
);
2159 if (regionServer
.metricsRegionServer
!= null) {
2160 regionServer
.metricsRegionServer
.updateGet(EnvironmentEdgeManager
.currentTime() - before
);
2162 if (quota
!= null) {
2168 private Result
get(Get get
, HRegion region
, RegionScannersCloseCallBack closeCallBack
,
2169 RpcCallContext context
) throws IOException
{
2170 region
.prepareGet(get
);
2171 List
<Cell
> results
= new ArrayList
<Cell
>();
2172 boolean stale
= region
.getRegionInfo().getReplicaId() != 0;
2174 if (region
.getCoprocessorHost() != null) {
2175 if (region
.getCoprocessorHost().preGet(get
, results
)) {
2177 .create(results
, get
.isCheckExistenceOnly() ?
!results
.isEmpty() : null, stale
);
2180 long before
= EnvironmentEdgeManager
.currentTime();
2181 Scan scan
= new Scan(get
);
2183 RegionScanner scanner
= null;
2185 scanner
= region
.getScanner(scan
);
2186 scanner
.next(results
);
2188 if (scanner
!= null) {
2189 if (closeCallBack
== null) {
2190 // If there is a context then the scanner can be added to the current
2191 // RpcCallContext. The rpc callback will take care of closing the
2192 // scanner, for eg in case
2194 assert scanner
instanceof org
.apache
.hadoop
.hbase
.ipc
.RpcCallback
;
2195 context
.setCallBack((RegionScannerImpl
) scanner
);
2197 // The call is from multi() where the results from the get() are
2198 // aggregated and then send out to the
2199 // rpc. The rpccall back will close all such scanners created as part
2201 closeCallBack
.addScanner(scanner
);
2207 if (region
.getCoprocessorHost() != null) {
2208 region
.getCoprocessorHost().postGet(get
, results
);
2210 region
.metricsUpdateForGet(results
, before
);
2211 return Result
.create(results
, get
.isCheckExistenceOnly() ?
!results
.isEmpty() : null, stale
);
2215 * Execute multiple actions on a table: get, mutate, and/or execCoprocessor
2217 * @param rpcc the RPC controller
2218 * @param request the multi request
2219 * @throws ServiceException
2222 public MultiResponse
multi(final RpcController rpcc
, final MultiRequest request
)
2223 throws ServiceException
{
2226 } catch (IOException ie
) {
2227 throw new ServiceException(ie
);
2230 // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data.
2231 // It is also the conduit via which we pass back data.
2232 PayloadCarryingRpcController controller
= (PayloadCarryingRpcController
)rpcc
;
2233 CellScanner cellScanner
= controller
!= null ? controller
.cellScanner(): null;
2234 if (controller
!= null) {
2235 controller
.setCellScanner(null);
2238 long nonceGroup
= request
.hasNonceGroup() ? request
.getNonceGroup() : HConstants
.NO_NONCE
;
2240 // this will contain all the cells that we need to return. It's created later, if needed.
2241 List
<CellScannable
> cellsToReturn
= null;
2242 MultiResponse
.Builder responseBuilder
= MultiResponse
.newBuilder();
2243 RegionActionResult
.Builder regionActionResultBuilder
= RegionActionResult
.newBuilder();
2244 Boolean processed
= null;
2245 RegionScannersCloseCallBack closeCallBack
= null;
2246 RpcCallContext context
= RpcServer
.getCurrentCall();
2247 this.rpcMultiRequestCount
.increment();
2248 Map
<RegionSpecifier
, ClientProtos
.RegionLoadStats
> regionStats
= new HashMap
<>(request
2249 .getRegionActionCount());
2250 for (RegionAction regionAction
: request
.getRegionActionList()) {
2251 this.requestCount
.add(regionAction
.getActionCount());
2252 OperationQuota quota
;
2254 regionActionResultBuilder
.clear();
2255 RegionSpecifier regionSpecifier
= regionAction
.getRegion();
2257 region
= getRegion(regionSpecifier
);
2258 quota
= getQuotaManager().checkQuota(region
, regionAction
.getActionList());
2259 } catch (IOException e
) {
2260 rpcServer
.getMetrics().exception(e
);
2261 regionActionResultBuilder
.setException(ResponseConverter
.buildException(e
));
2262 responseBuilder
.addRegionActionResult(regionActionResultBuilder
.build());
2263 // All Mutations in this RegionAction not executed as we can not see the Region online here
2264 // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner
2265 // corresponding to these Mutations.
2266 if (cellScanner
!= null) {
2267 skipCellsForMutations(regionAction
.getActionList(), cellScanner
);
2269 continue; // For this region it's a failure.
2272 if (regionAction
.hasAtomic() && regionAction
.getAtomic()) {
2273 // How does this call happen? It may need some work to play well w/ the surroundings.
2274 // Need to return an item per Action along w/ Action index. TODO.
2276 if (request
.hasCondition()) {
2277 Condition condition
= request
.getCondition();
2278 byte[] row
= condition
.getRow().toByteArray();
2279 byte[] family
= condition
.getFamily().toByteArray();
2280 byte[] qualifier
= condition
.getQualifier().toByteArray();
2281 CompareOp compareOp
= CompareOp
.valueOf(condition
.getCompareType().name());
2282 ByteArrayComparable comparator
=
2283 ProtobufUtil
.toComparator(condition
.getComparator());
2284 processed
= checkAndRowMutate(region
, regionAction
.getActionList(),
2285 cellScanner
, row
, family
, qualifier
, compareOp
,
2286 comparator
, regionActionResultBuilder
);
2288 mutateRows(region
, regionAction
.getActionList(), cellScanner
,
2289 regionActionResultBuilder
);
2290 processed
= Boolean
.TRUE
;
2292 } catch (IOException e
) {
2293 rpcServer
.getMetrics().exception(e
);
2294 // As it's atomic, we may expect it's a global failure.
2295 regionActionResultBuilder
.setException(ResponseConverter
.buildException(e
));
2298 // doNonAtomicRegionMutation manages the exception internally
2299 if (context
!= null && closeCallBack
== null) {
2300 // An RpcCallBack that creates a list of scanners that needs to perform callBack
2301 // operation on completion of multiGets.
2302 // Set this only once
2303 closeCallBack
= new RegionScannersCloseCallBack();
2304 context
.setCallBack(closeCallBack
);
2306 cellsToReturn
= doNonAtomicRegionMutation(region
, quota
, regionAction
, cellScanner
,
2307 regionActionResultBuilder
, cellsToReturn
, nonceGroup
, closeCallBack
, context
);
2309 responseBuilder
.addRegionActionResult(regionActionResultBuilder
.build());
2311 ClientProtos
.RegionLoadStats regionLoadStats
= ((HRegion
)region
).getLoadStatistics();
2312 if(regionLoadStats
!= null) {
2313 regionStats
.put(regionSpecifier
, regionLoadStats
);
2316 // Load the controller with the Cells to return.
2317 if (cellsToReturn
!= null && !cellsToReturn
.isEmpty() && controller
!= null) {
2318 controller
.setCellScanner(CellUtil
.createCellScanner(cellsToReturn
));
2321 if (processed
!= null) {
2322 responseBuilder
.setProcessed(processed
);
2325 MultiRegionLoadStats
.Builder builder
= MultiRegionLoadStats
.newBuilder();
2326 for(Entry
<RegionSpecifier
, ClientProtos
.RegionLoadStats
> stat
: regionStats
.entrySet()){
2327 builder
.addRegion(stat
.getKey());
2328 builder
.addStat(stat
.getValue());
2330 responseBuilder
.setRegionStatistics(builder
);
2331 return responseBuilder
.build();
2334 private void skipCellsForMutations(List
<Action
> actions
, CellScanner cellScanner
) {
2335 for (Action action
: actions
) {
2336 skipCellsForMutation(action
, cellScanner
);
2340 private void skipCellsForMutation(Action action
, CellScanner cellScanner
) {
2342 if (action
.hasMutation()) {
2343 MutationProto m
= action
.getMutation();
2344 if (m
.hasAssociatedCellCount()) {
2345 for (int i
= 0; i
< m
.getAssociatedCellCount(); i
++) {
2346 cellScanner
.advance();
2350 } catch (IOException e
) {
2351 // No need to handle these Individual Muatation level issue. Any way this entire RegionAction
2352 // marked as failed as we could not see the Region here. At client side the top level
2353 // RegionAction exception will be considered first.
2354 LOG
.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e
);
2359 * Mutate data in a table.
2361 * @param rpcc the RPC controller
2362 * @param request the mutate request
2363 * @throws ServiceException
2366 public MutateResponse
mutate(final RpcController rpcc
,
2367 final MutateRequest request
) throws ServiceException
{
2368 // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data.
2369 // It is also the conduit via which we pass back data.
2370 PayloadCarryingRpcController controller
= (PayloadCarryingRpcController
)rpcc
;
2371 CellScanner cellScanner
= controller
!= null ? controller
.cellScanner() : null;
2372 OperationQuota quota
= null;
2373 // Clear scanner so we are not holding on to reference across call.
2374 if (controller
!= null) {
2375 controller
.setCellScanner(null);
2379 requestCount
.increment();
2380 rpcMutateRequestCount
.increment();
2381 Region region
= getRegion(request
.getRegion());
2382 MutateResponse
.Builder builder
= MutateResponse
.newBuilder();
2383 MutationProto mutation
= request
.getMutation();
2384 if (!region
.getRegionInfo().isMetaTable()) {
2385 regionServer
.cacheFlusher
.reclaimMemStoreMemory();
2387 long nonceGroup
= request
.hasNonceGroup() ? request
.getNonceGroup() : HConstants
.NO_NONCE
;
2389 Boolean processed
= null;
2390 MutationType type
= mutation
.getMutateType();
2392 quota
= getQuotaManager().checkQuota(region
, OperationQuota
.OperationType
.MUTATE
);
2396 // TODO: this doesn't actually check anything.
2397 r
= append(region
, quota
, mutation
, cellScanner
, nonceGroup
);
2400 // TODO: this doesn't actually check anything.
2401 r
= increment(region
, quota
, mutation
, cellScanner
, nonceGroup
);
2404 Put put
= ProtobufUtil
.toPut(mutation
, cellScanner
);
2405 quota
.addMutation(put
);
2406 if (request
.hasCondition()) {
2407 Condition condition
= request
.getCondition();
2408 byte[] row
= condition
.getRow().toByteArray();
2409 byte[] family
= condition
.getFamily().toByteArray();
2410 byte[] qualifier
= condition
.getQualifier().toByteArray();
2411 CompareOp compareOp
= CompareOp
.valueOf(condition
.getCompareType().name());
2412 ByteArrayComparable comparator
=
2413 ProtobufUtil
.toComparator(condition
.getComparator());
2414 if (region
.getCoprocessorHost() != null) {
2415 processed
= region
.getCoprocessorHost().preCheckAndPut(
2416 row
, family
, qualifier
, compareOp
, comparator
, put
);
2418 if (processed
== null) {
2419 boolean result
= region
.checkAndMutate(row
, family
,
2420 qualifier
, compareOp
, comparator
, put
, true);
2421 if (region
.getCoprocessorHost() != null) {
2422 result
= region
.getCoprocessorHost().postCheckAndPut(row
, family
,
2423 qualifier
, compareOp
, comparator
, put
, result
);
2429 processed
= Boolean
.TRUE
;
2433 Delete delete
= ProtobufUtil
.toDelete(mutation
, cellScanner
);
2434 quota
.addMutation(delete
);
2435 if (request
.hasCondition()) {
2436 Condition condition
= request
.getCondition();
2437 byte[] row
= condition
.getRow().toByteArray();
2438 byte[] family
= condition
.getFamily().toByteArray();
2439 byte[] qualifier
= condition
.getQualifier().toByteArray();
2440 CompareOp compareOp
= CompareOp
.valueOf(condition
.getCompareType().name());
2441 ByteArrayComparable comparator
=
2442 ProtobufUtil
.toComparator(condition
.getComparator());
2443 if (region
.getCoprocessorHost() != null) {
2444 processed
= region
.getCoprocessorHost().preCheckAndDelete(
2445 row
, family
, qualifier
, compareOp
, comparator
, delete
);
2447 if (processed
== null) {
2448 boolean result
= region
.checkAndMutate(row
, family
,
2449 qualifier
, compareOp
, comparator
, delete
, true);
2450 if (region
.getCoprocessorHost() != null) {
2451 result
= region
.getCoprocessorHost().postCheckAndDelete(row
, family
,
2452 qualifier
, compareOp
, comparator
, delete
, result
);
2457 region
.delete(delete
);
2458 processed
= Boolean
.TRUE
;
2462 throw new DoNotRetryIOException(
2463 "Unsupported mutate type: " + type
.name());
2465 if (processed
!= null) {
2466 builder
.setProcessed(processed
.booleanValue());
2468 addResult(builder
, r
, controller
);
2469 return builder
.build();
2470 } catch (IOException ie
) {
2471 regionServer
.checkFileSystem();
2472 throw new ServiceException(ie
);
2474 if (quota
!= null) {
2481 * Scan data in a table.
2483 * @param controller the RPC controller
2484 * @param request the scan request
2485 * @throws ServiceException
2488 public ScanResponse
scan(final RpcController controller
, final ScanRequest request
)
2489 throws ServiceException
{
2490 OperationQuota quota
= null;
2491 Leases
.Lease lease
= null;
2492 String scannerName
= null;
2494 if (!request
.hasScannerId() && !request
.hasScan()) {
2495 throw new DoNotRetryIOException(
2496 "Missing required input: scannerId or scan");
2498 long scannerId
= -1;
2499 if (request
.hasScannerId()) {
2500 scannerId
= request
.getScannerId();
2501 scannerName
= String
.valueOf(scannerId
);
2505 } catch (IOException e
) {
2506 // If checkOpen failed, server not running or filesystem gone,
2507 // cancel this lease; filesystem is gone or we're closing or something.
2508 if (scannerName
!= null) {
2509 LOG
.debug("Server shutting down and client tried to access missing scanner "
2511 if (regionServer
.leases
!= null) {
2513 regionServer
.leases
.cancelLease(scannerName
);
2514 } catch (LeaseException le
) {
2515 // No problem, ignore
2516 if (LOG
.isTraceEnabled()) {
2517 LOG
.trace("Un-able to cancel lease of scanner. It could already be closed.");
2524 requestCount
.increment();
2525 rpcScanRequestCount
.increment();
2528 Region region
= null;
2529 RegionScanner scanner
= null;
2530 RegionScannerHolder rsh
= null;
2531 boolean moreResults
= true;
2532 boolean closeScanner
= false;
2533 boolean isSmallScan
= false;
2534 ScanResponse
.Builder builder
= ScanResponse
.newBuilder();
2535 if (request
.hasCloseScanner()) {
2536 closeScanner
= request
.getCloseScanner();
2538 int rows
= closeScanner ?
0 : 1;
2539 if (request
.hasNumberOfRows()) {
2540 rows
= request
.getNumberOfRows();
2542 if (request
.hasScannerId()) {
2543 rsh
= scanners
.get(scannerName
);
2545 LOG
.warn("Client tried to access missing scanner " + scannerName
);
2546 throw new UnknownScannerException(
2547 "Unknown scanner '" + scannerName
+ "'. This can happen due to any of the following "
2548 + "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of "
2549 + "long wait between consecutive client checkins, c) Server may be closing down, "
2550 + "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a "
2551 + "possible fix would be increasing the value of"
2552 + "'hbase.client.scanner.timeout.period' configuration.");
2555 HRegionInfo hri
= scanner
.getRegionInfo();
2556 region
= regionServer
.getRegion(hri
.getRegionName());
2557 if (region
!= rsh
.r
) { // Yes, should be the same instance
2558 throw new NotServingRegionException("Region was re-opened after the scanner"
2559 + scannerName
+ " was created: " + hri
.getRegionNameAsString());
2562 region
= getRegion(request
.getRegion());
2563 ClientProtos
.Scan protoScan
= request
.getScan();
2564 boolean isLoadingCfsOnDemandSet
= protoScan
.hasLoadColumnFamiliesOnDemand();
2565 Scan scan
= ProtobufUtil
.toScan(protoScan
);
2566 // if the request doesn't set this, get the default region setting.
2567 if (!isLoadingCfsOnDemandSet
) {
2568 scan
.setLoadColumnFamiliesOnDemand(region
.isLoadingCfsOnDemandDefault());
2571 isSmallScan
= scan
.isSmall();
2572 if (!scan
.hasFamilies()) {
2573 // Adding all families to scanner
2574 for (byte[] family
: region
.getTableDesc().getFamiliesKeys()) {
2575 scan
.addFamily(family
);
2579 if (region
.getCoprocessorHost() != null) {
2580 scanner
= region
.getCoprocessorHost().preScannerOpen(scan
);
2582 if (scanner
== null) {
2583 scanner
= region
.getScanner(scan
);
2585 if (region
.getCoprocessorHost() != null) {
2586 scanner
= region
.getCoprocessorHost().postScannerOpen(scan
, scanner
);
2588 scannerId
= this.scannerIdGen
.incrementAndGet();
2589 scannerName
= String
.valueOf(scannerId
);
2590 rsh
= addScanner(scannerName
, scanner
, region
);
2591 ttl
= this.scannerLeaseTimeoutPeriod
;
2593 assert scanner
!= null;
2594 if (request
.hasRenew() && request
.getRenew()) {
2595 rsh
= scanners
.get(scannerName
);
2596 lease
= regionServer
.leases
.removeLease(scannerName
);
2597 if (lease
!= null && rsh
!= null) {
2598 regionServer
.leases
.addLease(lease
);
2599 // Increment the nextCallSeq value which is the next expected from client.
2600 rsh
.incNextCallSeq();
2602 return builder
.build();
2604 RpcCallContext context
= RpcServer
.getCurrentCall();
2605 Object lastBlock
= null;
2607 quota
= getQuotaManager().checkQuota(region
, OperationQuota
.OperationType
.SCAN
);
2608 long maxQuotaResultSize
= Math
.min(maxScannerResultSize
, quota
.getReadAvailable());
2611 // if nextCallSeq does not match throw Exception straight away. This needs to be
2612 // performed even before checking of Lease.
2614 if (request
.hasNextCallSeq()) {
2616 if (request
.getNextCallSeq() != rsh
.getNextCallSeq()) {
2617 throw new OutOfOrderScannerNextException(
2618 "Expected nextCallSeq: " + rsh
.getNextCallSeq()
2619 + " But the nextCallSeq got from client: " + request
.getNextCallSeq() +
2620 "; request=" + TextFormat
.shortDebugString(request
));
2622 // Increment the nextCallSeq value which is the next expected from client.
2623 rsh
.incNextCallSeq();
2627 // Remove lease while its being processed in server; protects against case
2628 // where processing of request takes > lease expiration time.
2629 lease
= regionServer
.leases
.removeLease(scannerName
);
2630 List
<Result
> results
= new ArrayList
<Result
>();
2632 boolean done
= false;
2633 // Call coprocessor. Get region info from scanner.
2634 if (region
!= null && region
.getCoprocessorHost() != null) {
2635 Boolean bypass
= region
.getCoprocessorHost().preScannerNext(
2636 scanner
, results
, rows
);
2637 if (!results
.isEmpty()) {
2638 for (Result r
: results
) {
2639 lastBlock
= addSize(context
, r
, lastBlock
);
2642 if (bypass
!= null && bypass
.booleanValue()) {
2648 long maxResultSize
= Math
.min(scanner
.getMaxResultSize(), maxQuotaResultSize
);
2649 if (maxResultSize
<= 0) {
2650 maxResultSize
= maxQuotaResultSize
;
2652 // This is cells inside a row. Default size is 10 so if many versions or many cfs,
2653 // then we'll resize. Resizings show in profiler. Set it higher than 10. For now
2654 // arbitrary 32. TODO: keep record of general size of results being returned.
2655 List
<Cell
> values
= new ArrayList
<Cell
>(32);
2656 region
.startRegionOperation(Operation
.SCAN
);
2659 long before
= EnvironmentEdgeManager
.currentTime();
2660 synchronized(scanner
) {
2661 boolean stale
= (region
.getRegionInfo().getReplicaId() != 0);
2662 boolean clientHandlesPartials
=
2663 request
.hasClientHandlesPartials() && request
.getClientHandlesPartials();
2664 boolean clientHandlesHeartbeats
=
2665 request
.hasClientHandlesHeartbeats() && request
.getClientHandlesHeartbeats();
2667 // On the server side we must ensure that the correct ordering of partial results is
2668 // returned to the client to allow them to properly reconstruct the partial results.
2669 // If the coprocessor host is adding to the result list, we cannot guarantee the
2670 // correct ordering of partial results and so we prevent partial results from being
2672 boolean serverGuaranteesOrderOfPartials
= results
.isEmpty();
2673 boolean allowPartialResults
=
2674 clientHandlesPartials
&& serverGuaranteesOrderOfPartials
&& !isSmallScan
;
2675 boolean moreRows
= false;
2677 // Heartbeat messages occur when the processing of the ScanRequest is exceeds a
2678 // certain time threshold on the server. When the time threshold is exceeded, the
2679 // server stops the scan and sends back whatever Results it has accumulated within
2680 // that time period (may be empty). Since heartbeat messages have the potential to
2681 // create partial Results (in the event that the timeout occurs in the middle of a
2682 // row), we must only generate heartbeat messages when the client can handle both
2683 // heartbeats AND partials
2684 boolean allowHeartbeatMessages
= clientHandlesHeartbeats
&& allowPartialResults
;
2686 // Default value of timeLimit is negative to indicate no timeLimit should be
2688 long timeLimit
= -1;
2690 // Set the time limit to be half of the more restrictive timeout value (one of the
2691 // timeout values must be positive). In the event that both values are positive, the
2692 // more restrictive of the two is used to calculate the limit.
2693 if (allowHeartbeatMessages
&& (scannerLeaseTimeoutPeriod
> 0 || rpcTimeout
> 0)) {
2694 long timeLimitDelta
;
2695 if (scannerLeaseTimeoutPeriod
> 0 && rpcTimeout
> 0) {
2696 timeLimitDelta
= Math
.min(scannerLeaseTimeoutPeriod
, rpcTimeout
);
2699 scannerLeaseTimeoutPeriod
> 0 ? scannerLeaseTimeoutPeriod
: rpcTimeout
;
2701 if (controller
instanceof TimeLimitedRpcController
) {
2702 TimeLimitedRpcController timeLimitedRpcController
=
2703 (TimeLimitedRpcController
)controller
;
2704 if (timeLimitedRpcController
.getCallTimeout() > 0) {
2705 timeLimitDelta
= Math
.min(timeLimitDelta
,
2706 timeLimitedRpcController
.getCallTimeout());
2709 // Use half of whichever timeout value was more restrictive... But don't allow
2710 // the time limit to be less than the allowable minimum (could cause an
2711 // immediatate timeout before scanning any data).
2712 timeLimitDelta
= Math
.max(timeLimitDelta
/ 2, minimumScanTimeLimitDelta
);
2713 timeLimit
= System
.currentTimeMillis() + timeLimitDelta
;
2716 final LimitScope sizeScope
=
2717 allowPartialResults ? LimitScope
.BETWEEN_CELLS
: LimitScope
.BETWEEN_ROWS
;
2718 final LimitScope timeScope
=
2719 allowHeartbeatMessages ? LimitScope
.BETWEEN_CELLS
: LimitScope
.BETWEEN_ROWS
;
2721 boolean trackMetrics
=
2722 request
.hasTrackScanMetrics() && request
.getTrackScanMetrics();
2724 // Configure with limits for this RPC. Set keep progress true since size progress
2725 // towards size limit should be kept between calls to nextRaw
2726 ScannerContext
.Builder contextBuilder
= ScannerContext
.newBuilder(true);
2727 contextBuilder
.setSizeLimit(sizeScope
, maxResultSize
);
2728 contextBuilder
.setBatchLimit(scanner
.getBatch());
2729 contextBuilder
.setTimeLimit(timeScope
, timeLimit
);
2730 contextBuilder
.setTrackMetrics(trackMetrics
);
2731 ScannerContext scannerContext
= contextBuilder
.build();
2732 boolean limitReached
= false;
2734 // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The
2735 // batch limit is a limit on the number of cells per Result. Thus, if progress is
2736 // being tracked (i.e. scannerContext.keepProgress() is true) then we need to
2737 // reset the batch progress between nextRaw invocations since we don't want the
2738 // batch progress from previous calls to affect future calls
2739 scannerContext
.setBatchProgress(0);
2741 // Collect values to be returned here
2742 moreRows
= scanner
.nextRaw(values
, scannerContext
);
2744 if (!values
.isEmpty()) {
2745 final boolean partial
= scannerContext
.partialResultFormed();
2746 Result r
= Result
.create(values
, null, stale
, partial
);
2747 lastBlock
= addSize(context
, r
, lastBlock
);
2752 boolean sizeLimitReached
= scannerContext
.checkSizeLimit(LimitScope
.BETWEEN_ROWS
);
2753 boolean timeLimitReached
= scannerContext
.checkTimeLimit(LimitScope
.BETWEEN_ROWS
);
2754 boolean rowLimitReached
= i
>= rows
;
2755 limitReached
= sizeLimitReached
|| timeLimitReached
|| rowLimitReached
;
2757 if (limitReached
|| !moreRows
) {
2758 if (LOG
.isTraceEnabled()) {
2759 LOG
.trace("Done scanning. limitReached: " + limitReached
+ " moreRows: "
2760 + moreRows
+ " scannerContext: " + scannerContext
);
2762 // We only want to mark a ScanResponse as a heartbeat message in the event that
2763 // there are more values to be read server side. If there aren't more values,
2764 // marking it as a heartbeat is wasteful because the client will need to issue
2765 // another ScanRequest only to realize that they already have all the values
2767 // Heartbeat messages occur when the time limit has been reached.
2768 builder
.setHeartbeatMessage(timeLimitReached
);
2775 if (limitReached
|| moreRows
) {
2776 // We stopped prematurely
2777 builder
.setMoreResultsInRegion(true);
2779 // We didn't get a single batch
2780 builder
.setMoreResultsInRegion(false);
2783 // Check to see if the client requested that we track metrics server side. If the
2784 // client requested metrics, retrieve the metrics from the scanner context.
2786 Map
<String
, Long
> metrics
= scannerContext
.getMetrics().getMetricsMap();
2787 ScanMetrics
.Builder metricBuilder
= ScanMetrics
.newBuilder();
2788 NameInt64Pair
.Builder pairBuilder
= NameInt64Pair
.newBuilder();
2790 for (Entry
<String
, Long
> entry
: metrics
.entrySet()) {
2791 pairBuilder
.setName(entry
.getKey());
2792 pairBuilder
.setValue(entry
.getValue());
2793 metricBuilder
.addMetrics(pairBuilder
.build());
2796 builder
.setScanMetrics(metricBuilder
.build());
2799 region
.updateReadRequestsCount(i
);
2800 long end
= EnvironmentEdgeManager
.currentTime();
2801 long responseCellSize
= context
!= null ? context
.getResponseCellSize() : 0;
2802 region
.getMetrics().updateScanSize(responseCellSize
);
2803 region
.getMetrics().updateScanTime(end
- before
);
2804 if (regionServer
.metricsRegionServer
!= null) {
2805 regionServer
.metricsRegionServer
.updateScanSize(responseCellSize
);
2806 regionServer
.metricsRegionServer
.updateScanTime(end
- before
);
2809 region
.closeRegionOperation();
2811 // coprocessor postNext hook
2812 if (region
!= null && region
.getCoprocessorHost() != null) {
2813 region
.getCoprocessorHost().postScannerNext(scanner
, results
, rows
, true);
2817 quota
.addScanResult(results
);
2819 // If the scanner's filter - if any - is done with the scan
2820 // and wants to tell the client to stop the scan. This is done by passing
2821 // a null result, and setting moreResults to false.
2822 if (scanner
.isFilterDone() && results
.isEmpty()) {
2823 moreResults
= false;
2826 addResults(builder
, results
, controller
,
2827 RegionReplicaUtil
.isDefaultReplica(region
.getRegionInfo()),
2828 isClientCellBlockSupport(context
));
2830 } catch (IOException e
) {
2831 // if we have an exception on scanner next and we are using the callSeq
2832 // we should rollback because the client will retry with the same callSeq
2833 // and get an OutOfOrderScannerNextException if we don't do so.
2834 if (rsh
!= null && request
.hasNextCallSeq()) {
2835 rsh
.rollbackNextCallSeq();
2839 if (context
!= null) {
2840 context
.setCallBack(rsh
.shippedCallback
);
2842 // Adding resets expiration time on lease.
2843 if (scanners
.containsKey(scannerName
)) {
2844 ttl
= this.scannerLeaseTimeoutPeriod
;
2845 // When context != null, adding back the lease will be done in callback set above.
2846 if (context
== null) {
2847 if (lease
!= null) regionServer
.leases
.addLease(lease
);
2853 if (!moreResults
|| closeScanner
) {
2855 moreResults
= false;
2856 if (region
!= null && region
.getCoprocessorHost() != null) {
2857 if (region
.getCoprocessorHost().preScannerClose(scanner
)) {
2858 return builder
.build(); // bypass
2861 rsh
= scanners
.remove(scannerName
);
2863 if (context
!= null) {
2864 context
.setCallBack(rsh
.closeCallBack
);
2869 regionServer
.leases
.cancelLease(scannerName
);
2870 } catch (LeaseException le
) {
2871 // No problem, ignore
2872 if (LOG
.isTraceEnabled()) {
2873 LOG
.trace("Un-able to cancel lease of scanner. It could already be closed.");
2876 if (region
!= null && region
.getCoprocessorHost() != null) {
2877 region
.getCoprocessorHost().postScannerClose(scanner
);
2883 builder
.setTtl(ttl
);
2885 builder
.setScannerId(scannerId
);
2886 builder
.setMoreResults(moreResults
);
2887 return builder
.build();
2888 } catch (IOException ie
) {
2889 if (scannerName
!= null && ie
instanceof NotServingRegionException
) {
2890 RegionScannerHolder rsh
= scanners
.remove(scannerName
);
2893 RegionScanner scanner
= rsh
.s
;
2894 LOG
.warn(scannerName
+ " encountered " + ie
.getMessage() + ", closing ...");
2896 regionServer
.leases
.cancelLease(scannerName
);
2897 } catch (IOException e
) {
2898 LOG
.warn("Getting exception closing " + scannerName
, e
);
2902 throw new ServiceException(ie
);
2904 if (quota
!= null) {
2911 public CoprocessorServiceResponse
execRegionServerService(RpcController controller
,
2912 CoprocessorServiceRequest request
) throws ServiceException
{
2913 return regionServer
.execRegionServerService(controller
, request
);
2917 public UpdateConfigurationResponse
updateConfiguration(
2918 RpcController controller
, UpdateConfigurationRequest request
)
2919 throws ServiceException
{
2921 this.regionServer
.updateConfiguration();
2922 } catch (Exception e
) {
2923 throw new ServiceException(e
);
2925 return UpdateConfigurationResponse
.getDefaultInstance();