2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.util
;
20 import java
.io
.IOException
;
21 import java
.io
.InterruptedIOException
;
22 import java
.util
.concurrent
.ConcurrentHashMap
;
23 import java
.util
.concurrent
.ConcurrentMap
;
25 import org
.apache
.yetus
.audience
.InterfaceAudience
;
26 import org
.slf4j
.Logger
;
27 import org
.slf4j
.LoggerFactory
;
29 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.annotations
.VisibleForTesting
;
30 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.base
.Preconditions
;
33 * Allows multiple concurrent clients to lock on a numeric id with a minimal
34 * memory overhead. The intended usage is as follows:
37 * IdLock.Entry lockEntry = idLock.getLockEntry(id);
41 * idLock.releaseLockEntry(lockEntry);
44 @InterfaceAudience.Private
47 private static final Logger LOG
= LoggerFactory
.getLogger(IdLock
.class);
49 /** An entry returned to the client as a lock object */
50 public static final class Entry
{
51 private final long id
;
52 private int numWaiters
;
53 private boolean locked
= true;
54 private Thread holder
;
56 private Entry(long id
, Thread holder
) {
62 public String
toString() {
63 return "id=" + id
+ ", numWaiter=" + numWaiters
+ ", isLocked="
64 + locked
+ ", holder=" + holder
;
68 private ConcurrentMap
<Long
, Entry
> map
= new ConcurrentHashMap
<>();
71 * Blocks until the lock corresponding to the given id is acquired.
73 * @param id an arbitrary number to lock on
74 * @return an "entry" to pass to {@link #releaseLockEntry(Entry)} to release
76 * @throws IOException if interrupted
78 public Entry
getLockEntry(long id
) throws IOException
{
79 Thread currentThread
= Thread
.currentThread();
80 Entry entry
= new Entry(id
, currentThread
);
82 while ((existing
= map
.putIfAbsent(entry
.id
, entry
)) != null) {
83 synchronized (existing
) {
84 if (existing
.locked
) {
85 ++existing
.numWaiters
; // Add ourselves to waiters.
86 while (existing
.locked
) {
89 } catch (InterruptedException e
) {
90 --existing
.numWaiters
; // Remove ourselves from waiters.
92 // There is a rare case that interrupting and the lock owner thread call
93 // releaseLockEntry at the same time. Since the owner thread found there
94 // still one waiting, it won't remove the entry from the map. If the interrupted
95 // thread is the last one waiting on the lock, and since an exception is thrown,
96 // the 'existing' entry will stay in the map forever. Later threads which try to
97 // get this lock will stuck in a infinite loop because
98 // existing = map.putIfAbsent(entry.id, entry)) != null and existing.locked=false.
99 if (!existing
.locked
&& existing
.numWaiters
== 0) {
100 map
.remove(existing
.id
);
102 throw new InterruptedIOException(
103 "Interrupted waiting to acquire sparse lock");
107 --existing
.numWaiters
; // Remove ourselves from waiters.
108 existing
.locked
= true;
109 existing
.holder
= currentThread
;
112 // If the entry is not locked, it might already be deleted from the
113 // map, so we cannot return it. We need to get our entry into the map
114 // or get someone else's locked entry.
121 * Blocks until the lock corresponding to the given id is acquired.
123 * @param id an arbitrary number to lock on
124 * @param time time to wait in ms
125 * @return an "entry" to pass to {@link #releaseLockEntry(Entry)} to release
127 * @throws IOException if interrupted
129 public Entry
tryLockEntry(long id
, long time
) throws IOException
{
130 Preconditions
.checkArgument(time
>= 0);
131 Thread currentThread
= Thread
.currentThread();
132 Entry entry
= new Entry(id
, currentThread
);
134 long waitUtilTS
= System
.currentTimeMillis() + time
;
135 long remaining
= time
;
136 while ((existing
= map
.putIfAbsent(entry
.id
, entry
)) != null) {
137 synchronized (existing
) {
138 if (existing
.locked
) {
139 ++existing
.numWaiters
; // Add ourselves to waiters.
141 while (existing
.locked
) {
142 existing
.wait(remaining
);
143 if (existing
.locked
) {
144 long currentTS
= System
.currentTimeMillis();
145 if (currentTS
>= waitUtilTS
) {
149 // our wait is waken, but the lock is still taken, this can happen
150 // due to JDK Object's wait/notify mechanism.
151 // Calculate the new remaining time to wait
152 remaining
= waitUtilTS
- currentTS
;
157 } catch (InterruptedException e
) {
159 // Please refer to the comments in getLockEntry()
160 // the difference here is that we decrease numWaiters in finally block
161 if (!existing
.locked
&& existing
.numWaiters
== 1) {
162 map
.remove(existing
.id
);
164 throw new InterruptedIOException(
165 "Interrupted waiting to acquire sparse lock");
167 --existing
.numWaiters
; // Remove ourselves from waiters.
169 existing
.locked
= true;
170 existing
.holder
= currentThread
;
173 // If the entry is not locked, it might already be deleted from the
174 // map, so we cannot return it. We need to get our entry into the map
175 // or get someone else's locked entry.
182 * Must be called in a finally block to decrease the internal counter and remove the monitor
183 * object for the given id if the caller is the last client.
184 * @param entry the return value of {@link #getLockEntry(long)}
186 public void releaseLockEntry(Entry entry
) {
187 Thread currentThread
= Thread
.currentThread();
188 synchronized (entry
) {
189 if (entry
.holder
!= currentThread
) {
190 LOG
.warn("{} is trying to release lock entry {}, but it is not the holder.", currentThread
,
193 entry
.locked
= false;
194 if (entry
.numWaiters
> 0) {
197 map
.remove(entry
.id
);
203 * Test whether the given id is already locked by the current thread.
205 public boolean isHeldByCurrentThread(long id
) {
206 Thread currentThread
= Thread
.currentThread();
207 Entry entry
= map
.get(id
);
211 synchronized (entry
) {
212 return currentThread
.equals(entry
.holder
);
217 void assertMapEmpty() {
218 assert map
.isEmpty();
222 public void waitForWaiters(long id
, int numWaiters
) throws InterruptedException
{
223 for (Entry entry
;;) {
226 synchronized (entry
) {
227 if (entry
.numWaiters
>= numWaiters
) {