HBASE-24163 MOB compactor implementations should use format specifiers when calling...
[hbase.git] / hbase-common / src / main / java / org / apache / hadoop / hbase / util / IdLock.java
blob9e5692feebb69741794b7c11b7a8cb06757d14af
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.util;
20 import java.io.IOException;
21 import java.io.InterruptedIOException;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ConcurrentMap;
25 import org.apache.yetus.audience.InterfaceAudience;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
29 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
30 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
32 /**
33 * Allows multiple concurrent clients to lock on a numeric id with a minimal
34 * memory overhead. The intended usage is as follows:
36 * <pre>
37 * IdLock.Entry lockEntry = idLock.getLockEntry(id);
38 * try {
39 * // User code.
40 * } finally {
41 * idLock.releaseLockEntry(lockEntry);
42 * }</pre>
44 @InterfaceAudience.Private
45 public class IdLock {
47 private static final Logger LOG = LoggerFactory.getLogger(IdLock.class);
49 /** An entry returned to the client as a lock object */
50 public static final class Entry {
51 private final long id;
52 private int numWaiters;
53 private boolean locked = true;
54 private Thread holder;
56 private Entry(long id, Thread holder) {
57 this.id = id;
58 this.holder = holder;
61 @Override
62 public String toString() {
63 return "id=" + id + ", numWaiter=" + numWaiters + ", isLocked="
64 + locked + ", holder=" + holder;
68 private ConcurrentMap<Long, Entry> map = new ConcurrentHashMap<>();
70 /**
71 * Blocks until the lock corresponding to the given id is acquired.
73 * @param id an arbitrary number to lock on
74 * @return an "entry" to pass to {@link #releaseLockEntry(Entry)} to release
75 * the lock
76 * @throws IOException if interrupted
78 public Entry getLockEntry(long id) throws IOException {
79 Thread currentThread = Thread.currentThread();
80 Entry entry = new Entry(id, currentThread);
81 Entry existing;
82 while ((existing = map.putIfAbsent(entry.id, entry)) != null) {
83 synchronized (existing) {
84 if (existing.locked) {
85 ++existing.numWaiters; // Add ourselves to waiters.
86 while (existing.locked) {
87 try {
88 existing.wait();
89 } catch (InterruptedException e) {
90 --existing.numWaiters; // Remove ourselves from waiters.
91 // HBASE-21292
92 // There is a rare case that interrupting and the lock owner thread call
93 // releaseLockEntry at the same time. Since the owner thread found there
94 // still one waiting, it won't remove the entry from the map. If the interrupted
95 // thread is the last one waiting on the lock, and since an exception is thrown,
96 // the 'existing' entry will stay in the map forever. Later threads which try to
97 // get this lock will stuck in a infinite loop because
98 // existing = map.putIfAbsent(entry.id, entry)) != null and existing.locked=false.
99 if (!existing.locked && existing.numWaiters == 0) {
100 map.remove(existing.id);
102 throw new InterruptedIOException(
103 "Interrupted waiting to acquire sparse lock");
107 --existing.numWaiters; // Remove ourselves from waiters.
108 existing.locked = true;
109 existing.holder = currentThread;
110 return existing;
112 // If the entry is not locked, it might already be deleted from the
113 // map, so we cannot return it. We need to get our entry into the map
114 // or get someone else's locked entry.
117 return entry;
121 * Blocks until the lock corresponding to the given id is acquired.
123 * @param id an arbitrary number to lock on
124 * @param time time to wait in ms
125 * @return an "entry" to pass to {@link #releaseLockEntry(Entry)} to release
126 * the lock
127 * @throws IOException if interrupted
129 public Entry tryLockEntry(long id, long time) throws IOException {
130 Preconditions.checkArgument(time >= 0);
131 Thread currentThread = Thread.currentThread();
132 Entry entry = new Entry(id, currentThread);
133 Entry existing;
134 long waitUtilTS = System.currentTimeMillis() + time;
135 long remaining = time;
136 while ((existing = map.putIfAbsent(entry.id, entry)) != null) {
137 synchronized (existing) {
138 if (existing.locked) {
139 ++existing.numWaiters; // Add ourselves to waiters.
140 try {
141 while (existing.locked) {
142 existing.wait(remaining);
143 if (existing.locked) {
144 long currentTS = System.currentTimeMillis();
145 if (currentTS >= waitUtilTS) {
146 // time is up
147 return null;
148 } else {
149 // our wait is waken, but the lock is still taken, this can happen
150 // due to JDK Object's wait/notify mechanism.
151 // Calculate the new remaining time to wait
152 remaining = waitUtilTS - currentTS;
157 } catch (InterruptedException e) {
158 // HBASE-21292
159 // Please refer to the comments in getLockEntry()
160 // the difference here is that we decrease numWaiters in finally block
161 if (!existing.locked && existing.numWaiters == 1) {
162 map.remove(existing.id);
164 throw new InterruptedIOException(
165 "Interrupted waiting to acquire sparse lock");
166 } finally {
167 --existing.numWaiters; // Remove ourselves from waiters.
169 existing.locked = true;
170 existing.holder = currentThread;
171 return existing;
173 // If the entry is not locked, it might already be deleted from the
174 // map, so we cannot return it. We need to get our entry into the map
175 // or get someone else's locked entry.
178 return entry;
182 * Must be called in a finally block to decrease the internal counter and remove the monitor
183 * object for the given id if the caller is the last client.
184 * @param entry the return value of {@link #getLockEntry(long)}
186 public void releaseLockEntry(Entry entry) {
187 Thread currentThread = Thread.currentThread();
188 synchronized (entry) {
189 if (entry.holder != currentThread) {
190 LOG.warn("{} is trying to release lock entry {}, but it is not the holder.", currentThread,
191 entry);
193 entry.locked = false;
194 if (entry.numWaiters > 0) {
195 entry.notify();
196 } else {
197 map.remove(entry.id);
203 * Test whether the given id is already locked by the current thread.
205 public boolean isHeldByCurrentThread(long id) {
206 Thread currentThread = Thread.currentThread();
207 Entry entry = map.get(id);
208 if (entry == null) {
209 return false;
211 synchronized (entry) {
212 return currentThread.equals(entry.holder);
216 @VisibleForTesting
217 void assertMapEmpty() {
218 assert map.isEmpty();
221 @VisibleForTesting
222 public void waitForWaiters(long id, int numWaiters) throws InterruptedException {
223 for (Entry entry;;) {
224 entry = map.get(id);
225 if (entry != null) {
226 synchronized (entry) {
227 if (entry.numWaiters >= numWaiters) {
228 return;
232 Thread.sleep(100);