HBASE-26474 Implement connection-level attributes (addendum)
[hbase.git] / hbase-client / src / main / java / org / apache / hadoop / hbase / client / MetricsConnection.java
blob9db8b6090e10e5880ccc976e7b8e6e97f794b29c
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.client;
20 import static com.codahale.metrics.MetricRegistry.name;
21 import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
23 import com.codahale.metrics.Counter;
24 import com.codahale.metrics.Histogram;
25 import com.codahale.metrics.JmxReporter;
26 import com.codahale.metrics.MetricRegistry;
27 import com.codahale.metrics.RatioGauge;
28 import com.codahale.metrics.Timer;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.ConcurrentMap;
32 import java.util.concurrent.ConcurrentSkipListMap;
33 import java.util.concurrent.ThreadPoolExecutor;
34 import java.util.concurrent.TimeUnit;
35 import java.util.function.Supplier;
36 import org.apache.hadoop.hbase.ServerName;
37 import org.apache.yetus.audience.InterfaceAudience;
38 import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
39 import org.apache.hbase.thirdparty.com.google.protobuf.Message;
40 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
41 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
42 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
43 import org.apache.hadoop.hbase.util.Bytes;
45 /**
46 * This class is for maintaining the various connection statistics and publishing them through
47 * the metrics interfaces.
49 * This class manages its own {@link MetricRegistry} and {@link JmxReporter} so as to not
50 * conflict with other uses of Yammer Metrics within the client application. Instantiating
51 * this class implicitly creates and "starts" instances of these classes; be sure to call
52 * {@link #shutdown()} to terminate the thread pools they allocate.
54 @InterfaceAudience.Private
55 public class MetricsConnection implements StatisticTrackable {
57 /** Set this key to {@code true} to enable metrics collection of client requests. */
58 public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable";
60 private static final String CNT_BASE = "rpcCount_";
61 private static final String DRTN_BASE = "rpcCallDurationMs_";
62 private static final String REQ_BASE = "rpcCallRequestSizeBytes_";
63 private static final String RESP_BASE = "rpcCallResponseSizeBytes_";
64 private static final String MEMLOAD_BASE = "memstoreLoad_";
65 private static final String HEAP_BASE = "heapOccupancy_";
66 private static final String CACHE_BASE = "cacheDroppingExceptions_";
67 private static final String UNKNOWN_EXCEPTION = "UnknownException";
68 private static final String NS_LOOKUPS = "nsLookups";
69 private static final String NS_LOOKUPS_FAILED = "nsLookupsFailed";
70 private static final String CLIENT_SVC = ClientService.getDescriptor().getName();
72 /** A container class for collecting details about the RPC call as it percolates. */
73 public static class CallStats {
74 private long requestSizeBytes = 0;
75 private long responseSizeBytes = 0;
76 private long startTime = 0;
77 private long callTimeMs = 0;
78 private int concurrentCallsPerServer = 0;
79 private int numActionsPerServer = 0;
81 public long getRequestSizeBytes() {
82 return requestSizeBytes;
85 public void setRequestSizeBytes(long requestSizeBytes) {
86 this.requestSizeBytes = requestSizeBytes;
89 public long getResponseSizeBytes() {
90 return responseSizeBytes;
93 public void setResponseSizeBytes(long responseSizeBytes) {
94 this.responseSizeBytes = responseSizeBytes;
97 public long getStartTime() {
98 return startTime;
101 public void setStartTime(long startTime) {
102 this.startTime = startTime;
105 public long getCallTimeMs() {
106 return callTimeMs;
109 public void setCallTimeMs(long callTimeMs) {
110 this.callTimeMs = callTimeMs;
113 public int getConcurrentCallsPerServer() {
114 return concurrentCallsPerServer;
117 public void setConcurrentCallsPerServer(int callsPerServer) {
118 this.concurrentCallsPerServer = callsPerServer;
121 public int getNumActionsPerServer() {
122 return numActionsPerServer;
125 public void setNumActionsPerServer(int numActionsPerServer) {
126 this.numActionsPerServer = numActionsPerServer;
130 protected static final class CallTracker {
131 private final String name;
132 final Timer callTimer;
133 final Histogram reqHist;
134 final Histogram respHist;
136 private CallTracker(MetricRegistry registry, String name, String subName, String scope) {
137 StringBuilder sb = new StringBuilder(CLIENT_SVC).append("_").append(name);
138 if (subName != null) {
139 sb.append("(").append(subName).append(")");
141 this.name = sb.toString();
142 this.callTimer = registry.timer(name(MetricsConnection.class,
143 DRTN_BASE + this.name, scope));
144 this.reqHist = registry.histogram(name(MetricsConnection.class,
145 REQ_BASE + this.name, scope));
146 this.respHist = registry.histogram(name(MetricsConnection.class,
147 RESP_BASE + this.name, scope));
150 private CallTracker(MetricRegistry registry, String name, String scope) {
151 this(registry, name, null, scope);
154 public void updateRpc(CallStats stats) {
155 this.callTimer.update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS);
156 this.reqHist.update(stats.getRequestSizeBytes());
157 this.respHist.update(stats.getResponseSizeBytes());
160 @Override
161 public String toString() {
162 return "CallTracker:" + name;
166 protected static class RegionStats {
167 final String name;
168 final Histogram memstoreLoadHist;
169 final Histogram heapOccupancyHist;
171 public RegionStats(MetricRegistry registry, String name) {
172 this.name = name;
173 this.memstoreLoadHist = registry.histogram(name(MetricsConnection.class,
174 MEMLOAD_BASE + this.name));
175 this.heapOccupancyHist = registry.histogram(name(MetricsConnection.class,
176 HEAP_BASE + this.name));
179 public void update(RegionLoadStats regionStatistics) {
180 this.memstoreLoadHist.update(regionStatistics.getMemStoreLoad());
181 this.heapOccupancyHist.update(regionStatistics.getHeapOccupancy());
185 protected static class RunnerStats {
186 final Counter normalRunners;
187 final Counter delayRunners;
188 final Histogram delayIntevalHist;
190 public RunnerStats(MetricRegistry registry) {
191 this.normalRunners = registry.counter(
192 name(MetricsConnection.class, "normalRunnersCount"));
193 this.delayRunners = registry.counter(
194 name(MetricsConnection.class, "delayRunnersCount"));
195 this.delayIntevalHist = registry.histogram(
196 name(MetricsConnection.class, "delayIntervalHist"));
199 public void incrNormalRunners() {
200 this.normalRunners.inc();
203 public void incrDelayRunners() {
204 this.delayRunners.inc();
207 public void updateDelayInterval(long interval) {
208 this.delayIntevalHist.update(interval);
212 protected ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> serverStats
213 = new ConcurrentHashMap<>();
215 public void updateServerStats(ServerName serverName, byte[] regionName,
216 Object r) {
217 if (!(r instanceof Result)) {
218 return;
220 Result result = (Result) r;
221 RegionLoadStats stats = result.getStats();
222 if (stats == null) {
223 return;
225 updateRegionStats(serverName, regionName, stats);
228 @Override
229 public void updateRegionStats(ServerName serverName, byte[] regionName, RegionLoadStats stats) {
230 String name = serverName.getServerName() + "," + Bytes.toStringBinary(regionName);
231 ConcurrentMap<byte[], RegionStats> rsStats = computeIfAbsent(serverStats, serverName,
232 () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR));
233 RegionStats regionStats =
234 computeIfAbsent(rsStats, regionName, () -> new RegionStats(this.registry, name));
235 regionStats.update(stats);
238 /** A lambda for dispatching to the appropriate metric factory method */
239 private static interface NewMetric<T> {
240 T newMetric(Class<?> clazz, String name, String scope);
243 /** Anticipated number of metric entries */
244 private static final int CAPACITY = 50;
245 /** Default load factor from {@link java.util.HashMap#DEFAULT_LOAD_FACTOR} */
246 private static final float LOAD_FACTOR = 0.75f;
248 * Anticipated number of concurrent accessor threads
250 private static final int CONCURRENCY_LEVEL = 256;
252 private final MetricRegistry registry;
253 private final JmxReporter reporter;
254 private final String scope;
256 private final NewMetric<Timer> timerFactory = new NewMetric<Timer>() {
257 @Override public Timer newMetric(Class<?> clazz, String name, String scope) {
258 return registry.timer(name(clazz, name, scope));
262 private final NewMetric<Histogram> histogramFactory = new NewMetric<Histogram>() {
263 @Override public Histogram newMetric(Class<?> clazz, String name, String scope) {
264 return registry.histogram(name(clazz, name, scope));
268 private final NewMetric<Counter> counterFactory = new NewMetric<Counter>() {
269 @Override public Counter newMetric(Class<?> clazz, String name, String scope) {
270 return registry.counter(name(clazz, name, scope));
274 // static metrics
276 protected final Counter metaCacheHits;
277 protected final Counter metaCacheMisses;
278 protected final CallTracker getTracker;
279 protected final CallTracker scanTracker;
280 protected final CallTracker appendTracker;
281 protected final CallTracker deleteTracker;
282 protected final CallTracker incrementTracker;
283 protected final CallTracker putTracker;
284 protected final CallTracker multiTracker;
285 protected final RunnerStats runnerStats;
286 protected final Counter metaCacheNumClearServer;
287 protected final Counter metaCacheNumClearRegion;
288 protected final Counter hedgedReadOps;
289 protected final Counter hedgedReadWin;
290 protected final Histogram concurrentCallsPerServerHist;
291 protected final Histogram numActionsPerServerHist;
292 protected final Counter nsLookups;
293 protected final Counter nsLookupsFailed;
295 // dynamic metrics
297 // These maps are used to cache references to the metric instances that are managed by the
298 // registry. I don't think their use perfectly removes redundant allocations, but it's
299 // a big improvement over calling registry.newMetric each time.
300 protected final ConcurrentMap<String, Timer> rpcTimers =
301 new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
302 protected final ConcurrentMap<String, Histogram> rpcHistograms =
303 new ConcurrentHashMap<>(CAPACITY * 2 /* tracking both request and response sizes */,
304 LOAD_FACTOR, CONCURRENCY_LEVEL);
305 private final ConcurrentMap<String, Counter> cacheDroppingExceptions =
306 new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
307 protected final ConcurrentMap<String, Counter> rpcCounters =
308 new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
310 MetricsConnection(String scope, Supplier<ThreadPoolExecutor> batchPool,
311 Supplier<ThreadPoolExecutor> metaPool) {
312 this.scope = scope;
313 this.registry = new MetricRegistry();
314 this.registry.register(getExecutorPoolName(),
315 new RatioGauge() {
316 @Override
317 protected Ratio getRatio() {
318 ThreadPoolExecutor pool = batchPool.get();
319 if (pool == null) {
320 return Ratio.of(0, 0);
322 return Ratio.of(pool.getActiveCount(), pool.getMaximumPoolSize());
325 this.registry.register(getMetaPoolName(),
326 new RatioGauge() {
327 @Override
328 protected Ratio getRatio() {
329 ThreadPoolExecutor pool = metaPool.get();
330 if (pool == null) {
331 return Ratio.of(0, 0);
333 return Ratio.of(pool.getActiveCount(), pool.getMaximumPoolSize());
336 this.metaCacheHits = registry.counter(name(this.getClass(), "metaCacheHits", scope));
337 this.metaCacheMisses = registry.counter(name(this.getClass(), "metaCacheMisses", scope));
338 this.metaCacheNumClearServer = registry.counter(name(this.getClass(),
339 "metaCacheNumClearServer", scope));
340 this.metaCacheNumClearRegion = registry.counter(name(this.getClass(),
341 "metaCacheNumClearRegion", scope));
342 this.hedgedReadOps = registry.counter(name(this.getClass(), "hedgedReadOps", scope));
343 this.hedgedReadWin = registry.counter(name(this.getClass(), "hedgedReadWin", scope));
344 this.getTracker = new CallTracker(this.registry, "Get", scope);
345 this.scanTracker = new CallTracker(this.registry, "Scan", scope);
346 this.appendTracker = new CallTracker(this.registry, "Mutate", "Append", scope);
347 this.deleteTracker = new CallTracker(this.registry, "Mutate", "Delete", scope);
348 this.incrementTracker = new CallTracker(this.registry, "Mutate", "Increment", scope);
349 this.putTracker = new CallTracker(this.registry, "Mutate", "Put", scope);
350 this.multiTracker = new CallTracker(this.registry, "Multi", scope);
351 this.runnerStats = new RunnerStats(this.registry);
352 this.concurrentCallsPerServerHist = registry.histogram(name(MetricsConnection.class,
353 "concurrentCallsPerServer", scope));
354 this.numActionsPerServerHist = registry.histogram(name(MetricsConnection.class,
355 "numActionsPerServer", scope));
356 this.nsLookups = registry.counter(name(this.getClass(), NS_LOOKUPS, scope));
357 this.nsLookupsFailed = registry.counter(name(this.getClass(), NS_LOOKUPS_FAILED, scope));
359 this.reporter = JmxReporter.forRegistry(this.registry).build();
360 this.reporter.start();
363 final String getExecutorPoolName() {
364 return name(getClass(), "executorPoolActiveThreads", scope);
367 final String getMetaPoolName() {
368 return name(getClass(), "metaPoolActiveThreads", scope);
371 MetricRegistry getMetricRegistry() {
372 return registry;
375 public void shutdown() {
376 this.reporter.stop();
379 /** Produce an instance of {@link CallStats} for clients to attach to RPCs. */
380 public static CallStats newCallStats() {
381 // TODO: instance pool to reduce GC?
382 return new CallStats();
385 /** Increment the number of meta cache hits. */
386 public void incrMetaCacheHit() {
387 metaCacheHits.inc();
390 /** Increment the number of meta cache misses. */
391 public void incrMetaCacheMiss() {
392 metaCacheMisses.inc();
395 /** Increment the number of meta cache drops requested for entire RegionServer. */
396 public void incrMetaCacheNumClearServer() {
397 metaCacheNumClearServer.inc();
400 /** Increment the number of meta cache drops requested for individual region. */
401 public void incrMetaCacheNumClearRegion() {
402 metaCacheNumClearRegion.inc();
405 /** Increment the number of meta cache drops requested for individual region. */
406 public void incrMetaCacheNumClearRegion(int count) {
407 metaCacheNumClearRegion.inc(count);
410 /** Increment the number of hedged read that have occurred. */
411 public void incrHedgedReadOps() {
412 hedgedReadOps.inc();
415 /** Increment the number of hedged read returned faster than the original read. */
416 public void incrHedgedReadWin() {
417 hedgedReadWin.inc();
420 /** Increment the number of normal runner counts. */
421 public void incrNormalRunners() {
422 this.runnerStats.incrNormalRunners();
425 /** Increment the number of delay runner counts and update delay interval of delay runner. */
426 public void incrDelayRunnersAndUpdateDelayInterval(long interval) {
427 this.runnerStats.incrDelayRunners();
428 this.runnerStats.updateDelayInterval(interval);
432 * Get a metric for {@code key} from {@code map}, or create it with {@code factory}.
434 private <T> T getMetric(String key, ConcurrentMap<String, T> map, NewMetric<T> factory) {
435 return computeIfAbsent(map, key, () -> factory.newMetric(getClass(), key, scope));
438 /** Update call stats for non-critical-path methods */
439 private void updateRpcGeneric(String methodName, CallStats stats) {
440 getMetric(DRTN_BASE + methodName, rpcTimers, timerFactory)
441 .update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS);
442 getMetric(REQ_BASE + methodName, rpcHistograms, histogramFactory)
443 .update(stats.getRequestSizeBytes());
444 getMetric(RESP_BASE + methodName, rpcHistograms, histogramFactory)
445 .update(stats.getResponseSizeBytes());
448 /** Report RPC context to metrics system. */
449 public void updateRpc(MethodDescriptor method, Message param, CallStats stats) {
450 int callsPerServer = stats.getConcurrentCallsPerServer();
451 if (callsPerServer > 0) {
452 concurrentCallsPerServerHist.update(callsPerServer);
454 // Update the counter that tracks RPCs by type.
455 final String methodName = method.getService().getName() + "_" + method.getName();
456 getMetric(CNT_BASE + methodName, rpcCounters, counterFactory).inc();
457 // this implementation is tied directly to protobuf implementation details. would be better
458 // if we could dispatch based on something static, ie, request Message type.
459 if (method.getService() == ClientService.getDescriptor()) {
460 switch(method.getIndex()) {
461 case 0:
462 assert "Get".equals(method.getName());
463 getTracker.updateRpc(stats);
464 return;
465 case 1:
466 assert "Mutate".equals(method.getName());
467 final MutationType mutationType = ((MutateRequest) param).getMutation().getMutateType();
468 switch(mutationType) {
469 case APPEND:
470 appendTracker.updateRpc(stats);
471 return;
472 case DELETE:
473 deleteTracker.updateRpc(stats);
474 return;
475 case INCREMENT:
476 incrementTracker.updateRpc(stats);
477 return;
478 case PUT:
479 putTracker.updateRpc(stats);
480 return;
481 default:
482 throw new RuntimeException("Unrecognized mutation type " + mutationType);
484 case 2:
485 assert "Scan".equals(method.getName());
486 scanTracker.updateRpc(stats);
487 return;
488 case 3:
489 assert "BulkLoadHFile".equals(method.getName());
490 // use generic implementation
491 break;
492 case 4:
493 assert "PrepareBulkLoad".equals(method.getName());
494 // use generic implementation
495 break;
496 case 5:
497 assert "CleanupBulkLoad".equals(method.getName());
498 // use generic implementation
499 break;
500 case 6:
501 assert "ExecService".equals(method.getName());
502 // use generic implementation
503 break;
504 case 7:
505 assert "ExecRegionServerService".equals(method.getName());
506 // use generic implementation
507 break;
508 case 8:
509 assert "Multi".equals(method.getName());
510 numActionsPerServerHist.update(stats.getNumActionsPerServer());
511 multiTracker.updateRpc(stats);
512 return;
513 default:
514 throw new RuntimeException("Unrecognized ClientService RPC type " + method.getFullName());
517 // Fallback to dynamic registry lookup for DDL methods.
518 updateRpcGeneric(methodName, stats);
521 public void incrCacheDroppingExceptions(Object exception) {
522 getMetric(CACHE_BASE +
523 (exception == null? UNKNOWN_EXCEPTION : exception.getClass().getSimpleName()),
524 cacheDroppingExceptions, counterFactory).inc();
527 public void incrNsLookups() {
528 this.nsLookups.inc();
531 public void incrNsLookupsFailed() {
532 this.nsLookupsFailed.inc();