HBASE-26412 Handle sink failure in RegionReplicationSink (#3815)
[hbase.git] / hbase-server / src / main / java / org / apache / hadoop / hbase / regionserver / ServerNonceManager.java
blobe0ea974b194523bdade77a73a545600e33d336e3
1 /**
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org.apache.hadoop.hbase.regionserver;
21 import java.text.SimpleDateFormat;
22 import java.util.Date;
23 import java.util.Map;
24 import java.util.concurrent.ConcurrentHashMap;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.hbase.HConstants;
28 import org.apache.hadoop.hbase.ScheduledChore;
29 import org.apache.hadoop.hbase.Stoppable;
30 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
31 import org.apache.hadoop.hbase.util.NonceKey;
32 import org.apache.yetus.audience.InterfaceAudience;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
36 /**
37 * Implementation of nonce manager that stores nonces in a hash map and cleans them up after
38 * some time; if nonce group/client ID is supplied, nonces are stored by client ID.
40 @InterfaceAudience.Private
41 public class ServerNonceManager {
42 public static final String HASH_NONCE_GRACE_PERIOD_KEY = "hbase.server.hashNonce.gracePeriod";
43 private static final Logger LOG = LoggerFactory.getLogger(ServerNonceManager.class);
45 /** The time to wait in an extremely unlikely case of a conflict with a running op.
46 * Only here so that tests could override it and not wait. */
47 private int conflictWaitIterationMs = 30000;
49 private static final SimpleDateFormat tsFormat = new SimpleDateFormat("HH:mm:ss.SSS");
51 // This object is used to synchronize on in case of collisions, and for cleanup.
52 private static class OperationContext {
53 static final int DONT_PROCEED = 0;
54 static final int PROCEED = 1;
55 static final int WAIT = 2;
57 // 0..1 - state, 2..2 - whether anyone is waiting, 3.. - ts of last activity
58 private long data = 0;
59 private static final long STATE_BITS = 3;
60 private static final long WAITING_BIT = 4;
61 private static final long ALL_FLAG_BITS = WAITING_BIT | STATE_BITS;
63 private volatile long mvcc;
65 @Override
66 public String toString() {
67 return "[state " + getState() + ", hasWait " + hasWait() + ", activity "
68 + tsFormat.format(new Date(getActivityTime())) + "]";
71 public OperationContext() {
72 setState(WAIT);
73 reportActivity();
76 public void setState(int state) {
77 this.data = (this.data & ~STATE_BITS) | state;
80 public int getState() {
81 return (int)(this.data & STATE_BITS);
84 public void setHasWait() {
85 this.data = this.data | WAITING_BIT;
88 public boolean hasWait() {
89 return (this.data & WAITING_BIT) == WAITING_BIT;
92 public void reportActivity() {
93 long now = EnvironmentEdgeManager.currentTime();
94 this.data = (this.data & ALL_FLAG_BITS) | (now << 3);
97 public boolean isExpired(long minRelevantTime) {
98 return getActivityTime() < (minRelevantTime & (~0L >>> 3));
101 public void setMvcc(long mvcc) {
102 this.mvcc = mvcc;
105 public long getMvcc() {
106 return this.mvcc;
109 private long getActivityTime() {
110 return this.data >>> 3;
115 * Nonces.
116 * Approximate overhead per nonce: 64 bytes from hashmap, 32 from two objects (k/v),
117 * NK: 16 bytes (2 longs), OC: 8 bytes (1 long) - so, 120 bytes.
118 * With 30min expiration time, 5k increments/appends per sec., we'd use approximately 1Gb,
119 * which is a realistic worst case. If it's much worse, we could use some sort of memory
120 * limit and cleanup.
122 private ConcurrentHashMap<NonceKey, OperationContext> nonces = new ConcurrentHashMap<>();
124 private int deleteNonceGracePeriod;
126 public ServerNonceManager(Configuration conf) {
127 // Default - 30 minutes.
128 deleteNonceGracePeriod = conf.getInt(HASH_NONCE_GRACE_PERIOD_KEY, 30 * 60 * 1000);
129 if (deleteNonceGracePeriod < 60 * 1000) {
130 LOG.warn("Nonce grace period " + deleteNonceGracePeriod
131 + " is less than a minute; might be too small to be useful");
135 public void setConflictWaitIterationMs(int conflictWaitIterationMs) {
136 this.conflictWaitIterationMs = conflictWaitIterationMs;
140 * Starts the operation if operation with such nonce has not already succeeded. If the
141 * operation is in progress, waits for it to end and checks whether it has succeeded.
142 * @param group Nonce group.
143 * @param nonce Nonce.
144 * @param stoppable Stoppable that terminates waiting (if any) when the server is stopped.
145 * @return true if the operation has not already succeeded and can proceed; false otherwise.
147 public boolean startOperation(long group, long nonce, Stoppable stoppable)
148 throws InterruptedException {
149 if (nonce == HConstants.NO_NONCE) return true;
150 NonceKey nk = new NonceKey(group, nonce);
151 OperationContext ctx = new OperationContext();
152 while (true) {
153 OperationContext oldResult = nonces.putIfAbsent(nk, ctx);
154 if (oldResult == null) return true;
156 // Collision with some operation - should be extremely rare.
157 synchronized (oldResult) {
158 int oldState = oldResult.getState();
159 LOG.debug("Conflict detected by nonce: " + nk + ", " + oldResult);
160 if (oldState != OperationContext.WAIT) {
161 return oldState == OperationContext.PROCEED; // operation ended
163 oldResult.setHasWait();
164 oldResult.wait(this.conflictWaitIterationMs); // operation is still active... wait and loop
165 if (stoppable.isStopped()) {
166 throw new InterruptedException("Server stopped");
173 * Ends the operation started by startOperation.
174 * @param group Nonce group.
175 * @param nonce Nonce.
176 * @param success Whether the operation has succeeded.
178 public void endOperation(long group, long nonce, boolean success) {
179 if (nonce == HConstants.NO_NONCE) return;
180 NonceKey nk = new NonceKey(group, nonce);
181 OperationContext newResult = nonces.get(nk);
182 assert newResult != null;
183 synchronized (newResult) {
184 assert newResult.getState() == OperationContext.WAIT;
185 // If we failed, other retries can proceed.
186 newResult.setState(success ? OperationContext.DONT_PROCEED : OperationContext.PROCEED);
187 if (success) {
188 newResult.reportActivity(); // Set time to use for cleanup.
189 } else {
190 OperationContext val = nonces.remove(nk);
191 assert val == newResult;
193 if (newResult.hasWait()) {
194 LOG.debug("Conflict with running op ended: " + nk + ", " + newResult);
195 newResult.notifyAll();
201 * Store the write point in OperationContext when the operation succeed.
202 * @param group Nonce group.
203 * @param nonce Nonce.
204 * @param mvcc Write point of the succeed operation.
206 public void addMvccToOperationContext(long group, long nonce, long mvcc) {
207 if (nonce == HConstants.NO_NONCE) {
208 return;
210 NonceKey nk = new NonceKey(group, nonce);
211 OperationContext result = nonces.get(nk);
212 assert result != null;
213 synchronized (result) {
214 result.setMvcc(mvcc);
219 * Return the write point of the previous succeed operation.
220 * @param group Nonce group.
221 * @param nonce Nonce.
222 * @return write point of the previous succeed operation.
224 public long getMvccFromOperationContext(long group, long nonce) {
225 if (nonce == HConstants.NO_NONCE) {
226 return Long.MAX_VALUE;
228 NonceKey nk = new NonceKey(group, nonce);
229 OperationContext result = nonces.get(nk);
230 return result == null ? Long.MAX_VALUE : result.getMvcc();
234 * Reports the operation from WAL during replay.
235 * @param group Nonce group.
236 * @param nonce Nonce.
237 * @param writeTime Entry write time, used to ignore entries that are too old.
239 public void reportOperationFromWal(long group, long nonce, long writeTime) {
240 if (nonce == HConstants.NO_NONCE) return;
241 // Give the write time some slack in case the clocks are not synchronized.
242 long now = EnvironmentEdgeManager.currentTime();
243 if (now > writeTime + (deleteNonceGracePeriod * 1.5)) return;
244 OperationContext newResult = new OperationContext();
245 newResult.setState(OperationContext.DONT_PROCEED);
246 NonceKey nk = new NonceKey(group, nonce);
247 OperationContext oldResult = nonces.putIfAbsent(nk, newResult);
248 if (oldResult != null) {
249 // Some schemes can have collisions (for example, expiring hashes), so just log it.
250 // We have no idea about the semantics here, so this is the least of many evils.
251 LOG.warn("Nonce collision during WAL recovery: " + nk
252 + ", " + oldResult + " with " + newResult);
257 * Creates a scheduled chore that is used to clean up old nonces.
258 * @param stoppable Stoppable for the chore.
259 * @return ScheduledChore; the scheduled chore is not started.
261 public ScheduledChore createCleanupScheduledChore(Stoppable stoppable) {
262 // By default, it will run every 6 minutes (30 / 5).
263 return new ScheduledChore("nonceCleaner", stoppable, deleteNonceGracePeriod / 5) {
264 @Override
265 protected void chore() {
266 cleanUpOldNonces();
271 private void cleanUpOldNonces() {
272 long cutoff = EnvironmentEdgeManager.currentTime() - deleteNonceGracePeriod;
273 for (Map.Entry<NonceKey, OperationContext> entry : nonces.entrySet()) {
274 OperationContext oc = entry.getValue();
275 if (!oc.isExpired(cutoff)) continue;
276 synchronized (oc) {
277 if (oc.getState() == OperationContext.WAIT || !oc.isExpired(cutoff)) continue;
278 nonces.remove(entry.getKey());