1 /******************************************************************************
2 *******************************************************************************
4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5 ** Copyright (C) 2004-2010 Red Hat, Inc. All rights reserved.
7 ** This copyrighted material is made available to anyone wishing to use,
8 ** modify, copy, or redistribute it subject to the terms and conditions
9 ** of the GNU General Public License v.2.
11 *******************************************************************************
12 ******************************************************************************/
14 #include "dlm_internal.h"
19 static uint64_t dlm_cb_seq
;
20 static DEFINE_SPINLOCK(dlm_cb_seq_spin
);
22 static void dlm_dump_lkb_callbacks(struct dlm_lkb
*lkb
)
26 log_print("last_bast %x %llu flags %x mode %d sb %d %x",
28 (unsigned long long)lkb
->lkb_last_bast
.seq
,
29 lkb
->lkb_last_bast
.flags
,
30 lkb
->lkb_last_bast
.mode
,
31 lkb
->lkb_last_bast
.sb_status
,
32 lkb
->lkb_last_bast
.sb_flags
);
34 log_print("last_cast %x %llu flags %x mode %d sb %d %x",
36 (unsigned long long)lkb
->lkb_last_cast
.seq
,
37 lkb
->lkb_last_cast
.flags
,
38 lkb
->lkb_last_cast
.mode
,
39 lkb
->lkb_last_cast
.sb_status
,
40 lkb
->lkb_last_cast
.sb_flags
);
42 for (i
= 0; i
< DLM_CALLBACKS_SIZE
; i
++) {
43 log_print("cb %x %llu flags %x mode %d sb %d %x",
45 (unsigned long long)lkb
->lkb_callbacks
[i
].seq
,
46 lkb
->lkb_callbacks
[i
].flags
,
47 lkb
->lkb_callbacks
[i
].mode
,
48 lkb
->lkb_callbacks
[i
].sb_status
,
49 lkb
->lkb_callbacks
[i
].sb_flags
);
53 int dlm_add_lkb_callback(struct dlm_lkb
*lkb
, uint32_t flags
, int mode
,
54 int status
, uint32_t sbflags
, uint64_t seq
)
56 struct dlm_ls
*ls
= lkb
->lkb_resource
->res_ls
;
61 for (i
= 0; i
< DLM_CALLBACKS_SIZE
; i
++) {
62 if (lkb
->lkb_callbacks
[i
].seq
)
66 * Suppress some redundant basts here, do more on removal.
67 * Don't even add a bast if the callback just before it
68 * is a bast for the same mode or a more restrictive mode.
69 * (the addional > PR check is needed for PR/CW inversion)
72 if ((i
> 0) && (flags
& DLM_CB_BAST
) &&
73 (lkb
->lkb_callbacks
[i
-1].flags
& DLM_CB_BAST
)) {
75 prev_seq
= lkb
->lkb_callbacks
[i
-1].seq
;
76 prev_mode
= lkb
->lkb_callbacks
[i
-1].mode
;
78 if ((prev_mode
== mode
) ||
79 (prev_mode
> mode
&& prev_mode
> DLM_LOCK_PR
)) {
81 log_debug(ls
, "skip %x add bast %llu mode %d "
82 "for bast %llu mode %d",
84 (unsigned long long)seq
,
86 (unsigned long long)prev_seq
,
93 lkb
->lkb_callbacks
[i
].seq
= seq
;
94 lkb
->lkb_callbacks
[i
].flags
= flags
;
95 lkb
->lkb_callbacks
[i
].mode
= mode
;
96 lkb
->lkb_callbacks
[i
].sb_status
= status
;
97 lkb
->lkb_callbacks
[i
].sb_flags
= (sbflags
& 0x000000FF);
102 if (i
== DLM_CALLBACKS_SIZE
) {
103 log_error(ls
, "no callbacks %x %llu flags %x mode %d sb %d %x",
104 lkb
->lkb_id
, (unsigned long long)seq
,
105 flags
, mode
, status
, sbflags
);
106 dlm_dump_lkb_callbacks(lkb
);
114 int dlm_rem_lkb_callback(struct dlm_ls
*ls
, struct dlm_lkb
*lkb
,
115 struct dlm_callback
*cb
, int *resid
)
121 if (!lkb
->lkb_callbacks
[0].seq
) {
126 /* oldest undelivered cb is callbacks[0] */
128 memcpy(cb
, &lkb
->lkb_callbacks
[0], sizeof(struct dlm_callback
));
129 memset(&lkb
->lkb_callbacks
[0], 0, sizeof(struct dlm_callback
));
131 /* shift others down */
133 for (i
= 1; i
< DLM_CALLBACKS_SIZE
; i
++) {
134 if (!lkb
->lkb_callbacks
[i
].seq
)
136 memcpy(&lkb
->lkb_callbacks
[i
-1], &lkb
->lkb_callbacks
[i
],
137 sizeof(struct dlm_callback
));
138 memset(&lkb
->lkb_callbacks
[i
], 0, sizeof(struct dlm_callback
));
142 /* if cb is a bast, it should be skipped if the blocking mode is
143 compatible with the last granted mode */
145 if ((cb
->flags
& DLM_CB_BAST
) && lkb
->lkb_last_cast
.seq
) {
146 if (dlm_modes_compat(cb
->mode
, lkb
->lkb_last_cast
.mode
)) {
147 cb
->flags
|= DLM_CB_SKIP
;
149 log_debug(ls
, "skip %x bast %llu mode %d "
150 "for cast %llu mode %d",
152 (unsigned long long)cb
->seq
,
154 (unsigned long long)lkb
->lkb_last_cast
.seq
,
155 lkb
->lkb_last_cast
.mode
);
161 if (cb
->flags
& DLM_CB_CAST
) {
162 memcpy(&lkb
->lkb_last_cast
, cb
, sizeof(struct dlm_callback
));
163 lkb
->lkb_last_cast_time
= ktime_get();
166 if (cb
->flags
& DLM_CB_BAST
) {
167 memcpy(&lkb
->lkb_last_bast
, cb
, sizeof(struct dlm_callback
));
168 lkb
->lkb_last_bast_time
= ktime_get();
175 void dlm_add_cb(struct dlm_lkb
*lkb
, uint32_t flags
, int mode
, int status
,
178 struct dlm_ls
*ls
= lkb
->lkb_resource
->res_ls
;
179 uint64_t new_seq
, prev_seq
;
182 spin_lock(&dlm_cb_seq_spin
);
183 new_seq
= ++dlm_cb_seq
;
185 new_seq
= ++dlm_cb_seq
;
186 spin_unlock(&dlm_cb_seq_spin
);
188 if (lkb
->lkb_flags
& DLM_IFL_USER
) {
189 dlm_user_add_ast(lkb
, flags
, mode
, status
, sbflags
, new_seq
);
193 mutex_lock(&lkb
->lkb_cb_mutex
);
194 prev_seq
= lkb
->lkb_callbacks
[0].seq
;
196 rv
= dlm_add_lkb_callback(lkb
, flags
, mode
, status
, sbflags
, new_seq
);
201 kref_get(&lkb
->lkb_ref
);
203 if (test_bit(LSFL_CB_DELAY
, &ls
->ls_flags
)) {
204 mutex_lock(&ls
->ls_cb_mutex
);
205 list_add(&lkb
->lkb_cb_list
, &ls
->ls_cb_delay
);
206 mutex_unlock(&ls
->ls_cb_mutex
);
208 queue_work(ls
->ls_callback_wq
, &lkb
->lkb_cb_work
);
212 mutex_unlock(&lkb
->lkb_cb_mutex
);
215 void dlm_callback_work(struct work_struct
*work
)
217 struct dlm_lkb
*lkb
= container_of(work
, struct dlm_lkb
, lkb_cb_work
);
218 struct dlm_ls
*ls
= lkb
->lkb_resource
->res_ls
;
219 void (*castfn
) (void *astparam
);
220 void (*bastfn
) (void *astparam
, int mode
);
221 struct dlm_callback callbacks
[DLM_CALLBACKS_SIZE
];
224 memset(&callbacks
, 0, sizeof(callbacks
));
226 mutex_lock(&lkb
->lkb_cb_mutex
);
227 if (!lkb
->lkb_callbacks
[0].seq
) {
228 /* no callback work exists, shouldn't happen */
229 log_error(ls
, "dlm_callback_work %x no work", lkb
->lkb_id
);
231 dlm_dump_lkb_callbacks(lkb
);
234 for (i
= 0; i
< DLM_CALLBACKS_SIZE
; i
++) {
235 rv
= dlm_rem_lkb_callback(ls
, lkb
, &callbacks
[i
], &resid
);
241 /* cbs remain, loop should have removed all, shouldn't happen */
242 log_error(ls
, "dlm_callback_work %x resid %d", lkb
->lkb_id
,
245 dlm_dump_lkb_callbacks(lkb
);
247 mutex_unlock(&lkb
->lkb_cb_mutex
);
249 castfn
= lkb
->lkb_astfn
;
250 bastfn
= lkb
->lkb_bastfn
;
252 for (i
= 0; i
< DLM_CALLBACKS_SIZE
; i
++) {
253 if (!callbacks
[i
].seq
)
255 if (callbacks
[i
].flags
& DLM_CB_SKIP
) {
257 } else if (callbacks
[i
].flags
& DLM_CB_BAST
) {
258 bastfn(lkb
->lkb_astparam
, callbacks
[i
].mode
);
259 } else if (callbacks
[i
].flags
& DLM_CB_CAST
) {
260 lkb
->lkb_lksb
->sb_status
= callbacks
[i
].sb_status
;
261 lkb
->lkb_lksb
->sb_flags
= callbacks
[i
].sb_flags
;
262 castfn(lkb
->lkb_astparam
);
266 /* undo kref_get from dlm_add_callback, may cause lkb to be freed */
270 int dlm_callback_start(struct dlm_ls
*ls
)
272 ls
->ls_callback_wq
= alloc_workqueue("dlm_callback",
273 WQ_HIGHPRI
| WQ_MEM_RECLAIM
, 0);
274 if (!ls
->ls_callback_wq
) {
275 log_print("can't start dlm_callback workqueue");
281 void dlm_callback_stop(struct dlm_ls
*ls
)
283 if (ls
->ls_callback_wq
)
284 destroy_workqueue(ls
->ls_callback_wq
);
287 void dlm_callback_suspend(struct dlm_ls
*ls
)
289 set_bit(LSFL_CB_DELAY
, &ls
->ls_flags
);
291 if (ls
->ls_callback_wq
)
292 flush_workqueue(ls
->ls_callback_wq
);
295 #define MAX_CB_QUEUE 25
297 void dlm_callback_resume(struct dlm_ls
*ls
)
299 struct dlm_lkb
*lkb
, *safe
;
302 clear_bit(LSFL_CB_DELAY
, &ls
->ls_flags
);
304 if (!ls
->ls_callback_wq
)
308 mutex_lock(&ls
->ls_cb_mutex
);
309 list_for_each_entry_safe(lkb
, safe
, &ls
->ls_cb_delay
, lkb_cb_list
) {
310 list_del_init(&lkb
->lkb_cb_list
);
311 queue_work(ls
->ls_callback_wq
, &lkb
->lkb_cb_work
);
313 if (count
== MAX_CB_QUEUE
)
316 mutex_unlock(&ls
->ls_cb_mutex
);
319 log_rinfo(ls
, "dlm_callback_resume %d", count
);
320 if (count
== MAX_CB_QUEUE
) {