HBASE-26921 Rewrite the counting cells part in TestMultiVersions (#4316)
[hbase.git] / hbase-procedure / src / main / java / org / apache / hadoop / hbase / procedure2 / CompletedProcedureCleaner.java
blob796a8e47c9181323907856dbad837a02cd2a849f
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.procedure2;
20 import java.io.IOException;
21 import java.io.UncheckedIOException;
22 import java.util.Iterator;
23 import java.util.Map;
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
26 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
27 import org.apache.hadoop.hbase.util.IdLock;
28 import org.apache.hadoop.hbase.util.NonceKey;
29 import org.apache.yetus.audience.InterfaceAudience;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
33 /**
34 * Internal cleaner that removes the completed procedure results after a TTL.
35 * <p/>
36 * NOTE: This is a special case handled in timeoutLoop().
37 * <p/>
38 * Since the client code looks more or less like:
40 * <pre>
41 * procId = master.doOperation()
42 * while (master.getProcResult(procId) == ProcInProgress);
43 * </pre>
45 * The master should not throw away the proc result as soon as the procedure is done but should wait
46 * a result request from the client (see executor.removeResult(procId)) The client will call
47 * something like master.isProcDone() or master.getProcResult() which will return the result/state
48 * to the client, and it will mark the completed proc as ready to delete. note that the client may
49 * not receive the response from the master (e.g. master failover) so, if we delay a bit the real
50 * deletion of the proc result the client will be able to get the result the next try.
52 @InterfaceAudience.Private
53 class CompletedProcedureCleaner<TEnvironment> extends ProcedureInMemoryChore<TEnvironment> {
54 private static final Logger LOG = LoggerFactory.getLogger(CompletedProcedureCleaner.class);
56 static final String CLEANER_INTERVAL_CONF_KEY = "hbase.procedure.cleaner.interval";
57 private static final int DEFAULT_CLEANER_INTERVAL = 30 * 1000; // 30sec
59 private static final String BATCH_SIZE_CONF_KEY = "hbase.procedure.cleaner.evict.batch.size";
60 private static final int DEFAULT_BATCH_SIZE = 32;
62 private final Map<Long, CompletedProcedureRetainer<TEnvironment>> completed;
63 private final Map<NonceKey, Long> nonceKeysToProcIdsMap;
64 private final ProcedureStore store;
65 private final IdLock procExecutionLock;
66 private Configuration conf;
68 public CompletedProcedureCleaner(Configuration conf, ProcedureStore store,
69 IdLock procExecutionLock, Map<Long, CompletedProcedureRetainer<TEnvironment>> completedMap,
70 Map<NonceKey, Long> nonceKeysToProcIdsMap) {
71 // set the timeout interval that triggers the periodic-procedure
72 super(conf.getInt(CLEANER_INTERVAL_CONF_KEY, DEFAULT_CLEANER_INTERVAL));
73 this.completed = completedMap;
74 this.nonceKeysToProcIdsMap = nonceKeysToProcIdsMap;
75 this.store = store;
76 this.procExecutionLock = procExecutionLock;
77 this.conf = conf;
80 @Override
81 protected void periodicExecute(final TEnvironment env) {
82 if (completed.isEmpty()) {
83 if (LOG.isTraceEnabled()) {
84 LOG.trace("No completed procedures to cleanup.");
86 return;
89 final long evictTtl =
90 conf.getInt(ProcedureExecutor.EVICT_TTL_CONF_KEY, ProcedureExecutor.DEFAULT_EVICT_TTL);
91 final long evictAckTtl = conf.getInt(ProcedureExecutor.EVICT_ACKED_TTL_CONF_KEY,
92 ProcedureExecutor.DEFAULT_ACKED_EVICT_TTL);
93 final int batchSize = conf.getInt(BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE);
95 final long[] batchIds = new long[batchSize];
96 int batchCount = 0;
98 final long now = EnvironmentEdgeManager.currentTime();
99 final Iterator<Map.Entry<Long, CompletedProcedureRetainer<TEnvironment>>> it =
100 completed.entrySet().iterator();
101 while (it.hasNext() && store.isRunning()) {
102 final Map.Entry<Long, CompletedProcedureRetainer<TEnvironment>> entry = it.next();
103 final CompletedProcedureRetainer<TEnvironment> retainer = entry.getValue();
104 final Procedure<?> proc = retainer.getProcedure();
105 IdLock.Entry lockEntry;
106 try {
107 lockEntry = procExecutionLock.getLockEntry(proc.getProcId());
108 } catch (IOException e) {
109 // can only happen if interrupted, so not a big deal to propagate it
110 throw new UncheckedIOException(e);
112 try {
113 // TODO: Select TTL based on Procedure type
114 if (retainer.isExpired(now, evictTtl, evictAckTtl)) {
115 // Failed procedures aren't persisted in WAL.
116 if (!(proc instanceof FailedProcedure)) {
117 batchIds[batchCount++] = entry.getKey();
118 if (batchCount == batchIds.length) {
119 store.delete(batchIds, 0, batchCount);
120 batchCount = 0;
123 final NonceKey nonceKey = proc.getNonceKey();
124 if (nonceKey != null) {
125 nonceKeysToProcIdsMap.remove(nonceKey);
127 it.remove();
128 LOG.trace("Evict completed {}", proc);
130 } finally {
131 procExecutionLock.releaseLockEntry(lockEntry);
134 if (batchCount > 0) {
135 store.delete(batchIds, 0, batchCount);
137 // let the store do some cleanup works, i.e, delete the place marker for preserving the max
138 // procedure id.
139 store.cleanup();