HBASE-16012 Major compaction can't work due to obsolete scanner read point in RegionS...
[hbase.git] / hbase-server / src / main / java / org / apache / hadoop / hbase / regionserver / RSRpcServices.java
blobc3626fdbba0492612cfd176666599b6a8c456730
1 /**
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org.apache.hadoop.hbase.regionserver;
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.net.BindException;
24 import java.net.InetSocketAddress;
25 import java.net.UnknownHostException;
26 import java.nio.ByteBuffer;
27 import java.util.ArrayList;
28 import java.util.Collections;
29 import java.util.HashMap;
30 import java.util.Iterator;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Map.Entry;
34 import java.util.NavigableMap;
35 import java.util.Set;
36 import java.util.TreeSet;
37 import java.util.concurrent.ConcurrentHashMap;
38 import java.util.concurrent.atomic.AtomicLong;
40 import org.apache.commons.logging.Log;
41 import org.apache.commons.logging.LogFactory;
42 import org.apache.hadoop.conf.Configuration;
43 import org.apache.hadoop.hbase.ByteBufferedCell;
44 import org.apache.hadoop.hbase.Cell;
45 import org.apache.hadoop.hbase.CellScannable;
46 import org.apache.hadoop.hbase.CellScanner;
47 import org.apache.hadoop.hbase.CellUtil;
48 import org.apache.hadoop.hbase.DoNotRetryIOException;
49 import org.apache.hadoop.hbase.DroppedSnapshotException;
50 import org.apache.hadoop.hbase.HBaseIOException;
51 import org.apache.hadoop.hbase.HConstants;
52 import org.apache.hadoop.hbase.HRegionInfo;
53 import org.apache.hadoop.hbase.HTableDescriptor;
54 import org.apache.hadoop.hbase.MultiActionResultTooLarge;
55 import org.apache.hadoop.hbase.NotServingRegionException;
56 import org.apache.hadoop.hbase.ServerName;
57 import org.apache.hadoop.hbase.TableName;
58 import org.apache.hadoop.hbase.UnknownScannerException;
59 import org.apache.hadoop.hbase.classification.InterfaceAudience;
60 import org.apache.hadoop.hbase.client.Append;
61 import org.apache.hadoop.hbase.client.ConnectionUtils;
62 import org.apache.hadoop.hbase.client.Delete;
63 import org.apache.hadoop.hbase.client.Durability;
64 import org.apache.hadoop.hbase.client.Get;
65 import org.apache.hadoop.hbase.client.Increment;
66 import org.apache.hadoop.hbase.client.Mutation;
67 import org.apache.hadoop.hbase.client.Put;
68 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
69 import org.apache.hadoop.hbase.client.Result;
70 import org.apache.hadoop.hbase.client.RowMutations;
71 import org.apache.hadoop.hbase.client.Scan;
72 import org.apache.hadoop.hbase.client.VersionInfoUtil;
73 import org.apache.hadoop.hbase.conf.ConfigurationObserver;
74 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
75 import org.apache.hadoop.hbase.exceptions.MergeRegionException;
76 import org.apache.hadoop.hbase.exceptions.OperationConflictException;
77 import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
78 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
79 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
80 import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
81 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
82 import org.apache.hadoop.hbase.ipc.PriorityFunction;
83 import org.apache.hadoop.hbase.ipc.QosPriority;
84 import org.apache.hadoop.hbase.ipc.RpcCallContext;
85 import org.apache.hadoop.hbase.ipc.RpcCallback;
86 import org.apache.hadoop.hbase.ipc.RpcServer;
87 import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
88 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
89 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
90 import org.apache.hadoop.hbase.ipc.ServerRpcController;
91 import org.apache.hadoop.hbase.ipc.TimeLimitedRpcController;
92 import org.apache.hadoop.hbase.master.MasterRpcServices;
93 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
94 import org.apache.hadoop.hbase.protobuf.RequestConverter;
95 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
96 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
97 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
98 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
99 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
100 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionResponse;
101 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
102 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionResponse;
103 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
104 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
105 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
106 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
107 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoRequest;
108 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoResponse;
109 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
110 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileResponse;
111 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsRequest;
112 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsResponse;
113 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
114 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
115 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
116 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
117 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
118 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
119 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest;
120 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse;
121 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
122 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionResponse;
123 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest;
124 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerResponse;
125 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
126 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
127 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
128 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
129 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
130 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest;
131 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionResponse;
132 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
133 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action;
134 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
135 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
136 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
137 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
138 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition;
139 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
140 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
141 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
142 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
143 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRegionLoadStats;
144 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
145 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
146 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
147 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
148 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
149 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
150 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
151 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult;
152 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
153 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
154 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
155 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameInt64Pair;
156 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo;
157 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
158 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
159 import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics;
160 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
161 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
162 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
163 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
164 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
165 import org.apache.hadoop.hbase.quotas.OperationQuota;
166 import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
167 import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
168 import org.apache.hadoop.hbase.regionserver.Leases.Lease;
169 import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
170 import org.apache.hadoop.hbase.regionserver.Region.Operation;
171 import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
172 import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
173 import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
174 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
175 import org.apache.hadoop.hbase.security.User;
176 import org.apache.hadoop.hbase.util.Bytes;
177 import org.apache.hadoop.hbase.util.Counter;
178 import org.apache.hadoop.hbase.util.DNS;
179 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
180 import org.apache.hadoop.hbase.util.Pair;
181 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
182 import org.apache.hadoop.hbase.util.Strings;
183 import org.apache.hadoop.hbase.wal.WAL;
184 import org.apache.hadoop.hbase.wal.WALKey;
185 import org.apache.hadoop.hbase.wal.WALSplitter;
186 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
187 import org.apache.zookeeper.KeeperException;
189 import com.google.common.annotations.VisibleForTesting;
190 import com.google.protobuf.ByteString;
191 import com.google.protobuf.Message;
192 import com.google.protobuf.RpcController;
193 import com.google.protobuf.ServiceException;
194 import com.google.protobuf.TextFormat;
197 * Implements the regionserver RPC services.
199 @InterfaceAudience.Private
200 @SuppressWarnings("deprecation")
201 public class RSRpcServices implements HBaseRPCErrorHandler,
202 AdminService.BlockingInterface, ClientService.BlockingInterface, PriorityFunction,
203 ConfigurationObserver {
204 protected static final Log LOG = LogFactory.getLog(RSRpcServices.class);
206 /** RPC scheduler to use for the region server. */
207 public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
208 "hbase.region.server.rpc.scheduler.factory.class";
211 * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This
212 * configuration exists to prevent the scenario where a time limit is specified to be so
213 * restrictive that the time limit is reached immediately (before any cells are scanned).
215 private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA =
216 "hbase.region.server.rpc.minimum.scan.time.limit.delta";
218 * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA}
220 private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;
222 // Request counter. (Includes requests that are not serviced by regions.)
223 final Counter requestCount = new Counter();
225 // Request counter for rpc get
226 final Counter rpcGetRequestCount = new Counter();
228 // Request counter for rpc scan
229 final Counter rpcScanRequestCount = new Counter();
231 // Request counter for rpc multi
232 final Counter rpcMultiRequestCount = new Counter();
234 // Request counter for rpc mutate
235 final Counter rpcMutateRequestCount = new Counter();
237 // Server to handle client requests.
238 final RpcServerInterface rpcServer;
239 final InetSocketAddress isa;
241 private final HRegionServer regionServer;
242 private final long maxScannerResultSize;
244 // The reference to the priority extraction function
245 private final PriorityFunction priority;
247 private final AtomicLong scannerIdGen = new AtomicLong(0L);
248 private final ConcurrentHashMap<String, RegionScannerHolder> scanners =
249 new ConcurrentHashMap<String, RegionScannerHolder>();
252 * The lease timeout period for client scanners (milliseconds).
254 private final int scannerLeaseTimeoutPeriod;
257 * The RPC timeout period (milliseconds)
259 private final int rpcTimeout;
262 * The minimum allowable delta to use for the scan limit
264 private final long minimumScanTimeLimitDelta;
267 * An Rpc callback for closing a RegionScanner.
269 static class RegionScannerCloseCallBack implements RpcCallback {
271 private final RegionScanner scanner;
273 public RegionScannerCloseCallBack(RegionScanner scanner){
274 this.scanner = scanner;
277 @Override
278 public void run() throws IOException {
279 this.scanner.close();
284 * An Rpc callback for doing shipped() call on a RegionScanner.
286 private class RegionScannerShippedCallBack implements RpcCallback {
288 private final String scannerName;
289 private final RegionScanner scanner;
290 private final Lease lease;
292 public RegionScannerShippedCallBack(String scannerName, RegionScanner scanner, Lease lease) {
293 this.scannerName = scannerName;
294 this.scanner = scanner;
295 this.lease = lease;
298 @Override
299 public void run() throws IOException {
300 this.scanner.shipped();
301 // We're done. On way out re-add the above removed lease. The lease was temp removed for this
302 // Rpc call and we are at end of the call now. Time to add it back.
303 if (scanners.containsKey(scannerName)) {
304 if (lease != null) regionServer.leases.addLease(lease);
310 * An RpcCallBack that creates a list of scanners that needs to perform callBack operation on
311 * completion of multiGets.
313 static class RegionScannersCloseCallBack implements RpcCallback {
314 private final List<RegionScanner> scanners = new ArrayList<RegionScanner>();
316 public void addScanner(RegionScanner scanner) {
317 this.scanners.add(scanner);
320 @Override
321 public void run() {
322 for (RegionScanner scanner : scanners) {
323 try {
324 scanner.close();
325 } catch (IOException e) {
326 LOG.error("Exception while closing the scanner " + scanner, e);
333 * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together.
335 private static class RegionScannerHolder {
336 private AtomicLong nextCallSeq = new AtomicLong(0);
337 private RegionScanner s;
338 private Region r;
339 final RpcCallback closeCallBack;
340 final RpcCallback shippedCallback;
342 public RegionScannerHolder(RegionScanner s, Region r, RpcCallback closeCallBack,
343 RpcCallback shippedCallback) {
344 this.s = s;
345 this.r = r;
346 this.closeCallBack = closeCallBack;
347 this.shippedCallback = shippedCallback;
350 private long getNextCallSeq() {
351 return nextCallSeq.get();
354 private void incNextCallSeq() {
355 nextCallSeq.incrementAndGet();
358 private void rollbackNextCallSeq() {
359 nextCallSeq.decrementAndGet();
364 * Instantiated as a scanner lease. If the lease times out, the scanner is
365 * closed
367 private class ScannerListener implements LeaseListener {
368 private final String scannerName;
370 ScannerListener(final String n) {
371 this.scannerName = n;
374 @Override
375 public void leaseExpired() {
376 RegionScannerHolder rsh = scanners.remove(this.scannerName);
377 if (rsh != null) {
378 RegionScanner s = rsh.s;
379 LOG.info("Scanner " + this.scannerName + " lease expired on region "
380 + s.getRegionInfo().getRegionNameAsString());
381 Region region = null;
382 try {
383 region = regionServer.getRegion(s.getRegionInfo().getRegionName());
384 if (region != null && region.getCoprocessorHost() != null) {
385 region.getCoprocessorHost().preScannerClose(s);
387 } catch (IOException e) {
388 LOG.error("Closing scanner for " + s.getRegionInfo().getRegionNameAsString(), e);
389 } finally {
390 try {
391 s.close();
392 if (region != null && region.getCoprocessorHost() != null) {
393 region.getCoprocessorHost().postScannerClose(s);
395 } catch (IOException e) {
396 LOG.error("Closing scanner for " + s.getRegionInfo().getRegionNameAsString(), e);
399 } else {
400 LOG.warn("Scanner " + this.scannerName + " lease expired, but no related" +
401 " scanner found, hence no chance to close that related scanner!");
406 private static ResultOrException getResultOrException(final ClientProtos.Result r,
407 final int index){
408 return getResultOrException(ResponseConverter.buildActionResult(r), index);
411 private static ResultOrException getResultOrException(final Exception e, final int index) {
412 return getResultOrException(ResponseConverter.buildActionResult(e), index);
415 private static ResultOrException getResultOrException(
416 final ResultOrException.Builder builder, final int index) {
417 return builder.setIndex(index).build();
421 * Starts the nonce operation for a mutation, if needed.
422 * @param mutation Mutation.
423 * @param nonceGroup Nonce group from the request.
424 * @returns Nonce used (can be NO_NONCE).
426 private long startNonceOperation(final MutationProto mutation, long nonceGroup)
427 throws IOException, OperationConflictException {
428 if (regionServer.nonceManager == null || !mutation.hasNonce()) return HConstants.NO_NONCE;
429 boolean canProceed = false;
430 try {
431 canProceed = regionServer.nonceManager.startOperation(
432 nonceGroup, mutation.getNonce(), regionServer);
433 } catch (InterruptedException ex) {
434 throw new InterruptedIOException("Nonce start operation interrupted");
436 if (!canProceed) {
437 // TODO: instead, we could convert append/increment to get w/mvcc
438 String message = "The operation with nonce {" + nonceGroup + ", " + mutation.getNonce()
439 + "} on row [" + Bytes.toString(mutation.getRow().toByteArray())
440 + "] may have already completed";
441 throw new OperationConflictException(message);
443 return mutation.getNonce();
447 * Ends nonce operation for a mutation, if needed.
448 * @param mutation Mutation.
449 * @param nonceGroup Nonce group from the request. Always 0 in initial implementation.
450 * @param success Whether the operation for this nonce has succeeded.
452 private void endNonceOperation(final MutationProto mutation,
453 long nonceGroup, boolean success) {
454 if (regionServer.nonceManager != null && mutation.hasNonce()) {
455 regionServer.nonceManager.endOperation(nonceGroup, mutation.getNonce(), success);
460 * @return True if current call supports cellblocks
462 private boolean isClientCellBlockSupport() {
463 RpcCallContext context = RpcServer.getCurrentCall();
464 return context != null && context.isClientCellBlockSupported();
467 private boolean isClientCellBlockSupport(RpcCallContext context) {
468 return context != null && context.isClientCellBlockSupported();
471 private void addResult(final MutateResponse.Builder builder, final Result result,
472 final PayloadCarryingRpcController rpcc) {
473 if (result == null) return;
474 if (isClientCellBlockSupport()) {
475 builder.setResult(ProtobufUtil.toResultNoData(result));
476 rpcc.setCellScanner(result.cellScanner());
477 } else {
478 ClientProtos.Result pbr = ProtobufUtil.toResult(result);
479 builder.setResult(pbr);
483 private void addResults(final ScanResponse.Builder builder, final List<Result> results,
484 final RpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) {
485 builder.setStale(!isDefaultRegion);
486 if (results == null || results.isEmpty()) return;
487 if (clientCellBlockSupported) {
488 for (Result res : results) {
489 builder.addCellsPerResult(res.size());
490 builder.addPartialFlagPerResult(res.isPartial());
492 ((PayloadCarryingRpcController)controller).
493 setCellScanner(CellUtil.createCellScanner(results));
494 } else {
495 for (Result res: results) {
496 ClientProtos.Result pbr = ProtobufUtil.toResult(res);
497 builder.addResults(pbr);
503 * Mutate a list of rows atomically.
505 * @param region
506 * @param actions
507 * @param cellScanner if non-null, the mutation data -- the Cell content.
508 * @throws IOException
510 private void mutateRows(final Region region,
511 final List<ClientProtos.Action> actions,
512 final CellScanner cellScanner, RegionActionResult.Builder builder) throws IOException {
513 if (!region.getRegionInfo().isMetaTable()) {
514 regionServer.cacheFlusher.reclaimMemStoreMemory();
516 RowMutations rm = null;
517 int i = 0;
518 ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
519 ClientProtos.ResultOrException.newBuilder();
520 for (ClientProtos.Action action: actions) {
521 if (action.hasGet()) {
522 throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
523 action.getGet());
525 MutationType type = action.getMutation().getMutateType();
526 if (rm == null) {
527 rm = new RowMutations(action.getMutation().getRow().toByteArray());
529 switch (type) {
530 case PUT:
531 rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
532 break;
533 case DELETE:
534 rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
535 break;
536 default:
537 throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
539 // To unify the response format with doNonAtomicRegionMutation and read through client's
540 // AsyncProcess we have to add an empty result instance per operation
541 resultOrExceptionOrBuilder.clear();
542 resultOrExceptionOrBuilder.setIndex(i++);
543 builder.addResultOrException(
544 resultOrExceptionOrBuilder.build());
546 region.mutateRow(rm);
550 * Mutate a list of rows atomically.
552 * @param region
553 * @param actions
554 * @param cellScanner if non-null, the mutation data -- the Cell content.
555 * @param row
556 * @param family
557 * @param qualifier
558 * @param compareOp
559 * @param comparator @throws IOException
561 private boolean checkAndRowMutate(final Region region, final List<ClientProtos.Action> actions,
562 final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier,
563 CompareOp compareOp, ByteArrayComparable comparator,
564 RegionActionResult.Builder builder) throws IOException {
565 if (!region.getRegionInfo().isMetaTable()) {
566 regionServer.cacheFlusher.reclaimMemStoreMemory();
568 RowMutations rm = null;
569 int i = 0;
570 ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
571 ClientProtos.ResultOrException.newBuilder();
572 for (ClientProtos.Action action: actions) {
573 if (action.hasGet()) {
574 throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
575 action.getGet());
577 MutationType type = action.getMutation().getMutateType();
578 if (rm == null) {
579 rm = new RowMutations(action.getMutation().getRow().toByteArray());
581 switch (type) {
582 case PUT:
583 rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
584 break;
585 case DELETE:
586 rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
587 break;
588 default:
589 throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
591 // To unify the response format with doNonAtomicRegionMutation and read through client's
592 // AsyncProcess we have to add an empty result instance per operation
593 resultOrExceptionOrBuilder.clear();
594 resultOrExceptionOrBuilder.setIndex(i++);
595 builder.addResultOrException(
596 resultOrExceptionOrBuilder.build());
598 return region.checkAndRowMutate(row, family, qualifier, compareOp,
599 comparator, rm, Boolean.TRUE);
603 * Execute an append mutation.
605 * @param region
606 * @param m
607 * @param cellScanner
608 * @return result to return to client if default operation should be
609 * bypassed as indicated by RegionObserver, null otherwise
610 * @throws IOException
612 private Result append(final Region region, final OperationQuota quota, final MutationProto m,
613 final CellScanner cellScanner, long nonceGroup) throws IOException {
614 long before = EnvironmentEdgeManager.currentTime();
615 Append append = ProtobufUtil.toAppend(m, cellScanner);
616 quota.addMutation(append);
617 Result r = null;
618 if (region.getCoprocessorHost() != null) {
619 r = region.getCoprocessorHost().preAppend(append);
621 if (r == null) {
622 long nonce = startNonceOperation(m, nonceGroup);
623 boolean success = false;
624 try {
625 r = region.append(append, nonceGroup, nonce);
626 success = true;
627 } finally {
628 endNonceOperation(m, nonceGroup, success);
630 if (region.getCoprocessorHost() != null) {
631 region.getCoprocessorHost().postAppend(append, r);
634 if (regionServer.metricsRegionServer != null) {
635 regionServer.metricsRegionServer.updateAppend(
636 EnvironmentEdgeManager.currentTime() - before);
638 return r;
642 * Execute an increment mutation.
644 * @param region
645 * @param mutation
646 * @return the Result
647 * @throws IOException
649 private Result increment(final Region region, final OperationQuota quota,
650 final MutationProto mutation, final CellScanner cells, long nonceGroup)
651 throws IOException {
652 long before = EnvironmentEdgeManager.currentTime();
653 Increment increment = ProtobufUtil.toIncrement(mutation, cells);
654 quota.addMutation(increment);
655 Result r = null;
656 if (region.getCoprocessorHost() != null) {
657 r = region.getCoprocessorHost().preIncrement(increment);
659 if (r == null) {
660 long nonce = startNonceOperation(mutation, nonceGroup);
661 boolean success = false;
662 try {
663 r = region.increment(increment, nonceGroup, nonce);
664 success = true;
665 } finally {
666 endNonceOperation(mutation, nonceGroup, success);
668 if (region.getCoprocessorHost() != null) {
669 r = region.getCoprocessorHost().postIncrement(increment, r);
672 if (regionServer.metricsRegionServer != null) {
673 regionServer.metricsRegionServer.updateIncrement(
674 EnvironmentEdgeManager.currentTime() - before);
676 return r;
680 * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when
681 * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation.
682 * @param region
683 * @param actions
684 * @param cellScanner
685 * @param builder
686 * @param cellsToReturn Could be null. May be allocated in this method. This is what this
687 * method returns as a 'result'.
688 * @param closeCallBack the callback to be used with multigets
689 * @param context the current RpcCallContext
690 * @return Return the <code>cellScanner</code> passed
692 private List<CellScannable> doNonAtomicRegionMutation(final Region region,
693 final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner,
694 final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup,
695 final RegionScannersCloseCallBack closeCallBack, RpcCallContext context) {
696 // Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do
697 // one at a time, we instead pass them in batch. Be aware that the corresponding
698 // ResultOrException instance that matches each Put or Delete is then added down in the
699 // doBatchOp call. We should be staying aligned though the Put and Delete are deferred/batched
700 List<ClientProtos.Action> mutations = null;
701 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
702 IOException sizeIOE = null;
703 Object lastBlock = null;
704 for (ClientProtos.Action action : actions.getActionList()) {
705 ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = null;
706 try {
707 Result r = null;
709 if (context != null
710 && context.isRetryImmediatelySupported()
711 && (context.getResponseCellSize() > maxQuotaResultSize
712 || context.getResponseBlockSize() > maxQuotaResultSize)) {
714 // We're storing the exception since the exception and reason string won't
715 // change after the response size limit is reached.
716 if (sizeIOE == null ) {
717 // We don't need the stack un-winding do don't throw the exception.
718 // Throwing will kill the JVM's JIT.
720 // Instead just create the exception and then store it.
721 sizeIOE = new MultiActionResultTooLarge("Max size exceeded"
722 + " CellSize: " + context.getResponseCellSize()
723 + " BlockSize: " + context.getResponseBlockSize());
725 // Only report the exception once since there's only one request that
726 // caused the exception. Otherwise this number will dominate the exceptions count.
727 rpcServer.getMetrics().exception(sizeIOE);
730 // Now that there's an exception is known to be created
731 // use it for the response.
733 // This will create a copy in the builder.
734 resultOrExceptionBuilder = ResultOrException.newBuilder().
735 setException(ResponseConverter.buildException(sizeIOE));
736 resultOrExceptionBuilder.setIndex(action.getIndex());
737 builder.addResultOrException(resultOrExceptionBuilder.build());
738 if (cellScanner != null) {
739 skipCellsForMutation(action, cellScanner);
741 continue;
743 if (action.hasGet()) {
744 long before = EnvironmentEdgeManager.currentTime();
745 try {
746 Get get = ProtobufUtil.toGet(action.getGet());
747 if (context != null) {
748 r = get(get, ((HRegion) region), closeCallBack, context);
749 } else {
750 r = region.get(get);
752 } finally {
753 if (regionServer.metricsRegionServer != null) {
754 regionServer.metricsRegionServer.updateGet(
755 EnvironmentEdgeManager.currentTime() - before);
758 } else if (action.hasServiceCall()) {
759 resultOrExceptionBuilder = ResultOrException.newBuilder();
760 try {
761 Message result = execServiceOnRegion(region, action.getServiceCall());
762 ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder =
763 ClientProtos.CoprocessorServiceResult.newBuilder();
764 resultOrExceptionBuilder.setServiceResult(
765 serviceResultBuilder.setValue(
766 serviceResultBuilder.getValueBuilder()
767 .setName(result.getClass().getName())
768 .setValue(result.toByteString())));
769 } catch (IOException ioe) {
770 rpcServer.getMetrics().exception(ioe);
771 resultOrExceptionBuilder.setException(ResponseConverter.buildException(ioe));
773 } else if (action.hasMutation()) {
774 MutationType type = action.getMutation().getMutateType();
775 if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null &&
776 !mutations.isEmpty()) {
777 // Flush out any Puts or Deletes already collected.
778 doBatchOp(builder, region, quota, mutations, cellScanner);
779 mutations.clear();
781 switch (type) {
782 case APPEND:
783 r = append(region, quota, action.getMutation(), cellScanner, nonceGroup);
784 break;
785 case INCREMENT:
786 r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup);
787 break;
788 case PUT:
789 case DELETE:
790 // Collect the individual mutations and apply in a batch
791 if (mutations == null) {
792 mutations = new ArrayList<ClientProtos.Action>(actions.getActionCount());
794 mutations.add(action);
795 break;
796 default:
797 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
799 } else {
800 throw new HBaseIOException("Unexpected Action type");
802 if (r != null) {
803 ClientProtos.Result pbResult = null;
804 if (isClientCellBlockSupport(context)) {
805 pbResult = ProtobufUtil.toResultNoData(r);
806 // Hard to guess the size here. Just make a rough guess.
807 if (cellsToReturn == null) {
808 cellsToReturn = new ArrayList<CellScannable>();
810 cellsToReturn.add(r);
811 } else {
812 pbResult = ProtobufUtil.toResult(r);
814 lastBlock = addSize(context, r, lastBlock);
815 resultOrExceptionBuilder =
816 ClientProtos.ResultOrException.newBuilder().setResult(pbResult);
818 // Could get to here and there was no result and no exception. Presumes we added
819 // a Put or Delete to the collecting Mutations List for adding later. In this
820 // case the corresponding ResultOrException instance for the Put or Delete will be added
821 // down in the doBatchOp method call rather than up here.
822 } catch (IOException ie) {
823 rpcServer.getMetrics().exception(ie);
824 resultOrExceptionBuilder = ResultOrException.newBuilder().
825 setException(ResponseConverter.buildException(ie));
827 if (resultOrExceptionBuilder != null) {
828 // Propagate index.
829 resultOrExceptionBuilder.setIndex(action.getIndex());
830 builder.addResultOrException(resultOrExceptionBuilder.build());
833 // Finish up any outstanding mutations
834 if (mutations != null && !mutations.isEmpty()) {
835 doBatchOp(builder, region, quota, mutations, cellScanner);
837 return cellsToReturn;
841 * Execute a list of Put/Delete mutations.
843 * @param builder
844 * @param region
845 * @param mutations
847 private void doBatchOp(final RegionActionResult.Builder builder, final Region region,
848 final OperationQuota quota, final List<ClientProtos.Action> mutations,
849 final CellScanner cells) {
850 Mutation[] mArray = new Mutation[mutations.size()];
851 long before = EnvironmentEdgeManager.currentTime();
852 boolean batchContainsPuts = false, batchContainsDelete = false;
853 try {
854 int i = 0;
855 for (ClientProtos.Action action: mutations) {
856 MutationProto m = action.getMutation();
857 Mutation mutation;
858 if (m.getMutateType() == MutationType.PUT) {
859 mutation = ProtobufUtil.toPut(m, cells);
860 batchContainsPuts = true;
861 } else {
862 mutation = ProtobufUtil.toDelete(m, cells);
863 batchContainsDelete = true;
865 mArray[i++] = mutation;
866 quota.addMutation(mutation);
869 if (!region.getRegionInfo().isMetaTable()) {
870 regionServer.cacheFlusher.reclaimMemStoreMemory();
873 OperationStatus[] codes = region.batchMutate(mArray, HConstants.NO_NONCE,
874 HConstants.NO_NONCE);
875 for (i = 0; i < codes.length; i++) {
876 int index = mutations.get(i).getIndex();
877 Exception e = null;
878 switch (codes[i].getOperationStatusCode()) {
879 case BAD_FAMILY:
880 e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg());
881 builder.addResultOrException(getResultOrException(e, index));
882 break;
884 case SANITY_CHECK_FAILURE:
885 e = new FailedSanityCheckException(codes[i].getExceptionMsg());
886 builder.addResultOrException(getResultOrException(e, index));
887 break;
889 default:
890 e = new DoNotRetryIOException(codes[i].getExceptionMsg());
891 builder.addResultOrException(getResultOrException(e, index));
892 break;
894 case SUCCESS:
895 builder.addResultOrException(getResultOrException(
896 ClientProtos.Result.getDefaultInstance(), index));
897 break;
900 } catch (IOException ie) {
901 for (int i = 0; i < mutations.size(); i++) {
902 builder.addResultOrException(getResultOrException(ie, mutations.get(i).getIndex()));
905 if (regionServer.metricsRegionServer != null) {
906 long after = EnvironmentEdgeManager.currentTime();
907 if (batchContainsPuts) {
908 regionServer.metricsRegionServer.updatePut(after - before);
910 if (batchContainsDelete) {
911 regionServer.metricsRegionServer.updateDelete(after - before);
917 * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of
918 * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse.
919 * @param region
920 * @param mutations
921 * @param replaySeqId
922 * @return an array of OperationStatus which internally contains the OperationStatusCode and the
923 * exceptionMessage if any
924 * @throws IOException
926 private OperationStatus [] doReplayBatchOp(final Region region,
927 final List<WALSplitter.MutationReplay> mutations, long replaySeqId) throws IOException {
928 long before = EnvironmentEdgeManager.currentTime();
929 boolean batchContainsPuts = false, batchContainsDelete = false;
930 try {
931 for (Iterator<WALSplitter.MutationReplay> it = mutations.iterator(); it.hasNext();) {
932 WALSplitter.MutationReplay m = it.next();
934 if (m.type == MutationType.PUT) {
935 batchContainsPuts = true;
936 } else {
937 batchContainsDelete = true;
940 NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap();
941 List<Cell> metaCells = map.get(WALEdit.METAFAMILY);
942 if (metaCells != null && !metaCells.isEmpty()) {
943 for (Cell metaCell : metaCells) {
944 CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell);
945 boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
946 HRegion hRegion = (HRegion)region;
947 if (compactionDesc != null) {
948 // replay the compaction. Remove the files from stores only if we are the primary
949 // region replica (thus own the files)
950 hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica,
951 replaySeqId);
952 continue;
954 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell);
955 if (flushDesc != null && !isDefaultReplica) {
956 hRegion.replayWALFlushMarker(flushDesc, replaySeqId);
957 continue;
959 RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell);
960 if (regionEvent != null && !isDefaultReplica) {
961 hRegion.replayWALRegionEventMarker(regionEvent);
962 continue;
964 BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell);
965 if (bulkLoadEvent != null) {
966 hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent);
967 continue;
970 it.remove();
973 requestCount.add(mutations.size());
974 if (!region.getRegionInfo().isMetaTable()) {
975 regionServer.cacheFlusher.reclaimMemStoreMemory();
977 return region.batchReplay(mutations.toArray(
978 new WALSplitter.MutationReplay[mutations.size()]), replaySeqId);
979 } finally {
980 if (regionServer.metricsRegionServer != null) {
981 long after = EnvironmentEdgeManager.currentTime();
982 if (batchContainsPuts) {
983 regionServer.metricsRegionServer.updatePut(after - before);
985 if (batchContainsDelete) {
986 regionServer.metricsRegionServer.updateDelete(after - before);
992 private void closeAllScanners() {
993 // Close any outstanding scanners. Means they'll get an UnknownScanner
994 // exception next time they come in.
995 for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) {
996 try {
997 e.getValue().s.close();
998 } catch (IOException ioe) {
999 LOG.warn("Closing scanner " + e.getKey(), ioe);
1004 public RSRpcServices(HRegionServer rs) throws IOException {
1005 regionServer = rs;
1007 RpcSchedulerFactory rpcSchedulerFactory;
1008 try {
1009 Class<?> rpcSchedulerFactoryClass = rs.conf.getClass(
1010 REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
1011 SimpleRpcSchedulerFactory.class);
1012 rpcSchedulerFactory = ((RpcSchedulerFactory) rpcSchedulerFactoryClass.newInstance());
1013 } catch (InstantiationException e) {
1014 throw new IllegalArgumentException(e);
1015 } catch (IllegalAccessException e) {
1016 throw new IllegalArgumentException(e);
1018 // Server to handle client requests.
1019 InetSocketAddress initialIsa;
1020 InetSocketAddress bindAddress;
1021 if(this instanceof MasterRpcServices) {
1022 String hostname = getHostname(rs.conf, true);
1023 int port = rs.conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT);
1024 // Creation of a HSA will force a resolve.
1025 initialIsa = new InetSocketAddress(hostname, port);
1026 bindAddress = new InetSocketAddress(rs.conf.get("hbase.master.ipc.address", hostname), port);
1027 } else {
1028 String hostname = getHostname(rs.conf, false);
1029 int port = rs.conf.getInt(HConstants.REGIONSERVER_PORT,
1030 HConstants.DEFAULT_REGIONSERVER_PORT);
1031 // Creation of a HSA will force a resolve.
1032 initialIsa = new InetSocketAddress(hostname, port);
1033 bindAddress = new InetSocketAddress(
1034 rs.conf.get("hbase.regionserver.ipc.address", hostname), port);
1036 if (initialIsa.getAddress() == null) {
1037 throw new IllegalArgumentException("Failed resolve of " + initialIsa);
1039 priority = createPriority();
1040 String name = rs.getProcessName() + "/" + initialIsa.toString();
1041 // Set how many times to retry talking to another server over Connection.
1042 ConnectionUtils.setServerSideHConnectionRetriesConfig(rs.conf, name, LOG);
1043 try {
1044 rpcServer = new RpcServer(rs, name, getServices(),
1045 bindAddress, // use final bindAddress for this server.
1046 rs.conf,
1047 rpcSchedulerFactory.create(rs.conf, this, rs));
1048 } catch (BindException be) {
1049 String configName = (this instanceof MasterRpcServices) ? HConstants.MASTER_PORT :
1050 HConstants.REGIONSERVER_PORT;
1051 throw new IOException(be.getMessage() + ". To switch ports use the '" + configName +
1052 "' configuration property.", be.getCause() != null ? be.getCause() : be);
1055 scannerLeaseTimeoutPeriod = rs.conf.getInt(
1056 HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
1057 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
1058 maxScannerResultSize = rs.conf.getLong(
1059 HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
1060 HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE);
1061 rpcTimeout = rs.conf.getInt(
1062 HConstants.HBASE_RPC_TIMEOUT_KEY,
1063 HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
1064 minimumScanTimeLimitDelta = rs.conf.getLong(
1065 REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
1066 DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA);
1068 InetSocketAddress address = rpcServer.getListenerAddress();
1069 if (address == null) {
1070 throw new IOException("Listener channel is closed");
1072 // Set our address, however we need the final port that was given to rpcServer
1073 isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort());
1074 rpcServer.setErrorHandler(this);
1075 rs.setName(name);
1078 @Override
1079 public void onConfigurationChange(Configuration newConf) {
1080 if (rpcServer instanceof ConfigurationObserver) {
1081 ((ConfigurationObserver)rpcServer).onConfigurationChange(newConf);
1085 protected PriorityFunction createPriority() {
1086 return new AnnotationReadingPriorityFunction(this);
1089 public static String getHostname(Configuration conf, boolean isMaster)
1090 throws UnknownHostException {
1091 String hostname = conf.get(isMaster? HRegionServer.MASTER_HOSTNAME_KEY :
1092 HRegionServer.RS_HOSTNAME_KEY);
1093 if (hostname == null || hostname.isEmpty()) {
1094 String masterOrRS = isMaster ? "master" : "regionserver";
1095 return Strings.domainNamePointerToHostName(DNS.getDefaultHost(
1096 conf.get("hbase." + masterOrRS + ".dns.interface", "default"),
1097 conf.get("hbase." + masterOrRS + ".dns.nameserver", "default")));
1098 } else {
1099 LOG.info("hostname is configured to be " + hostname);
1100 return hostname;
1104 RegionScanner getScanner(long scannerId) {
1105 String scannerIdString = Long.toString(scannerId);
1106 RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
1107 if (scannerHolder != null) {
1108 return scannerHolder.s;
1110 return null;
1114 * Get the vtime associated with the scanner.
1115 * Currently the vtime is the number of "next" calls.
1117 long getScannerVirtualTime(long scannerId) {
1118 String scannerIdString = Long.toString(scannerId);
1119 RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
1120 if (scannerHolder != null) {
1121 return scannerHolder.getNextCallSeq();
1123 return 0L;
1127 * Method to account for the size of retained cells and retained data blocks.
1128 * @return an object that represents the last referenced block from this response.
1130 Object addSize(RpcCallContext context, Result r, Object lastBlock) {
1131 if (context != null && !r.isEmpty()) {
1132 for (Cell c : r.rawCells()) {
1133 context.incrementResponseCellSize(CellUtil.estimatedHeapSizeOf(c));
1135 // Since byte buffers can point all kinds of crazy places it's harder to keep track
1136 // of which blocks are kept alive by what byte buffer.
1137 // So we make a guess.
1138 if (c instanceof ByteBufferedCell) {
1139 ByteBufferedCell bbCell = (ByteBufferedCell) c;
1140 ByteBuffer bb = bbCell.getValueByteBuffer();
1141 if (bb != lastBlock) {
1142 context.incrementResponseBlockSize(bb.capacity());
1143 lastBlock = bb;
1145 } else {
1146 // We're using the last block being the same as the current block as
1147 // a proxy for pointing to a new block. This won't be exact.
1148 // If there are multiple gets that bounce back and forth
1149 // Then it's possible that this will over count the size of
1150 // referenced blocks. However it's better to over count and
1151 // use two rpcs than to OOME the regionserver.
1152 byte[] valueArray = c.getValueArray();
1153 if (valueArray != lastBlock) {
1154 context.incrementResponseBlockSize(valueArray.length);
1155 lastBlock = valueArray;
1161 return lastBlock;
1164 RegionScannerHolder addScanner(String scannerName, RegionScanner s, Region r)
1165 throws LeaseStillHeldException {
1166 Lease lease = regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
1167 new ScannerListener(scannerName));
1168 RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, s, lease);
1169 RpcCallback closeCallback;
1170 if (s instanceof RpcCallback) {
1171 closeCallback = (RpcCallback) s;
1172 } else {
1173 closeCallback = new RegionScannerCloseCallBack(s);
1175 RegionScannerHolder rsh = new RegionScannerHolder(s, r, closeCallback, shippedCallback);
1176 RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh);
1177 assert existing == null : "scannerId must be unique within regionserver's whole lifecycle!";
1178 return rsh;
1182 * Find the HRegion based on a region specifier
1184 * @param regionSpecifier the region specifier
1185 * @return the corresponding region
1186 * @throws IOException if the specifier is not null,
1187 * but failed to find the region
1189 @VisibleForTesting
1190 public Region getRegion(
1191 final RegionSpecifier regionSpecifier) throws IOException {
1192 ByteString value = regionSpecifier.getValue();
1193 RegionSpecifierType type = regionSpecifier.getType();
1194 switch (type) {
1195 case REGION_NAME:
1196 byte[] regionName = value.toByteArray();
1197 String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
1198 return regionServer.getRegionByEncodedName(regionName, encodedRegionName);
1199 case ENCODED_REGION_NAME:
1200 return regionServer.getRegionByEncodedName(value.toStringUtf8());
1201 default:
1202 throw new DoNotRetryIOException(
1203 "Unsupported region specifier type: " + type);
1207 @VisibleForTesting
1208 public PriorityFunction getPriority() {
1209 return priority;
1212 @VisibleForTesting
1213 public Configuration getConfiguration() {
1214 return regionServer.getConfiguration();
1217 private RegionServerQuotaManager getQuotaManager() {
1218 return regionServer.getRegionServerQuotaManager();
1221 void start() {
1222 rpcServer.start();
1225 void stop() {
1226 closeAllScanners();
1227 rpcServer.stop();
1231 * Called to verify that this server is up and running.
1233 * @throws IOException
1235 protected void checkOpen() throws IOException {
1236 if (regionServer.isAborted()) {
1237 throw new RegionServerAbortedException("Server " + regionServer.serverName + " aborting");
1239 if (regionServer.isStopped()) {
1240 throw new RegionServerStoppedException("Server " + regionServer.serverName + " stopping");
1242 if (!regionServer.fsOk) {
1243 throw new RegionServerStoppedException("File system not available");
1245 if (!regionServer.isOnline()) {
1246 throw new ServerNotRunningYetException("Server is not running yet");
1251 * @return list of blocking services and their security info classes that this server supports
1253 protected List<BlockingServiceAndInterface> getServices() {
1254 List<BlockingServiceAndInterface> bssi = new ArrayList<BlockingServiceAndInterface>(2);
1255 bssi.add(new BlockingServiceAndInterface(
1256 ClientService.newReflectiveBlockingService(this),
1257 ClientService.BlockingInterface.class));
1258 bssi.add(new BlockingServiceAndInterface(
1259 AdminService.newReflectiveBlockingService(this),
1260 AdminService.BlockingInterface.class));
1261 return bssi;
1264 public InetSocketAddress getSocketAddress() {
1265 return isa;
1268 @Override
1269 public int getPriority(RequestHeader header, Message param, User user) {
1270 return priority.getPriority(header, param, user);
1273 @Override
1274 public long getDeadline(RequestHeader header, Message param) {
1275 return priority.getDeadline(header, param);
1279 * Check if an OOME and, if so, abort immediately to avoid creating more objects.
1281 * @param e
1283 * @return True if we OOME'd and are aborting.
1285 @Override
1286 public boolean checkOOME(final Throwable e) {
1287 boolean stop = false;
1288 try {
1289 if (e instanceof OutOfMemoryError
1290 || (e.getCause() != null && e.getCause() instanceof OutOfMemoryError)
1291 || (e.getMessage() != null && e.getMessage().contains(
1292 "java.lang.OutOfMemoryError"))) {
1293 stop = true;
1294 LOG.fatal("Run out of memory; " + getClass().getSimpleName()
1295 + " will abort itself immediately", e);
1297 } finally {
1298 if (stop) {
1299 Runtime.getRuntime().halt(1);
1302 return stop;
1306 * Close a region on the region server.
1308 * @param controller the RPC controller
1309 * @param request the request
1310 * @throws ServiceException
1312 @Override
1313 @QosPriority(priority=HConstants.ADMIN_QOS)
1314 public CloseRegionResponse closeRegion(final RpcController controller,
1315 final CloseRegionRequest request) throws ServiceException {
1316 final ServerName sn = (request.hasDestinationServer() ?
1317 ProtobufUtil.toServerName(request.getDestinationServer()) : null);
1319 try {
1320 checkOpen();
1321 if (request.hasServerStartCode()) {
1322 // check that we are the same server that this RPC is intended for.
1323 long serverStartCode = request.getServerStartCode();
1324 if (regionServer.serverName.getStartcode() != serverStartCode) {
1325 throw new ServiceException(new DoNotRetryIOException("This RPC was intended for a " +
1326 "different server with startCode: " + serverStartCode + ", this server is: "
1327 + regionServer.serverName));
1330 final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion());
1332 requestCount.increment();
1333 LOG.info("Close " + encodedRegionName + ", moving to " + sn);
1334 boolean closed = regionServer.closeRegion(encodedRegionName, false, sn);
1335 CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed);
1336 return builder.build();
1337 } catch (IOException ie) {
1338 throw new ServiceException(ie);
1343 * Compact a region on the region server.
1345 * @param controller the RPC controller
1346 * @param request the request
1347 * @throws ServiceException
1349 @Override
1350 @QosPriority(priority=HConstants.ADMIN_QOS)
1351 public CompactRegionResponse compactRegion(final RpcController controller,
1352 final CompactRegionRequest request) throws ServiceException {
1353 try {
1354 checkOpen();
1355 requestCount.increment();
1356 Region region = getRegion(request.getRegion());
1357 region.startRegionOperation(Operation.COMPACT_REGION);
1358 LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString());
1359 boolean major = false;
1360 byte [] family = null;
1361 Store store = null;
1362 if (request.hasFamily()) {
1363 family = request.getFamily().toByteArray();
1364 store = region.getStore(family);
1365 if (store == null) {
1366 throw new ServiceException(new IOException("column family " + Bytes.toString(family)
1367 + " does not exist in region " + region.getRegionInfo().getRegionNameAsString()));
1370 if (request.hasMajor()) {
1371 major = request.getMajor();
1373 if (major) {
1374 if (family != null) {
1375 store.triggerMajorCompaction();
1376 } else {
1377 region.triggerMajorCompaction();
1381 String familyLogMsg = (family != null)?" for column family: " + Bytes.toString(family):"";
1382 if (LOG.isTraceEnabled()) {
1383 LOG.trace("User-triggered compaction requested for region "
1384 + region.getRegionInfo().getRegionNameAsString() + familyLogMsg);
1386 String log = "User-triggered " + (major ? "major " : "") + "compaction" + familyLogMsg;
1387 if(family != null) {
1388 regionServer.compactSplitThread.requestCompaction(region, store, log,
1389 Store.PRIORITY_USER, null, RpcServer.getRequestUser());
1390 } else {
1391 regionServer.compactSplitThread.requestCompaction(region, log,
1392 Store.PRIORITY_USER, null, RpcServer.getRequestUser());
1394 return CompactRegionResponse.newBuilder().build();
1395 } catch (IOException ie) {
1396 throw new ServiceException(ie);
1401 * Flush a region on the region server.
1403 * @param controller the RPC controller
1404 * @param request the request
1405 * @throws ServiceException
1407 @Override
1408 @QosPriority(priority=HConstants.ADMIN_QOS)
1409 public FlushRegionResponse flushRegion(final RpcController controller,
1410 final FlushRegionRequest request) throws ServiceException {
1411 try {
1412 checkOpen();
1413 requestCount.increment();
1414 Region region = getRegion(request.getRegion());
1415 LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString());
1416 boolean shouldFlush = true;
1417 if (request.hasIfOlderThanTs()) {
1418 shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs();
1420 FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder();
1421 if (shouldFlush) {
1422 boolean writeFlushWalMarker = request.hasWriteFlushWalMarker() ?
1423 request.getWriteFlushWalMarker() : false;
1424 // Go behind the curtain so we can manage writing of the flush WAL marker
1425 HRegion.FlushResultImpl flushResult = (HRegion.FlushResultImpl)
1426 ((HRegion)region).flushcache(true, writeFlushWalMarker);
1427 boolean compactionNeeded = flushResult.isCompactionNeeded();
1428 if (compactionNeeded) {
1429 regionServer.compactSplitThread.requestSystemCompaction(region,
1430 "Compaction through user triggered flush");
1432 builder.setFlushed(flushResult.isFlushSucceeded());
1433 builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker);
1435 builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores());
1436 return builder.build();
1437 } catch (DroppedSnapshotException ex) {
1438 // Cache flush can fail in a few places. If it fails in a critical
1439 // section, we get a DroppedSnapshotException and a replay of wal
1440 // is required. Currently the only way to do this is a restart of
1441 // the server.
1442 regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
1443 throw new ServiceException(ex);
1444 } catch (IOException ie) {
1445 throw new ServiceException(ie);
1449 @Override
1450 @QosPriority(priority=HConstants.ADMIN_QOS)
1451 public GetOnlineRegionResponse getOnlineRegion(final RpcController controller,
1452 final GetOnlineRegionRequest request) throws ServiceException {
1453 try {
1454 checkOpen();
1455 requestCount.increment();
1456 Map<String, Region> onlineRegions = regionServer.onlineRegions;
1457 List<HRegionInfo> list = new ArrayList<HRegionInfo>(onlineRegions.size());
1458 for (Region region: onlineRegions.values()) {
1459 list.add(region.getRegionInfo());
1461 Collections.sort(list);
1462 return ResponseConverter.buildGetOnlineRegionResponse(list);
1463 } catch (IOException ie) {
1464 throw new ServiceException(ie);
1468 @Override
1469 @QosPriority(priority=HConstants.ADMIN_QOS)
1470 public GetRegionInfoResponse getRegionInfo(final RpcController controller,
1471 final GetRegionInfoRequest request) throws ServiceException {
1472 try {
1473 checkOpen();
1474 requestCount.increment();
1475 Region region = getRegion(request.getRegion());
1476 HRegionInfo info = region.getRegionInfo();
1477 GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder();
1478 builder.setRegionInfo(HRegionInfo.convert(info));
1479 if (request.hasCompactionState() && request.getCompactionState()) {
1480 builder.setCompactionState(region.getCompactionState());
1482 builder.setIsRecovering(region.isRecovering());
1483 return builder.build();
1484 } catch (IOException ie) {
1485 throw new ServiceException(ie);
1490 * Get some information of the region server.
1492 * @param controller the RPC controller
1493 * @param request the request
1494 * @throws ServiceException
1496 @Override
1497 @QosPriority(priority=HConstants.ADMIN_QOS)
1498 public GetServerInfoResponse getServerInfo(final RpcController controller,
1499 final GetServerInfoRequest request) throws ServiceException {
1500 try {
1501 checkOpen();
1502 } catch (IOException ie) {
1503 throw new ServiceException(ie);
1505 requestCount.increment();
1506 int infoPort = regionServer.infoServer != null ? regionServer.infoServer.getPort() : -1;
1507 return ResponseConverter.buildGetServerInfoResponse(regionServer.serverName, infoPort);
1510 @Override
1511 @QosPriority(priority=HConstants.ADMIN_QOS)
1512 public GetStoreFileResponse getStoreFile(final RpcController controller,
1513 final GetStoreFileRequest request) throws ServiceException {
1514 try {
1515 checkOpen();
1516 Region region = getRegion(request.getRegion());
1517 requestCount.increment();
1518 Set<byte[]> columnFamilies;
1519 if (request.getFamilyCount() == 0) {
1520 columnFamilies = region.getTableDesc().getFamiliesKeys();
1521 } else {
1522 columnFamilies = new TreeSet<byte[]>(Bytes.BYTES_RAWCOMPARATOR);
1523 for (ByteString cf: request.getFamilyList()) {
1524 columnFamilies.add(cf.toByteArray());
1527 int nCF = columnFamilies.size();
1528 List<String> fileList = region.getStoreFileList(
1529 columnFamilies.toArray(new byte[nCF][]));
1530 GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder();
1531 builder.addAllStoreFile(fileList);
1532 return builder.build();
1533 } catch (IOException ie) {
1534 throw new ServiceException(ie);
1539 * Merge regions on the region server.
1541 * @param controller the RPC controller
1542 * @param request the request
1543 * @return merge regions response
1544 * @throws ServiceException
1546 @Override
1547 @QosPriority(priority = HConstants.ADMIN_QOS)
1548 public MergeRegionsResponse mergeRegions(final RpcController controller,
1549 final MergeRegionsRequest request) throws ServiceException {
1550 try {
1551 checkOpen();
1552 requestCount.increment();
1553 Region regionA = getRegion(request.getRegionA());
1554 Region regionB = getRegion(request.getRegionB());
1555 boolean forcible = request.getForcible();
1556 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
1557 regionA.startRegionOperation(Operation.MERGE_REGION);
1558 regionB.startRegionOperation(Operation.MERGE_REGION);
1559 if (regionA.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID ||
1560 regionB.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
1561 throw new ServiceException(new MergeRegionException("Can't merge non-default replicas"));
1563 LOG.info("Receiving merging request for " + regionA + ", " + regionB
1564 + ",forcible=" + forcible);
1565 regionA.flush(true);
1566 regionB.flush(true);
1567 regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible,
1568 masterSystemTime, RpcServer.getRequestUser());
1569 return MergeRegionsResponse.newBuilder().build();
1570 } catch (DroppedSnapshotException ex) {
1571 regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
1572 throw new ServiceException(ex);
1573 } catch (IOException ie) {
1574 throw new ServiceException(ie);
1579 * Open asynchronously a region or a set of regions on the region server.
1581 * The opening is coordinated by ZooKeeper, and this method requires the znode to be created
1582 * before being called. As a consequence, this method should be called only from the master.
1583 * <p>
1584 * Different manages states for the region are:
1585 * </p><ul>
1586 * <li>region not opened: the region opening will start asynchronously.</li>
1587 * <li>a close is already in progress: this is considered as an error.</li>
1588 * <li>an open is already in progress: this new open request will be ignored. This is important
1589 * because the Master can do multiple requests if it crashes.</li>
1590 * <li>the region is already opened: this new open request will be ignored.</li>
1591 * </ul>
1592 * <p>
1593 * Bulk assign: If there are more than 1 region to open, it will be considered as a bulk assign.
1594 * For a single region opening, errors are sent through a ServiceException. For bulk assign,
1595 * errors are put in the response as FAILED_OPENING.
1596 * </p>
1597 * @param controller the RPC controller
1598 * @param request the request
1599 * @throws ServiceException
1601 @Override
1602 @QosPriority(priority=HConstants.ADMIN_QOS)
1603 public OpenRegionResponse openRegion(final RpcController controller,
1604 final OpenRegionRequest request) throws ServiceException {
1605 requestCount.increment();
1606 if (request.hasServerStartCode()) {
1607 // check that we are the same server that this RPC is intended for.
1608 long serverStartCode = request.getServerStartCode();
1609 if (regionServer.serverName.getStartcode() != serverStartCode) {
1610 throw new ServiceException(new DoNotRetryIOException("This RPC was intended for a " +
1611 "different server with startCode: " + serverStartCode + ", this server is: "
1612 + regionServer.serverName));
1616 OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder();
1617 final int regionCount = request.getOpenInfoCount();
1618 final Map<TableName, HTableDescriptor> htds =
1619 new HashMap<TableName, HTableDescriptor>(regionCount);
1620 final boolean isBulkAssign = regionCount > 1;
1621 try {
1622 checkOpen();
1623 } catch (IOException ie) {
1624 TableName tableName = null;
1625 if (regionCount == 1) {
1626 RegionInfo ri = request.getOpenInfo(0).getRegion();
1627 if (ri != null) {
1628 tableName = ProtobufUtil.toTableName(ri.getTableName());
1631 if (!TableName.META_TABLE_NAME.equals(tableName)) {
1632 throw new ServiceException(ie);
1634 // We are assigning meta, wait a little for regionserver to finish initialization.
1635 int timeout = regionServer.conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
1636 HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2; // Quarter of RPC timeout
1637 long endTime = System.currentTimeMillis() + timeout;
1638 synchronized (regionServer.online) {
1639 try {
1640 while (System.currentTimeMillis() <= endTime
1641 && !regionServer.isStopped() && !regionServer.isOnline()) {
1642 regionServer.online.wait(regionServer.msgInterval);
1644 checkOpen();
1645 } catch (InterruptedException t) {
1646 Thread.currentThread().interrupt();
1647 throw new ServiceException(t);
1648 } catch (IOException e) {
1649 throw new ServiceException(e);
1654 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
1656 for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
1657 final HRegionInfo region = HRegionInfo.convert(regionOpenInfo.getRegion());
1658 HTableDescriptor htd;
1659 try {
1660 String encodedName = region.getEncodedName();
1661 byte[] encodedNameBytes = region.getEncodedNameAsBytes();
1662 final Region onlineRegion = regionServer.getFromOnlineRegions(encodedName);
1663 if (onlineRegion != null) {
1664 // The region is already online. This should not happen any more.
1665 String error = "Received OPEN for the region:"
1666 + region.getRegionNameAsString() + ", which is already online";
1667 regionServer.abort(error);
1668 throw new IOException(error);
1670 LOG.info("Open " + region.getRegionNameAsString());
1671 htd = htds.get(region.getTable());
1672 if (htd == null) {
1673 htd = regionServer.tableDescriptors.get(region.getTable());
1674 htds.put(region.getTable(), htd);
1677 final Boolean previous = regionServer.regionsInTransitionInRS.putIfAbsent(
1678 encodedNameBytes, Boolean.TRUE);
1680 if (Boolean.FALSE.equals(previous)) {
1681 if (regionServer.getFromOnlineRegions(encodedName) != null) {
1682 // There is a close in progress. This should not happen any more.
1683 String error = "Received OPEN for the region:"
1684 + region.getRegionNameAsString() + ", which we are already trying to CLOSE";
1685 regionServer.abort(error);
1686 throw new IOException(error);
1688 regionServer.regionsInTransitionInRS.put(encodedNameBytes, Boolean.TRUE);
1691 if (Boolean.TRUE.equals(previous)) {
1692 // An open is in progress. This is supported, but let's log this.
1693 LOG.info("Receiving OPEN for the region:" +
1694 region.getRegionNameAsString() + ", which we are already trying to OPEN"
1695 + " - ignoring this new request for this region.");
1698 // We are opening this region. If it moves back and forth for whatever reason, we don't
1699 // want to keep returning the stale moved record while we are opening/if we close again.
1700 regionServer.removeFromMovedRegions(region.getEncodedName());
1702 if (previous == null || !previous.booleanValue()) {
1703 // check if the region to be opened is marked in recovering state in ZK
1704 if (ZKSplitLog.isRegionMarkedRecoveringInZK(regionServer.getZooKeeper(),
1705 region.getEncodedName())) {
1706 // Check if current region open is for distributedLogReplay. This check is to support
1707 // rolling restart/upgrade where we want to Master/RS see same configuration
1708 if (!regionOpenInfo.hasOpenForDistributedLogReplay()
1709 || regionOpenInfo.getOpenForDistributedLogReplay()) {
1710 regionServer.recoveringRegions.put(region.getEncodedName(), null);
1711 } else {
1712 // Remove stale recovery region from ZK when we open region not for recovering which
1713 // could happen when turn distributedLogReplay off from on.
1714 List<String> tmpRegions = new ArrayList<String>();
1715 tmpRegions.add(region.getEncodedName());
1716 ZKSplitLog.deleteRecoveringRegionZNodes(regionServer.getZooKeeper(),
1717 tmpRegions);
1720 // If there is no action in progress, we can submit a specific handler.
1721 // Need to pass the expected version in the constructor.
1722 if (region.isMetaRegion()) {
1723 regionServer.service.submit(new OpenMetaHandler(
1724 regionServer, regionServer, region, htd, masterSystemTime));
1725 } else {
1726 regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(),
1727 regionOpenInfo.getFavoredNodesList());
1728 regionServer.service.submit(new OpenRegionHandler(
1729 regionServer, regionServer, region, htd, masterSystemTime));
1733 builder.addOpeningState(RegionOpeningState.OPENED);
1735 } catch (KeeperException zooKeeperEx) {
1736 LOG.error("Can't retrieve recovering state from zookeeper", zooKeeperEx);
1737 throw new ServiceException(zooKeeperEx);
1738 } catch (IOException ie) {
1739 LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie);
1740 if (isBulkAssign) {
1741 builder.addOpeningState(RegionOpeningState.FAILED_OPENING);
1742 } else {
1743 throw new ServiceException(ie);
1747 return builder.build();
1751 * Wamrmup a region on this server.
1753 * This method should only be called by Master. It synchrnously opens the region and
1754 * closes the region bringing the most important pages in cache.
1755 * <p>
1757 * @param controller the RPC controller
1758 * @param request the request
1759 * @throws ServiceException
1761 @Override
1762 public WarmupRegionResponse warmupRegion(final RpcController controller,
1763 final WarmupRegionRequest request) throws ServiceException {
1765 RegionInfo regionInfo = request.getRegionInfo();
1766 final HRegionInfo region = HRegionInfo.convert(regionInfo);
1767 HTableDescriptor htd;
1768 WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance();
1770 try {
1771 checkOpen();
1772 String encodedName = region.getEncodedName();
1773 byte[] encodedNameBytes = region.getEncodedNameAsBytes();
1774 final Region onlineRegion = regionServer.getFromOnlineRegions(encodedName);
1776 if (onlineRegion != null) {
1777 LOG.info("Region already online. Skipping warming up " + region);
1778 return response;
1781 if (LOG.isDebugEnabled()) {
1782 LOG.debug("Warming up Region " + region.getRegionNameAsString());
1785 htd = regionServer.tableDescriptors.get(region.getTable());
1787 if (regionServer.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) {
1788 LOG.info("Region is in transition. Skipping warmup " + region);
1789 return response;
1792 HRegion.warmupHRegion(region, htd, regionServer.getWAL(region),
1793 regionServer.getConfiguration(), regionServer, null);
1795 } catch (IOException ie) {
1796 LOG.error("Failed warming up region " + region.getRegionNameAsString(), ie);
1797 throw new ServiceException(ie);
1800 return response;
1804 * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is
1805 * that the given mutations will be durable on the receiving RS if this method returns without any
1806 * exception.
1807 * @param controller the RPC controller
1808 * @param request the request
1809 * @throws ServiceException
1811 @Override
1812 @QosPriority(priority = HConstants.REPLAY_QOS)
1813 public ReplicateWALEntryResponse replay(final RpcController controller,
1814 final ReplicateWALEntryRequest request) throws ServiceException {
1815 long before = EnvironmentEdgeManager.currentTime();
1816 CellScanner cells = ((PayloadCarryingRpcController) controller).cellScanner();
1817 try {
1818 checkOpen();
1819 List<WALEntry> entries = request.getEntryList();
1820 if (entries == null || entries.isEmpty()) {
1821 // empty input
1822 return ReplicateWALEntryResponse.newBuilder().build();
1824 ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
1825 Region region = regionServer.getRegionByEncodedName(regionName.toStringUtf8());
1826 RegionCoprocessorHost coprocessorHost =
1827 ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())
1828 ? region.getCoprocessorHost()
1829 : null; // do not invoke coprocessors if this is a secondary region replica
1830 List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<Pair<WALKey, WALEdit>>();
1832 // Skip adding the edits to WAL if this is a secondary region replica
1833 boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
1834 Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL;
1836 for (WALEntry entry : entries) {
1837 if (!regionName.equals(entry.getKey().getEncodedRegionName())) {
1838 throw new NotServingRegionException("Replay request contains entries from multiple " +
1839 "regions. First region:" + regionName.toStringUtf8() + " , other region:"
1840 + entry.getKey().getEncodedRegionName());
1842 if (regionServer.nonceManager != null && isPrimary) {
1843 long nonceGroup = entry.getKey().hasNonceGroup()
1844 ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
1845 long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
1846 regionServer.nonceManager.reportOperationFromWal(
1847 nonceGroup,
1848 nonce,
1849 entry.getKey().getWriteTime());
1851 Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null :
1852 new Pair<WALKey, WALEdit>();
1853 List<WALSplitter.MutationReplay> edits = WALSplitter.getMutationsFromWALEntry(entry,
1854 cells, walEntry, durability);
1855 if (coprocessorHost != null) {
1856 // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a
1857 // KeyValue.
1858 if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(),
1859 walEntry.getSecond())) {
1860 // if bypass this log entry, ignore it ...
1861 continue;
1863 walEntries.add(walEntry);
1865 if(edits!=null && !edits.isEmpty()) {
1866 long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
1867 entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
1868 OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId);
1869 // check if it's a partial success
1870 for (int i = 0; result != null && i < result.length; i++) {
1871 if (result[i] != OperationStatus.SUCCESS) {
1872 throw new IOException(result[i].getExceptionMsg());
1878 //sync wal at the end because ASYNC_WAL is used above
1879 WAL wal = getWAL(region);
1880 if (wal != null) {
1881 wal.sync();
1884 if (coprocessorHost != null) {
1885 for (Pair<WALKey, WALEdit> entry : walEntries) {
1886 coprocessorHost.postWALRestore(region.getRegionInfo(), entry.getFirst(),
1887 entry.getSecond());
1890 return ReplicateWALEntryResponse.newBuilder().build();
1891 } catch (IOException ie) {
1892 throw new ServiceException(ie);
1893 } finally {
1894 if (regionServer.metricsRegionServer != null) {
1895 regionServer.metricsRegionServer.updateReplay(
1896 EnvironmentEdgeManager.currentTime() - before);
1901 WAL getWAL(Region region) {
1902 return ((HRegion)region).getWAL();
1906 * Replicate WAL entries on the region server.
1908 * @param controller the RPC controller
1909 * @param request the request
1910 * @throws ServiceException
1912 @Override
1913 @QosPriority(priority=HConstants.REPLICATION_QOS)
1914 public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
1915 final ReplicateWALEntryRequest request) throws ServiceException {
1916 try {
1917 checkOpen();
1918 if (regionServer.replicationSinkHandler != null) {
1919 requestCount.increment();
1920 List<WALEntry> entries = request.getEntryList();
1921 CellScanner cellScanner = ((PayloadCarryingRpcController)controller).cellScanner();
1922 regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(entries, cellScanner);
1923 regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner,
1924 request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(),
1925 request.getSourceHFileArchiveDirPath());
1926 regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries(entries, cellScanner);
1927 return ReplicateWALEntryResponse.newBuilder().build();
1928 } else {
1929 throw new ServiceException("Replication services are not initialized yet");
1931 } catch (IOException ie) {
1932 throw new ServiceException(ie);
1937 * Roll the WAL writer of the region server.
1938 * @param controller the RPC controller
1939 * @param request the request
1940 * @throws ServiceException
1942 @Override
1943 public RollWALWriterResponse rollWALWriter(final RpcController controller,
1944 final RollWALWriterRequest request) throws ServiceException {
1945 try {
1946 checkOpen();
1947 requestCount.increment();
1948 regionServer.getRegionServerCoprocessorHost().preRollWALWriterRequest();
1949 regionServer.walRoller.requestRollAll();
1950 regionServer.getRegionServerCoprocessorHost().postRollWALWriterRequest();
1951 RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder();
1952 return builder.build();
1953 } catch (IOException ie) {
1954 throw new ServiceException(ie);
1959 * Split a region on the region server.
1961 * @param controller the RPC controller
1962 * @param request the request
1963 * @throws ServiceException
1965 @Override
1966 @QosPriority(priority=HConstants.ADMIN_QOS)
1967 public SplitRegionResponse splitRegion(final RpcController controller,
1968 final SplitRegionRequest request) throws ServiceException {
1969 try {
1970 checkOpen();
1971 requestCount.increment();
1972 Region region = getRegion(request.getRegion());
1973 region.startRegionOperation(Operation.SPLIT_REGION);
1974 if (region.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
1975 throw new IOException("Can't split replicas directly. "
1976 + "Replicas are auto-split when their primary is split.");
1978 LOG.info("Splitting " + region.getRegionInfo().getRegionNameAsString());
1979 region.flush(true);
1980 byte[] splitPoint = null;
1981 if (request.hasSplitPoint()) {
1982 splitPoint = request.getSplitPoint().toByteArray();
1984 ((HRegion)region).forceSplit(splitPoint);
1985 regionServer.compactSplitThread.requestSplit(region, ((HRegion)region).checkSplit(),
1986 RpcServer.getRequestUser());
1987 return SplitRegionResponse.newBuilder().build();
1988 } catch (DroppedSnapshotException ex) {
1989 regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
1990 throw new ServiceException(ex);
1991 } catch (IOException ie) {
1992 throw new ServiceException(ie);
1997 * Stop the region server.
1999 * @param controller the RPC controller
2000 * @param request the request
2001 * @throws ServiceException
2003 @Override
2004 @QosPriority(priority=HConstants.ADMIN_QOS)
2005 public StopServerResponse stopServer(final RpcController controller,
2006 final StopServerRequest request) throws ServiceException {
2007 requestCount.increment();
2008 String reason = request.getReason();
2009 regionServer.stop(reason);
2010 return StopServerResponse.newBuilder().build();
2013 @Override
2014 public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
2015 UpdateFavoredNodesRequest request) throws ServiceException {
2016 List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList();
2017 UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder();
2018 for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) {
2019 HRegionInfo hri = HRegionInfo.convert(regionUpdateInfo.getRegion());
2020 regionServer.updateRegionFavoredNodesMapping(hri.getEncodedName(),
2021 regionUpdateInfo.getFavoredNodesList());
2023 respBuilder.setResponse(openInfoList.size());
2024 return respBuilder.build();
2028 * Atomically bulk load several HFiles into an open region
2029 * @return true if successful, false is failed but recoverably (no action)
2030 * @throws ServiceException if failed unrecoverably
2032 @Override
2033 public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
2034 final BulkLoadHFileRequest request) throws ServiceException {
2035 try {
2036 checkOpen();
2037 requestCount.increment();
2038 Region region = getRegion(request.getRegion());
2039 List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>();
2040 for (FamilyPath familyPath: request.getFamilyPathList()) {
2041 familyPaths.add(new Pair<byte[], String>(familyPath.getFamily().toByteArray(),
2042 familyPath.getPath()));
2044 boolean bypass = false;
2045 if (region.getCoprocessorHost() != null) {
2046 bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
2048 boolean loaded = false;
2049 if (!bypass) {
2050 loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null);
2052 if (region.getCoprocessorHost() != null) {
2053 loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
2055 BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
2056 builder.setLoaded(loaded);
2057 return builder.build();
2058 } catch (IOException ie) {
2059 throw new ServiceException(ie);
2063 @Override
2064 public CoprocessorServiceResponse execService(final RpcController controller,
2065 final CoprocessorServiceRequest request) throws ServiceException {
2066 try {
2067 checkOpen();
2068 requestCount.increment();
2069 Region region = getRegion(request.getRegion());
2070 Message result = execServiceOnRegion(region, request.getCall());
2071 CoprocessorServiceResponse.Builder builder =
2072 CoprocessorServiceResponse.newBuilder();
2073 builder.setRegion(RequestConverter.buildRegionSpecifier(
2074 RegionSpecifierType.REGION_NAME, region.getRegionInfo().getRegionName()));
2075 builder.setValue(
2076 builder.getValueBuilder().setName(result.getClass().getName())
2077 .setValue(result.toByteString()));
2078 return builder.build();
2079 } catch (IOException ie) {
2080 throw new ServiceException(ie);
2084 private Message execServiceOnRegion(Region region,
2085 final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException {
2086 // ignore the passed in controller (from the serialized call)
2087 ServerRpcController execController = new ServerRpcController();
2088 return region.execService(execController, serviceCall);
2092 * Get data from a table.
2094 * @param controller the RPC controller
2095 * @param request the get request
2096 * @throws ServiceException
2098 @Override
2099 public GetResponse get(final RpcController controller,
2100 final GetRequest request) throws ServiceException {
2101 long before = EnvironmentEdgeManager.currentTime();
2102 OperationQuota quota = null;
2103 try {
2104 checkOpen();
2105 requestCount.increment();
2106 rpcGetRequestCount.increment();
2107 Region region = getRegion(request.getRegion());
2109 GetResponse.Builder builder = GetResponse.newBuilder();
2110 ClientProtos.Get get = request.getGet();
2111 Boolean existence = null;
2112 Result r = null;
2113 RpcCallContext context = RpcServer.getCurrentCall();
2114 quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.GET);
2116 Get clientGet = ProtobufUtil.toGet(get);
2117 if (get.getExistenceOnly() && region.getCoprocessorHost() != null) {
2118 existence = region.getCoprocessorHost().preExists(clientGet);
2120 if (existence == null) {
2121 if (context != null) {
2122 r = get(clientGet, ((HRegion) region), null, context);
2123 } else {
2124 // for test purpose
2125 r = region.get(clientGet);
2127 if (get.getExistenceOnly()) {
2128 boolean exists = r.getExists();
2129 if (region.getCoprocessorHost() != null) {
2130 exists = region.getCoprocessorHost().postExists(clientGet, exists);
2132 existence = exists;
2135 if (existence != null) {
2136 ClientProtos.Result pbr =
2137 ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0);
2138 builder.setResult(pbr);
2139 } else if (r != null) {
2140 ClientProtos.Result pbr;
2141 RpcCallContext call = RpcServer.getCurrentCall();
2142 if (isClientCellBlockSupport(call) && controller instanceof PayloadCarryingRpcController
2143 && VersionInfoUtil.hasMinimumVersion(call.getClientVersionInfo(), 1, 3)) {
2144 pbr = ProtobufUtil.toResultNoData(r);
2145 ((PayloadCarryingRpcController) controller).setCellScanner(CellUtil.createCellScanner(r
2146 .rawCells()));
2147 } else {
2148 pbr = ProtobufUtil.toResult(r);
2150 builder.setResult(pbr);
2152 if (r != null) {
2153 quota.addGetResult(r);
2155 return builder.build();
2156 } catch (IOException ie) {
2157 throw new ServiceException(ie);
2158 } finally {
2159 if (regionServer.metricsRegionServer != null) {
2160 regionServer.metricsRegionServer.updateGet(EnvironmentEdgeManager.currentTime() - before);
2162 if (quota != null) {
2163 quota.close();
2168 private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCallBack,
2169 RpcCallContext context) throws IOException {
2170 region.prepareGet(get);
2171 List<Cell> results = new ArrayList<Cell>();
2172 boolean stale = region.getRegionInfo().getReplicaId() != 0;
2173 // pre-get CP hook
2174 if (region.getCoprocessorHost() != null) {
2175 if (region.getCoprocessorHost().preGet(get, results)) {
2176 return Result
2177 .create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
2180 long before = EnvironmentEdgeManager.currentTime();
2181 Scan scan = new Scan(get);
2183 RegionScanner scanner = null;
2184 try {
2185 scanner = region.getScanner(scan);
2186 scanner.next(results);
2187 } finally {
2188 if (scanner != null) {
2189 if (closeCallBack == null) {
2190 // If there is a context then the scanner can be added to the current
2191 // RpcCallContext. The rpc callback will take care of closing the
2192 // scanner, for eg in case
2193 // of get()
2194 assert scanner instanceof org.apache.hadoop.hbase.ipc.RpcCallback;
2195 context.setCallBack((RegionScannerImpl) scanner);
2196 } else {
2197 // The call is from multi() where the results from the get() are
2198 // aggregated and then send out to the
2199 // rpc. The rpccall back will close all such scanners created as part
2200 // of multi().
2201 closeCallBack.addScanner(scanner);
2206 // post-get CP hook
2207 if (region.getCoprocessorHost() != null) {
2208 region.getCoprocessorHost().postGet(get, results);
2210 region.metricsUpdateForGet(results, before);
2211 return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
2215 * Execute multiple actions on a table: get, mutate, and/or execCoprocessor
2217 * @param rpcc the RPC controller
2218 * @param request the multi request
2219 * @throws ServiceException
2221 @Override
2222 public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
2223 throws ServiceException {
2224 try {
2225 checkOpen();
2226 } catch (IOException ie) {
2227 throw new ServiceException(ie);
2230 // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data.
2231 // It is also the conduit via which we pass back data.
2232 PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
2233 CellScanner cellScanner = controller != null ? controller.cellScanner(): null;
2234 if (controller != null) {
2235 controller.setCellScanner(null);
2238 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2240 // this will contain all the cells that we need to return. It's created later, if needed.
2241 List<CellScannable> cellsToReturn = null;
2242 MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();
2243 RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
2244 Boolean processed = null;
2245 RegionScannersCloseCallBack closeCallBack = null;
2246 RpcCallContext context = RpcServer.getCurrentCall();
2247 this.rpcMultiRequestCount.increment();
2248 Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats = new HashMap<>(request
2249 .getRegionActionCount());
2250 for (RegionAction regionAction : request.getRegionActionList()) {
2251 this.requestCount.add(regionAction.getActionCount());
2252 OperationQuota quota;
2253 Region region;
2254 regionActionResultBuilder.clear();
2255 RegionSpecifier regionSpecifier = regionAction.getRegion();
2256 try {
2257 region = getRegion(regionSpecifier);
2258 quota = getQuotaManager().checkQuota(region, regionAction.getActionList());
2259 } catch (IOException e) {
2260 rpcServer.getMetrics().exception(e);
2261 regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2262 responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2263 // All Mutations in this RegionAction not executed as we can not see the Region online here
2264 // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner
2265 // corresponding to these Mutations.
2266 if (cellScanner != null) {
2267 skipCellsForMutations(regionAction.getActionList(), cellScanner);
2269 continue; // For this region it's a failure.
2272 if (regionAction.hasAtomic() && regionAction.getAtomic()) {
2273 // How does this call happen? It may need some work to play well w/ the surroundings.
2274 // Need to return an item per Action along w/ Action index. TODO.
2275 try {
2276 if (request.hasCondition()) {
2277 Condition condition = request.getCondition();
2278 byte[] row = condition.getRow().toByteArray();
2279 byte[] family = condition.getFamily().toByteArray();
2280 byte[] qualifier = condition.getQualifier().toByteArray();
2281 CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
2282 ByteArrayComparable comparator =
2283 ProtobufUtil.toComparator(condition.getComparator());
2284 processed = checkAndRowMutate(region, regionAction.getActionList(),
2285 cellScanner, row, family, qualifier, compareOp,
2286 comparator, regionActionResultBuilder);
2287 } else {
2288 mutateRows(region, regionAction.getActionList(), cellScanner,
2289 regionActionResultBuilder);
2290 processed = Boolean.TRUE;
2292 } catch (IOException e) {
2293 rpcServer.getMetrics().exception(e);
2294 // As it's atomic, we may expect it's a global failure.
2295 regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2297 } else {
2298 // doNonAtomicRegionMutation manages the exception internally
2299 if (context != null && closeCallBack == null) {
2300 // An RpcCallBack that creates a list of scanners that needs to perform callBack
2301 // operation on completion of multiGets.
2302 // Set this only once
2303 closeCallBack = new RegionScannersCloseCallBack();
2304 context.setCallBack(closeCallBack);
2306 cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner,
2307 regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context);
2309 responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2310 quota.close();
2311 ClientProtos.RegionLoadStats regionLoadStats = ((HRegion)region).getLoadStatistics();
2312 if(regionLoadStats != null) {
2313 regionStats.put(regionSpecifier, regionLoadStats);
2316 // Load the controller with the Cells to return.
2317 if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {
2318 controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
2321 if (processed != null) {
2322 responseBuilder.setProcessed(processed);
2325 MultiRegionLoadStats.Builder builder = MultiRegionLoadStats.newBuilder();
2326 for(Entry<RegionSpecifier, ClientProtos.RegionLoadStats> stat: regionStats.entrySet()){
2327 builder.addRegion(stat.getKey());
2328 builder.addStat(stat.getValue());
2330 responseBuilder.setRegionStatistics(builder);
2331 return responseBuilder.build();
2334 private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) {
2335 for (Action action : actions) {
2336 skipCellsForMutation(action, cellScanner);
2340 private void skipCellsForMutation(Action action, CellScanner cellScanner) {
2341 try {
2342 if (action.hasMutation()) {
2343 MutationProto m = action.getMutation();
2344 if (m.hasAssociatedCellCount()) {
2345 for (int i = 0; i < m.getAssociatedCellCount(); i++) {
2346 cellScanner.advance();
2350 } catch (IOException e) {
2351 // No need to handle these Individual Muatation level issue. Any way this entire RegionAction
2352 // marked as failed as we could not see the Region here. At client side the top level
2353 // RegionAction exception will be considered first.
2354 LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e);
2359 * Mutate data in a table.
2361 * @param rpcc the RPC controller
2362 * @param request the mutate request
2363 * @throws ServiceException
2365 @Override
2366 public MutateResponse mutate(final RpcController rpcc,
2367 final MutateRequest request) throws ServiceException {
2368 // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data.
2369 // It is also the conduit via which we pass back data.
2370 PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
2371 CellScanner cellScanner = controller != null ? controller.cellScanner() : null;
2372 OperationQuota quota = null;
2373 // Clear scanner so we are not holding on to reference across call.
2374 if (controller != null) {
2375 controller.setCellScanner(null);
2377 try {
2378 checkOpen();
2379 requestCount.increment();
2380 rpcMutateRequestCount.increment();
2381 Region region = getRegion(request.getRegion());
2382 MutateResponse.Builder builder = MutateResponse.newBuilder();
2383 MutationProto mutation = request.getMutation();
2384 if (!region.getRegionInfo().isMetaTable()) {
2385 regionServer.cacheFlusher.reclaimMemStoreMemory();
2387 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2388 Result r = null;
2389 Boolean processed = null;
2390 MutationType type = mutation.getMutateType();
2392 quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE);
2394 switch (type) {
2395 case APPEND:
2396 // TODO: this doesn't actually check anything.
2397 r = append(region, quota, mutation, cellScanner, nonceGroup);
2398 break;
2399 case INCREMENT:
2400 // TODO: this doesn't actually check anything.
2401 r = increment(region, quota, mutation, cellScanner, nonceGroup);
2402 break;
2403 case PUT:
2404 Put put = ProtobufUtil.toPut(mutation, cellScanner);
2405 quota.addMutation(put);
2406 if (request.hasCondition()) {
2407 Condition condition = request.getCondition();
2408 byte[] row = condition.getRow().toByteArray();
2409 byte[] family = condition.getFamily().toByteArray();
2410 byte[] qualifier = condition.getQualifier().toByteArray();
2411 CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
2412 ByteArrayComparable comparator =
2413 ProtobufUtil.toComparator(condition.getComparator());
2414 if (region.getCoprocessorHost() != null) {
2415 processed = region.getCoprocessorHost().preCheckAndPut(
2416 row, family, qualifier, compareOp, comparator, put);
2418 if (processed == null) {
2419 boolean result = region.checkAndMutate(row, family,
2420 qualifier, compareOp, comparator, put, true);
2421 if (region.getCoprocessorHost() != null) {
2422 result = region.getCoprocessorHost().postCheckAndPut(row, family,
2423 qualifier, compareOp, comparator, put, result);
2425 processed = result;
2427 } else {
2428 region.put(put);
2429 processed = Boolean.TRUE;
2431 break;
2432 case DELETE:
2433 Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
2434 quota.addMutation(delete);
2435 if (request.hasCondition()) {
2436 Condition condition = request.getCondition();
2437 byte[] row = condition.getRow().toByteArray();
2438 byte[] family = condition.getFamily().toByteArray();
2439 byte[] qualifier = condition.getQualifier().toByteArray();
2440 CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
2441 ByteArrayComparable comparator =
2442 ProtobufUtil.toComparator(condition.getComparator());
2443 if (region.getCoprocessorHost() != null) {
2444 processed = region.getCoprocessorHost().preCheckAndDelete(
2445 row, family, qualifier, compareOp, comparator, delete);
2447 if (processed == null) {
2448 boolean result = region.checkAndMutate(row, family,
2449 qualifier, compareOp, comparator, delete, true);
2450 if (region.getCoprocessorHost() != null) {
2451 result = region.getCoprocessorHost().postCheckAndDelete(row, family,
2452 qualifier, compareOp, comparator, delete, result);
2454 processed = result;
2456 } else {
2457 region.delete(delete);
2458 processed = Boolean.TRUE;
2460 break;
2461 default:
2462 throw new DoNotRetryIOException(
2463 "Unsupported mutate type: " + type.name());
2465 if (processed != null) {
2466 builder.setProcessed(processed.booleanValue());
2468 addResult(builder, r, controller);
2469 return builder.build();
2470 } catch (IOException ie) {
2471 regionServer.checkFileSystem();
2472 throw new ServiceException(ie);
2473 } finally {
2474 if (quota != null) {
2475 quota.close();
2481 * Scan data in a table.
2483 * @param controller the RPC controller
2484 * @param request the scan request
2485 * @throws ServiceException
2487 @Override
2488 public ScanResponse scan(final RpcController controller, final ScanRequest request)
2489 throws ServiceException {
2490 OperationQuota quota = null;
2491 Leases.Lease lease = null;
2492 String scannerName = null;
2493 try {
2494 if (!request.hasScannerId() && !request.hasScan()) {
2495 throw new DoNotRetryIOException(
2496 "Missing required input: scannerId or scan");
2498 long scannerId = -1;
2499 if (request.hasScannerId()) {
2500 scannerId = request.getScannerId();
2501 scannerName = String.valueOf(scannerId);
2503 try {
2504 checkOpen();
2505 } catch (IOException e) {
2506 // If checkOpen failed, server not running or filesystem gone,
2507 // cancel this lease; filesystem is gone or we're closing or something.
2508 if (scannerName != null) {
2509 LOG.debug("Server shutting down and client tried to access missing scanner "
2510 + scannerName);
2511 if (regionServer.leases != null) {
2512 try {
2513 regionServer.leases.cancelLease(scannerName);
2514 } catch (LeaseException le) {
2515 // No problem, ignore
2516 if (LOG.isTraceEnabled()) {
2517 LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
2522 throw e;
2524 requestCount.increment();
2525 rpcScanRequestCount.increment();
2527 int ttl = 0;
2528 Region region = null;
2529 RegionScanner scanner = null;
2530 RegionScannerHolder rsh = null;
2531 boolean moreResults = true;
2532 boolean closeScanner = false;
2533 boolean isSmallScan = false;
2534 ScanResponse.Builder builder = ScanResponse.newBuilder();
2535 if (request.hasCloseScanner()) {
2536 closeScanner = request.getCloseScanner();
2538 int rows = closeScanner ? 0 : 1;
2539 if (request.hasNumberOfRows()) {
2540 rows = request.getNumberOfRows();
2542 if (request.hasScannerId()) {
2543 rsh = scanners.get(scannerName);
2544 if (rsh == null) {
2545 LOG.warn("Client tried to access missing scanner " + scannerName);
2546 throw new UnknownScannerException(
2547 "Unknown scanner '" + scannerName + "'. This can happen due to any of the following "
2548 + "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of "
2549 + "long wait between consecutive client checkins, c) Server may be closing down, "
2550 + "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a "
2551 + "possible fix would be increasing the value of"
2552 + "'hbase.client.scanner.timeout.period' configuration.");
2554 scanner = rsh.s;
2555 HRegionInfo hri = scanner.getRegionInfo();
2556 region = regionServer.getRegion(hri.getRegionName());
2557 if (region != rsh.r) { // Yes, should be the same instance
2558 throw new NotServingRegionException("Region was re-opened after the scanner"
2559 + scannerName + " was created: " + hri.getRegionNameAsString());
2561 } else {
2562 region = getRegion(request.getRegion());
2563 ClientProtos.Scan protoScan = request.getScan();
2564 boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
2565 Scan scan = ProtobufUtil.toScan(protoScan);
2566 // if the request doesn't set this, get the default region setting.
2567 if (!isLoadingCfsOnDemandSet) {
2568 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
2571 isSmallScan = scan.isSmall();
2572 if (!scan.hasFamilies()) {
2573 // Adding all families to scanner
2574 for (byte[] family: region.getTableDesc().getFamiliesKeys()) {
2575 scan.addFamily(family);
2579 if (region.getCoprocessorHost() != null) {
2580 scanner = region.getCoprocessorHost().preScannerOpen(scan);
2582 if (scanner == null) {
2583 scanner = region.getScanner(scan);
2585 if (region.getCoprocessorHost() != null) {
2586 scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
2588 scannerId = this.scannerIdGen.incrementAndGet();
2589 scannerName = String.valueOf(scannerId);
2590 rsh = addScanner(scannerName, scanner, region);
2591 ttl = this.scannerLeaseTimeoutPeriod;
2593 assert scanner != null;
2594 if (request.hasRenew() && request.getRenew()) {
2595 rsh = scanners.get(scannerName);
2596 lease = regionServer.leases.removeLease(scannerName);
2597 if (lease != null && rsh != null) {
2598 regionServer.leases.addLease(lease);
2599 // Increment the nextCallSeq value which is the next expected from client.
2600 rsh.incNextCallSeq();
2602 return builder.build();
2604 RpcCallContext context = RpcServer.getCurrentCall();
2605 Object lastBlock = null;
2607 quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN);
2608 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
2610 if (rows > 0) {
2611 // if nextCallSeq does not match throw Exception straight away. This needs to be
2612 // performed even before checking of Lease.
2613 // See HBASE-5974
2614 if (request.hasNextCallSeq()) {
2615 if (rsh != null) {
2616 if (request.getNextCallSeq() != rsh.getNextCallSeq()) {
2617 throw new OutOfOrderScannerNextException(
2618 "Expected nextCallSeq: " + rsh.getNextCallSeq()
2619 + " But the nextCallSeq got from client: " + request.getNextCallSeq() +
2620 "; request=" + TextFormat.shortDebugString(request));
2622 // Increment the nextCallSeq value which is the next expected from client.
2623 rsh.incNextCallSeq();
2626 try {
2627 // Remove lease while its being processed in server; protects against case
2628 // where processing of request takes > lease expiration time.
2629 lease = regionServer.leases.removeLease(scannerName);
2630 List<Result> results = new ArrayList<Result>();
2632 boolean done = false;
2633 // Call coprocessor. Get region info from scanner.
2634 if (region != null && region.getCoprocessorHost() != null) {
2635 Boolean bypass = region.getCoprocessorHost().preScannerNext(
2636 scanner, results, rows);
2637 if (!results.isEmpty()) {
2638 for (Result r : results) {
2639 lastBlock = addSize(context, r, lastBlock);
2642 if (bypass != null && bypass.booleanValue()) {
2643 done = true;
2647 if (!done) {
2648 long maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize);
2649 if (maxResultSize <= 0) {
2650 maxResultSize = maxQuotaResultSize;
2652 // This is cells inside a row. Default size is 10 so if many versions or many cfs,
2653 // then we'll resize. Resizings show in profiler. Set it higher than 10. For now
2654 // arbitrary 32. TODO: keep record of general size of results being returned.
2655 List<Cell> values = new ArrayList<Cell>(32);
2656 region.startRegionOperation(Operation.SCAN);
2657 try {
2658 int i = 0;
2659 long before = EnvironmentEdgeManager.currentTime();
2660 synchronized(scanner) {
2661 boolean stale = (region.getRegionInfo().getReplicaId() != 0);
2662 boolean clientHandlesPartials =
2663 request.hasClientHandlesPartials() && request.getClientHandlesPartials();
2664 boolean clientHandlesHeartbeats =
2665 request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats();
2667 // On the server side we must ensure that the correct ordering of partial results is
2668 // returned to the client to allow them to properly reconstruct the partial results.
2669 // If the coprocessor host is adding to the result list, we cannot guarantee the
2670 // correct ordering of partial results and so we prevent partial results from being
2671 // formed.
2672 boolean serverGuaranteesOrderOfPartials = results.isEmpty();
2673 boolean allowPartialResults =
2674 clientHandlesPartials && serverGuaranteesOrderOfPartials && !isSmallScan;
2675 boolean moreRows = false;
2677 // Heartbeat messages occur when the processing of the ScanRequest is exceeds a
2678 // certain time threshold on the server. When the time threshold is exceeded, the
2679 // server stops the scan and sends back whatever Results it has accumulated within
2680 // that time period (may be empty). Since heartbeat messages have the potential to
2681 // create partial Results (in the event that the timeout occurs in the middle of a
2682 // row), we must only generate heartbeat messages when the client can handle both
2683 // heartbeats AND partials
2684 boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults;
2686 // Default value of timeLimit is negative to indicate no timeLimit should be
2687 // enforced.
2688 long timeLimit = -1;
2690 // Set the time limit to be half of the more restrictive timeout value (one of the
2691 // timeout values must be positive). In the event that both values are positive, the
2692 // more restrictive of the two is used to calculate the limit.
2693 if (allowHeartbeatMessages && (scannerLeaseTimeoutPeriod > 0 || rpcTimeout > 0)) {
2694 long timeLimitDelta;
2695 if (scannerLeaseTimeoutPeriod > 0 && rpcTimeout > 0) {
2696 timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, rpcTimeout);
2697 } else {
2698 timeLimitDelta =
2699 scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout;
2701 if (controller instanceof TimeLimitedRpcController) {
2702 TimeLimitedRpcController timeLimitedRpcController =
2703 (TimeLimitedRpcController)controller;
2704 if (timeLimitedRpcController.getCallTimeout() > 0) {
2705 timeLimitDelta = Math.min(timeLimitDelta,
2706 timeLimitedRpcController.getCallTimeout());
2709 // Use half of whichever timeout value was more restrictive... But don't allow
2710 // the time limit to be less than the allowable minimum (could cause an
2711 // immediatate timeout before scanning any data).
2712 timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta);
2713 timeLimit = System.currentTimeMillis() + timeLimitDelta;
2716 final LimitScope sizeScope =
2717 allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
2718 final LimitScope timeScope =
2719 allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
2721 boolean trackMetrics =
2722 request.hasTrackScanMetrics() && request.getTrackScanMetrics();
2724 // Configure with limits for this RPC. Set keep progress true since size progress
2725 // towards size limit should be kept between calls to nextRaw
2726 ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
2727 contextBuilder.setSizeLimit(sizeScope, maxResultSize);
2728 contextBuilder.setBatchLimit(scanner.getBatch());
2729 contextBuilder.setTimeLimit(timeScope, timeLimit);
2730 contextBuilder.setTrackMetrics(trackMetrics);
2731 ScannerContext scannerContext = contextBuilder.build();
2732 boolean limitReached = false;
2733 while (i < rows) {
2734 // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The
2735 // batch limit is a limit on the number of cells per Result. Thus, if progress is
2736 // being tracked (i.e. scannerContext.keepProgress() is true) then we need to
2737 // reset the batch progress between nextRaw invocations since we don't want the
2738 // batch progress from previous calls to affect future calls
2739 scannerContext.setBatchProgress(0);
2741 // Collect values to be returned here
2742 moreRows = scanner.nextRaw(values, scannerContext);
2744 if (!values.isEmpty()) {
2745 final boolean partial = scannerContext.partialResultFormed();
2746 Result r = Result.create(values, null, stale, partial);
2747 lastBlock = addSize(context, r, lastBlock);
2748 results.add(r);
2749 i++;
2752 boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS);
2753 boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS);
2754 boolean rowLimitReached = i >= rows;
2755 limitReached = sizeLimitReached || timeLimitReached || rowLimitReached;
2757 if (limitReached || !moreRows) {
2758 if (LOG.isTraceEnabled()) {
2759 LOG.trace("Done scanning. limitReached: " + limitReached + " moreRows: "
2760 + moreRows + " scannerContext: " + scannerContext);
2762 // We only want to mark a ScanResponse as a heartbeat message in the event that
2763 // there are more values to be read server side. If there aren't more values,
2764 // marking it as a heartbeat is wasteful because the client will need to issue
2765 // another ScanRequest only to realize that they already have all the values
2766 if (moreRows) {
2767 // Heartbeat messages occur when the time limit has been reached.
2768 builder.setHeartbeatMessage(timeLimitReached);
2770 break;
2772 values.clear();
2775 if (limitReached || moreRows) {
2776 // We stopped prematurely
2777 builder.setMoreResultsInRegion(true);
2778 } else {
2779 // We didn't get a single batch
2780 builder.setMoreResultsInRegion(false);
2783 // Check to see if the client requested that we track metrics server side. If the
2784 // client requested metrics, retrieve the metrics from the scanner context.
2785 if (trackMetrics) {
2786 Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap();
2787 ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder();
2788 NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder();
2790 for (Entry<String, Long> entry : metrics.entrySet()) {
2791 pairBuilder.setName(entry.getKey());
2792 pairBuilder.setValue(entry.getValue());
2793 metricBuilder.addMetrics(pairBuilder.build());
2796 builder.setScanMetrics(metricBuilder.build());
2799 region.updateReadRequestsCount(i);
2800 long end = EnvironmentEdgeManager.currentTime();
2801 long responseCellSize = context != null ? context.getResponseCellSize() : 0;
2802 region.getMetrics().updateScanSize(responseCellSize);
2803 region.getMetrics().updateScanTime(end - before);
2804 if (regionServer.metricsRegionServer != null) {
2805 regionServer.metricsRegionServer.updateScanSize(responseCellSize);
2806 regionServer.metricsRegionServer.updateScanTime(end - before);
2808 } finally {
2809 region.closeRegionOperation();
2811 // coprocessor postNext hook
2812 if (region != null && region.getCoprocessorHost() != null) {
2813 region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
2817 quota.addScanResult(results);
2819 // If the scanner's filter - if any - is done with the scan
2820 // and wants to tell the client to stop the scan. This is done by passing
2821 // a null result, and setting moreResults to false.
2822 if (scanner.isFilterDone() && results.isEmpty()) {
2823 moreResults = false;
2824 results = null;
2825 } else {
2826 addResults(builder, results, controller,
2827 RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()),
2828 isClientCellBlockSupport(context));
2830 } catch (IOException e) {
2831 // if we have an exception on scanner next and we are using the callSeq
2832 // we should rollback because the client will retry with the same callSeq
2833 // and get an OutOfOrderScannerNextException if we don't do so.
2834 if (rsh != null && request.hasNextCallSeq()) {
2835 rsh.rollbackNextCallSeq();
2837 throw e;
2838 } finally {
2839 if (context != null) {
2840 context.setCallBack(rsh.shippedCallback);
2842 // Adding resets expiration time on lease.
2843 if (scanners.containsKey(scannerName)) {
2844 ttl = this.scannerLeaseTimeoutPeriod;
2845 // When context != null, adding back the lease will be done in callback set above.
2846 if (context == null) {
2847 if (lease != null) regionServer.leases.addLease(lease);
2853 if (!moreResults || closeScanner) {
2854 ttl = 0;
2855 moreResults = false;
2856 if (region != null && region.getCoprocessorHost() != null) {
2857 if (region.getCoprocessorHost().preScannerClose(scanner)) {
2858 return builder.build(); // bypass
2861 rsh = scanners.remove(scannerName);
2862 if (rsh != null) {
2863 if (context != null) {
2864 context.setCallBack(rsh.closeCallBack);
2865 } else {
2866 rsh.s.close();
2868 try {
2869 regionServer.leases.cancelLease(scannerName);
2870 } catch (LeaseException le) {
2871 // No problem, ignore
2872 if (LOG.isTraceEnabled()) {
2873 LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
2876 if (region != null && region.getCoprocessorHost() != null) {
2877 region.getCoprocessorHost().postScannerClose(scanner);
2882 if (ttl > 0) {
2883 builder.setTtl(ttl);
2885 builder.setScannerId(scannerId);
2886 builder.setMoreResults(moreResults);
2887 return builder.build();
2888 } catch (IOException ie) {
2889 if (scannerName != null && ie instanceof NotServingRegionException) {
2890 RegionScannerHolder rsh = scanners.remove(scannerName);
2891 if (rsh != null) {
2892 try {
2893 RegionScanner scanner = rsh.s;
2894 LOG.warn(scannerName + " encountered " + ie.getMessage() + ", closing ...");
2895 scanner.close();
2896 regionServer.leases.cancelLease(scannerName);
2897 } catch (IOException e) {
2898 LOG.warn("Getting exception closing " + scannerName, e);
2902 throw new ServiceException(ie);
2903 } finally {
2904 if (quota != null) {
2905 quota.close();
2910 @Override
2911 public CoprocessorServiceResponse execRegionServerService(RpcController controller,
2912 CoprocessorServiceRequest request) throws ServiceException {
2913 return regionServer.execRegionServerService(controller, request);
2916 @Override
2917 public UpdateConfigurationResponse updateConfiguration(
2918 RpcController controller, UpdateConfigurationRequest request)
2919 throws ServiceException {
2920 try {
2921 this.regionServer.updateConfiguration();
2922 } catch (Exception e) {
2923 throw new ServiceException(e);
2925 return UpdateConfigurationResponse.getDefaultInstance();