2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static com
.codahale
.metrics
.MetricRegistry
.name
;
21 import static org
.apache
.hadoop
.hbase
.util
.ConcurrentMapUtils
.computeIfAbsent
;
23 import com
.codahale
.metrics
.Counter
;
24 import com
.codahale
.metrics
.Histogram
;
25 import com
.codahale
.metrics
.JmxReporter
;
26 import com
.codahale
.metrics
.MetricRegistry
;
27 import com
.codahale
.metrics
.RatioGauge
;
28 import com
.codahale
.metrics
.Timer
;
30 import java
.util
.concurrent
.ConcurrentHashMap
;
31 import java
.util
.concurrent
.ConcurrentMap
;
32 import java
.util
.concurrent
.ConcurrentSkipListMap
;
33 import java
.util
.concurrent
.ThreadPoolExecutor
;
34 import java
.util
.concurrent
.TimeUnit
;
35 import java
.util
.function
.Supplier
;
36 import org
.apache
.hadoop
.hbase
.ServerName
;
37 import org
.apache
.yetus
.audience
.InterfaceAudience
;
38 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.Descriptors
.MethodDescriptor
;
39 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.Message
;
40 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.ClientService
;
41 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.MutateRequest
;
42 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.MutationProto
.MutationType
;
43 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
46 * This class is for maintaining the various connection statistics and publishing them through
47 * the metrics interfaces.
49 * This class manages its own {@link MetricRegistry} and {@link JmxReporter} so as to not
50 * conflict with other uses of Yammer Metrics within the client application. Instantiating
51 * this class implicitly creates and "starts" instances of these classes; be sure to call
52 * {@link #shutdown()} to terminate the thread pools they allocate.
54 @InterfaceAudience.Private
55 public class MetricsConnection
implements StatisticTrackable
{
57 /** Set this key to {@code true} to enable metrics collection of client requests. */
58 public static final String CLIENT_SIDE_METRICS_ENABLED_KEY
= "hbase.client.metrics.enable";
60 private static final String CNT_BASE
= "rpcCount_";
61 private static final String DRTN_BASE
= "rpcCallDurationMs_";
62 private static final String REQ_BASE
= "rpcCallRequestSizeBytes_";
63 private static final String RESP_BASE
= "rpcCallResponseSizeBytes_";
64 private static final String MEMLOAD_BASE
= "memstoreLoad_";
65 private static final String HEAP_BASE
= "heapOccupancy_";
66 private static final String CACHE_BASE
= "cacheDroppingExceptions_";
67 private static final String UNKNOWN_EXCEPTION
= "UnknownException";
68 private static final String NS_LOOKUPS
= "nsLookups";
69 private static final String NS_LOOKUPS_FAILED
= "nsLookupsFailed";
70 private static final String CLIENT_SVC
= ClientService
.getDescriptor().getName();
72 /** A container class for collecting details about the RPC call as it percolates. */
73 public static class CallStats
{
74 private long requestSizeBytes
= 0;
75 private long responseSizeBytes
= 0;
76 private long startTime
= 0;
77 private long callTimeMs
= 0;
78 private int concurrentCallsPerServer
= 0;
79 private int numActionsPerServer
= 0;
81 public long getRequestSizeBytes() {
82 return requestSizeBytes
;
85 public void setRequestSizeBytes(long requestSizeBytes
) {
86 this.requestSizeBytes
= requestSizeBytes
;
89 public long getResponseSizeBytes() {
90 return responseSizeBytes
;
93 public void setResponseSizeBytes(long responseSizeBytes
) {
94 this.responseSizeBytes
= responseSizeBytes
;
97 public long getStartTime() {
101 public void setStartTime(long startTime
) {
102 this.startTime
= startTime
;
105 public long getCallTimeMs() {
109 public void setCallTimeMs(long callTimeMs
) {
110 this.callTimeMs
= callTimeMs
;
113 public int getConcurrentCallsPerServer() {
114 return concurrentCallsPerServer
;
117 public void setConcurrentCallsPerServer(int callsPerServer
) {
118 this.concurrentCallsPerServer
= callsPerServer
;
121 public int getNumActionsPerServer() {
122 return numActionsPerServer
;
125 public void setNumActionsPerServer(int numActionsPerServer
) {
126 this.numActionsPerServer
= numActionsPerServer
;
130 protected static final class CallTracker
{
131 private final String name
;
132 final Timer callTimer
;
133 final Histogram reqHist
;
134 final Histogram respHist
;
136 private CallTracker(MetricRegistry registry
, String name
, String subName
, String scope
) {
137 StringBuilder sb
= new StringBuilder(CLIENT_SVC
).append("_").append(name
);
138 if (subName
!= null) {
139 sb
.append("(").append(subName
).append(")");
141 this.name
= sb
.toString();
142 this.callTimer
= registry
.timer(name(MetricsConnection
.class,
143 DRTN_BASE
+ this.name
, scope
));
144 this.reqHist
= registry
.histogram(name(MetricsConnection
.class,
145 REQ_BASE
+ this.name
, scope
));
146 this.respHist
= registry
.histogram(name(MetricsConnection
.class,
147 RESP_BASE
+ this.name
, scope
));
150 private CallTracker(MetricRegistry registry
, String name
, String scope
) {
151 this(registry
, name
, null, scope
);
154 public void updateRpc(CallStats stats
) {
155 this.callTimer
.update(stats
.getCallTimeMs(), TimeUnit
.MILLISECONDS
);
156 this.reqHist
.update(stats
.getRequestSizeBytes());
157 this.respHist
.update(stats
.getResponseSizeBytes());
161 public String
toString() {
162 return "CallTracker:" + name
;
166 protected static class RegionStats
{
168 final Histogram memstoreLoadHist
;
169 final Histogram heapOccupancyHist
;
171 public RegionStats(MetricRegistry registry
, String name
) {
173 this.memstoreLoadHist
= registry
.histogram(name(MetricsConnection
.class,
174 MEMLOAD_BASE
+ this.name
));
175 this.heapOccupancyHist
= registry
.histogram(name(MetricsConnection
.class,
176 HEAP_BASE
+ this.name
));
179 public void update(RegionLoadStats regionStatistics
) {
180 this.memstoreLoadHist
.update(regionStatistics
.getMemStoreLoad());
181 this.heapOccupancyHist
.update(regionStatistics
.getHeapOccupancy());
185 protected static class RunnerStats
{
186 final Counter normalRunners
;
187 final Counter delayRunners
;
188 final Histogram delayIntevalHist
;
190 public RunnerStats(MetricRegistry registry
) {
191 this.normalRunners
= registry
.counter(
192 name(MetricsConnection
.class, "normalRunnersCount"));
193 this.delayRunners
= registry
.counter(
194 name(MetricsConnection
.class, "delayRunnersCount"));
195 this.delayIntevalHist
= registry
.histogram(
196 name(MetricsConnection
.class, "delayIntervalHist"));
199 public void incrNormalRunners() {
200 this.normalRunners
.inc();
203 public void incrDelayRunners() {
204 this.delayRunners
.inc();
207 public void updateDelayInterval(long interval
) {
208 this.delayIntevalHist
.update(interval
);
212 protected ConcurrentHashMap
<ServerName
, ConcurrentMap
<byte[], RegionStats
>> serverStats
213 = new ConcurrentHashMap
<>();
215 public void updateServerStats(ServerName serverName
, byte[] regionName
,
217 if (!(r
instanceof Result
)) {
220 Result result
= (Result
) r
;
221 RegionLoadStats stats
= result
.getStats();
225 updateRegionStats(serverName
, regionName
, stats
);
229 public void updateRegionStats(ServerName serverName
, byte[] regionName
, RegionLoadStats stats
) {
230 String name
= serverName
.getServerName() + "," + Bytes
.toStringBinary(regionName
);
231 ConcurrentMap
<byte[], RegionStats
> rsStats
= computeIfAbsent(serverStats
, serverName
,
232 () -> new ConcurrentSkipListMap
<>(Bytes
.BYTES_COMPARATOR
));
233 RegionStats regionStats
=
234 computeIfAbsent(rsStats
, regionName
, () -> new RegionStats(this.registry
, name
));
235 regionStats
.update(stats
);
238 /** A lambda for dispatching to the appropriate metric factory method */
239 private static interface NewMetric
<T
> {
240 T
newMetric(Class
<?
> clazz
, String name
, String scope
);
243 /** Anticipated number of metric entries */
244 private static final int CAPACITY
= 50;
245 /** Default load factor from {@link java.util.HashMap#DEFAULT_LOAD_FACTOR} */
246 private static final float LOAD_FACTOR
= 0.75f
;
248 * Anticipated number of concurrent accessor threads
250 private static final int CONCURRENCY_LEVEL
= 256;
252 private final MetricRegistry registry
;
253 private final JmxReporter reporter
;
254 private final String scope
;
256 private final NewMetric
<Timer
> timerFactory
= new NewMetric
<Timer
>() {
257 @Override public Timer
newMetric(Class
<?
> clazz
, String name
, String scope
) {
258 return registry
.timer(name(clazz
, name
, scope
));
262 private final NewMetric
<Histogram
> histogramFactory
= new NewMetric
<Histogram
>() {
263 @Override public Histogram
newMetric(Class
<?
> clazz
, String name
, String scope
) {
264 return registry
.histogram(name(clazz
, name
, scope
));
268 private final NewMetric
<Counter
> counterFactory
= new NewMetric
<Counter
>() {
269 @Override public Counter
newMetric(Class
<?
> clazz
, String name
, String scope
) {
270 return registry
.counter(name(clazz
, name
, scope
));
276 protected final Counter metaCacheHits
;
277 protected final Counter metaCacheMisses
;
278 protected final CallTracker getTracker
;
279 protected final CallTracker scanTracker
;
280 protected final CallTracker appendTracker
;
281 protected final CallTracker deleteTracker
;
282 protected final CallTracker incrementTracker
;
283 protected final CallTracker putTracker
;
284 protected final CallTracker multiTracker
;
285 protected final RunnerStats runnerStats
;
286 protected final Counter metaCacheNumClearServer
;
287 protected final Counter metaCacheNumClearRegion
;
288 protected final Counter hedgedReadOps
;
289 protected final Counter hedgedReadWin
;
290 protected final Histogram concurrentCallsPerServerHist
;
291 protected final Histogram numActionsPerServerHist
;
292 protected final Counter nsLookups
;
293 protected final Counter nsLookupsFailed
;
297 // These maps are used to cache references to the metric instances that are managed by the
298 // registry. I don't think their use perfectly removes redundant allocations, but it's
299 // a big improvement over calling registry.newMetric each time.
300 protected final ConcurrentMap
<String
, Timer
> rpcTimers
=
301 new ConcurrentHashMap
<>(CAPACITY
, LOAD_FACTOR
, CONCURRENCY_LEVEL
);
302 protected final ConcurrentMap
<String
, Histogram
> rpcHistograms
=
303 new ConcurrentHashMap
<>(CAPACITY
* 2 /* tracking both request and response sizes */,
304 LOAD_FACTOR
, CONCURRENCY_LEVEL
);
305 private final ConcurrentMap
<String
, Counter
> cacheDroppingExceptions
=
306 new ConcurrentHashMap
<>(CAPACITY
, LOAD_FACTOR
, CONCURRENCY_LEVEL
);
307 protected final ConcurrentMap
<String
, Counter
> rpcCounters
=
308 new ConcurrentHashMap
<>(CAPACITY
, LOAD_FACTOR
, CONCURRENCY_LEVEL
);
310 MetricsConnection(String scope
, Supplier
<ThreadPoolExecutor
> batchPool
,
311 Supplier
<ThreadPoolExecutor
> metaPool
) {
313 this.registry
= new MetricRegistry();
314 this.registry
.register(getExecutorPoolName(),
317 protected Ratio
getRatio() {
318 ThreadPoolExecutor pool
= batchPool
.get();
320 return Ratio
.of(0, 0);
322 return Ratio
.of(pool
.getActiveCount(), pool
.getMaximumPoolSize());
325 this.registry
.register(getMetaPoolName(),
328 protected Ratio
getRatio() {
329 ThreadPoolExecutor pool
= metaPool
.get();
331 return Ratio
.of(0, 0);
333 return Ratio
.of(pool
.getActiveCount(), pool
.getMaximumPoolSize());
336 this.metaCacheHits
= registry
.counter(name(this.getClass(), "metaCacheHits", scope
));
337 this.metaCacheMisses
= registry
.counter(name(this.getClass(), "metaCacheMisses", scope
));
338 this.metaCacheNumClearServer
= registry
.counter(name(this.getClass(),
339 "metaCacheNumClearServer", scope
));
340 this.metaCacheNumClearRegion
= registry
.counter(name(this.getClass(),
341 "metaCacheNumClearRegion", scope
));
342 this.hedgedReadOps
= registry
.counter(name(this.getClass(), "hedgedReadOps", scope
));
343 this.hedgedReadWin
= registry
.counter(name(this.getClass(), "hedgedReadWin", scope
));
344 this.getTracker
= new CallTracker(this.registry
, "Get", scope
);
345 this.scanTracker
= new CallTracker(this.registry
, "Scan", scope
);
346 this.appendTracker
= new CallTracker(this.registry
, "Mutate", "Append", scope
);
347 this.deleteTracker
= new CallTracker(this.registry
, "Mutate", "Delete", scope
);
348 this.incrementTracker
= new CallTracker(this.registry
, "Mutate", "Increment", scope
);
349 this.putTracker
= new CallTracker(this.registry
, "Mutate", "Put", scope
);
350 this.multiTracker
= new CallTracker(this.registry
, "Multi", scope
);
351 this.runnerStats
= new RunnerStats(this.registry
);
352 this.concurrentCallsPerServerHist
= registry
.histogram(name(MetricsConnection
.class,
353 "concurrentCallsPerServer", scope
));
354 this.numActionsPerServerHist
= registry
.histogram(name(MetricsConnection
.class,
355 "numActionsPerServer", scope
));
356 this.nsLookups
= registry
.counter(name(this.getClass(), NS_LOOKUPS
, scope
));
357 this.nsLookupsFailed
= registry
.counter(name(this.getClass(), NS_LOOKUPS_FAILED
, scope
));
359 this.reporter
= JmxReporter
.forRegistry(this.registry
).build();
360 this.reporter
.start();
363 final String
getExecutorPoolName() {
364 return name(getClass(), "executorPoolActiveThreads", scope
);
367 final String
getMetaPoolName() {
368 return name(getClass(), "metaPoolActiveThreads", scope
);
371 MetricRegistry
getMetricRegistry() {
375 public void shutdown() {
376 this.reporter
.stop();
379 /** Produce an instance of {@link CallStats} for clients to attach to RPCs. */
380 public static CallStats
newCallStats() {
381 // TODO: instance pool to reduce GC?
382 return new CallStats();
385 /** Increment the number of meta cache hits. */
386 public void incrMetaCacheHit() {
390 /** Increment the number of meta cache misses. */
391 public void incrMetaCacheMiss() {
392 metaCacheMisses
.inc();
395 /** Increment the number of meta cache drops requested for entire RegionServer. */
396 public void incrMetaCacheNumClearServer() {
397 metaCacheNumClearServer
.inc();
400 /** Increment the number of meta cache drops requested for individual region. */
401 public void incrMetaCacheNumClearRegion() {
402 metaCacheNumClearRegion
.inc();
405 /** Increment the number of meta cache drops requested for individual region. */
406 public void incrMetaCacheNumClearRegion(int count
) {
407 metaCacheNumClearRegion
.inc(count
);
410 /** Increment the number of hedged read that have occurred. */
411 public void incrHedgedReadOps() {
415 /** Increment the number of hedged read returned faster than the original read. */
416 public void incrHedgedReadWin() {
420 /** Increment the number of normal runner counts. */
421 public void incrNormalRunners() {
422 this.runnerStats
.incrNormalRunners();
425 /** Increment the number of delay runner counts and update delay interval of delay runner. */
426 public void incrDelayRunnersAndUpdateDelayInterval(long interval
) {
427 this.runnerStats
.incrDelayRunners();
428 this.runnerStats
.updateDelayInterval(interval
);
432 * Get a metric for {@code key} from {@code map}, or create it with {@code factory}.
434 private <T
> T
getMetric(String key
, ConcurrentMap
<String
, T
> map
, NewMetric
<T
> factory
) {
435 return computeIfAbsent(map
, key
, () -> factory
.newMetric(getClass(), key
, scope
));
438 /** Update call stats for non-critical-path methods */
439 private void updateRpcGeneric(String methodName
, CallStats stats
) {
440 getMetric(DRTN_BASE
+ methodName
, rpcTimers
, timerFactory
)
441 .update(stats
.getCallTimeMs(), TimeUnit
.MILLISECONDS
);
442 getMetric(REQ_BASE
+ methodName
, rpcHistograms
, histogramFactory
)
443 .update(stats
.getRequestSizeBytes());
444 getMetric(RESP_BASE
+ methodName
, rpcHistograms
, histogramFactory
)
445 .update(stats
.getResponseSizeBytes());
448 /** Report RPC context to metrics system. */
449 public void updateRpc(MethodDescriptor method
, Message param
, CallStats stats
) {
450 int callsPerServer
= stats
.getConcurrentCallsPerServer();
451 if (callsPerServer
> 0) {
452 concurrentCallsPerServerHist
.update(callsPerServer
);
454 // Update the counter that tracks RPCs by type.
455 final String methodName
= method
.getService().getName() + "_" + method
.getName();
456 getMetric(CNT_BASE
+ methodName
, rpcCounters
, counterFactory
).inc();
457 // this implementation is tied directly to protobuf implementation details. would be better
458 // if we could dispatch based on something static, ie, request Message type.
459 if (method
.getService() == ClientService
.getDescriptor()) {
460 switch(method
.getIndex()) {
462 assert "Get".equals(method
.getName());
463 getTracker
.updateRpc(stats
);
466 assert "Mutate".equals(method
.getName());
467 final MutationType mutationType
= ((MutateRequest
) param
).getMutation().getMutateType();
468 switch(mutationType
) {
470 appendTracker
.updateRpc(stats
);
473 deleteTracker
.updateRpc(stats
);
476 incrementTracker
.updateRpc(stats
);
479 putTracker
.updateRpc(stats
);
482 throw new RuntimeException("Unrecognized mutation type " + mutationType
);
485 assert "Scan".equals(method
.getName());
486 scanTracker
.updateRpc(stats
);
489 assert "BulkLoadHFile".equals(method
.getName());
490 // use generic implementation
493 assert "PrepareBulkLoad".equals(method
.getName());
494 // use generic implementation
497 assert "CleanupBulkLoad".equals(method
.getName());
498 // use generic implementation
501 assert "ExecService".equals(method
.getName());
502 // use generic implementation
505 assert "ExecRegionServerService".equals(method
.getName());
506 // use generic implementation
509 assert "Multi".equals(method
.getName());
510 numActionsPerServerHist
.update(stats
.getNumActionsPerServer());
511 multiTracker
.updateRpc(stats
);
514 throw new RuntimeException("Unrecognized ClientService RPC type " + method
.getFullName());
517 // Fallback to dynamic registry lookup for DDL methods.
518 updateRpcGeneric(methodName
, stats
);
521 public void incrCacheDroppingExceptions(Object exception
) {
522 getMetric(CACHE_BASE
+
523 (exception
== null? UNKNOWN_EXCEPTION
: exception
.getClass().getSimpleName()),
524 cacheDroppingExceptions
, counterFactory
).inc();
527 public void incrNsLookups() {
528 this.nsLookups
.inc();
531 public void incrNsLookupsFailed() {
532 this.nsLookupsFailed
.inc();