1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
6 ** Copyright (C) 2004-2010 Red Hat, Inc. All rights reserved.
9 *******************************************************************************
10 ******************************************************************************/
12 #include "dlm_internal.h"
17 static uint64_t dlm_cb_seq
;
18 static DEFINE_SPINLOCK(dlm_cb_seq_spin
);
20 static void dlm_dump_lkb_callbacks(struct dlm_lkb
*lkb
)
24 log_print("last_bast %x %llu flags %x mode %d sb %d %x",
26 (unsigned long long)lkb
->lkb_last_bast
.seq
,
27 lkb
->lkb_last_bast
.flags
,
28 lkb
->lkb_last_bast
.mode
,
29 lkb
->lkb_last_bast
.sb_status
,
30 lkb
->lkb_last_bast
.sb_flags
);
32 log_print("last_cast %x %llu flags %x mode %d sb %d %x",
34 (unsigned long long)lkb
->lkb_last_cast
.seq
,
35 lkb
->lkb_last_cast
.flags
,
36 lkb
->lkb_last_cast
.mode
,
37 lkb
->lkb_last_cast
.sb_status
,
38 lkb
->lkb_last_cast
.sb_flags
);
40 for (i
= 0; i
< DLM_CALLBACKS_SIZE
; i
++) {
41 log_print("cb %x %llu flags %x mode %d sb %d %x",
43 (unsigned long long)lkb
->lkb_callbacks
[i
].seq
,
44 lkb
->lkb_callbacks
[i
].flags
,
45 lkb
->lkb_callbacks
[i
].mode
,
46 lkb
->lkb_callbacks
[i
].sb_status
,
47 lkb
->lkb_callbacks
[i
].sb_flags
);
51 int dlm_add_lkb_callback(struct dlm_lkb
*lkb
, uint32_t flags
, int mode
,
52 int status
, uint32_t sbflags
, uint64_t seq
)
54 struct dlm_ls
*ls
= lkb
->lkb_resource
->res_ls
;
59 for (i
= 0; i
< DLM_CALLBACKS_SIZE
; i
++) {
60 if (lkb
->lkb_callbacks
[i
].seq
)
64 * Suppress some redundant basts here, do more on removal.
65 * Don't even add a bast if the callback just before it
66 * is a bast for the same mode or a more restrictive mode.
67 * (the addional > PR check is needed for PR/CW inversion)
70 if ((i
> 0) && (flags
& DLM_CB_BAST
) &&
71 (lkb
->lkb_callbacks
[i
-1].flags
& DLM_CB_BAST
)) {
73 prev_seq
= lkb
->lkb_callbacks
[i
-1].seq
;
74 prev_mode
= lkb
->lkb_callbacks
[i
-1].mode
;
76 if ((prev_mode
== mode
) ||
77 (prev_mode
> mode
&& prev_mode
> DLM_LOCK_PR
)) {
79 log_debug(ls
, "skip %x add bast %llu mode %d "
80 "for bast %llu mode %d",
82 (unsigned long long)seq
,
84 (unsigned long long)prev_seq
,
91 lkb
->lkb_callbacks
[i
].seq
= seq
;
92 lkb
->lkb_callbacks
[i
].flags
= flags
;
93 lkb
->lkb_callbacks
[i
].mode
= mode
;
94 lkb
->lkb_callbacks
[i
].sb_status
= status
;
95 lkb
->lkb_callbacks
[i
].sb_flags
= (sbflags
& 0x000000FF);
100 if (i
== DLM_CALLBACKS_SIZE
) {
101 log_error(ls
, "no callbacks %x %llu flags %x mode %d sb %d %x",
102 lkb
->lkb_id
, (unsigned long long)seq
,
103 flags
, mode
, status
, sbflags
);
104 dlm_dump_lkb_callbacks(lkb
);
112 int dlm_rem_lkb_callback(struct dlm_ls
*ls
, struct dlm_lkb
*lkb
,
113 struct dlm_callback
*cb
, int *resid
)
119 if (!lkb
->lkb_callbacks
[0].seq
) {
124 /* oldest undelivered cb is callbacks[0] */
126 memcpy(cb
, &lkb
->lkb_callbacks
[0], sizeof(struct dlm_callback
));
127 memset(&lkb
->lkb_callbacks
[0], 0, sizeof(struct dlm_callback
));
129 /* shift others down */
131 for (i
= 1; i
< DLM_CALLBACKS_SIZE
; i
++) {
132 if (!lkb
->lkb_callbacks
[i
].seq
)
134 memcpy(&lkb
->lkb_callbacks
[i
-1], &lkb
->lkb_callbacks
[i
],
135 sizeof(struct dlm_callback
));
136 memset(&lkb
->lkb_callbacks
[i
], 0, sizeof(struct dlm_callback
));
140 /* if cb is a bast, it should be skipped if the blocking mode is
141 compatible with the last granted mode */
143 if ((cb
->flags
& DLM_CB_BAST
) && lkb
->lkb_last_cast
.seq
) {
144 if (dlm_modes_compat(cb
->mode
, lkb
->lkb_last_cast
.mode
)) {
145 cb
->flags
|= DLM_CB_SKIP
;
147 log_debug(ls
, "skip %x bast %llu mode %d "
148 "for cast %llu mode %d",
150 (unsigned long long)cb
->seq
,
152 (unsigned long long)lkb
->lkb_last_cast
.seq
,
153 lkb
->lkb_last_cast
.mode
);
159 if (cb
->flags
& DLM_CB_CAST
) {
160 memcpy(&lkb
->lkb_last_cast
, cb
, sizeof(struct dlm_callback
));
161 lkb
->lkb_last_cast_time
= ktime_get();
164 if (cb
->flags
& DLM_CB_BAST
) {
165 memcpy(&lkb
->lkb_last_bast
, cb
, sizeof(struct dlm_callback
));
166 lkb
->lkb_last_bast_time
= ktime_get();
173 void dlm_add_cb(struct dlm_lkb
*lkb
, uint32_t flags
, int mode
, int status
,
176 struct dlm_ls
*ls
= lkb
->lkb_resource
->res_ls
;
177 uint64_t new_seq
, prev_seq
;
180 spin_lock(&dlm_cb_seq_spin
);
181 new_seq
= ++dlm_cb_seq
;
183 new_seq
= ++dlm_cb_seq
;
184 spin_unlock(&dlm_cb_seq_spin
);
186 if (lkb
->lkb_flags
& DLM_IFL_USER
) {
187 dlm_user_add_ast(lkb
, flags
, mode
, status
, sbflags
, new_seq
);
191 mutex_lock(&lkb
->lkb_cb_mutex
);
192 prev_seq
= lkb
->lkb_callbacks
[0].seq
;
194 rv
= dlm_add_lkb_callback(lkb
, flags
, mode
, status
, sbflags
, new_seq
);
199 kref_get(&lkb
->lkb_ref
);
201 if (test_bit(LSFL_CB_DELAY
, &ls
->ls_flags
)) {
202 mutex_lock(&ls
->ls_cb_mutex
);
203 list_add(&lkb
->lkb_cb_list
, &ls
->ls_cb_delay
);
204 mutex_unlock(&ls
->ls_cb_mutex
);
206 queue_work(ls
->ls_callback_wq
, &lkb
->lkb_cb_work
);
210 mutex_unlock(&lkb
->lkb_cb_mutex
);
213 void dlm_callback_work(struct work_struct
*work
)
215 struct dlm_lkb
*lkb
= container_of(work
, struct dlm_lkb
, lkb_cb_work
);
216 struct dlm_ls
*ls
= lkb
->lkb_resource
->res_ls
;
217 void (*castfn
) (void *astparam
);
218 void (*bastfn
) (void *astparam
, int mode
);
219 struct dlm_callback callbacks
[DLM_CALLBACKS_SIZE
];
222 memset(&callbacks
, 0, sizeof(callbacks
));
224 mutex_lock(&lkb
->lkb_cb_mutex
);
225 if (!lkb
->lkb_callbacks
[0].seq
) {
226 /* no callback work exists, shouldn't happen */
227 log_error(ls
, "dlm_callback_work %x no work", lkb
->lkb_id
);
229 dlm_dump_lkb_callbacks(lkb
);
232 for (i
= 0; i
< DLM_CALLBACKS_SIZE
; i
++) {
233 rv
= dlm_rem_lkb_callback(ls
, lkb
, &callbacks
[i
], &resid
);
239 /* cbs remain, loop should have removed all, shouldn't happen */
240 log_error(ls
, "dlm_callback_work %x resid %d", lkb
->lkb_id
,
243 dlm_dump_lkb_callbacks(lkb
);
245 mutex_unlock(&lkb
->lkb_cb_mutex
);
247 castfn
= lkb
->lkb_astfn
;
248 bastfn
= lkb
->lkb_bastfn
;
250 for (i
= 0; i
< DLM_CALLBACKS_SIZE
; i
++) {
251 if (!callbacks
[i
].seq
)
253 if (callbacks
[i
].flags
& DLM_CB_SKIP
) {
255 } else if (callbacks
[i
].flags
& DLM_CB_BAST
) {
256 bastfn(lkb
->lkb_astparam
, callbacks
[i
].mode
);
257 } else if (callbacks
[i
].flags
& DLM_CB_CAST
) {
258 lkb
->lkb_lksb
->sb_status
= callbacks
[i
].sb_status
;
259 lkb
->lkb_lksb
->sb_flags
= callbacks
[i
].sb_flags
;
260 castfn(lkb
->lkb_astparam
);
264 /* undo kref_get from dlm_add_callback, may cause lkb to be freed */
268 int dlm_callback_start(struct dlm_ls
*ls
)
270 ls
->ls_callback_wq
= alloc_workqueue("dlm_callback",
271 WQ_HIGHPRI
| WQ_MEM_RECLAIM
, 0);
272 if (!ls
->ls_callback_wq
) {
273 log_print("can't start dlm_callback workqueue");
279 void dlm_callback_stop(struct dlm_ls
*ls
)
281 if (ls
->ls_callback_wq
)
282 destroy_workqueue(ls
->ls_callback_wq
);
285 void dlm_callback_suspend(struct dlm_ls
*ls
)
287 set_bit(LSFL_CB_DELAY
, &ls
->ls_flags
);
289 if (ls
->ls_callback_wq
)
290 flush_workqueue(ls
->ls_callback_wq
);
293 #define MAX_CB_QUEUE 25
295 void dlm_callback_resume(struct dlm_ls
*ls
)
297 struct dlm_lkb
*lkb
, *safe
;
300 clear_bit(LSFL_CB_DELAY
, &ls
->ls_flags
);
302 if (!ls
->ls_callback_wq
)
306 mutex_lock(&ls
->ls_cb_mutex
);
307 list_for_each_entry_safe(lkb
, safe
, &ls
->ls_cb_delay
, lkb_cb_list
) {
308 list_del_init(&lkb
->lkb_cb_list
);
309 queue_work(ls
->ls_callback_wq
, &lkb
->lkb_cb_work
);
311 if (count
== MAX_CB_QUEUE
)
314 mutex_unlock(&ls
->ls_cb_mutex
);
317 log_rinfo(ls
, "dlm_callback_resume %d", count
);
318 if (count
== MAX_CB_QUEUE
) {