1 /******************************************************************************
2 *******************************************************************************
4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5 ** Copyright (C) 2004-2010 Red Hat, Inc. All rights reserved.
7 ** This copyrighted material is made available to anyone wishing to use,
8 ** modify, copy, or redistribute it subject to the terms and conditions
9 ** of the GNU General Public License v.2.
11 *******************************************************************************
12 ******************************************************************************/
14 #include "dlm_internal.h"
19 static uint64_t dlm_cb_seq
;
20 static DEFINE_SPINLOCK(dlm_cb_seq_spin
);
22 static void dlm_dump_lkb_callbacks(struct dlm_lkb
*lkb
)
26 log_print("last_bast %x %llu flags %x mode %d sb %d %x",
28 (unsigned long long)lkb
->lkb_last_bast
.seq
,
29 lkb
->lkb_last_bast
.flags
,
30 lkb
->lkb_last_bast
.mode
,
31 lkb
->lkb_last_bast
.sb_status
,
32 lkb
->lkb_last_bast
.sb_flags
);
34 log_print("last_cast %x %llu flags %x mode %d sb %d %x",
36 (unsigned long long)lkb
->lkb_last_cast
.seq
,
37 lkb
->lkb_last_cast
.flags
,
38 lkb
->lkb_last_cast
.mode
,
39 lkb
->lkb_last_cast
.sb_status
,
40 lkb
->lkb_last_cast
.sb_flags
);
42 for (i
= 0; i
< DLM_CALLBACKS_SIZE
; i
++) {
43 log_print("cb %x %llu flags %x mode %d sb %d %x",
45 (unsigned long long)lkb
->lkb_callbacks
[i
].seq
,
46 lkb
->lkb_callbacks
[i
].flags
,
47 lkb
->lkb_callbacks
[i
].mode
,
48 lkb
->lkb_callbacks
[i
].sb_status
,
49 lkb
->lkb_callbacks
[i
].sb_flags
);
53 int dlm_add_lkb_callback(struct dlm_lkb
*lkb
, uint32_t flags
, int mode
,
54 int status
, uint32_t sbflags
, uint64_t seq
)
56 struct dlm_ls
*ls
= lkb
->lkb_resource
->res_ls
;
61 for (i
= 0; i
< DLM_CALLBACKS_SIZE
; i
++) {
62 if (lkb
->lkb_callbacks
[i
].seq
)
66 * Suppress some redundant basts here, do more on removal.
67 * Don't even add a bast if the callback just before it
68 * is a bast for the same mode or a more restrictive mode.
69 * (the addional > PR check is needed for PR/CW inversion)
72 if ((i
> 0) && (flags
& DLM_CB_BAST
) &&
73 (lkb
->lkb_callbacks
[i
-1].flags
& DLM_CB_BAST
)) {
75 prev_seq
= lkb
->lkb_callbacks
[i
-1].seq
;
76 prev_mode
= lkb
->lkb_callbacks
[i
-1].mode
;
78 if ((prev_mode
== mode
) ||
79 (prev_mode
> mode
&& prev_mode
> DLM_LOCK_PR
)) {
81 log_debug(ls
, "skip %x add bast %llu mode %d "
82 "for bast %llu mode %d",
84 (unsigned long long)seq
,
86 (unsigned long long)prev_seq
,
93 lkb
->lkb_callbacks
[i
].seq
= seq
;
94 lkb
->lkb_callbacks
[i
].flags
= flags
;
95 lkb
->lkb_callbacks
[i
].mode
= mode
;
96 lkb
->lkb_callbacks
[i
].sb_status
= status
;
97 lkb
->lkb_callbacks
[i
].sb_flags
= (sbflags
& 0x000000FF);
102 if (i
== DLM_CALLBACKS_SIZE
) {
103 log_error(ls
, "no callbacks %x %llu flags %x mode %d sb %d %x",
104 lkb
->lkb_id
, (unsigned long long)seq
,
105 flags
, mode
, status
, sbflags
);
106 dlm_dump_lkb_callbacks(lkb
);
114 int dlm_rem_lkb_callback(struct dlm_ls
*ls
, struct dlm_lkb
*lkb
,
115 struct dlm_callback
*cb
, int *resid
)
121 if (!lkb
->lkb_callbacks
[0].seq
) {
126 /* oldest undelivered cb is callbacks[0] */
128 memcpy(cb
, &lkb
->lkb_callbacks
[0], sizeof(struct dlm_callback
));
129 memset(&lkb
->lkb_callbacks
[0], 0, sizeof(struct dlm_callback
));
131 /* shift others down */
133 for (i
= 1; i
< DLM_CALLBACKS_SIZE
; i
++) {
134 if (!lkb
->lkb_callbacks
[i
].seq
)
136 memcpy(&lkb
->lkb_callbacks
[i
-1], &lkb
->lkb_callbacks
[i
],
137 sizeof(struct dlm_callback
));
138 memset(&lkb
->lkb_callbacks
[i
], 0, sizeof(struct dlm_callback
));
142 /* if cb is a bast, it should be skipped if the blocking mode is
143 compatible with the last granted mode */
145 if ((cb
->flags
& DLM_CB_BAST
) && lkb
->lkb_last_cast
.seq
) {
146 if (dlm_modes_compat(cb
->mode
, lkb
->lkb_last_cast
.mode
)) {
147 cb
->flags
|= DLM_CB_SKIP
;
149 log_debug(ls
, "skip %x bast %llu mode %d "
150 "for cast %llu mode %d",
152 (unsigned long long)cb
->seq
,
154 (unsigned long long)lkb
->lkb_last_cast
.seq
,
155 lkb
->lkb_last_cast
.mode
);
161 if (cb
->flags
& DLM_CB_CAST
) {
162 memcpy(&lkb
->lkb_last_cast
, cb
, sizeof(struct dlm_callback
));
163 lkb
->lkb_last_cast_time
= ktime_get();
166 if (cb
->flags
& DLM_CB_BAST
) {
167 memcpy(&lkb
->lkb_last_bast
, cb
, sizeof(struct dlm_callback
));
168 lkb
->lkb_last_bast_time
= ktime_get();
175 void dlm_add_cb(struct dlm_lkb
*lkb
, uint32_t flags
, int mode
, int status
,
178 struct dlm_ls
*ls
= lkb
->lkb_resource
->res_ls
;
179 uint64_t new_seq
, prev_seq
;
182 spin_lock(&dlm_cb_seq_spin
);
183 new_seq
= ++dlm_cb_seq
;
184 spin_unlock(&dlm_cb_seq_spin
);
186 if (lkb
->lkb_flags
& DLM_IFL_USER
) {
187 dlm_user_add_ast(lkb
, flags
, mode
, status
, sbflags
, new_seq
);
191 mutex_lock(&lkb
->lkb_cb_mutex
);
192 prev_seq
= lkb
->lkb_callbacks
[0].seq
;
194 rv
= dlm_add_lkb_callback(lkb
, flags
, mode
, status
, sbflags
, new_seq
);
199 kref_get(&lkb
->lkb_ref
);
201 if (test_bit(LSFL_CB_DELAY
, &ls
->ls_flags
)) {
202 mutex_lock(&ls
->ls_cb_mutex
);
203 list_add(&lkb
->lkb_cb_list
, &ls
->ls_cb_delay
);
204 mutex_unlock(&ls
->ls_cb_mutex
);
206 queue_work(ls
->ls_callback_wq
, &lkb
->lkb_cb_work
);
210 mutex_unlock(&lkb
->lkb_cb_mutex
);
213 void dlm_callback_work(struct work_struct
*work
)
215 struct dlm_lkb
*lkb
= container_of(work
, struct dlm_lkb
, lkb_cb_work
);
216 struct dlm_ls
*ls
= lkb
->lkb_resource
->res_ls
;
217 void (*castfn
) (void *astparam
);
218 void (*bastfn
) (void *astparam
, int mode
);
219 struct dlm_callback callbacks
[DLM_CALLBACKS_SIZE
];
222 memset(&callbacks
, 0, sizeof(callbacks
));
224 mutex_lock(&lkb
->lkb_cb_mutex
);
225 if (!lkb
->lkb_callbacks
[0].seq
) {
226 /* no callback work exists, shouldn't happen */
227 log_error(ls
, "dlm_callback_work %x no work", lkb
->lkb_id
);
229 dlm_dump_lkb_callbacks(lkb
);
232 for (i
= 0; i
< DLM_CALLBACKS_SIZE
; i
++) {
233 rv
= dlm_rem_lkb_callback(ls
, lkb
, &callbacks
[i
], &resid
);
239 /* cbs remain, loop should have removed all, shouldn't happen */
240 log_error(ls
, "dlm_callback_work %x resid %d", lkb
->lkb_id
,
243 dlm_dump_lkb_callbacks(lkb
);
245 mutex_unlock(&lkb
->lkb_cb_mutex
);
247 castfn
= lkb
->lkb_astfn
;
248 bastfn
= lkb
->lkb_bastfn
;
250 for (i
= 0; i
< DLM_CALLBACKS_SIZE
; i
++) {
251 if (!callbacks
[i
].seq
)
253 if (callbacks
[i
].flags
& DLM_CB_SKIP
) {
255 } else if (callbacks
[i
].flags
& DLM_CB_BAST
) {
256 bastfn(lkb
->lkb_astparam
, callbacks
[i
].mode
);
257 } else if (callbacks
[i
].flags
& DLM_CB_CAST
) {
258 lkb
->lkb_lksb
->sb_status
= callbacks
[i
].sb_status
;
259 lkb
->lkb_lksb
->sb_flags
= callbacks
[i
].sb_flags
;
260 castfn(lkb
->lkb_astparam
);
264 /* undo kref_get from dlm_add_callback, may cause lkb to be freed */
268 int dlm_callback_start(struct dlm_ls
*ls
)
270 ls
->ls_callback_wq
= alloc_workqueue("dlm_callback",
271 WQ_UNBOUND
| WQ_MEM_RECLAIM
, 0);
272 if (!ls
->ls_callback_wq
) {
273 log_print("can't start dlm_callback workqueue");
279 void dlm_callback_stop(struct dlm_ls
*ls
)
281 if (ls
->ls_callback_wq
)
282 destroy_workqueue(ls
->ls_callback_wq
);
285 void dlm_callback_suspend(struct dlm_ls
*ls
)
287 set_bit(LSFL_CB_DELAY
, &ls
->ls_flags
);
289 if (ls
->ls_callback_wq
)
290 flush_workqueue(ls
->ls_callback_wq
);
293 #define MAX_CB_QUEUE 25
295 void dlm_callback_resume(struct dlm_ls
*ls
)
297 struct dlm_lkb
*lkb
, *safe
;
300 clear_bit(LSFL_CB_DELAY
, &ls
->ls_flags
);
302 if (!ls
->ls_callback_wq
)
306 mutex_lock(&ls
->ls_cb_mutex
);
307 list_for_each_entry_safe(lkb
, safe
, &ls
->ls_cb_delay
, lkb_cb_list
) {
308 list_del_init(&lkb
->lkb_cb_list
);
309 queue_work(ls
->ls_callback_wq
, &lkb
->lkb_cb_work
);
311 if (count
== MAX_CB_QUEUE
)
314 mutex_unlock(&ls
->ls_cb_mutex
);
317 log_rinfo(ls
, "dlm_callback_resume %d", count
);
318 if (count
== MAX_CB_QUEUE
) {