2 * Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
3 * Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved.
5 * This copyrighted material is made available to anyone wishing to use,
6 * modify, copy, or redistribute it subject to the terms and conditions
7 * of the GNU General Public License version 2.
12 /* A lock placed on this queue is re-submitted to DLM as soon as the lock_dlm
15 static void queue_submit(struct gdlm_lock
*lp
)
17 struct gdlm_ls
*ls
= lp
->ls
;
19 spin_lock(&ls
->async_lock
);
20 list_add_tail(&lp
->delay_list
, &ls
->submit
);
21 spin_unlock(&ls
->async_lock
);
22 wake_up(&ls
->thread_wait
);
25 static void process_blocking(struct gdlm_lock
*lp
, int bast_mode
)
27 struct gdlm_ls
*ls
= lp
->ls
;
30 switch (gdlm_make_lmstate(bast_mode
)) {
41 gdlm_assert(0, "unknown bast mode %u", lp
->bast_mode
);
44 ls
->fscb(ls
->sdp
, cb
, &lp
->lockname
);
47 static void wake_up_ast(struct gdlm_lock
*lp
)
49 clear_bit(LFL_AST_WAIT
, &lp
->flags
);
50 smp_mb__after_clear_bit();
51 wake_up_bit(&lp
->flags
, LFL_AST_WAIT
);
54 static void process_complete(struct gdlm_lock
*lp
)
56 struct gdlm_ls
*ls
= lp
->ls
;
57 struct lm_async_cb acb
;
58 s16 prev_mode
= lp
->cur
;
60 memset(&acb
, 0, sizeof(acb
));
62 if (lp
->lksb
.sb_status
== -DLM_ECANCEL
) {
63 log_info("complete dlm cancel %x,%llx flags %lx",
65 (unsigned long long)lp
->lockname
.ln_number
,
69 acb
.lc_ret
|= LM_OUT_CANCELED
;
70 if (lp
->cur
== DLM_LOCK_IV
)
75 if (test_and_clear_bit(LFL_DLM_UNLOCK
, &lp
->flags
)) {
76 if (lp
->lksb
.sb_status
!= -DLM_EUNLOCK
) {
77 log_info("unlock sb_status %d %x,%llx flags %lx",
78 lp
->lksb
.sb_status
, lp
->lockname
.ln_type
,
79 (unsigned long long)lp
->lockname
.ln_number
,
84 lp
->cur
= DLM_LOCK_IV
;
85 lp
->req
= DLM_LOCK_IV
;
88 if (test_and_clear_bit(LFL_UNLOCK_DELETE
, &lp
->flags
)) {
95 if (lp
->lksb
.sb_flags
& DLM_SBF_VALNOTVALID
)
96 memset(lp
->lksb
.sb_lvbptr
, 0, GDLM_LVB_SIZE
);
98 if (lp
->lksb
.sb_flags
& DLM_SBF_ALTMODE
) {
99 if (lp
->req
== DLM_LOCK_PR
)
100 lp
->req
= DLM_LOCK_CW
;
101 else if (lp
->req
== DLM_LOCK_CW
)
102 lp
->req
= DLM_LOCK_PR
;
106 * A canceled lock request. The lock was just taken off the delayed
107 * list and was never even submitted to dlm.
110 if (test_and_clear_bit(LFL_CANCEL
, &lp
->flags
)) {
111 log_info("complete internal cancel %x,%llx",
112 lp
->lockname
.ln_type
,
113 (unsigned long long)lp
->lockname
.ln_number
);
115 acb
.lc_ret
|= LM_OUT_CANCELED
;
123 if (lp
->lksb
.sb_status
) {
124 /* a "normal" error */
125 if ((lp
->lksb
.sb_status
== -EAGAIN
) &&
126 (lp
->lkf
& DLM_LKF_NOQUEUE
)) {
128 if (lp
->cur
== DLM_LOCK_IV
)
129 lp
->lksb
.sb_lkid
= 0;
133 /* this could only happen with cancels I think */
134 log_info("ast sb_status %d %x,%llx flags %lx",
135 lp
->lksb
.sb_status
, lp
->lockname
.ln_type
,
136 (unsigned long long)lp
->lockname
.ln_number
,
138 if (lp
->lksb
.sb_status
== -EDEADLOCK
&&
139 lp
->ls
->fsflags
& LM_MFLAG_CONV_NODROP
) {
141 acb
.lc_ret
|= LM_OUT_CONV_DEADLK
;
142 if (lp
->cur
== DLM_LOCK_IV
)
143 lp
->lksb
.sb_lkid
= 0;
150 * This is an AST for an EX->EX conversion for sync_lvb from GFS.
153 if (test_and_clear_bit(LFL_SYNC_LVB
, &lp
->flags
)) {
159 * A lock has been demoted to NL because it initially completed during
160 * BLOCK_LOCKS. Now it must be requested in the originally requested
164 if (test_and_clear_bit(LFL_REREQUEST
, &lp
->flags
)) {
165 gdlm_assert(lp
->req
== DLM_LOCK_NL
, "%x,%llx",
166 lp
->lockname
.ln_type
,
167 (unsigned long long)lp
->lockname
.ln_number
);
168 gdlm_assert(lp
->prev_req
> DLM_LOCK_NL
, "%x,%llx",
169 lp
->lockname
.ln_type
,
170 (unsigned long long)lp
->lockname
.ln_number
);
172 lp
->cur
= DLM_LOCK_NL
;
173 lp
->req
= lp
->prev_req
;
174 lp
->prev_req
= DLM_LOCK_IV
;
175 lp
->lkf
&= ~DLM_LKF_CONVDEADLK
;
177 set_bit(LFL_NOCACHE
, &lp
->flags
);
179 if (test_bit(DFL_BLOCK_LOCKS
, &ls
->flags
) &&
180 !test_bit(LFL_NOBLOCK
, &lp
->flags
))
181 gdlm_queue_delayed(lp
);
188 * A request is granted during dlm recovery. It may be granted
189 * because the locks of a failed node were cleared. In that case,
190 * there may be inconsistent data beneath this lock and we must wait
191 * for recovery to complete to use it. When gfs recovery is done this
192 * granted lock will be converted to NL and then reacquired in this
196 if (test_bit(DFL_BLOCK_LOCKS
, &ls
->flags
) &&
197 !test_bit(LFL_NOBLOCK
, &lp
->flags
) &&
198 lp
->req
!= DLM_LOCK_NL
) {
201 lp
->prev_req
= lp
->req
;
202 lp
->req
= DLM_LOCK_NL
;
203 lp
->lkf
|= DLM_LKF_CONVERT
;
204 lp
->lkf
&= ~DLM_LKF_CONVDEADLK
;
206 log_debug("rereq %x,%llx id %x %d,%d",
207 lp
->lockname
.ln_type
,
208 (unsigned long long)lp
->lockname
.ln_number
,
209 lp
->lksb
.sb_lkid
, lp
->cur
, lp
->req
);
211 set_bit(LFL_REREQUEST
, &lp
->flags
);
217 * DLM demoted the lock to NL before it was granted so GFS must be
218 * told it cannot cache data for this lock.
221 if (lp
->lksb
.sb_flags
& DLM_SBF_DEMOTED
)
222 set_bit(LFL_NOCACHE
, &lp
->flags
);
226 * This is an internal lock_dlm lock
229 if (test_bit(LFL_INLOCK
, &lp
->flags
)) {
230 clear_bit(LFL_NOBLOCK
, &lp
->flags
);
237 * Normal completion of a lock request. Tell GFS it now has the lock.
240 clear_bit(LFL_NOBLOCK
, &lp
->flags
);
243 acb
.lc_name
= lp
->lockname
;
244 acb
.lc_ret
|= gdlm_make_lmstate(lp
->cur
);
246 if (!test_and_clear_bit(LFL_NOCACHE
, &lp
->flags
) &&
247 (lp
->cur
> DLM_LOCK_NL
) && (prev_mode
> DLM_LOCK_NL
))
248 acb
.lc_ret
|= LM_OUT_CACHEABLE
;
250 ls
->fscb(ls
->sdp
, LM_CB_ASYNC
, &acb
);
253 static inline int no_work(struct gdlm_ls
*ls
, int blocking
)
257 spin_lock(&ls
->async_lock
);
258 ret
= list_empty(&ls
->complete
) && list_empty(&ls
->submit
);
260 ret
= list_empty(&ls
->blocking
);
261 spin_unlock(&ls
->async_lock
);
266 static inline int check_drop(struct gdlm_ls
*ls
)
268 if (!ls
->drop_locks_count
)
271 if (time_after(jiffies
, ls
->drop_time
+ ls
->drop_locks_period
* HZ
)) {
272 ls
->drop_time
= jiffies
;
273 if (ls
->all_locks_count
>= ls
->drop_locks_count
)
279 static int gdlm_thread(void *data
, int blist
)
281 struct gdlm_ls
*ls
= (struct gdlm_ls
*) data
;
282 struct gdlm_lock
*lp
= NULL
;
283 uint8_t complete
, blocking
, submit
, drop
;
285 /* Only thread1 is allowed to do blocking callbacks since gfs
286 may wait for a completion callback within a blocking cb. */
288 while (!kthread_should_stop()) {
289 wait_event_interruptible(ls
->thread_wait
,
290 !no_work(ls
, blist
) || kthread_should_stop());
292 complete
= blocking
= submit
= drop
= 0;
294 spin_lock(&ls
->async_lock
);
296 if (blist
&& !list_empty(&ls
->blocking
)) {
297 lp
= list_entry(ls
->blocking
.next
, struct gdlm_lock
,
299 list_del_init(&lp
->blist
);
300 blocking
= lp
->bast_mode
;
302 } else if (!list_empty(&ls
->complete
)) {
303 lp
= list_entry(ls
->complete
.next
, struct gdlm_lock
,
305 list_del_init(&lp
->clist
);
307 } else if (!list_empty(&ls
->submit
)) {
308 lp
= list_entry(ls
->submit
.next
, struct gdlm_lock
,
310 list_del_init(&lp
->delay_list
);
314 drop
= check_drop(ls
);
315 spin_unlock(&ls
->async_lock
);
318 process_complete(lp
);
321 process_blocking(lp
, blocking
);
327 ls
->fscb(ls
->sdp
, LM_CB_DROPLOCKS
, NULL
);
335 static int gdlm_thread1(void *data
)
337 return gdlm_thread(data
, 1);
340 static int gdlm_thread2(void *data
)
342 return gdlm_thread(data
, 0);
345 int gdlm_init_threads(struct gdlm_ls
*ls
)
347 struct task_struct
*p
;
350 p
= kthread_run(gdlm_thread1
, ls
, "lock_dlm1");
353 log_error("can't start lock_dlm1 thread %d", error
);
358 p
= kthread_run(gdlm_thread2
, ls
, "lock_dlm2");
361 log_error("can't start lock_dlm2 thread %d", error
);
362 kthread_stop(ls
->thread1
);
370 void gdlm_release_threads(struct gdlm_ls
*ls
)
372 kthread_stop(ls
->thread1
);
373 kthread_stop(ls
->thread2
);