3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
.regionserver
;
21 import java
.io
.IOException
;
22 import java
.util
.ConcurrentModificationException
;
23 import java
.util
.Iterator
;
25 import java
.util
.concurrent
.ConcurrentHashMap
;
26 import java
.util
.concurrent
.Delayed
;
27 import java
.util
.concurrent
.TimeUnit
;
29 import org
.apache
.hadoop
.hbase
.log
.HBaseMarkers
;
30 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
31 import org
.apache
.yetus
.audience
.InterfaceAudience
;
32 import org
.slf4j
.Logger
;
33 import org
.slf4j
.LoggerFactory
;
38 * There are several server classes in HBase that need to track external
39 * clients that occasionally send heartbeats.
41 * <p>These external clients hold resources in the server class.
42 * Those resources need to be released if the external client fails to send a
43 * heartbeat after some interval of time passes.
45 * <p>The Leases class is a general reusable class for this kind of pattern.
46 * An instance of the Leases class will create a thread to do its dirty work.
47 * You should close() the instance if you want to clean up the thread properly.
50 * NOTE: This class extends Thread rather than Chore because the sleep time
51 * can be interrupted when there is something to do, rather than the Chore
52 * sleep time which is invariant.
54 @InterfaceAudience.Private
55 public class LeaseManager
extends Thread
{
56 private static final Logger LOG
= LoggerFactory
.getLogger(LeaseManager
.class.getName());
57 private static final int MIN_WAIT_TIME
= 100;
59 private final Map
<String
, Lease
> leases
= new ConcurrentHashMap
<>();
60 private final int leaseCheckFrequency
;
61 private volatile boolean stopRequested
= false;
64 * Creates a lease manager.
66 * @param leaseCheckFrequency - how often the lease should be checked (milliseconds)
68 public LeaseManager(final int leaseCheckFrequency
) {
69 super("RegionServer.LeaseManager"); // thread name
70 this.leaseCheckFrequency
= leaseCheckFrequency
;
76 long toWait
= leaseCheckFrequency
;
77 Lease nextLease
= null;
78 long nextLeaseDelay
= Long
.MAX_VALUE
;
80 while (!stopRequested
|| (stopRequested
&& !leases
.isEmpty()) ) {
83 if (nextLease
!= null) {
84 toWait
= nextLease
.getDelay(TimeUnit
.MILLISECONDS
);
87 toWait
= Math
.min(leaseCheckFrequency
, toWait
);
88 toWait
= Math
.max(MIN_WAIT_TIME
, toWait
);
91 } catch (InterruptedException
| ConcurrentModificationException e
) {
93 } catch (Throwable e
) {
94 LOG
.error(HBaseMarkers
.FATAL
, "Unexpected exception killed leases thread", e
);
99 nextLeaseDelay
= Long
.MAX_VALUE
;
100 for (Iterator
<Map
.Entry
<String
, Lease
>> it
= leases
.entrySet().iterator(); it
.hasNext();) {
101 Map
.Entry
<String
, Lease
> entry
= it
.next();
102 Lease lease
= entry
.getValue();
103 long thisLeaseDelay
= lease
.getDelay(TimeUnit
.MILLISECONDS
);
104 if ( thisLeaseDelay
> 0) {
105 if (nextLease
== null || thisLeaseDelay
< nextLeaseDelay
) {
107 nextLeaseDelay
= thisLeaseDelay
;
110 // A lease expired. Run the expired code before removing from map
111 // since its presence in map is used to see if lease exists still.
112 if (lease
.getListener() == null) {
113 LOG
.error("lease listener is null for lease " + lease
.getLeaseName());
115 lease
.getListener().leaseExpired();
125 * Shuts down this lease instance when all outstanding leases expire.
126 * Like {@link #close()} but rather than violently end all leases, waits
127 * first on extant leases to finish. Use this method if the lease holders
128 * could lose data, leak locks, etc. Presumes client has shutdown
129 * allocation of new leases.
131 public void closeAfterLeasesExpire() {
132 this.stopRequested
= true;
136 * Shut down this Leases instance. All pending leases will be destroyed,
137 * without any cancellation calls.
139 public void close() {
140 this.stopRequested
= true;
142 LOG
.info("Closed leases");
146 * Create a lease and insert it to the map of leases.
148 * @param leaseName name of the lease
149 * @param leaseTimeoutPeriod length of the lease in milliseconds
150 * @param listener listener that will process lease expirations
151 * @return The lease created.
153 public Lease
createLease(String leaseName
, int leaseTimeoutPeriod
, final LeaseListener listener
)
154 throws LeaseStillHeldException
{
155 Lease lease
= new Lease(leaseName
, leaseTimeoutPeriod
, listener
);
161 * Inserts lease. Resets expiration before insertion.
163 public void addLease(final Lease lease
) throws LeaseStillHeldException
{
164 if (this.stopRequested
) {
167 if (leases
.containsKey(lease
.getLeaseName())) {
168 throw new LeaseStillHeldException(lease
.getLeaseName());
170 lease
.resetExpirationTime();
171 leases
.put(lease
.getLeaseName(), lease
);
177 * @param leaseName name of the lease
179 public void renewLease(final String leaseName
) throws LeaseException
{
180 if (this.stopRequested
) {
183 Lease lease
= leases
.get(leaseName
);
185 if (lease
== null ) {
186 throw new LeaseException("lease '" + leaseName
+
187 "' does not exist or has already expired");
189 lease
.resetExpirationTime();
193 * Client explicitly cancels a lease.
195 * @param leaseName name of lease
197 public void cancelLease(final String leaseName
) throws LeaseException
{
198 removeLease(leaseName
);
202 * Remove named lease. Lease is removed from the map of leases.
204 * @param leaseName name of lease
205 * @return Removed lease
207 Lease
removeLease(final String leaseName
) throws LeaseException
{
208 Lease lease
= leases
.remove(leaseName
);
210 throw new LeaseException("lease '" + leaseName
+ "' does not exist");
216 * Thrown if we are asked to create a lease but lease on passed name already
219 @SuppressWarnings("serial")
220 public static class LeaseStillHeldException
extends IOException
{
221 private final String leaseName
;
223 public LeaseStillHeldException(final String name
) {
224 this.leaseName
= name
;
227 /** @return name of lease */
228 public String
getName() {
229 return this.leaseName
;
233 /** This class tracks a single Lease. */
234 static class Lease
implements Delayed
{
235 private final String leaseName
;
236 private final LeaseListener listener
;
237 private int leaseTimeoutPeriod
;
238 private long expirationTime
;
240 Lease(final String leaseName
, int leaseTimeoutPeriod
, LeaseListener listener
) {
241 this.leaseName
= leaseName
;
242 this.listener
= listener
;
243 this.leaseTimeoutPeriod
= leaseTimeoutPeriod
;
244 this.expirationTime
= 0;
247 /** @return the lease name */
248 public String
getLeaseName() {
252 /** @return listener */
253 public LeaseListener
getListener() {
254 return this.listener
;
258 public boolean equals(Object obj
) {
265 if (getClass() != obj
.getClass()) {
268 return this.hashCode() == obj
.hashCode();
272 public int hashCode() {
273 return this.leaseName
.hashCode();
277 public long getDelay(TimeUnit unit
) {
278 return unit
.convert(this.expirationTime
- EnvironmentEdgeManager
.currentTime(),
279 TimeUnit
.MILLISECONDS
);
283 public int compareTo(Delayed o
) {
284 long delta
= this.getDelay(TimeUnit
.MILLISECONDS
) -
285 o
.getDelay(TimeUnit
.MILLISECONDS
);
287 return this.equals(o
) ?
0 : (delta
> 0 ?
1 : -1);
291 * Resets the expiration time of the lease.
293 public void resetExpirationTime() {
294 this.expirationTime
= EnvironmentEdgeManager
.currentTime() + this.leaseTimeoutPeriod
;