Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/viro/vfs-2.6
[linux/fpc-iii.git] / fs / gfs2 / locking / dlm / lock.c
blobcf7ea8abec876c8a8cf02f8324160b4fd8ca3b6d
1 /*
2 * Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
3 * Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved.
5 * This copyrighted material is made available to anyone wishing to use,
6 * modify, copy, or redistribute it subject to the terms and conditions
7 * of the GNU General Public License version 2.
8 */
10 #include "lock_dlm.h"
12 static char junk_lvb[GDLM_LVB_SIZE];
14 static void queue_complete(struct gdlm_lock *lp)
16 struct gdlm_ls *ls = lp->ls;
18 clear_bit(LFL_ACTIVE, &lp->flags);
20 spin_lock(&ls->async_lock);
21 list_add_tail(&lp->clist, &ls->complete);
22 spin_unlock(&ls->async_lock);
23 wake_up(&ls->thread_wait);
26 static inline void gdlm_ast(void *astarg)
28 queue_complete(astarg);
31 static inline void gdlm_bast(void *astarg, int mode)
33 struct gdlm_lock *lp = astarg;
34 struct gdlm_ls *ls = lp->ls;
36 if (!mode) {
37 printk(KERN_INFO "lock_dlm: bast mode zero %x,%llx\n",
38 lp->lockname.ln_type,
39 (unsigned long long)lp->lockname.ln_number);
40 return;
43 spin_lock(&ls->async_lock);
44 if (!lp->bast_mode) {
45 list_add_tail(&lp->blist, &ls->blocking);
46 lp->bast_mode = mode;
47 } else if (lp->bast_mode < mode)
48 lp->bast_mode = mode;
49 spin_unlock(&ls->async_lock);
50 wake_up(&ls->thread_wait);
53 void gdlm_queue_delayed(struct gdlm_lock *lp)
55 struct gdlm_ls *ls = lp->ls;
57 spin_lock(&ls->async_lock);
58 list_add_tail(&lp->delay_list, &ls->delayed);
59 spin_unlock(&ls->async_lock);
62 /* convert gfs lock-state to dlm lock-mode */
64 static s16 make_mode(s16 lmstate)
66 switch (lmstate) {
67 case LM_ST_UNLOCKED:
68 return DLM_LOCK_NL;
69 case LM_ST_EXCLUSIVE:
70 return DLM_LOCK_EX;
71 case LM_ST_DEFERRED:
72 return DLM_LOCK_CW;
73 case LM_ST_SHARED:
74 return DLM_LOCK_PR;
76 gdlm_assert(0, "unknown LM state %d", lmstate);
77 return -1;
80 /* convert dlm lock-mode to gfs lock-state */
82 s16 gdlm_make_lmstate(s16 dlmmode)
84 switch (dlmmode) {
85 case DLM_LOCK_IV:
86 case DLM_LOCK_NL:
87 return LM_ST_UNLOCKED;
88 case DLM_LOCK_EX:
89 return LM_ST_EXCLUSIVE;
90 case DLM_LOCK_CW:
91 return LM_ST_DEFERRED;
92 case DLM_LOCK_PR:
93 return LM_ST_SHARED;
95 gdlm_assert(0, "unknown DLM mode %d", dlmmode);
96 return -1;
99 /* verify agreement with GFS on the current lock state, NB: DLM_LOCK_NL and
100 DLM_LOCK_IV are both considered LM_ST_UNLOCKED by GFS. */
102 static void check_cur_state(struct gdlm_lock *lp, unsigned int cur_state)
104 s16 cur = make_mode(cur_state);
105 if (lp->cur != DLM_LOCK_IV)
106 gdlm_assert(lp->cur == cur, "%d, %d", lp->cur, cur);
109 static inline unsigned int make_flags(struct gdlm_lock *lp,
110 unsigned int gfs_flags,
111 s16 cur, s16 req)
113 unsigned int lkf = 0;
115 if (gfs_flags & LM_FLAG_TRY)
116 lkf |= DLM_LKF_NOQUEUE;
118 if (gfs_flags & LM_FLAG_TRY_1CB) {
119 lkf |= DLM_LKF_NOQUEUE;
120 lkf |= DLM_LKF_NOQUEUEBAST;
123 if (gfs_flags & LM_FLAG_PRIORITY) {
124 lkf |= DLM_LKF_NOORDER;
125 lkf |= DLM_LKF_HEADQUE;
128 if (gfs_flags & LM_FLAG_ANY) {
129 if (req == DLM_LOCK_PR)
130 lkf |= DLM_LKF_ALTCW;
131 else if (req == DLM_LOCK_CW)
132 lkf |= DLM_LKF_ALTPR;
135 if (lp->lksb.sb_lkid != 0) {
136 lkf |= DLM_LKF_CONVERT;
138 /* Conversion deadlock avoidance by DLM */
140 if (!(lp->ls->fsflags & LM_MFLAG_CONV_NODROP) &&
141 !test_bit(LFL_FORCE_PROMOTE, &lp->flags) &&
142 !(lkf & DLM_LKF_NOQUEUE) &&
143 cur > DLM_LOCK_NL && req > DLM_LOCK_NL && cur != req)
144 lkf |= DLM_LKF_CONVDEADLK;
147 if (lp->lvb)
148 lkf |= DLM_LKF_VALBLK;
150 return lkf;
153 /* make_strname - convert GFS lock numbers to a string */
155 static inline void make_strname(const struct lm_lockname *lockname,
156 struct gdlm_strname *str)
158 sprintf(str->name, "%8x%16llx", lockname->ln_type,
159 (unsigned long long)lockname->ln_number);
160 str->namelen = GDLM_STRNAME_BYTES;
163 static int gdlm_create_lp(struct gdlm_ls *ls, struct lm_lockname *name,
164 struct gdlm_lock **lpp)
166 struct gdlm_lock *lp;
168 lp = kzalloc(sizeof(struct gdlm_lock), GFP_NOFS);
169 if (!lp)
170 return -ENOMEM;
172 lp->lockname = *name;
173 make_strname(name, &lp->strname);
174 lp->ls = ls;
175 lp->cur = DLM_LOCK_IV;
176 lp->lvb = NULL;
177 lp->hold_null = NULL;
178 INIT_LIST_HEAD(&lp->clist);
179 INIT_LIST_HEAD(&lp->blist);
180 INIT_LIST_HEAD(&lp->delay_list);
182 spin_lock(&ls->async_lock);
183 list_add(&lp->all_list, &ls->all_locks);
184 ls->all_locks_count++;
185 spin_unlock(&ls->async_lock);
187 *lpp = lp;
188 return 0;
191 void gdlm_delete_lp(struct gdlm_lock *lp)
193 struct gdlm_ls *ls = lp->ls;
195 spin_lock(&ls->async_lock);
196 if (!list_empty(&lp->clist))
197 list_del_init(&lp->clist);
198 if (!list_empty(&lp->blist))
199 list_del_init(&lp->blist);
200 if (!list_empty(&lp->delay_list))
201 list_del_init(&lp->delay_list);
202 gdlm_assert(!list_empty(&lp->all_list), "%x,%llx", lp->lockname.ln_type,
203 (unsigned long long)lp->lockname.ln_number);
204 list_del_init(&lp->all_list);
205 ls->all_locks_count--;
206 spin_unlock(&ls->async_lock);
208 kfree(lp);
211 int gdlm_get_lock(void *lockspace, struct lm_lockname *name,
212 void **lockp)
214 struct gdlm_lock *lp;
215 int error;
217 error = gdlm_create_lp(lockspace, name, &lp);
219 *lockp = lp;
220 return error;
223 void gdlm_put_lock(void *lock)
225 gdlm_delete_lp(lock);
228 unsigned int gdlm_do_lock(struct gdlm_lock *lp)
230 struct gdlm_ls *ls = lp->ls;
231 int error, bast = 1;
234 * When recovery is in progress, delay lock requests for submission
235 * once recovery is done. Requests for recovery (NOEXP) and unlocks
236 * can pass.
239 if (test_bit(DFL_BLOCK_LOCKS, &ls->flags) &&
240 !test_bit(LFL_NOBLOCK, &lp->flags) && lp->req != DLM_LOCK_NL) {
241 gdlm_queue_delayed(lp);
242 return LM_OUT_ASYNC;
246 * Submit the actual lock request.
249 if (test_bit(LFL_NOBAST, &lp->flags))
250 bast = 0;
252 set_bit(LFL_ACTIVE, &lp->flags);
254 log_debug("lk %x,%llx id %x %d,%d %x", lp->lockname.ln_type,
255 (unsigned long long)lp->lockname.ln_number, lp->lksb.sb_lkid,
256 lp->cur, lp->req, lp->lkf);
258 error = dlm_lock(ls->dlm_lockspace, lp->req, &lp->lksb, lp->lkf,
259 lp->strname.name, lp->strname.namelen, 0, gdlm_ast,
260 lp, bast ? gdlm_bast : NULL);
262 if ((error == -EAGAIN) && (lp->lkf & DLM_LKF_NOQUEUE)) {
263 lp->lksb.sb_status = -EAGAIN;
264 queue_complete(lp);
265 error = 0;
268 if (error) {
269 log_error("%s: gdlm_lock %x,%llx err=%d cur=%d req=%d lkf=%x "
270 "flags=%lx", ls->fsname, lp->lockname.ln_type,
271 (unsigned long long)lp->lockname.ln_number, error,
272 lp->cur, lp->req, lp->lkf, lp->flags);
273 return LM_OUT_ERROR;
275 return LM_OUT_ASYNC;
278 static unsigned int gdlm_do_unlock(struct gdlm_lock *lp)
280 struct gdlm_ls *ls = lp->ls;
281 unsigned int lkf = 0;
282 int error;
284 set_bit(LFL_DLM_UNLOCK, &lp->flags);
285 set_bit(LFL_ACTIVE, &lp->flags);
287 if (lp->lvb)
288 lkf = DLM_LKF_VALBLK;
290 log_debug("un %x,%llx %x %d %x", lp->lockname.ln_type,
291 (unsigned long long)lp->lockname.ln_number,
292 lp->lksb.sb_lkid, lp->cur, lkf);
294 error = dlm_unlock(ls->dlm_lockspace, lp->lksb.sb_lkid, lkf, NULL, lp);
296 if (error) {
297 log_error("%s: gdlm_unlock %x,%llx err=%d cur=%d req=%d lkf=%x "
298 "flags=%lx", ls->fsname, lp->lockname.ln_type,
299 (unsigned long long)lp->lockname.ln_number, error,
300 lp->cur, lp->req, lp->lkf, lp->flags);
301 return LM_OUT_ERROR;
303 return LM_OUT_ASYNC;
306 unsigned int gdlm_lock(void *lock, unsigned int cur_state,
307 unsigned int req_state, unsigned int flags)
309 struct gdlm_lock *lp = lock;
311 clear_bit(LFL_DLM_CANCEL, &lp->flags);
312 if (flags & LM_FLAG_NOEXP)
313 set_bit(LFL_NOBLOCK, &lp->flags);
315 check_cur_state(lp, cur_state);
316 lp->req = make_mode(req_state);
317 lp->lkf = make_flags(lp, flags, lp->cur, lp->req);
319 return gdlm_do_lock(lp);
322 unsigned int gdlm_unlock(void *lock, unsigned int cur_state)
324 struct gdlm_lock *lp = lock;
326 clear_bit(LFL_DLM_CANCEL, &lp->flags);
327 if (lp->cur == DLM_LOCK_IV)
328 return 0;
329 return gdlm_do_unlock(lp);
332 void gdlm_cancel(void *lock)
334 struct gdlm_lock *lp = lock;
335 struct gdlm_ls *ls = lp->ls;
336 int error, delay_list = 0;
338 if (test_bit(LFL_DLM_CANCEL, &lp->flags))
339 return;
341 log_info("gdlm_cancel %x,%llx flags %lx", lp->lockname.ln_type,
342 (unsigned long long)lp->lockname.ln_number, lp->flags);
344 spin_lock(&ls->async_lock);
345 if (!list_empty(&lp->delay_list)) {
346 list_del_init(&lp->delay_list);
347 delay_list = 1;
349 spin_unlock(&ls->async_lock);
351 if (delay_list) {
352 set_bit(LFL_CANCEL, &lp->flags);
353 set_bit(LFL_ACTIVE, &lp->flags);
354 queue_complete(lp);
355 return;
358 if (!test_bit(LFL_ACTIVE, &lp->flags) ||
359 test_bit(LFL_DLM_UNLOCK, &lp->flags)) {
360 log_info("gdlm_cancel skip %x,%llx flags %lx",
361 lp->lockname.ln_type,
362 (unsigned long long)lp->lockname.ln_number, lp->flags);
363 return;
366 /* the lock is blocked in the dlm */
368 set_bit(LFL_DLM_CANCEL, &lp->flags);
369 set_bit(LFL_ACTIVE, &lp->flags);
371 error = dlm_unlock(ls->dlm_lockspace, lp->lksb.sb_lkid, DLM_LKF_CANCEL,
372 NULL, lp);
374 log_info("gdlm_cancel rv %d %x,%llx flags %lx", error,
375 lp->lockname.ln_type,
376 (unsigned long long)lp->lockname.ln_number, lp->flags);
378 if (error == -EBUSY)
379 clear_bit(LFL_DLM_CANCEL, &lp->flags);
382 static int gdlm_add_lvb(struct gdlm_lock *lp)
384 char *lvb;
386 lvb = kzalloc(GDLM_LVB_SIZE, GFP_NOFS);
387 if (!lvb)
388 return -ENOMEM;
390 lp->lksb.sb_lvbptr = lvb;
391 lp->lvb = lvb;
392 return 0;
395 static void gdlm_del_lvb(struct gdlm_lock *lp)
397 kfree(lp->lvb);
398 lp->lvb = NULL;
399 lp->lksb.sb_lvbptr = NULL;
402 static int gdlm_ast_wait(void *word)
404 schedule();
405 return 0;
408 /* This can do a synchronous dlm request (requiring a lock_dlm thread to get
409 the completion) because gfs won't call hold_lvb() during a callback (from
410 the context of a lock_dlm thread). */
412 static int hold_null_lock(struct gdlm_lock *lp)
414 struct gdlm_lock *lpn = NULL;
415 int error;
417 if (lp->hold_null) {
418 printk(KERN_INFO "lock_dlm: lvb already held\n");
419 return 0;
422 error = gdlm_create_lp(lp->ls, &lp->lockname, &lpn);
423 if (error)
424 goto out;
426 lpn->lksb.sb_lvbptr = junk_lvb;
427 lpn->lvb = junk_lvb;
429 lpn->req = DLM_LOCK_NL;
430 lpn->lkf = DLM_LKF_VALBLK | DLM_LKF_EXPEDITE;
431 set_bit(LFL_NOBAST, &lpn->flags);
432 set_bit(LFL_INLOCK, &lpn->flags);
433 set_bit(LFL_AST_WAIT, &lpn->flags);
435 gdlm_do_lock(lpn);
436 wait_on_bit(&lpn->flags, LFL_AST_WAIT, gdlm_ast_wait, TASK_UNINTERRUPTIBLE);
437 error = lpn->lksb.sb_status;
438 if (error) {
439 printk(KERN_INFO "lock_dlm: hold_null_lock dlm error %d\n",
440 error);
441 gdlm_delete_lp(lpn);
442 lpn = NULL;
444 out:
445 lp->hold_null = lpn;
446 return error;
449 /* This cannot do a synchronous dlm request (requiring a lock_dlm thread to get
450 the completion) because gfs may call unhold_lvb() during a callback (from
451 the context of a lock_dlm thread) which could cause a deadlock since the
452 other lock_dlm thread could be engaged in recovery. */
454 static void unhold_null_lock(struct gdlm_lock *lp)
456 struct gdlm_lock *lpn = lp->hold_null;
458 gdlm_assert(lpn, "%x,%llx", lp->lockname.ln_type,
459 (unsigned long long)lp->lockname.ln_number);
460 lpn->lksb.sb_lvbptr = NULL;
461 lpn->lvb = NULL;
462 set_bit(LFL_UNLOCK_DELETE, &lpn->flags);
463 gdlm_do_unlock(lpn);
464 lp->hold_null = NULL;
467 /* Acquire a NL lock because gfs requires the value block to remain
468 intact on the resource while the lvb is "held" even if it's holding no locks
469 on the resource. */
471 int gdlm_hold_lvb(void *lock, char **lvbp)
473 struct gdlm_lock *lp = lock;
474 int error;
476 error = gdlm_add_lvb(lp);
477 if (error)
478 return error;
480 *lvbp = lp->lvb;
482 error = hold_null_lock(lp);
483 if (error)
484 gdlm_del_lvb(lp);
486 return error;
489 void gdlm_unhold_lvb(void *lock, char *lvb)
491 struct gdlm_lock *lp = lock;
493 unhold_null_lock(lp);
494 gdlm_del_lvb(lp);
497 void gdlm_submit_delayed(struct gdlm_ls *ls)
499 struct gdlm_lock *lp, *safe;
501 spin_lock(&ls->async_lock);
502 list_for_each_entry_safe(lp, safe, &ls->delayed, delay_list) {
503 list_del_init(&lp->delay_list);
504 list_add_tail(&lp->delay_list, &ls->submit);
506 spin_unlock(&ls->async_lock);
507 wake_up(&ls->thread_wait);
510 int gdlm_release_all_locks(struct gdlm_ls *ls)
512 struct gdlm_lock *lp, *safe;
513 int count = 0;
515 spin_lock(&ls->async_lock);
516 list_for_each_entry_safe(lp, safe, &ls->all_locks, all_list) {
517 list_del_init(&lp->all_list);
519 if (lp->lvb && lp->lvb != junk_lvb)
520 kfree(lp->lvb);
521 kfree(lp);
522 count++;
524 spin_unlock(&ls->async_lock);
526 return count;