1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /* -*- mode: c; c-basic-offset: 8; -*-
3 * vim: noexpandtab sw=8 ts=8 sts=0:
7 * Code which implements the kernel side of a minimal userspace
8 * interface to our DLM.
10 * Many of the functions here are pared down versions of dlmglue.c
13 * Copyright (C) 2003, 2004 Oracle. All rights reserved.
16 #include <linux/signal.h>
17 #include <linux/sched/signal.h>
19 #include <linux/module.h>
21 #include <linux/types.h>
22 #include <linux/crc32.h>
24 #include "../ocfs2_lockingver.h"
25 #include "../stackglue.h"
28 #define MLOG_MASK_PREFIX ML_DLMFS
29 #include "../cluster/masklog.h"
32 static inline struct user_lock_res
*user_lksb_to_lock_res(struct ocfs2_dlm_lksb
*lksb
)
34 return container_of(lksb
, struct user_lock_res
, l_lksb
);
37 static inline int user_check_wait_flag(struct user_lock_res
*lockres
,
42 spin_lock(&lockres
->l_lock
);
43 ret
= lockres
->l_flags
& flag
;
44 spin_unlock(&lockres
->l_lock
);
49 static inline void user_wait_on_busy_lock(struct user_lock_res
*lockres
)
52 wait_event(lockres
->l_event
,
53 !user_check_wait_flag(lockres
, USER_LOCK_BUSY
));
56 static inline void user_wait_on_blocked_lock(struct user_lock_res
*lockres
)
59 wait_event(lockres
->l_event
,
60 !user_check_wait_flag(lockres
, USER_LOCK_BLOCKED
));
63 /* I heart container_of... */
64 static inline struct ocfs2_cluster_connection
*
65 cluster_connection_from_user_lockres(struct user_lock_res
*lockres
)
67 struct dlmfs_inode_private
*ip
;
69 ip
= container_of(lockres
,
70 struct dlmfs_inode_private
,
76 user_dlm_inode_from_user_lockres(struct user_lock_res
*lockres
)
78 struct dlmfs_inode_private
*ip
;
80 ip
= container_of(lockres
,
81 struct dlmfs_inode_private
,
83 return &ip
->ip_vfs_inode
;
86 static inline void user_recover_from_dlm_error(struct user_lock_res
*lockres
)
88 spin_lock(&lockres
->l_lock
);
89 lockres
->l_flags
&= ~USER_LOCK_BUSY
;
90 spin_unlock(&lockres
->l_lock
);
93 #define user_log_dlm_error(_func, _stat, _lockres) do { \
94 mlog(ML_ERROR, "Dlm error %d while calling %s on " \
95 "resource %.*s\n", _stat, _func, \
96 _lockres->l_namelen, _lockres->l_name); \
99 /* WARNING: This function lives in a world where the only three lock
100 * levels are EX, PR, and NL. It *will* have to be adjusted when more
101 * lock types are added. */
102 static inline int user_highest_compat_lock_level(int level
)
104 int new_level
= DLM_LOCK_EX
;
106 if (level
== DLM_LOCK_EX
)
107 new_level
= DLM_LOCK_NL
;
108 else if (level
== DLM_LOCK_PR
)
109 new_level
= DLM_LOCK_PR
;
113 static void user_ast(struct ocfs2_dlm_lksb
*lksb
)
115 struct user_lock_res
*lockres
= user_lksb_to_lock_res(lksb
);
118 mlog(ML_BASTS
, "AST fired for lockres %.*s, level %d => %d\n",
119 lockres
->l_namelen
, lockres
->l_name
, lockres
->l_level
,
120 lockres
->l_requested
);
122 spin_lock(&lockres
->l_lock
);
124 status
= ocfs2_dlm_lock_status(&lockres
->l_lksb
);
126 mlog(ML_ERROR
, "lksb status value of %u on lockres %.*s\n",
127 status
, lockres
->l_namelen
, lockres
->l_name
);
128 spin_unlock(&lockres
->l_lock
);
132 mlog_bug_on_msg(lockres
->l_requested
== DLM_LOCK_IV
,
133 "Lockres %.*s, requested ivmode. flags 0x%x\n",
134 lockres
->l_namelen
, lockres
->l_name
, lockres
->l_flags
);
136 /* we're downconverting. */
137 if (lockres
->l_requested
< lockres
->l_level
) {
138 if (lockres
->l_requested
<=
139 user_highest_compat_lock_level(lockres
->l_blocking
)) {
140 lockres
->l_blocking
= DLM_LOCK_NL
;
141 lockres
->l_flags
&= ~USER_LOCK_BLOCKED
;
145 lockres
->l_level
= lockres
->l_requested
;
146 lockres
->l_requested
= DLM_LOCK_IV
;
147 lockres
->l_flags
|= USER_LOCK_ATTACHED
;
148 lockres
->l_flags
&= ~USER_LOCK_BUSY
;
150 spin_unlock(&lockres
->l_lock
);
152 wake_up(&lockres
->l_event
);
155 static inline void user_dlm_grab_inode_ref(struct user_lock_res
*lockres
)
158 inode
= user_dlm_inode_from_user_lockres(lockres
);
163 static void user_dlm_unblock_lock(struct work_struct
*work
);
165 static void __user_dlm_queue_lockres(struct user_lock_res
*lockres
)
167 if (!(lockres
->l_flags
& USER_LOCK_QUEUED
)) {
168 user_dlm_grab_inode_ref(lockres
);
170 INIT_WORK(&lockres
->l_work
, user_dlm_unblock_lock
);
172 queue_work(user_dlm_worker
, &lockres
->l_work
);
173 lockres
->l_flags
|= USER_LOCK_QUEUED
;
177 static void __user_dlm_cond_queue_lockres(struct user_lock_res
*lockres
)
181 if (!(lockres
->l_flags
& USER_LOCK_BLOCKED
))
184 switch (lockres
->l_blocking
) {
186 if (!lockres
->l_ex_holders
&& !lockres
->l_ro_holders
)
190 if (!lockres
->l_ex_holders
)
198 __user_dlm_queue_lockres(lockres
);
201 static void user_bast(struct ocfs2_dlm_lksb
*lksb
, int level
)
203 struct user_lock_res
*lockres
= user_lksb_to_lock_res(lksb
);
205 mlog(ML_BASTS
, "BAST fired for lockres %.*s, blocking %d, level %d\n",
206 lockres
->l_namelen
, lockres
->l_name
, level
, lockres
->l_level
);
208 spin_lock(&lockres
->l_lock
);
209 lockres
->l_flags
|= USER_LOCK_BLOCKED
;
210 if (level
> lockres
->l_blocking
)
211 lockres
->l_blocking
= level
;
213 __user_dlm_queue_lockres(lockres
);
214 spin_unlock(&lockres
->l_lock
);
216 wake_up(&lockres
->l_event
);
219 static void user_unlock_ast(struct ocfs2_dlm_lksb
*lksb
, int status
)
221 struct user_lock_res
*lockres
= user_lksb_to_lock_res(lksb
);
223 mlog(ML_BASTS
, "UNLOCK AST fired for lockres %.*s, flags 0x%x\n",
224 lockres
->l_namelen
, lockres
->l_name
, lockres
->l_flags
);
227 mlog(ML_ERROR
, "dlm returns status %d\n", status
);
229 spin_lock(&lockres
->l_lock
);
230 /* The teardown flag gets set early during the unlock process,
231 * so test the cancel flag to make sure that this ast isn't
232 * for a concurrent cancel. */
233 if (lockres
->l_flags
& USER_LOCK_IN_TEARDOWN
234 && !(lockres
->l_flags
& USER_LOCK_IN_CANCEL
)) {
235 lockres
->l_level
= DLM_LOCK_IV
;
236 } else if (status
== DLM_CANCELGRANT
) {
237 /* We tried to cancel a convert request, but it was
238 * already granted. Don't clear the busy flag - the
239 * ast should've done this already. */
240 BUG_ON(!(lockres
->l_flags
& USER_LOCK_IN_CANCEL
));
241 lockres
->l_flags
&= ~USER_LOCK_IN_CANCEL
;
244 BUG_ON(!(lockres
->l_flags
& USER_LOCK_IN_CANCEL
));
245 /* Cancel succeeded, we want to re-queue */
246 lockres
->l_requested
= DLM_LOCK_IV
; /* cancel an
249 lockres
->l_flags
&= ~USER_LOCK_IN_CANCEL
;
250 /* we want the unblock thread to look at it again
252 if (lockres
->l_flags
& USER_LOCK_BLOCKED
)
253 __user_dlm_queue_lockres(lockres
);
256 lockres
->l_flags
&= ~USER_LOCK_BUSY
;
258 spin_unlock(&lockres
->l_lock
);
260 wake_up(&lockres
->l_event
);
264 * This is the userdlmfs locking protocol version.
266 * See fs/ocfs2/dlmglue.c for more details on locking versions.
268 static struct ocfs2_locking_protocol user_dlm_lproto
= {
270 .pv_major
= OCFS2_LOCKING_PROTOCOL_MAJOR
,
271 .pv_minor
= OCFS2_LOCKING_PROTOCOL_MINOR
,
273 .lp_lock_ast
= user_ast
,
274 .lp_blocking_ast
= user_bast
,
275 .lp_unlock_ast
= user_unlock_ast
,
278 static inline void user_dlm_drop_inode_ref(struct user_lock_res
*lockres
)
281 inode
= user_dlm_inode_from_user_lockres(lockres
);
285 static void user_dlm_unblock_lock(struct work_struct
*work
)
287 int new_level
, status
;
288 struct user_lock_res
*lockres
=
289 container_of(work
, struct user_lock_res
, l_work
);
290 struct ocfs2_cluster_connection
*conn
=
291 cluster_connection_from_user_lockres(lockres
);
293 mlog(0, "lockres %.*s\n", lockres
->l_namelen
, lockres
->l_name
);
295 spin_lock(&lockres
->l_lock
);
297 mlog_bug_on_msg(!(lockres
->l_flags
& USER_LOCK_QUEUED
),
298 "Lockres %.*s, flags 0x%x\n",
299 lockres
->l_namelen
, lockres
->l_name
, lockres
->l_flags
);
301 /* notice that we don't clear USER_LOCK_BLOCKED here. If it's
302 * set, we want user_ast clear it. */
303 lockres
->l_flags
&= ~USER_LOCK_QUEUED
;
305 /* It's valid to get here and no longer be blocked - if we get
306 * several basts in a row, we might be queued by the first
307 * one, the unblock thread might run and clear the queued
308 * flag, and finally we might get another bast which re-queues
309 * us before our ast for the downconvert is called. */
310 if (!(lockres
->l_flags
& USER_LOCK_BLOCKED
)) {
311 mlog(ML_BASTS
, "lockres %.*s USER_LOCK_BLOCKED\n",
312 lockres
->l_namelen
, lockres
->l_name
);
313 spin_unlock(&lockres
->l_lock
);
317 if (lockres
->l_flags
& USER_LOCK_IN_TEARDOWN
) {
318 mlog(ML_BASTS
, "lockres %.*s USER_LOCK_IN_TEARDOWN\n",
319 lockres
->l_namelen
, lockres
->l_name
);
320 spin_unlock(&lockres
->l_lock
);
324 if (lockres
->l_flags
& USER_LOCK_BUSY
) {
325 if (lockres
->l_flags
& USER_LOCK_IN_CANCEL
) {
326 mlog(ML_BASTS
, "lockres %.*s USER_LOCK_IN_CANCEL\n",
327 lockres
->l_namelen
, lockres
->l_name
);
328 spin_unlock(&lockres
->l_lock
);
332 lockres
->l_flags
|= USER_LOCK_IN_CANCEL
;
333 spin_unlock(&lockres
->l_lock
);
335 status
= ocfs2_dlm_unlock(conn
, &lockres
->l_lksb
,
338 user_log_dlm_error("ocfs2_dlm_unlock", status
, lockres
);
342 /* If there are still incompat holders, we can exit safely
343 * without worrying about re-queueing this lock as that will
344 * happen on the last call to user_cluster_unlock. */
345 if ((lockres
->l_blocking
== DLM_LOCK_EX
)
346 && (lockres
->l_ex_holders
|| lockres
->l_ro_holders
)) {
347 spin_unlock(&lockres
->l_lock
);
348 mlog(ML_BASTS
, "lockres %.*s, EX/PR Holders %u,%u\n",
349 lockres
->l_namelen
, lockres
->l_name
,
350 lockres
->l_ex_holders
, lockres
->l_ro_holders
);
354 if ((lockres
->l_blocking
== DLM_LOCK_PR
)
355 && lockres
->l_ex_holders
) {
356 spin_unlock(&lockres
->l_lock
);
357 mlog(ML_BASTS
, "lockres %.*s, EX Holders %u\n",
358 lockres
->l_namelen
, lockres
->l_name
,
359 lockres
->l_ex_holders
);
363 /* yay, we can downconvert now. */
364 new_level
= user_highest_compat_lock_level(lockres
->l_blocking
);
365 lockres
->l_requested
= new_level
;
366 lockres
->l_flags
|= USER_LOCK_BUSY
;
367 mlog(ML_BASTS
, "lockres %.*s, downconvert %d => %d\n",
368 lockres
->l_namelen
, lockres
->l_name
, lockres
->l_level
, new_level
);
369 spin_unlock(&lockres
->l_lock
);
371 /* need lock downconvert request now... */
372 status
= ocfs2_dlm_lock(conn
, new_level
, &lockres
->l_lksb
,
373 DLM_LKF_CONVERT
|DLM_LKF_VALBLK
,
377 user_log_dlm_error("ocfs2_dlm_lock", status
, lockres
);
378 user_recover_from_dlm_error(lockres
);
382 user_dlm_drop_inode_ref(lockres
);
385 static inline void user_dlm_inc_holders(struct user_lock_res
*lockres
,
390 lockres
->l_ex_holders
++;
393 lockres
->l_ro_holders
++;
400 /* predict what lock level we'll be dropping down to on behalf
401 * of another node, and return true if the currently wanted
402 * level will be compatible with it. */
404 user_may_continue_on_blocked_lock(struct user_lock_res
*lockres
,
407 BUG_ON(!(lockres
->l_flags
& USER_LOCK_BLOCKED
));
409 return wanted
<= user_highest_compat_lock_level(lockres
->l_blocking
);
412 int user_dlm_cluster_lock(struct user_lock_res
*lockres
,
416 int status
, local_flags
;
417 struct ocfs2_cluster_connection
*conn
=
418 cluster_connection_from_user_lockres(lockres
);
420 if (level
!= DLM_LOCK_EX
&&
421 level
!= DLM_LOCK_PR
) {
422 mlog(ML_ERROR
, "lockres %.*s: invalid request!\n",
423 lockres
->l_namelen
, lockres
->l_name
);
428 mlog(ML_BASTS
, "lockres %.*s, level %d, flags = 0x%x\n",
429 lockres
->l_namelen
, lockres
->l_name
, level
, lkm_flags
);
432 if (signal_pending(current
)) {
433 status
= -ERESTARTSYS
;
437 spin_lock(&lockres
->l_lock
);
439 /* We only compare against the currently granted level
440 * here. If the lock is blocked waiting on a downconvert,
441 * we'll get caught below. */
442 if ((lockres
->l_flags
& USER_LOCK_BUSY
) &&
443 (level
> lockres
->l_level
)) {
444 /* is someone sitting in dlm_lock? If so, wait on
446 spin_unlock(&lockres
->l_lock
);
448 user_wait_on_busy_lock(lockres
);
452 if ((lockres
->l_flags
& USER_LOCK_BLOCKED
) &&
453 (!user_may_continue_on_blocked_lock(lockres
, level
))) {
454 /* is the lock is currently blocked on behalf of
456 spin_unlock(&lockres
->l_lock
);
458 user_wait_on_blocked_lock(lockres
);
462 if (level
> lockres
->l_level
) {
463 local_flags
= lkm_flags
| DLM_LKF_VALBLK
;
464 if (lockres
->l_level
!= DLM_LOCK_IV
)
465 local_flags
|= DLM_LKF_CONVERT
;
467 lockres
->l_requested
= level
;
468 lockres
->l_flags
|= USER_LOCK_BUSY
;
469 spin_unlock(&lockres
->l_lock
);
471 BUG_ON(level
== DLM_LOCK_IV
);
472 BUG_ON(level
== DLM_LOCK_NL
);
474 /* call dlm_lock to upgrade lock now */
475 status
= ocfs2_dlm_lock(conn
, level
, &lockres
->l_lksb
,
476 local_flags
, lockres
->l_name
,
479 if ((lkm_flags
& DLM_LKF_NOQUEUE
) &&
481 user_log_dlm_error("ocfs2_dlm_lock",
483 user_recover_from_dlm_error(lockres
);
487 user_wait_on_busy_lock(lockres
);
491 user_dlm_inc_holders(lockres
, level
);
492 spin_unlock(&lockres
->l_lock
);
499 static inline void user_dlm_dec_holders(struct user_lock_res
*lockres
,
504 BUG_ON(!lockres
->l_ex_holders
);
505 lockres
->l_ex_holders
--;
508 BUG_ON(!lockres
->l_ro_holders
);
509 lockres
->l_ro_holders
--;
516 void user_dlm_cluster_unlock(struct user_lock_res
*lockres
,
519 if (level
!= DLM_LOCK_EX
&&
520 level
!= DLM_LOCK_PR
) {
521 mlog(ML_ERROR
, "lockres %.*s: invalid request!\n",
522 lockres
->l_namelen
, lockres
->l_name
);
526 spin_lock(&lockres
->l_lock
);
527 user_dlm_dec_holders(lockres
, level
);
528 __user_dlm_cond_queue_lockres(lockres
);
529 spin_unlock(&lockres
->l_lock
);
532 void user_dlm_write_lvb(struct inode
*inode
,
536 struct user_lock_res
*lockres
= &DLMFS_I(inode
)->ip_lockres
;
539 BUG_ON(len
> DLM_LVB_LEN
);
541 spin_lock(&lockres
->l_lock
);
543 BUG_ON(lockres
->l_level
< DLM_LOCK_EX
);
544 lvb
= ocfs2_dlm_lvb(&lockres
->l_lksb
);
545 memcpy(lvb
, val
, len
);
547 spin_unlock(&lockres
->l_lock
);
550 ssize_t
user_dlm_read_lvb(struct inode
*inode
,
554 struct user_lock_res
*lockres
= &DLMFS_I(inode
)->ip_lockres
;
558 BUG_ON(len
> DLM_LVB_LEN
);
560 spin_lock(&lockres
->l_lock
);
562 BUG_ON(lockres
->l_level
< DLM_LOCK_PR
);
563 if (ocfs2_dlm_lvb_valid(&lockres
->l_lksb
)) {
564 lvb
= ocfs2_dlm_lvb(&lockres
->l_lksb
);
565 memcpy(val
, lvb
, len
);
569 spin_unlock(&lockres
->l_lock
);
573 void user_dlm_lock_res_init(struct user_lock_res
*lockres
,
574 struct dentry
*dentry
)
576 memset(lockres
, 0, sizeof(*lockres
));
578 spin_lock_init(&lockres
->l_lock
);
579 init_waitqueue_head(&lockres
->l_event
);
580 lockres
->l_level
= DLM_LOCK_IV
;
581 lockres
->l_requested
= DLM_LOCK_IV
;
582 lockres
->l_blocking
= DLM_LOCK_IV
;
584 /* should have been checked before getting here. */
585 BUG_ON(dentry
->d_name
.len
>= USER_DLM_LOCK_ID_MAX_LEN
);
587 memcpy(lockres
->l_name
,
590 lockres
->l_namelen
= dentry
->d_name
.len
;
593 int user_dlm_destroy_lock(struct user_lock_res
*lockres
)
596 struct ocfs2_cluster_connection
*conn
=
597 cluster_connection_from_user_lockres(lockres
);
599 mlog(ML_BASTS
, "lockres %.*s\n", lockres
->l_namelen
, lockres
->l_name
);
601 spin_lock(&lockres
->l_lock
);
602 if (lockres
->l_flags
& USER_LOCK_IN_TEARDOWN
) {
603 spin_unlock(&lockres
->l_lock
);
607 lockres
->l_flags
|= USER_LOCK_IN_TEARDOWN
;
609 while (lockres
->l_flags
& USER_LOCK_BUSY
) {
610 spin_unlock(&lockres
->l_lock
);
612 user_wait_on_busy_lock(lockres
);
614 spin_lock(&lockres
->l_lock
);
617 if (lockres
->l_ro_holders
|| lockres
->l_ex_holders
) {
618 spin_unlock(&lockres
->l_lock
);
623 if (!(lockres
->l_flags
& USER_LOCK_ATTACHED
)) {
624 spin_unlock(&lockres
->l_lock
);
628 lockres
->l_flags
&= ~USER_LOCK_ATTACHED
;
629 lockres
->l_flags
|= USER_LOCK_BUSY
;
630 spin_unlock(&lockres
->l_lock
);
632 status
= ocfs2_dlm_unlock(conn
, &lockres
->l_lksb
, DLM_LKF_VALBLK
);
634 user_log_dlm_error("ocfs2_dlm_unlock", status
, lockres
);
638 user_wait_on_busy_lock(lockres
);
645 static void user_dlm_recovery_handler_noop(int node_num
,
648 /* We ignore recovery events */
652 void user_dlm_set_locking_protocol(void)
654 ocfs2_stack_glue_set_max_proto_version(&user_dlm_lproto
.lp_max_version
);
657 struct ocfs2_cluster_connection
*user_dlm_register(const struct qstr
*name
)
660 struct ocfs2_cluster_connection
*conn
;
662 rc
= ocfs2_cluster_connect_agnostic(name
->name
, name
->len
,
664 user_dlm_recovery_handler_noop
,
669 return rc
? ERR_PTR(rc
) : conn
;
672 void user_dlm_unregister(struct ocfs2_cluster_connection
*conn
)
674 ocfs2_cluster_disconnect(conn
, 0);