2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.procedure2
;
20 import java
.io
.IOException
;
21 import java
.io
.UncheckedIOException
;
22 import java
.util
.Iterator
;
24 import org
.apache
.hadoop
.conf
.Configuration
;
25 import org
.apache
.hadoop
.hbase
.procedure2
.store
.ProcedureStore
;
26 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
27 import org
.apache
.hadoop
.hbase
.util
.IdLock
;
28 import org
.apache
.hadoop
.hbase
.util
.NonceKey
;
29 import org
.apache
.yetus
.audience
.InterfaceAudience
;
30 import org
.slf4j
.Logger
;
31 import org
.slf4j
.LoggerFactory
;
34 * Internal cleaner that removes the completed procedure results after a TTL.
36 * NOTE: This is a special case handled in timeoutLoop().
38 * Since the client code looks more or less like:
41 * procId = master.doOperation()
42 * while (master.getProcResult(procId) == ProcInProgress);
45 * The master should not throw away the proc result as soon as the procedure is done but should wait
46 * a result request from the client (see executor.removeResult(procId)) The client will call
47 * something like master.isProcDone() or master.getProcResult() which will return the result/state
48 * to the client, and it will mark the completed proc as ready to delete. note that the client may
49 * not receive the response from the master (e.g. master failover) so, if we delay a bit the real
50 * deletion of the proc result the client will be able to get the result the next try.
52 @InterfaceAudience.Private
53 class CompletedProcedureCleaner
<TEnvironment
> extends ProcedureInMemoryChore
<TEnvironment
> {
54 private static final Logger LOG
= LoggerFactory
.getLogger(CompletedProcedureCleaner
.class);
56 static final String CLEANER_INTERVAL_CONF_KEY
= "hbase.procedure.cleaner.interval";
57 private static final int DEFAULT_CLEANER_INTERVAL
= 30 * 1000; // 30sec
59 private static final String BATCH_SIZE_CONF_KEY
= "hbase.procedure.cleaner.evict.batch.size";
60 private static final int DEFAULT_BATCH_SIZE
= 32;
62 private final Map
<Long
, CompletedProcedureRetainer
<TEnvironment
>> completed
;
63 private final Map
<NonceKey
, Long
> nonceKeysToProcIdsMap
;
64 private final ProcedureStore store
;
65 private final IdLock procExecutionLock
;
66 private Configuration conf
;
68 public CompletedProcedureCleaner(Configuration conf
, ProcedureStore store
,
69 IdLock procExecutionLock
, Map
<Long
, CompletedProcedureRetainer
<TEnvironment
>> completedMap
,
70 Map
<NonceKey
, Long
> nonceKeysToProcIdsMap
) {
71 // set the timeout interval that triggers the periodic-procedure
72 super(conf
.getInt(CLEANER_INTERVAL_CONF_KEY
, DEFAULT_CLEANER_INTERVAL
));
73 this.completed
= completedMap
;
74 this.nonceKeysToProcIdsMap
= nonceKeysToProcIdsMap
;
76 this.procExecutionLock
= procExecutionLock
;
81 protected void periodicExecute(final TEnvironment env
) {
82 if (completed
.isEmpty()) {
83 if (LOG
.isTraceEnabled()) {
84 LOG
.trace("No completed procedures to cleanup.");
90 conf
.getInt(ProcedureExecutor
.EVICT_TTL_CONF_KEY
, ProcedureExecutor
.DEFAULT_EVICT_TTL
);
91 final long evictAckTtl
= conf
.getInt(ProcedureExecutor
.EVICT_ACKED_TTL_CONF_KEY
,
92 ProcedureExecutor
.DEFAULT_ACKED_EVICT_TTL
);
93 final int batchSize
= conf
.getInt(BATCH_SIZE_CONF_KEY
, DEFAULT_BATCH_SIZE
);
95 final long[] batchIds
= new long[batchSize
];
98 final long now
= EnvironmentEdgeManager
.currentTime();
99 final Iterator
<Map
.Entry
<Long
, CompletedProcedureRetainer
<TEnvironment
>>> it
=
100 completed
.entrySet().iterator();
101 while (it
.hasNext() && store
.isRunning()) {
102 final Map
.Entry
<Long
, CompletedProcedureRetainer
<TEnvironment
>> entry
= it
.next();
103 final CompletedProcedureRetainer
<TEnvironment
> retainer
= entry
.getValue();
104 final Procedure
<?
> proc
= retainer
.getProcedure();
105 IdLock
.Entry lockEntry
;
107 lockEntry
= procExecutionLock
.getLockEntry(proc
.getProcId());
108 } catch (IOException e
) {
109 // can only happen if interrupted, so not a big deal to propagate it
110 throw new UncheckedIOException(e
);
113 // TODO: Select TTL based on Procedure type
114 if (retainer
.isExpired(now
, evictTtl
, evictAckTtl
)) {
115 // Failed procedures aren't persisted in WAL.
116 if (!(proc
instanceof FailedProcedure
)) {
117 batchIds
[batchCount
++] = entry
.getKey();
118 if (batchCount
== batchIds
.length
) {
119 store
.delete(batchIds
, 0, batchCount
);
123 final NonceKey nonceKey
= proc
.getNonceKey();
124 if (nonceKey
!= null) {
125 nonceKeysToProcIdsMap
.remove(nonceKey
);
128 LOG
.trace("Evict completed {}", proc
);
131 procExecutionLock
.releaseLockEntry(lockEntry
);
134 if (batchCount
> 0) {
135 store
.delete(batchIds
, 0, batchCount
);
137 // let the store do some cleanup works, i.e, delete the place marker for preserving the max