2 * Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
3 * Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved.
5 * This copyrighted material is made available to anyone wishing to use,
6 * modify, copy, or redistribute it subject to the terms and conditions
7 * of the GNU General Public License version 2.
12 static char junk_lvb
[GDLM_LVB_SIZE
];
15 /* convert dlm lock-mode to gfs lock-state */
17 static s16
gdlm_make_lmstate(s16 dlmmode
)
22 return LM_ST_UNLOCKED
;
24 return LM_ST_EXCLUSIVE
;
26 return LM_ST_DEFERRED
;
30 gdlm_assert(0, "unknown DLM mode %d", dlmmode
);
34 /* A lock placed on this queue is re-submitted to DLM as soon as the lock_dlm
37 static void queue_submit(struct gdlm_lock
*lp
)
39 struct gdlm_ls
*ls
= lp
->ls
;
41 spin_lock(&ls
->async_lock
);
42 list_add_tail(&lp
->delay_list
, &ls
->submit
);
43 spin_unlock(&ls
->async_lock
);
44 wake_up(&ls
->thread_wait
);
47 static void wake_up_ast(struct gdlm_lock
*lp
)
49 clear_bit(LFL_AST_WAIT
, &lp
->flags
);
50 smp_mb__after_clear_bit();
51 wake_up_bit(&lp
->flags
, LFL_AST_WAIT
);
54 static void gdlm_delete_lp(struct gdlm_lock
*lp
)
56 struct gdlm_ls
*ls
= lp
->ls
;
58 spin_lock(&ls
->async_lock
);
59 if (!list_empty(&lp
->delay_list
))
60 list_del_init(&lp
->delay_list
);
61 ls
->all_locks_count
--;
62 spin_unlock(&ls
->async_lock
);
67 static void gdlm_queue_delayed(struct gdlm_lock
*lp
)
69 struct gdlm_ls
*ls
= lp
->ls
;
71 spin_lock(&ls
->async_lock
);
72 list_add_tail(&lp
->delay_list
, &ls
->delayed
);
73 spin_unlock(&ls
->async_lock
);
76 static void process_complete(struct gdlm_lock
*lp
)
78 struct gdlm_ls
*ls
= lp
->ls
;
79 struct lm_async_cb acb
;
81 memset(&acb
, 0, sizeof(acb
));
83 if (lp
->lksb
.sb_status
== -DLM_ECANCEL
) {
84 log_info("complete dlm cancel %x,%llx flags %lx",
86 (unsigned long long)lp
->lockname
.ln_number
,
90 acb
.lc_ret
|= LM_OUT_CANCELED
;
91 if (lp
->cur
== DLM_LOCK_IV
)
96 if (test_and_clear_bit(LFL_DLM_UNLOCK
, &lp
->flags
)) {
97 if (lp
->lksb
.sb_status
!= -DLM_EUNLOCK
) {
98 log_info("unlock sb_status %d %x,%llx flags %lx",
99 lp
->lksb
.sb_status
, lp
->lockname
.ln_type
,
100 (unsigned long long)lp
->lockname
.ln_number
,
105 lp
->cur
= DLM_LOCK_IV
;
106 lp
->req
= DLM_LOCK_IV
;
107 lp
->lksb
.sb_lkid
= 0;
109 if (test_and_clear_bit(LFL_UNLOCK_DELETE
, &lp
->flags
)) {
116 if (lp
->lksb
.sb_flags
& DLM_SBF_VALNOTVALID
)
117 memset(lp
->lksb
.sb_lvbptr
, 0, GDLM_LVB_SIZE
);
119 if (lp
->lksb
.sb_flags
& DLM_SBF_ALTMODE
) {
120 if (lp
->req
== DLM_LOCK_PR
)
121 lp
->req
= DLM_LOCK_CW
;
122 else if (lp
->req
== DLM_LOCK_CW
)
123 lp
->req
= DLM_LOCK_PR
;
127 * A canceled lock request. The lock was just taken off the delayed
128 * list and was never even submitted to dlm.
131 if (test_and_clear_bit(LFL_CANCEL
, &lp
->flags
)) {
132 log_info("complete internal cancel %x,%llx",
133 lp
->lockname
.ln_type
,
134 (unsigned long long)lp
->lockname
.ln_number
);
136 acb
.lc_ret
|= LM_OUT_CANCELED
;
144 if (lp
->lksb
.sb_status
) {
145 /* a "normal" error */
146 if ((lp
->lksb
.sb_status
== -EAGAIN
) &&
147 (lp
->lkf
& DLM_LKF_NOQUEUE
)) {
149 if (lp
->cur
== DLM_LOCK_IV
)
150 lp
->lksb
.sb_lkid
= 0;
154 /* this could only happen with cancels I think */
155 log_info("ast sb_status %d %x,%llx flags %lx",
156 lp
->lksb
.sb_status
, lp
->lockname
.ln_type
,
157 (unsigned long long)lp
->lockname
.ln_number
,
163 * This is an AST for an EX->EX conversion for sync_lvb from GFS.
166 if (test_and_clear_bit(LFL_SYNC_LVB
, &lp
->flags
)) {
172 * A lock has been demoted to NL because it initially completed during
173 * BLOCK_LOCKS. Now it must be requested in the originally requested
177 if (test_and_clear_bit(LFL_REREQUEST
, &lp
->flags
)) {
178 gdlm_assert(lp
->req
== DLM_LOCK_NL
, "%x,%llx",
179 lp
->lockname
.ln_type
,
180 (unsigned long long)lp
->lockname
.ln_number
);
181 gdlm_assert(lp
->prev_req
> DLM_LOCK_NL
, "%x,%llx",
182 lp
->lockname
.ln_type
,
183 (unsigned long long)lp
->lockname
.ln_number
);
185 lp
->cur
= DLM_LOCK_NL
;
186 lp
->req
= lp
->prev_req
;
187 lp
->prev_req
= DLM_LOCK_IV
;
188 lp
->lkf
&= ~DLM_LKF_CONVDEADLK
;
190 set_bit(LFL_NOCACHE
, &lp
->flags
);
192 if (test_bit(DFL_BLOCK_LOCKS
, &ls
->flags
) &&
193 !test_bit(LFL_NOBLOCK
, &lp
->flags
))
194 gdlm_queue_delayed(lp
);
201 * A request is granted during dlm recovery. It may be granted
202 * because the locks of a failed node were cleared. In that case,
203 * there may be inconsistent data beneath this lock and we must wait
204 * for recovery to complete to use it. When gfs recovery is done this
205 * granted lock will be converted to NL and then reacquired in this
209 if (test_bit(DFL_BLOCK_LOCKS
, &ls
->flags
) &&
210 !test_bit(LFL_NOBLOCK
, &lp
->flags
) &&
211 lp
->req
!= DLM_LOCK_NL
) {
214 lp
->prev_req
= lp
->req
;
215 lp
->req
= DLM_LOCK_NL
;
216 lp
->lkf
|= DLM_LKF_CONVERT
;
217 lp
->lkf
&= ~DLM_LKF_CONVDEADLK
;
219 log_debug("rereq %x,%llx id %x %d,%d",
220 lp
->lockname
.ln_type
,
221 (unsigned long long)lp
->lockname
.ln_number
,
222 lp
->lksb
.sb_lkid
, lp
->cur
, lp
->req
);
224 set_bit(LFL_REREQUEST
, &lp
->flags
);
230 * DLM demoted the lock to NL before it was granted so GFS must be
231 * told it cannot cache data for this lock.
234 if (lp
->lksb
.sb_flags
& DLM_SBF_DEMOTED
)
235 set_bit(LFL_NOCACHE
, &lp
->flags
);
239 * This is an internal lock_dlm lock
242 if (test_bit(LFL_INLOCK
, &lp
->flags
)) {
243 clear_bit(LFL_NOBLOCK
, &lp
->flags
);
250 * Normal completion of a lock request. Tell GFS it now has the lock.
253 clear_bit(LFL_NOBLOCK
, &lp
->flags
);
256 acb
.lc_name
= lp
->lockname
;
257 acb
.lc_ret
|= gdlm_make_lmstate(lp
->cur
);
259 ls
->fscb(ls
->sdp
, LM_CB_ASYNC
, &acb
);
262 static void gdlm_ast(void *astarg
)
264 struct gdlm_lock
*lp
= astarg
;
265 clear_bit(LFL_ACTIVE
, &lp
->flags
);
266 process_complete(lp
);
269 static void process_blocking(struct gdlm_lock
*lp
, int bast_mode
)
271 struct gdlm_ls
*ls
= lp
->ls
;
274 switch (gdlm_make_lmstate(bast_mode
)) {
275 case LM_ST_EXCLUSIVE
:
285 gdlm_assert(0, "unknown bast mode %u", bast_mode
);
288 ls
->fscb(ls
->sdp
, cb
, &lp
->lockname
);
292 static void gdlm_bast(void *astarg
, int mode
)
294 struct gdlm_lock
*lp
= astarg
;
297 printk(KERN_INFO
"lock_dlm: bast mode zero %x,%llx\n",
298 lp
->lockname
.ln_type
,
299 (unsigned long long)lp
->lockname
.ln_number
);
303 process_blocking(lp
, mode
);
306 /* convert gfs lock-state to dlm lock-mode */
308 static s16
make_mode(s16 lmstate
)
313 case LM_ST_EXCLUSIVE
:
320 gdlm_assert(0, "unknown LM state %d", lmstate
);
325 /* verify agreement with GFS on the current lock state, NB: DLM_LOCK_NL and
326 DLM_LOCK_IV are both considered LM_ST_UNLOCKED by GFS. */
328 static void check_cur_state(struct gdlm_lock
*lp
, unsigned int cur_state
)
330 s16 cur
= make_mode(cur_state
);
331 if (lp
->cur
!= DLM_LOCK_IV
)
332 gdlm_assert(lp
->cur
== cur
, "%d, %d", lp
->cur
, cur
);
335 static inline unsigned int make_flags(struct gdlm_lock
*lp
,
336 unsigned int gfs_flags
,
339 unsigned int lkf
= 0;
341 if (gfs_flags
& LM_FLAG_TRY
)
342 lkf
|= DLM_LKF_NOQUEUE
;
344 if (gfs_flags
& LM_FLAG_TRY_1CB
) {
345 lkf
|= DLM_LKF_NOQUEUE
;
346 lkf
|= DLM_LKF_NOQUEUEBAST
;
349 if (gfs_flags
& LM_FLAG_PRIORITY
) {
350 lkf
|= DLM_LKF_NOORDER
;
351 lkf
|= DLM_LKF_HEADQUE
;
354 if (gfs_flags
& LM_FLAG_ANY
) {
355 if (req
== DLM_LOCK_PR
)
356 lkf
|= DLM_LKF_ALTCW
;
357 else if (req
== DLM_LOCK_CW
)
358 lkf
|= DLM_LKF_ALTPR
;
361 if (lp
->lksb
.sb_lkid
!= 0) {
362 lkf
|= DLM_LKF_CONVERT
;
366 lkf
|= DLM_LKF_VALBLK
;
371 /* make_strname - convert GFS lock numbers to a string */
373 static inline void make_strname(const struct lm_lockname
*lockname
,
374 struct gdlm_strname
*str
)
376 sprintf(str
->name
, "%8x%16llx", lockname
->ln_type
,
377 (unsigned long long)lockname
->ln_number
);
378 str
->namelen
= GDLM_STRNAME_BYTES
;
381 static int gdlm_create_lp(struct gdlm_ls
*ls
, struct lm_lockname
*name
,
382 struct gdlm_lock
**lpp
)
384 struct gdlm_lock
*lp
;
386 lp
= kzalloc(sizeof(struct gdlm_lock
), GFP_NOFS
);
390 lp
->lockname
= *name
;
391 make_strname(name
, &lp
->strname
);
393 lp
->cur
= DLM_LOCK_IV
;
394 INIT_LIST_HEAD(&lp
->delay_list
);
396 spin_lock(&ls
->async_lock
);
397 ls
->all_locks_count
++;
398 spin_unlock(&ls
->async_lock
);
404 int gdlm_get_lock(void *lockspace
, struct lm_lockname
*name
,
407 struct gdlm_lock
*lp
;
410 error
= gdlm_create_lp(lockspace
, name
, &lp
);
416 void gdlm_put_lock(void *lock
)
418 gdlm_delete_lp(lock
);
421 unsigned int gdlm_do_lock(struct gdlm_lock
*lp
)
423 struct gdlm_ls
*ls
= lp
->ls
;
427 * When recovery is in progress, delay lock requests for submission
428 * once recovery is done. Requests for recovery (NOEXP) and unlocks
432 if (test_bit(DFL_BLOCK_LOCKS
, &ls
->flags
) &&
433 !test_bit(LFL_NOBLOCK
, &lp
->flags
) && lp
->req
!= DLM_LOCK_NL
) {
434 gdlm_queue_delayed(lp
);
439 * Submit the actual lock request.
442 if (test_bit(LFL_NOBAST
, &lp
->flags
))
445 set_bit(LFL_ACTIVE
, &lp
->flags
);
447 log_debug("lk %x,%llx id %x %d,%d %x", lp
->lockname
.ln_type
,
448 (unsigned long long)lp
->lockname
.ln_number
, lp
->lksb
.sb_lkid
,
449 lp
->cur
, lp
->req
, lp
->lkf
);
451 error
= dlm_lock(ls
->dlm_lockspace
, lp
->req
, &lp
->lksb
, lp
->lkf
,
452 lp
->strname
.name
, lp
->strname
.namelen
, 0, gdlm_ast
,
453 lp
, bast
? gdlm_bast
: NULL
);
455 if ((error
== -EAGAIN
) && (lp
->lkf
& DLM_LKF_NOQUEUE
)) {
456 lp
->lksb
.sb_status
= -EAGAIN
;
462 log_error("%s: gdlm_lock %x,%llx err=%d cur=%d req=%d lkf=%x "
463 "flags=%lx", ls
->fsname
, lp
->lockname
.ln_type
,
464 (unsigned long long)lp
->lockname
.ln_number
, error
,
465 lp
->cur
, lp
->req
, lp
->lkf
, lp
->flags
);
471 static unsigned int gdlm_do_unlock(struct gdlm_lock
*lp
)
473 struct gdlm_ls
*ls
= lp
->ls
;
474 unsigned int lkf
= 0;
477 set_bit(LFL_DLM_UNLOCK
, &lp
->flags
);
478 set_bit(LFL_ACTIVE
, &lp
->flags
);
481 lkf
= DLM_LKF_VALBLK
;
483 log_debug("un %x,%llx %x %d %x", lp
->lockname
.ln_type
,
484 (unsigned long long)lp
->lockname
.ln_number
,
485 lp
->lksb
.sb_lkid
, lp
->cur
, lkf
);
487 error
= dlm_unlock(ls
->dlm_lockspace
, lp
->lksb
.sb_lkid
, lkf
, NULL
, lp
);
490 log_error("%s: gdlm_unlock %x,%llx err=%d cur=%d req=%d lkf=%x "
491 "flags=%lx", ls
->fsname
, lp
->lockname
.ln_type
,
492 (unsigned long long)lp
->lockname
.ln_number
, error
,
493 lp
->cur
, lp
->req
, lp
->lkf
, lp
->flags
);
499 unsigned int gdlm_lock(void *lock
, unsigned int cur_state
,
500 unsigned int req_state
, unsigned int flags
)
502 struct gdlm_lock
*lp
= lock
;
504 if (req_state
== LM_ST_UNLOCKED
)
505 return gdlm_unlock(lock
, cur_state
);
507 if (req_state
== LM_ST_UNLOCKED
)
508 return gdlm_unlock(lock
, cur_state
);
510 clear_bit(LFL_DLM_CANCEL
, &lp
->flags
);
511 if (flags
& LM_FLAG_NOEXP
)
512 set_bit(LFL_NOBLOCK
, &lp
->flags
);
514 check_cur_state(lp
, cur_state
);
515 lp
->req
= make_mode(req_state
);
516 lp
->lkf
= make_flags(lp
, flags
, lp
->cur
, lp
->req
);
518 return gdlm_do_lock(lp
);
521 unsigned int gdlm_unlock(void *lock
, unsigned int cur_state
)
523 struct gdlm_lock
*lp
= lock
;
525 clear_bit(LFL_DLM_CANCEL
, &lp
->flags
);
526 if (lp
->cur
== DLM_LOCK_IV
)
528 return gdlm_do_unlock(lp
);
531 void gdlm_cancel(void *lock
)
533 struct gdlm_lock
*lp
= lock
;
534 struct gdlm_ls
*ls
= lp
->ls
;
535 int error
, delay_list
= 0;
537 if (test_bit(LFL_DLM_CANCEL
, &lp
->flags
))
540 log_info("gdlm_cancel %x,%llx flags %lx", lp
->lockname
.ln_type
,
541 (unsigned long long)lp
->lockname
.ln_number
, lp
->flags
);
543 spin_lock(&ls
->async_lock
);
544 if (!list_empty(&lp
->delay_list
)) {
545 list_del_init(&lp
->delay_list
);
548 spin_unlock(&ls
->async_lock
);
551 set_bit(LFL_CANCEL
, &lp
->flags
);
552 set_bit(LFL_ACTIVE
, &lp
->flags
);
557 if (!test_bit(LFL_ACTIVE
, &lp
->flags
) ||
558 test_bit(LFL_DLM_UNLOCK
, &lp
->flags
)) {
559 log_info("gdlm_cancel skip %x,%llx flags %lx",
560 lp
->lockname
.ln_type
,
561 (unsigned long long)lp
->lockname
.ln_number
, lp
->flags
);
565 /* the lock is blocked in the dlm */
567 set_bit(LFL_DLM_CANCEL
, &lp
->flags
);
568 set_bit(LFL_ACTIVE
, &lp
->flags
);
570 error
= dlm_unlock(ls
->dlm_lockspace
, lp
->lksb
.sb_lkid
, DLM_LKF_CANCEL
,
573 log_info("gdlm_cancel rv %d %x,%llx flags %lx", error
,
574 lp
->lockname
.ln_type
,
575 (unsigned long long)lp
->lockname
.ln_number
, lp
->flags
);
578 clear_bit(LFL_DLM_CANCEL
, &lp
->flags
);
581 static int gdlm_add_lvb(struct gdlm_lock
*lp
)
585 lvb
= kzalloc(GDLM_LVB_SIZE
, GFP_NOFS
);
589 lp
->lksb
.sb_lvbptr
= lvb
;
594 static void gdlm_del_lvb(struct gdlm_lock
*lp
)
598 lp
->lksb
.sb_lvbptr
= NULL
;
601 static int gdlm_ast_wait(void *word
)
607 /* This can do a synchronous dlm request (requiring a lock_dlm thread to get
608 the completion) because gfs won't call hold_lvb() during a callback (from
609 the context of a lock_dlm thread). */
611 static int hold_null_lock(struct gdlm_lock
*lp
)
613 struct gdlm_lock
*lpn
= NULL
;
617 printk(KERN_INFO
"lock_dlm: lvb already held\n");
621 error
= gdlm_create_lp(lp
->ls
, &lp
->lockname
, &lpn
);
625 lpn
->lksb
.sb_lvbptr
= junk_lvb
;
628 lpn
->req
= DLM_LOCK_NL
;
629 lpn
->lkf
= DLM_LKF_VALBLK
| DLM_LKF_EXPEDITE
;
630 set_bit(LFL_NOBAST
, &lpn
->flags
);
631 set_bit(LFL_INLOCK
, &lpn
->flags
);
632 set_bit(LFL_AST_WAIT
, &lpn
->flags
);
635 wait_on_bit(&lpn
->flags
, LFL_AST_WAIT
, gdlm_ast_wait
, TASK_UNINTERRUPTIBLE
);
636 error
= lpn
->lksb
.sb_status
;
638 printk(KERN_INFO
"lock_dlm: hold_null_lock dlm error %d\n",
648 /* This cannot do a synchronous dlm request (requiring a lock_dlm thread to get
649 the completion) because gfs may call unhold_lvb() during a callback (from
650 the context of a lock_dlm thread) which could cause a deadlock since the
651 other lock_dlm thread could be engaged in recovery. */
653 static void unhold_null_lock(struct gdlm_lock
*lp
)
655 struct gdlm_lock
*lpn
= lp
->hold_null
;
657 gdlm_assert(lpn
, "%x,%llx", lp
->lockname
.ln_type
,
658 (unsigned long long)lp
->lockname
.ln_number
);
659 lpn
->lksb
.sb_lvbptr
= NULL
;
661 set_bit(LFL_UNLOCK_DELETE
, &lpn
->flags
);
663 lp
->hold_null
= NULL
;
666 /* Acquire a NL lock because gfs requires the value block to remain
667 intact on the resource while the lvb is "held" even if it's holding no locks
670 int gdlm_hold_lvb(void *lock
, char **lvbp
)
672 struct gdlm_lock
*lp
= lock
;
675 error
= gdlm_add_lvb(lp
);
681 error
= hold_null_lock(lp
);
688 void gdlm_unhold_lvb(void *lock
, char *lvb
)
690 struct gdlm_lock
*lp
= lock
;
692 unhold_null_lock(lp
);
696 void gdlm_submit_delayed(struct gdlm_ls
*ls
)
698 struct gdlm_lock
*lp
, *safe
;
700 spin_lock(&ls
->async_lock
);
701 list_for_each_entry_safe(lp
, safe
, &ls
->delayed
, delay_list
) {
702 list_del_init(&lp
->delay_list
);
703 list_add_tail(&lp
->delay_list
, &ls
->submit
);
705 spin_unlock(&ls
->async_lock
);
706 wake_up(&ls
->thread_wait
);