2 * Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
3 * Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved.
5 * This copyrighted material is made available to anyone wishing to use,
6 * modify, copy, or redistribute it subject to the terms and conditions
7 * of the GNU General Public License version 2.
12 /* A lock placed on this queue is re-submitted to DLM as soon as the lock_dlm
15 static void queue_submit(struct gdlm_lock
*lp
)
17 struct gdlm_ls
*ls
= lp
->ls
;
19 spin_lock(&ls
->async_lock
);
20 list_add_tail(&lp
->delay_list
, &ls
->submit
);
21 spin_unlock(&ls
->async_lock
);
22 wake_up(&ls
->thread_wait
);
25 static void process_blocking(struct gdlm_lock
*lp
, int bast_mode
)
27 struct gdlm_ls
*ls
= lp
->ls
;
30 switch (gdlm_make_lmstate(bast_mode
)) {
41 gdlm_assert(0, "unknown bast mode %u", lp
->bast_mode
);
44 ls
->fscb(ls
->sdp
, cb
, &lp
->lockname
);
47 static void wake_up_ast(struct gdlm_lock
*lp
)
49 clear_bit(LFL_AST_WAIT
, &lp
->flags
);
50 smp_mb__after_clear_bit();
51 wake_up_bit(&lp
->flags
, LFL_AST_WAIT
);
54 static void process_complete(struct gdlm_lock
*lp
)
56 struct gdlm_ls
*ls
= lp
->ls
;
57 struct lm_async_cb acb
;
58 s16 prev_mode
= lp
->cur
;
60 memset(&acb
, 0, sizeof(acb
));
62 if (lp
->lksb
.sb_status
== -DLM_ECANCEL
) {
63 log_info("complete dlm cancel %x,%llx flags %lx",
65 (unsigned long long)lp
->lockname
.ln_number
,
69 acb
.lc_ret
|= LM_OUT_CANCELED
;
70 if (lp
->cur
== DLM_LOCK_IV
)
75 if (test_and_clear_bit(LFL_DLM_UNLOCK
, &lp
->flags
)) {
76 if (lp
->lksb
.sb_status
!= -DLM_EUNLOCK
) {
77 log_info("unlock sb_status %d %x,%llx flags %lx",
78 lp
->lksb
.sb_status
, lp
->lockname
.ln_type
,
79 (unsigned long long)lp
->lockname
.ln_number
,
84 lp
->cur
= DLM_LOCK_IV
;
85 lp
->req
= DLM_LOCK_IV
;
88 if (test_and_clear_bit(LFL_UNLOCK_DELETE
, &lp
->flags
)) {
95 if (lp
->lksb
.sb_flags
& DLM_SBF_VALNOTVALID
)
96 memset(lp
->lksb
.sb_lvbptr
, 0, GDLM_LVB_SIZE
);
98 if (lp
->lksb
.sb_flags
& DLM_SBF_ALTMODE
) {
99 if (lp
->req
== DLM_LOCK_PR
)
100 lp
->req
= DLM_LOCK_CW
;
101 else if (lp
->req
== DLM_LOCK_CW
)
102 lp
->req
= DLM_LOCK_PR
;
106 * A canceled lock request. The lock was just taken off the delayed
107 * list and was never even submitted to dlm.
110 if (test_and_clear_bit(LFL_CANCEL
, &lp
->flags
)) {
111 log_info("complete internal cancel %x,%llx",
112 lp
->lockname
.ln_type
,
113 (unsigned long long)lp
->lockname
.ln_number
);
115 acb
.lc_ret
|= LM_OUT_CANCELED
;
123 if (lp
->lksb
.sb_status
) {
124 /* a "normal" error */
125 if ((lp
->lksb
.sb_status
== -EAGAIN
) &&
126 (lp
->lkf
& DLM_LKF_NOQUEUE
)) {
128 if (lp
->cur
== DLM_LOCK_IV
)
129 lp
->lksb
.sb_lkid
= 0;
133 /* this could only happen with cancels I think */
134 log_info("ast sb_status %d %x,%llx flags %lx",
135 lp
->lksb
.sb_status
, lp
->lockname
.ln_type
,
136 (unsigned long long)lp
->lockname
.ln_number
,
142 * This is an AST for an EX->EX conversion for sync_lvb from GFS.
145 if (test_and_clear_bit(LFL_SYNC_LVB
, &lp
->flags
)) {
151 * A lock has been demoted to NL because it initially completed during
152 * BLOCK_LOCKS. Now it must be requested in the originally requested
156 if (test_and_clear_bit(LFL_REREQUEST
, &lp
->flags
)) {
157 gdlm_assert(lp
->req
== DLM_LOCK_NL
, "%x,%llx",
158 lp
->lockname
.ln_type
,
159 (unsigned long long)lp
->lockname
.ln_number
);
160 gdlm_assert(lp
->prev_req
> DLM_LOCK_NL
, "%x,%llx",
161 lp
->lockname
.ln_type
,
162 (unsigned long long)lp
->lockname
.ln_number
);
164 lp
->cur
= DLM_LOCK_NL
;
165 lp
->req
= lp
->prev_req
;
166 lp
->prev_req
= DLM_LOCK_IV
;
167 lp
->lkf
&= ~DLM_LKF_CONVDEADLK
;
169 set_bit(LFL_NOCACHE
, &lp
->flags
);
171 if (test_bit(DFL_BLOCK_LOCKS
, &ls
->flags
) &&
172 !test_bit(LFL_NOBLOCK
, &lp
->flags
))
173 gdlm_queue_delayed(lp
);
180 * A request is granted during dlm recovery. It may be granted
181 * because the locks of a failed node were cleared. In that case,
182 * there may be inconsistent data beneath this lock and we must wait
183 * for recovery to complete to use it. When gfs recovery is done this
184 * granted lock will be converted to NL and then reacquired in this
188 if (test_bit(DFL_BLOCK_LOCKS
, &ls
->flags
) &&
189 !test_bit(LFL_NOBLOCK
, &lp
->flags
) &&
190 lp
->req
!= DLM_LOCK_NL
) {
193 lp
->prev_req
= lp
->req
;
194 lp
->req
= DLM_LOCK_NL
;
195 lp
->lkf
|= DLM_LKF_CONVERT
;
196 lp
->lkf
&= ~DLM_LKF_CONVDEADLK
;
198 log_debug("rereq %x,%llx id %x %d,%d",
199 lp
->lockname
.ln_type
,
200 (unsigned long long)lp
->lockname
.ln_number
,
201 lp
->lksb
.sb_lkid
, lp
->cur
, lp
->req
);
203 set_bit(LFL_REREQUEST
, &lp
->flags
);
209 * DLM demoted the lock to NL before it was granted so GFS must be
210 * told it cannot cache data for this lock.
213 if (lp
->lksb
.sb_flags
& DLM_SBF_DEMOTED
)
214 set_bit(LFL_NOCACHE
, &lp
->flags
);
218 * This is an internal lock_dlm lock
221 if (test_bit(LFL_INLOCK
, &lp
->flags
)) {
222 clear_bit(LFL_NOBLOCK
, &lp
->flags
);
229 * Normal completion of a lock request. Tell GFS it now has the lock.
232 clear_bit(LFL_NOBLOCK
, &lp
->flags
);
235 acb
.lc_name
= lp
->lockname
;
236 acb
.lc_ret
|= gdlm_make_lmstate(lp
->cur
);
238 if (!test_and_clear_bit(LFL_NOCACHE
, &lp
->flags
) &&
239 (lp
->cur
> DLM_LOCK_NL
) && (prev_mode
> DLM_LOCK_NL
))
240 acb
.lc_ret
|= LM_OUT_CACHEABLE
;
242 ls
->fscb(ls
->sdp
, LM_CB_ASYNC
, &acb
);
245 static inline int no_work(struct gdlm_ls
*ls
, int blocking
)
249 spin_lock(&ls
->async_lock
);
250 ret
= list_empty(&ls
->complete
) && list_empty(&ls
->submit
);
252 ret
= list_empty(&ls
->blocking
);
253 spin_unlock(&ls
->async_lock
);
258 static inline int check_drop(struct gdlm_ls
*ls
)
260 if (!ls
->drop_locks_count
)
263 if (time_after(jiffies
, ls
->drop_time
+ ls
->drop_locks_period
* HZ
)) {
264 ls
->drop_time
= jiffies
;
265 if (ls
->all_locks_count
>= ls
->drop_locks_count
)
271 static int gdlm_thread(void *data
)
273 struct gdlm_ls
*ls
= (struct gdlm_ls
*) data
;
274 struct gdlm_lock
*lp
= NULL
;
276 uint8_t complete
, blocking
, submit
, drop
;
277 DECLARE_WAITQUEUE(wait
, current
);
279 /* Only thread1 is allowed to do blocking callbacks since gfs
280 may wait for a completion callback within a blocking cb. */
282 if (current
== ls
->thread1
)
285 while (!kthread_should_stop()) {
286 set_current_state(TASK_INTERRUPTIBLE
);
287 add_wait_queue(&ls
->thread_wait
, &wait
);
288 if (no_work(ls
, blist
))
290 remove_wait_queue(&ls
->thread_wait
, &wait
);
291 set_current_state(TASK_RUNNING
);
293 complete
= blocking
= submit
= drop
= 0;
295 spin_lock(&ls
->async_lock
);
297 if (blist
&& !list_empty(&ls
->blocking
)) {
298 lp
= list_entry(ls
->blocking
.next
, struct gdlm_lock
,
300 list_del_init(&lp
->blist
);
301 blocking
= lp
->bast_mode
;
303 } else if (!list_empty(&ls
->complete
)) {
304 lp
= list_entry(ls
->complete
.next
, struct gdlm_lock
,
306 list_del_init(&lp
->clist
);
308 } else if (!list_empty(&ls
->submit
)) {
309 lp
= list_entry(ls
->submit
.next
, struct gdlm_lock
,
311 list_del_init(&lp
->delay_list
);
315 drop
= check_drop(ls
);
316 spin_unlock(&ls
->async_lock
);
319 process_complete(lp
);
322 process_blocking(lp
, blocking
);
328 ls
->fscb(ls
->sdp
, LM_CB_DROPLOCKS
, NULL
);
336 int gdlm_init_threads(struct gdlm_ls
*ls
)
338 struct task_struct
*p
;
341 p
= kthread_run(gdlm_thread
, ls
, "lock_dlm1");
344 log_error("can't start lock_dlm1 thread %d", error
);
349 p
= kthread_run(gdlm_thread
, ls
, "lock_dlm2");
352 log_error("can't start lock_dlm2 thread %d", error
);
353 kthread_stop(ls
->thread1
);
361 void gdlm_release_threads(struct gdlm_ls
*ls
)
363 kthread_stop(ls
->thread1
);
364 kthread_stop(ls
->thread2
);