1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
6 ** Copyright (C) 2004-2011 Red Hat, Inc. All rights reserved.
9 *******************************************************************************
10 ******************************************************************************/
12 #include <linux/module.h>
14 #include "dlm_internal.h"
15 #include "lockspace.h"
24 #include "requestqueue.h"
29 static struct mutex ls_lock
;
30 static struct list_head lslist
;
31 static spinlock_t lslist_lock
;
33 static ssize_t
dlm_control_store(struct dlm_ls
*ls
, const char *buf
, size_t len
)
37 int rc
= kstrtoint(buf
, 0, &n
);
41 ls
= dlm_find_lockspace_local(ls
);
55 dlm_put_lockspace(ls
);
59 static ssize_t
dlm_event_store(struct dlm_ls
*ls
, const char *buf
, size_t len
)
61 int rc
= kstrtoint(buf
, 0, &ls
->ls_uevent_result
);
65 set_bit(LSFL_UEVENT_WAIT
, &ls
->ls_flags
);
66 wake_up(&ls
->ls_uevent_wait
);
70 static ssize_t
dlm_id_show(struct dlm_ls
*ls
, char *buf
)
72 return snprintf(buf
, PAGE_SIZE
, "%u\n", ls
->ls_global_id
);
75 static ssize_t
dlm_id_store(struct dlm_ls
*ls
, const char *buf
, size_t len
)
77 int rc
= kstrtouint(buf
, 0, &ls
->ls_global_id
);
84 static ssize_t
dlm_nodir_show(struct dlm_ls
*ls
, char *buf
)
86 return snprintf(buf
, PAGE_SIZE
, "%u\n", dlm_no_directory(ls
));
89 static ssize_t
dlm_nodir_store(struct dlm_ls
*ls
, const char *buf
, size_t len
)
92 int rc
= kstrtoint(buf
, 0, &val
);
97 set_bit(LSFL_NODIR
, &ls
->ls_flags
);
101 static ssize_t
dlm_recover_status_show(struct dlm_ls
*ls
, char *buf
)
103 uint32_t status
= dlm_recover_status(ls
);
104 return snprintf(buf
, PAGE_SIZE
, "%x\n", status
);
107 static ssize_t
dlm_recover_nodeid_show(struct dlm_ls
*ls
, char *buf
)
109 return snprintf(buf
, PAGE_SIZE
, "%d\n", ls
->ls_recover_nodeid
);
113 struct attribute attr
;
114 ssize_t (*show
)(struct dlm_ls
*, char *);
115 ssize_t (*store
)(struct dlm_ls
*, const char *, size_t);
118 static struct dlm_attr dlm_attr_control
= {
119 .attr
= {.name
= "control", .mode
= S_IWUSR
},
120 .store
= dlm_control_store
123 static struct dlm_attr dlm_attr_event
= {
124 .attr
= {.name
= "event_done", .mode
= S_IWUSR
},
125 .store
= dlm_event_store
128 static struct dlm_attr dlm_attr_id
= {
129 .attr
= {.name
= "id", .mode
= S_IRUGO
| S_IWUSR
},
131 .store
= dlm_id_store
134 static struct dlm_attr dlm_attr_nodir
= {
135 .attr
= {.name
= "nodir", .mode
= S_IRUGO
| S_IWUSR
},
136 .show
= dlm_nodir_show
,
137 .store
= dlm_nodir_store
140 static struct dlm_attr dlm_attr_recover_status
= {
141 .attr
= {.name
= "recover_status", .mode
= S_IRUGO
},
142 .show
= dlm_recover_status_show
145 static struct dlm_attr dlm_attr_recover_nodeid
= {
146 .attr
= {.name
= "recover_nodeid", .mode
= S_IRUGO
},
147 .show
= dlm_recover_nodeid_show
150 static struct attribute
*dlm_attrs
[] = {
151 &dlm_attr_control
.attr
,
152 &dlm_attr_event
.attr
,
154 &dlm_attr_nodir
.attr
,
155 &dlm_attr_recover_status
.attr
,
156 &dlm_attr_recover_nodeid
.attr
,
159 ATTRIBUTE_GROUPS(dlm
);
161 static ssize_t
dlm_attr_show(struct kobject
*kobj
, struct attribute
*attr
,
164 struct dlm_ls
*ls
= container_of(kobj
, struct dlm_ls
, ls_kobj
);
165 struct dlm_attr
*a
= container_of(attr
, struct dlm_attr
, attr
);
166 return a
->show
? a
->show(ls
, buf
) : 0;
169 static ssize_t
dlm_attr_store(struct kobject
*kobj
, struct attribute
*attr
,
170 const char *buf
, size_t len
)
172 struct dlm_ls
*ls
= container_of(kobj
, struct dlm_ls
, ls_kobj
);
173 struct dlm_attr
*a
= container_of(attr
, struct dlm_attr
, attr
);
174 return a
->store
? a
->store(ls
, buf
, len
) : len
;
177 static const struct sysfs_ops dlm_attr_ops
= {
178 .show
= dlm_attr_show
,
179 .store
= dlm_attr_store
,
182 static struct kobj_type dlm_ktype
= {
183 .default_groups
= dlm_groups
,
184 .sysfs_ops
= &dlm_attr_ops
,
187 static struct kset
*dlm_kset
;
189 static int do_uevent(struct dlm_ls
*ls
, int in
)
192 kobject_uevent(&ls
->ls_kobj
, KOBJ_ONLINE
);
194 kobject_uevent(&ls
->ls_kobj
, KOBJ_OFFLINE
);
196 log_rinfo(ls
, "%s the lockspace group...", in
? "joining" : "leaving");
198 /* dlm_controld will see the uevent, do the necessary group management
199 and then write to sysfs to wake us */
201 wait_event(ls
->ls_uevent_wait
,
202 test_and_clear_bit(LSFL_UEVENT_WAIT
, &ls
->ls_flags
));
204 log_rinfo(ls
, "group event done %d", ls
->ls_uevent_result
);
206 return ls
->ls_uevent_result
;
209 static int dlm_uevent(const struct kobject
*kobj
, struct kobj_uevent_env
*env
)
211 const struct dlm_ls
*ls
= container_of(kobj
, struct dlm_ls
, ls_kobj
);
213 add_uevent_var(env
, "LOCKSPACE=%s", ls
->ls_name
);
217 static const struct kset_uevent_ops dlm_uevent_ops
= {
218 .uevent
= dlm_uevent
,
221 int __init
dlm_lockspace_init(void)
224 mutex_init(&ls_lock
);
225 INIT_LIST_HEAD(&lslist
);
226 spin_lock_init(&lslist_lock
);
228 dlm_kset
= kset_create_and_add("dlm", &dlm_uevent_ops
, kernel_kobj
);
230 printk(KERN_WARNING
"%s: can not create kset\n", __func__
);
236 void dlm_lockspace_exit(void)
238 kset_unregister(dlm_kset
);
241 struct dlm_ls
*dlm_find_lockspace_global(uint32_t id
)
245 spin_lock_bh(&lslist_lock
);
247 list_for_each_entry(ls
, &lslist
, ls_list
) {
248 if (ls
->ls_global_id
== id
) {
249 atomic_inc(&ls
->ls_count
);
255 spin_unlock_bh(&lslist_lock
);
259 struct dlm_ls
*dlm_find_lockspace_local(dlm_lockspace_t
*lockspace
)
261 struct dlm_ls
*ls
= lockspace
;
263 atomic_inc(&ls
->ls_count
);
267 struct dlm_ls
*dlm_find_lockspace_device(int minor
)
271 spin_lock_bh(&lslist_lock
);
272 list_for_each_entry(ls
, &lslist
, ls_list
) {
273 if (ls
->ls_device
.minor
== minor
) {
274 atomic_inc(&ls
->ls_count
);
280 spin_unlock_bh(&lslist_lock
);
284 void dlm_put_lockspace(struct dlm_ls
*ls
)
286 if (atomic_dec_and_test(&ls
->ls_count
))
287 wake_up(&ls
->ls_count_wait
);
290 static void remove_lockspace(struct dlm_ls
*ls
)
293 wait_event(ls
->ls_count_wait
, atomic_read(&ls
->ls_count
) == 0);
295 spin_lock_bh(&lslist_lock
);
296 if (atomic_read(&ls
->ls_count
) != 0) {
297 spin_unlock_bh(&lslist_lock
);
301 WARN_ON(ls
->ls_create_count
!= 0);
302 list_del(&ls
->ls_list
);
303 spin_unlock_bh(&lslist_lock
);
306 static int threads_start(void)
310 /* Thread for sending/receiving messages for all lockspace's */
311 error
= dlm_midcomms_start();
313 log_print("cannot start dlm midcomms %d", error
);
318 static int lkb_idr_free(struct dlm_lkb
*lkb
)
320 if (lkb
->lkb_lvbptr
&& test_bit(DLM_IFL_MSTCPY_BIT
, &lkb
->lkb_iflags
))
321 dlm_free_lvb(lkb
->lkb_lvbptr
);
327 static void rhash_free_rsb(void *ptr
, void *arg
)
329 struct dlm_rsb
*rsb
= ptr
;
334 static void free_lockspace(struct work_struct
*work
)
336 struct dlm_ls
*ls
= container_of(work
, struct dlm_ls
, ls_free_work
);
341 * Free all lkb's in xa
343 xa_for_each(&ls
->ls_lkbxa
, id
, lkb
) {
346 xa_destroy(&ls
->ls_lkbxa
);
349 * Free all rsb's on rsbtbl
351 rhashtable_free_and_destroy(&ls
->ls_rsbtbl
, rhash_free_rsb
, NULL
);
356 static int new_lockspace(const char *name
, const char *cluster
,
357 uint32_t flags
, int lvblen
,
358 const struct dlm_lockspace_ops
*ops
, void *ops_arg
,
359 int *ops_result
, dlm_lockspace_t
**lockspace
)
362 int namelen
= strlen(name
);
365 if (namelen
> DLM_LOCKSPACE_LEN
|| namelen
== 0)
371 if (!try_module_get(THIS_MODULE
))
374 if (!dlm_user_daemon_available()) {
375 log_print("dlm user daemon not available");
380 if (ops
&& ops_result
) {
381 if (!dlm_config
.ci_recover_callbacks
)
382 *ops_result
= -EOPNOTSUPP
;
388 log_print("dlm cluster name '%s' is being used without an application provided cluster name",
389 dlm_config
.ci_cluster_name
);
391 if (dlm_config
.ci_recover_callbacks
&& cluster
&&
392 strncmp(cluster
, dlm_config
.ci_cluster_name
, DLM_LOCKSPACE_LEN
)) {
393 log_print("dlm cluster name '%s' does not match "
394 "the application cluster name '%s'",
395 dlm_config
.ci_cluster_name
, cluster
);
402 spin_lock_bh(&lslist_lock
);
403 list_for_each_entry(ls
, &lslist
, ls_list
) {
404 WARN_ON(ls
->ls_create_count
<= 0);
405 if (ls
->ls_namelen
!= namelen
)
407 if (memcmp(ls
->ls_name
, name
, namelen
))
409 if (flags
& DLM_LSFL_NEWEXCL
) {
413 ls
->ls_create_count
++;
418 spin_unlock_bh(&lslist_lock
);
425 ls
= kzalloc(sizeof(*ls
), GFP_NOFS
);
428 memcpy(ls
->ls_name
, name
, namelen
);
429 ls
->ls_namelen
= namelen
;
430 ls
->ls_lvblen
= lvblen
;
431 atomic_set(&ls
->ls_count
, 0);
432 init_waitqueue_head(&ls
->ls_count_wait
);
435 if (ops
&& dlm_config
.ci_recover_callbacks
) {
437 ls
->ls_ops_arg
= ops_arg
;
440 if (flags
& DLM_LSFL_SOFTIRQ
)
441 set_bit(LSFL_SOFTIRQ
, &ls
->ls_flags
);
443 /* ls_exflags are forced to match among nodes, and we don't
444 * need to require all nodes to have some flags set
446 ls
->ls_exflags
= (flags
& ~(DLM_LSFL_FS
| DLM_LSFL_NEWEXCL
|
449 INIT_LIST_HEAD(&ls
->ls_slow_inactive
);
450 INIT_LIST_HEAD(&ls
->ls_slow_active
);
451 rwlock_init(&ls
->ls_rsbtbl_lock
);
453 error
= rhashtable_init(&ls
->ls_rsbtbl
, &dlm_rhash_rsb_params
);
457 xa_init_flags(&ls
->ls_lkbxa
, XA_FLAGS_ALLOC
| XA_FLAGS_LOCK_BH
);
458 rwlock_init(&ls
->ls_lkbxa_lock
);
460 INIT_LIST_HEAD(&ls
->ls_waiters
);
461 spin_lock_init(&ls
->ls_waiters_lock
);
462 INIT_LIST_HEAD(&ls
->ls_orphans
);
463 spin_lock_init(&ls
->ls_orphans_lock
);
465 INIT_LIST_HEAD(&ls
->ls_nodes
);
466 INIT_LIST_HEAD(&ls
->ls_nodes_gone
);
467 ls
->ls_num_nodes
= 0;
468 ls
->ls_low_nodeid
= 0;
469 ls
->ls_total_weight
= 0;
470 ls
->ls_node_array
= NULL
;
472 memset(&ls
->ls_local_rsb
, 0, sizeof(struct dlm_rsb
));
473 ls
->ls_local_rsb
.res_ls
= ls
;
475 ls
->ls_debug_rsb_dentry
= NULL
;
476 ls
->ls_debug_waiters_dentry
= NULL
;
478 init_waitqueue_head(&ls
->ls_uevent_wait
);
479 ls
->ls_uevent_result
= 0;
480 init_completion(&ls
->ls_recovery_done
);
481 ls
->ls_recovery_result
= -1;
483 spin_lock_init(&ls
->ls_cb_lock
);
484 INIT_LIST_HEAD(&ls
->ls_cb_delay
);
486 INIT_WORK(&ls
->ls_free_work
, free_lockspace
);
488 ls
->ls_recoverd_task
= NULL
;
489 mutex_init(&ls
->ls_recoverd_active
);
490 spin_lock_init(&ls
->ls_recover_lock
);
491 spin_lock_init(&ls
->ls_rcom_spin
);
492 get_random_bytes(&ls
->ls_rcom_seq
, sizeof(uint64_t));
493 ls
->ls_recover_status
= 0;
494 ls
->ls_recover_seq
= get_random_u64();
495 ls
->ls_recover_args
= NULL
;
496 init_rwsem(&ls
->ls_in_recovery
);
497 rwlock_init(&ls
->ls_recv_active
);
498 INIT_LIST_HEAD(&ls
->ls_requestqueue
);
499 rwlock_init(&ls
->ls_requestqueue_lock
);
500 spin_lock_init(&ls
->ls_clear_proc_locks
);
502 /* Due backwards compatibility with 3.1 we need to use maximum
503 * possible dlm message size to be sure the message will fit and
504 * not having out of bounds issues. However on sending side 3.2
507 ls
->ls_recover_buf
= kmalloc(DLM_MAX_SOCKET_BUFSIZE
, GFP_NOFS
);
508 if (!ls
->ls_recover_buf
) {
514 ls
->ls_num_slots
= 0;
515 ls
->ls_slots_size
= 0;
518 INIT_LIST_HEAD(&ls
->ls_recover_list
);
519 spin_lock_init(&ls
->ls_recover_list_lock
);
520 xa_init_flags(&ls
->ls_recover_xa
, XA_FLAGS_ALLOC
| XA_FLAGS_LOCK_BH
);
521 spin_lock_init(&ls
->ls_recover_xa_lock
);
522 ls
->ls_recover_list_count
= 0;
523 init_waitqueue_head(&ls
->ls_wait_general
);
524 INIT_LIST_HEAD(&ls
->ls_masters_list
);
525 rwlock_init(&ls
->ls_masters_lock
);
526 INIT_LIST_HEAD(&ls
->ls_dir_dump_list
);
527 rwlock_init(&ls
->ls_dir_dump_lock
);
529 INIT_LIST_HEAD(&ls
->ls_scan_list
);
530 spin_lock_init(&ls
->ls_scan_lock
);
531 timer_setup(&ls
->ls_scan_timer
, dlm_rsb_scan
, TIMER_DEFERRABLE
);
533 spin_lock_bh(&lslist_lock
);
534 ls
->ls_create_count
= 1;
535 list_add(&ls
->ls_list
, &lslist
);
536 spin_unlock_bh(&lslist_lock
);
538 if (flags
& DLM_LSFL_FS
)
539 set_bit(LSFL_FS
, &ls
->ls_flags
);
541 error
= dlm_callback_start(ls
);
543 log_error(ls
, "can't start dlm_callback %d", error
);
547 init_waitqueue_head(&ls
->ls_recover_lock_wait
);
550 * Once started, dlm_recoverd first looks for ls in lslist, then
551 * initializes ls_in_recovery as locked in "down" mode. We need
552 * to wait for the wakeup from dlm_recoverd because in_recovery
553 * has to start out in down mode.
556 error
= dlm_recoverd_start(ls
);
558 log_error(ls
, "can't start dlm_recoverd %d", error
);
562 wait_event(ls
->ls_recover_lock_wait
,
563 test_bit(LSFL_RECOVER_LOCK
, &ls
->ls_flags
));
565 ls
->ls_kobj
.kset
= dlm_kset
;
566 error
= kobject_init_and_add(&ls
->ls_kobj
, &dlm_ktype
, NULL
,
570 kobject_uevent(&ls
->ls_kobj
, KOBJ_ADD
);
572 /* This uevent triggers dlm_controld in userspace to add us to the
573 group of nodes that are members of this lockspace (managed by the
574 cluster infrastructure.) Once it's done that, it tells us who the
575 current lockspace members are (via configfs) and then tells the
576 lockspace to start running (via sysfs) in dlm_ls_start(). */
578 error
= do_uevent(ls
, 1);
582 /* wait until recovery is successful or failed */
583 wait_for_completion(&ls
->ls_recovery_done
);
584 error
= ls
->ls_recovery_result
;
588 dlm_create_debug_file(ls
);
590 log_rinfo(ls
, "join complete");
596 dlm_clear_members(ls
);
597 kfree(ls
->ls_node_array
);
599 dlm_recoverd_stop(ls
);
601 dlm_callback_stop(ls
);
603 spin_lock_bh(&lslist_lock
);
604 list_del(&ls
->ls_list
);
605 spin_unlock_bh(&lslist_lock
);
606 xa_destroy(&ls
->ls_recover_xa
);
607 kfree(ls
->ls_recover_buf
);
609 xa_destroy(&ls
->ls_lkbxa
);
610 rhashtable_destroy(&ls
->ls_rsbtbl
);
612 kobject_put(&ls
->ls_kobj
);
615 module_put(THIS_MODULE
);
619 static int __dlm_new_lockspace(const char *name
, const char *cluster
,
620 uint32_t flags
, int lvblen
,
621 const struct dlm_lockspace_ops
*ops
,
622 void *ops_arg
, int *ops_result
,
623 dlm_lockspace_t
**lockspace
)
627 mutex_lock(&ls_lock
);
629 error
= threads_start();
633 error
= new_lockspace(name
, cluster
, flags
, lvblen
, ops
, ops_arg
,
634 ops_result
, lockspace
);
640 dlm_midcomms_shutdown();
644 mutex_unlock(&ls_lock
);
648 int dlm_new_lockspace(const char *name
, const char *cluster
, uint32_t flags
,
649 int lvblen
, const struct dlm_lockspace_ops
*ops
,
650 void *ops_arg
, int *ops_result
,
651 dlm_lockspace_t
**lockspace
)
653 return __dlm_new_lockspace(name
, cluster
, flags
| DLM_LSFL_FS
, lvblen
,
654 ops
, ops_arg
, ops_result
, lockspace
);
657 int dlm_new_user_lockspace(const char *name
, const char *cluster
,
658 uint32_t flags
, int lvblen
,
659 const struct dlm_lockspace_ops
*ops
,
660 void *ops_arg
, int *ops_result
,
661 dlm_lockspace_t
**lockspace
)
663 if (flags
& DLM_LSFL_SOFTIRQ
)
666 return __dlm_new_lockspace(name
, cluster
, flags
, lvblen
, ops
,
667 ops_arg
, ops_result
, lockspace
);
670 /* NOTE: We check the lkbxa here rather than the resource table.
671 This is because there may be LKBs queued as ASTs that have been unlinked
672 from their RSBs and are pending deletion once the AST has been delivered */
674 static int lockspace_busy(struct dlm_ls
*ls
, int force
)
680 read_lock_bh(&ls
->ls_lkbxa_lock
);
682 xa_for_each(&ls
->ls_lkbxa
, id
, lkb
) {
686 } else if (force
== 1) {
687 xa_for_each(&ls
->ls_lkbxa
, id
, lkb
) {
688 if (lkb
->lkb_nodeid
== 0 &&
689 lkb
->lkb_grmode
!= DLM_LOCK_IV
) {
697 read_unlock_bh(&ls
->ls_lkbxa_lock
);
701 static int release_lockspace(struct dlm_ls
*ls
, int force
)
705 busy
= lockspace_busy(ls
, force
);
707 spin_lock_bh(&lslist_lock
);
708 if (ls
->ls_create_count
== 1) {
712 /* remove_lockspace takes ls off lslist */
713 ls
->ls_create_count
= 0;
716 } else if (ls
->ls_create_count
> 1) {
717 rv
= --ls
->ls_create_count
;
721 spin_unlock_bh(&lslist_lock
);
724 log_debug(ls
, "release_lockspace no remove %d", rv
);
729 dlm_midcomms_version_wait();
731 dlm_device_deregister(ls
);
733 if (force
< 3 && dlm_user_daemon_available())
736 dlm_recoverd_stop(ls
);
738 /* clear the LSFL_RUNNING flag to fast up
739 * time_shutdown_sync(), we don't care anymore
741 clear_bit(LSFL_RUNNING
, &ls
->ls_flags
);
742 timer_shutdown_sync(&ls
->ls_scan_timer
);
745 dlm_clear_members(ls
);
746 dlm_midcomms_shutdown();
749 dlm_callback_stop(ls
);
751 remove_lockspace(ls
);
753 dlm_delete_debug_file(ls
);
755 kobject_put(&ls
->ls_kobj
);
757 xa_destroy(&ls
->ls_recover_xa
);
758 kfree(ls
->ls_recover_buf
);
761 * Free structures on any other lists
764 dlm_purge_requestqueue(ls
);
765 kfree(ls
->ls_recover_args
);
766 dlm_clear_members(ls
);
767 dlm_clear_members_gone(ls
);
768 kfree(ls
->ls_node_array
);
770 log_rinfo(ls
, "%s final free", __func__
);
772 /* delayed free of data structures see free_lockspace() */
773 queue_work(dlm_wq
, &ls
->ls_free_work
);
774 module_put(THIS_MODULE
);
779 * Called when a system has released all its locks and is not going to use the
780 * lockspace any longer. We free everything we're managing for this lockspace.
781 * Remaining nodes will go through the recovery process as if we'd died. The
782 * lockspace must continue to function as usual, participating in recoveries,
783 * until this returns.
785 * Force has 4 possible values:
786 * 0 - don't destroy lockspace if it has any LKBs
787 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
788 * 2 - destroy lockspace regardless of LKBs
789 * 3 - destroy lockspace as part of a forced shutdown
792 int dlm_release_lockspace(void *lockspace
, int force
)
797 ls
= dlm_find_lockspace_local(lockspace
);
800 dlm_put_lockspace(ls
);
802 mutex_lock(&ls_lock
);
803 error
= release_lockspace(ls
, force
);
808 mutex_unlock(&ls_lock
);
813 void dlm_stop_lockspaces(void)
820 spin_lock_bh(&lslist_lock
);
821 list_for_each_entry(ls
, &lslist
, ls_list
) {
822 if (!test_bit(LSFL_RUNNING
, &ls
->ls_flags
)) {
826 spin_unlock_bh(&lslist_lock
);
827 log_error(ls
, "no userland control daemon, stopping lockspace");
831 spin_unlock_bh(&lslist_lock
);
834 log_print("dlm user daemon left %d lockspaces", count
);