3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
.regionserver
;
21 import java
.text
.SimpleDateFormat
;
22 import java
.util
.Date
;
24 import java
.util
.concurrent
.ConcurrentHashMap
;
26 import org
.apache
.hadoop
.conf
.Configuration
;
27 import org
.apache
.hadoop
.hbase
.HConstants
;
28 import org
.apache
.hadoop
.hbase
.ScheduledChore
;
29 import org
.apache
.hadoop
.hbase
.Stoppable
;
30 import org
.apache
.yetus
.audience
.InterfaceAudience
;
31 import org
.slf4j
.Logger
;
32 import org
.slf4j
.LoggerFactory
;
33 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
34 import org
.apache
.hadoop
.hbase
.util
.NonceKey
;
36 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.annotations
.VisibleForTesting
;
39 * Implementation of nonce manager that stores nonces in a hash map and cleans them up after
40 * some time; if nonce group/client ID is supplied, nonces are stored by client ID.
42 @InterfaceAudience.Private
43 public class ServerNonceManager
{
44 public static final String HASH_NONCE_GRACE_PERIOD_KEY
= "hbase.server.hashNonce.gracePeriod";
45 private static final Logger LOG
= LoggerFactory
.getLogger(ServerNonceManager
.class);
47 /** The time to wait in an extremely unlikely case of a conflict with a running op.
48 * Only here so that tests could override it and not wait. */
49 private int conflictWaitIterationMs
= 30000;
51 private static final SimpleDateFormat tsFormat
= new SimpleDateFormat("HH:mm:ss.SSS");
53 // This object is used to synchronize on in case of collisions, and for cleanup.
54 private static class OperationContext
{
55 static final int DONT_PROCEED
= 0;
56 static final int PROCEED
= 1;
57 static final int WAIT
= 2;
59 // 0..1 - state, 2..2 - whether anyone is waiting, 3.. - ts of last activity
60 private long data
= 0;
61 private static final long STATE_BITS
= 3;
62 private static final long WAITING_BIT
= 4;
63 private static final long ALL_FLAG_BITS
= WAITING_BIT
| STATE_BITS
;
65 private volatile long mvcc
;
68 public String
toString() {
69 return "[state " + getState() + ", hasWait " + hasWait() + ", activity "
70 + tsFormat
.format(new Date(getActivityTime())) + "]";
73 public OperationContext() {
78 public void setState(int state
) {
79 this.data
= (this.data
& ~STATE_BITS
) | state
;
82 public int getState() {
83 return (int)(this.data
& STATE_BITS
);
86 public void setHasWait() {
87 this.data
= this.data
| WAITING_BIT
;
90 public boolean hasWait() {
91 return (this.data
& WAITING_BIT
) == WAITING_BIT
;
94 public void reportActivity() {
95 long now
= EnvironmentEdgeManager
.currentTime();
96 this.data
= (this.data
& ALL_FLAG_BITS
) | (now
<< 3);
99 public boolean isExpired(long minRelevantTime
) {
100 return getActivityTime() < (minRelevantTime
& (~
0L >>> 3));
103 public void setMvcc(long mvcc
) {
107 public long getMvcc() {
111 private long getActivityTime() {
112 return this.data
>>> 3;
118 * Approximate overhead per nonce: 64 bytes from hashmap, 32 from two objects (k/v),
119 * NK: 16 bytes (2 longs), OC: 8 bytes (1 long) - so, 120 bytes.
120 * With 30min expiration time, 5k increments/appends per sec., we'd use approximately 1Gb,
121 * which is a realistic worst case. If it's much worse, we could use some sort of memory
124 private ConcurrentHashMap
<NonceKey
, OperationContext
> nonces
= new ConcurrentHashMap
<>();
126 private int deleteNonceGracePeriod
;
128 public ServerNonceManager(Configuration conf
) {
129 // Default - 30 minutes.
130 deleteNonceGracePeriod
= conf
.getInt(HASH_NONCE_GRACE_PERIOD_KEY
, 30 * 60 * 1000);
131 if (deleteNonceGracePeriod
< 60 * 1000) {
132 LOG
.warn("Nonce grace period " + deleteNonceGracePeriod
133 + " is less than a minute; might be too small to be useful");
138 public void setConflictWaitIterationMs(int conflictWaitIterationMs
) {
139 this.conflictWaitIterationMs
= conflictWaitIterationMs
;
143 * Starts the operation if operation with such nonce has not already succeeded. If the
144 * operation is in progress, waits for it to end and checks whether it has succeeded.
145 * @param group Nonce group.
146 * @param nonce Nonce.
147 * @param stoppable Stoppable that terminates waiting (if any) when the server is stopped.
148 * @return true if the operation has not already succeeded and can proceed; false otherwise.
150 public boolean startOperation(long group
, long nonce
, Stoppable stoppable
)
151 throws InterruptedException
{
152 if (nonce
== HConstants
.NO_NONCE
) return true;
153 NonceKey nk
= new NonceKey(group
, nonce
);
154 OperationContext ctx
= new OperationContext();
156 OperationContext oldResult
= nonces
.putIfAbsent(nk
, ctx
);
157 if (oldResult
== null) return true;
159 // Collision with some operation - should be extremely rare.
160 synchronized (oldResult
) {
161 int oldState
= oldResult
.getState();
162 LOG
.debug("Conflict detected by nonce: " + nk
+ ", " + oldResult
);
163 if (oldState
!= OperationContext
.WAIT
) {
164 return oldState
== OperationContext
.PROCEED
; // operation ended
166 oldResult
.setHasWait();
167 oldResult
.wait(this.conflictWaitIterationMs
); // operation is still active... wait and loop
168 if (stoppable
.isStopped()) {
169 throw new InterruptedException("Server stopped");
176 * Ends the operation started by startOperation.
177 * @param group Nonce group.
178 * @param nonce Nonce.
179 * @param success Whether the operation has succeeded.
181 public void endOperation(long group
, long nonce
, boolean success
) {
182 if (nonce
== HConstants
.NO_NONCE
) return;
183 NonceKey nk
= new NonceKey(group
, nonce
);
184 OperationContext newResult
= nonces
.get(nk
);
185 assert newResult
!= null;
186 synchronized (newResult
) {
187 assert newResult
.getState() == OperationContext
.WAIT
;
188 // If we failed, other retries can proceed.
189 newResult
.setState(success ? OperationContext
.DONT_PROCEED
: OperationContext
.PROCEED
);
191 newResult
.reportActivity(); // Set time to use for cleanup.
193 OperationContext val
= nonces
.remove(nk
);
194 assert val
== newResult
;
196 if (newResult
.hasWait()) {
197 LOG
.debug("Conflict with running op ended: " + nk
+ ", " + newResult
);
198 newResult
.notifyAll();
204 * Store the write point in OperationContext when the operation succeed.
205 * @param group Nonce group.
206 * @param nonce Nonce.
207 * @param mvcc Write point of the succeed operation.
209 public void addMvccToOperationContext(long group
, long nonce
, long mvcc
) {
210 if (nonce
== HConstants
.NO_NONCE
) {
213 NonceKey nk
= new NonceKey(group
, nonce
);
214 OperationContext result
= nonces
.get(nk
);
215 assert result
!= null;
216 synchronized (result
) {
217 result
.setMvcc(mvcc
);
222 * Return the write point of the previous succeed operation.
223 * @param group Nonce group.
224 * @param nonce Nonce.
225 * @return write point of the previous succeed operation.
227 public long getMvccFromOperationContext(long group
, long nonce
) {
228 if (nonce
== HConstants
.NO_NONCE
) {
229 return Long
.MAX_VALUE
;
231 NonceKey nk
= new NonceKey(group
, nonce
);
232 OperationContext result
= nonces
.get(nk
);
233 return result
== null ? Long
.MAX_VALUE
: result
.getMvcc();
237 * Reports the operation from WAL during replay.
238 * @param group Nonce group.
239 * @param nonce Nonce.
240 * @param writeTime Entry write time, used to ignore entries that are too old.
242 public void reportOperationFromWal(long group
, long nonce
, long writeTime
) {
243 if (nonce
== HConstants
.NO_NONCE
) return;
244 // Give the write time some slack in case the clocks are not synchronized.
245 long now
= EnvironmentEdgeManager
.currentTime();
246 if (now
> writeTime
+ (deleteNonceGracePeriod
* 1.5)) return;
247 OperationContext newResult
= new OperationContext();
248 newResult
.setState(OperationContext
.DONT_PROCEED
);
249 NonceKey nk
= new NonceKey(group
, nonce
);
250 OperationContext oldResult
= nonces
.putIfAbsent(nk
, newResult
);
251 if (oldResult
!= null) {
252 // Some schemes can have collisions (for example, expiring hashes), so just log it.
253 // We have no idea about the semantics here, so this is the least of many evils.
254 LOG
.warn("Nonce collision during WAL recovery: " + nk
255 + ", " + oldResult
+ " with " + newResult
);
260 * Creates a scheduled chore that is used to clean up old nonces.
261 * @param stoppable Stoppable for the chore.
262 * @return ScheduledChore; the scheduled chore is not started.
264 public ScheduledChore
createCleanupScheduledChore(Stoppable stoppable
) {
265 // By default, it will run every 6 minutes (30 / 5).
266 return new ScheduledChore("nonceCleaner", stoppable
, deleteNonceGracePeriod
/ 5) {
268 protected void chore() {
274 private void cleanUpOldNonces() {
275 long cutoff
= EnvironmentEdgeManager
.currentTime() - deleteNonceGracePeriod
;
276 for (Map
.Entry
<NonceKey
, OperationContext
> entry
: nonces
.entrySet()) {
277 OperationContext oc
= entry
.getValue();
278 if (!oc
.isExpired(cutoff
)) continue;
280 if (oc
.getState() == OperationContext
.WAIT
|| !oc
.isExpired(cutoff
)) continue;
281 nonces
.remove(entry
.getKey());