mm-only debug patch...
[mmotm.git] / fs / dlm / lockspace.c
blobd489fcc86713de1d0484902d6e7f911b57377cd0
1 /******************************************************************************
2 *******************************************************************************
3 **
4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5 ** Copyright (C) 2004-2008 Red Hat, Inc. All rights reserved.
6 **
7 ** This copyrighted material is made available to anyone wishing to use,
8 ** modify, copy, or redistribute it subject to the terms and conditions
9 ** of the GNU General Public License v.2.
11 *******************************************************************************
12 ******************************************************************************/
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "ast.h"
19 #include "dir.h"
20 #include "lowcomms.h"
21 #include "config.h"
22 #include "memory.h"
23 #include "lock.h"
24 #include "recover.h"
25 #include "requestqueue.h"
26 #include "user.h"
28 static int ls_count;
29 static struct mutex ls_lock;
30 static struct list_head lslist;
31 static spinlock_t lslist_lock;
32 static struct task_struct * scand_task;
35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
37 ssize_t ret = len;
38 int n = simple_strtol(buf, NULL, 0);
40 ls = dlm_find_lockspace_local(ls->ls_local_handle);
41 if (!ls)
42 return -EINVAL;
44 switch (n) {
45 case 0:
46 dlm_ls_stop(ls);
47 break;
48 case 1:
49 dlm_ls_start(ls);
50 break;
51 default:
52 ret = -EINVAL;
54 dlm_put_lockspace(ls);
55 return ret;
58 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
60 ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
61 set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
62 wake_up(&ls->ls_uevent_wait);
63 return len;
66 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
68 return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
71 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
73 ls->ls_global_id = simple_strtoul(buf, NULL, 0);
74 return len;
77 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
79 uint32_t status = dlm_recover_status(ls);
80 return snprintf(buf, PAGE_SIZE, "%x\n", status);
83 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
85 return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
88 struct dlm_attr {
89 struct attribute attr;
90 ssize_t (*show)(struct dlm_ls *, char *);
91 ssize_t (*store)(struct dlm_ls *, const char *, size_t);
94 static struct dlm_attr dlm_attr_control = {
95 .attr = {.name = "control", .mode = S_IWUSR},
96 .store = dlm_control_store
99 static struct dlm_attr dlm_attr_event = {
100 .attr = {.name = "event_done", .mode = S_IWUSR},
101 .store = dlm_event_store
104 static struct dlm_attr dlm_attr_id = {
105 .attr = {.name = "id", .mode = S_IRUGO | S_IWUSR},
106 .show = dlm_id_show,
107 .store = dlm_id_store
110 static struct dlm_attr dlm_attr_recover_status = {
111 .attr = {.name = "recover_status", .mode = S_IRUGO},
112 .show = dlm_recover_status_show
115 static struct dlm_attr dlm_attr_recover_nodeid = {
116 .attr = {.name = "recover_nodeid", .mode = S_IRUGO},
117 .show = dlm_recover_nodeid_show
120 static struct attribute *dlm_attrs[] = {
121 &dlm_attr_control.attr,
122 &dlm_attr_event.attr,
123 &dlm_attr_id.attr,
124 &dlm_attr_recover_status.attr,
125 &dlm_attr_recover_nodeid.attr,
126 NULL,
129 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
130 char *buf)
132 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
133 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
134 return a->show ? a->show(ls, buf) : 0;
137 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
138 const char *buf, size_t len)
140 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
141 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
142 return a->store ? a->store(ls, buf, len) : len;
145 static void lockspace_kobj_release(struct kobject *k)
147 struct dlm_ls *ls = container_of(k, struct dlm_ls, ls_kobj);
148 kfree(ls);
151 static struct sysfs_ops dlm_attr_ops = {
152 .show = dlm_attr_show,
153 .store = dlm_attr_store,
156 static struct kobj_type dlm_ktype = {
157 .default_attrs = dlm_attrs,
158 .sysfs_ops = &dlm_attr_ops,
159 .release = lockspace_kobj_release,
162 static struct kset *dlm_kset;
164 static int do_uevent(struct dlm_ls *ls, int in)
166 int error;
168 if (in)
169 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
170 else
171 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
173 log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving");
175 /* dlm_controld will see the uevent, do the necessary group management
176 and then write to sysfs to wake us */
178 error = wait_event_interruptible(ls->ls_uevent_wait,
179 test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
181 log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result);
183 if (error)
184 goto out;
186 error = ls->ls_uevent_result;
187 out:
188 if (error)
189 log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
190 error, ls->ls_uevent_result);
191 return error;
195 int __init dlm_lockspace_init(void)
197 ls_count = 0;
198 mutex_init(&ls_lock);
199 INIT_LIST_HEAD(&lslist);
200 spin_lock_init(&lslist_lock);
202 dlm_kset = kset_create_and_add("dlm", NULL, kernel_kobj);
203 if (!dlm_kset) {
204 printk(KERN_WARNING "%s: can not create kset\n", __func__);
205 return -ENOMEM;
207 return 0;
210 void dlm_lockspace_exit(void)
212 kset_unregister(dlm_kset);
215 static struct dlm_ls *find_ls_to_scan(void)
217 struct dlm_ls *ls;
219 spin_lock(&lslist_lock);
220 list_for_each_entry(ls, &lslist, ls_list) {
221 if (time_after_eq(jiffies, ls->ls_scan_time +
222 dlm_config.ci_scan_secs * HZ)) {
223 spin_unlock(&lslist_lock);
224 return ls;
227 spin_unlock(&lslist_lock);
228 return NULL;
231 static int dlm_scand(void *data)
233 struct dlm_ls *ls;
234 int timeout_jiffies = dlm_config.ci_scan_secs * HZ;
236 while (!kthread_should_stop()) {
237 ls = find_ls_to_scan();
238 if (ls) {
239 if (dlm_lock_recovery_try(ls)) {
240 ls->ls_scan_time = jiffies;
241 dlm_scan_rsbs(ls);
242 dlm_scan_timeout(ls);
243 dlm_unlock_recovery(ls);
244 } else {
245 ls->ls_scan_time += HZ;
247 } else {
248 schedule_timeout_interruptible(timeout_jiffies);
251 return 0;
254 static int dlm_scand_start(void)
256 struct task_struct *p;
257 int error = 0;
259 p = kthread_run(dlm_scand, NULL, "dlm_scand");
260 if (IS_ERR(p))
261 error = PTR_ERR(p);
262 else
263 scand_task = p;
264 return error;
267 static void dlm_scand_stop(void)
269 kthread_stop(scand_task);
272 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
274 struct dlm_ls *ls;
276 spin_lock(&lslist_lock);
278 list_for_each_entry(ls, &lslist, ls_list) {
279 if (ls->ls_global_id == id) {
280 ls->ls_count++;
281 goto out;
284 ls = NULL;
285 out:
286 spin_unlock(&lslist_lock);
287 return ls;
290 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
292 struct dlm_ls *ls;
294 spin_lock(&lslist_lock);
295 list_for_each_entry(ls, &lslist, ls_list) {
296 if (ls->ls_local_handle == lockspace) {
297 ls->ls_count++;
298 goto out;
301 ls = NULL;
302 out:
303 spin_unlock(&lslist_lock);
304 return ls;
307 struct dlm_ls *dlm_find_lockspace_device(int minor)
309 struct dlm_ls *ls;
311 spin_lock(&lslist_lock);
312 list_for_each_entry(ls, &lslist, ls_list) {
313 if (ls->ls_device.minor == minor) {
314 ls->ls_count++;
315 goto out;
318 ls = NULL;
319 out:
320 spin_unlock(&lslist_lock);
321 return ls;
324 void dlm_put_lockspace(struct dlm_ls *ls)
326 spin_lock(&lslist_lock);
327 ls->ls_count--;
328 spin_unlock(&lslist_lock);
331 static void remove_lockspace(struct dlm_ls *ls)
333 for (;;) {
334 spin_lock(&lslist_lock);
335 if (ls->ls_count == 0) {
336 WARN_ON(ls->ls_create_count != 0);
337 list_del(&ls->ls_list);
338 spin_unlock(&lslist_lock);
339 return;
341 spin_unlock(&lslist_lock);
342 ssleep(1);
346 static int threads_start(void)
348 int error;
350 /* Thread which process lock requests for all lockspace's */
351 error = dlm_astd_start();
352 if (error) {
353 log_print("cannot start dlm_astd thread %d", error);
354 goto fail;
357 error = dlm_scand_start();
358 if (error) {
359 log_print("cannot start dlm_scand thread %d", error);
360 goto astd_fail;
363 /* Thread for sending/receiving messages for all lockspace's */
364 error = dlm_lowcomms_start();
365 if (error) {
366 log_print("cannot start dlm lowcomms %d", error);
367 goto scand_fail;
370 return 0;
372 scand_fail:
373 dlm_scand_stop();
374 astd_fail:
375 dlm_astd_stop();
376 fail:
377 return error;
380 static void threads_stop(void)
382 dlm_scand_stop();
383 dlm_lowcomms_stop();
384 dlm_astd_stop();
387 static int new_lockspace(const char *name, int namelen, void **lockspace,
388 uint32_t flags, int lvblen)
390 struct dlm_ls *ls;
391 int i, size, error;
392 int do_unreg = 0;
394 if (namelen > DLM_LOCKSPACE_LEN)
395 return -EINVAL;
397 if (!lvblen || (lvblen % 8))
398 return -EINVAL;
400 if (!try_module_get(THIS_MODULE))
401 return -EINVAL;
403 if (!dlm_user_daemon_available()) {
404 module_put(THIS_MODULE);
405 return -EUNATCH;
408 error = 0;
410 spin_lock(&lslist_lock);
411 list_for_each_entry(ls, &lslist, ls_list) {
412 WARN_ON(ls->ls_create_count <= 0);
413 if (ls->ls_namelen != namelen)
414 continue;
415 if (memcmp(ls->ls_name, name, namelen))
416 continue;
417 if (flags & DLM_LSFL_NEWEXCL) {
418 error = -EEXIST;
419 break;
421 ls->ls_create_count++;
422 *lockspace = ls;
423 error = 1;
424 break;
426 spin_unlock(&lslist_lock);
428 if (error)
429 goto out;
431 error = -ENOMEM;
433 ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_KERNEL);
434 if (!ls)
435 goto out;
436 memcpy(ls->ls_name, name, namelen);
437 ls->ls_namelen = namelen;
438 ls->ls_lvblen = lvblen;
439 ls->ls_count = 0;
440 ls->ls_flags = 0;
441 ls->ls_scan_time = jiffies;
443 if (flags & DLM_LSFL_TIMEWARN)
444 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
446 if (flags & DLM_LSFL_FS)
447 ls->ls_allocation = GFP_NOFS;
448 else
449 ls->ls_allocation = GFP_KERNEL;
451 /* ls_exflags are forced to match among nodes, and we don't
452 need to require all nodes to have some flags set */
453 ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
454 DLM_LSFL_NEWEXCL));
456 size = dlm_config.ci_rsbtbl_size;
457 ls->ls_rsbtbl_size = size;
459 ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_KERNEL);
460 if (!ls->ls_rsbtbl)
461 goto out_lsfree;
462 for (i = 0; i < size; i++) {
463 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
464 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
465 spin_lock_init(&ls->ls_rsbtbl[i].lock);
468 size = dlm_config.ci_lkbtbl_size;
469 ls->ls_lkbtbl_size = size;
471 ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_KERNEL);
472 if (!ls->ls_lkbtbl)
473 goto out_rsbfree;
474 for (i = 0; i < size; i++) {
475 INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
476 rwlock_init(&ls->ls_lkbtbl[i].lock);
477 ls->ls_lkbtbl[i].counter = 1;
480 size = dlm_config.ci_dirtbl_size;
481 ls->ls_dirtbl_size = size;
483 ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_KERNEL);
484 if (!ls->ls_dirtbl)
485 goto out_lkbfree;
486 for (i = 0; i < size; i++) {
487 INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
488 spin_lock_init(&ls->ls_dirtbl[i].lock);
491 INIT_LIST_HEAD(&ls->ls_waiters);
492 mutex_init(&ls->ls_waiters_mutex);
493 INIT_LIST_HEAD(&ls->ls_orphans);
494 mutex_init(&ls->ls_orphans_mutex);
495 INIT_LIST_HEAD(&ls->ls_timeout);
496 mutex_init(&ls->ls_timeout_mutex);
498 INIT_LIST_HEAD(&ls->ls_nodes);
499 INIT_LIST_HEAD(&ls->ls_nodes_gone);
500 ls->ls_num_nodes = 0;
501 ls->ls_low_nodeid = 0;
502 ls->ls_total_weight = 0;
503 ls->ls_node_array = NULL;
505 memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
506 ls->ls_stub_rsb.res_ls = ls;
508 ls->ls_debug_rsb_dentry = NULL;
509 ls->ls_debug_waiters_dentry = NULL;
511 init_waitqueue_head(&ls->ls_uevent_wait);
512 ls->ls_uevent_result = 0;
513 init_completion(&ls->ls_members_done);
514 ls->ls_members_result = -1;
516 ls->ls_recoverd_task = NULL;
517 mutex_init(&ls->ls_recoverd_active);
518 spin_lock_init(&ls->ls_recover_lock);
519 spin_lock_init(&ls->ls_rcom_spin);
520 get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
521 ls->ls_recover_status = 0;
522 ls->ls_recover_seq = 0;
523 ls->ls_recover_args = NULL;
524 init_rwsem(&ls->ls_in_recovery);
525 init_rwsem(&ls->ls_recv_active);
526 INIT_LIST_HEAD(&ls->ls_requestqueue);
527 mutex_init(&ls->ls_requestqueue_mutex);
528 mutex_init(&ls->ls_clear_proc_locks);
530 ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_KERNEL);
531 if (!ls->ls_recover_buf)
532 goto out_dirfree;
534 INIT_LIST_HEAD(&ls->ls_recover_list);
535 spin_lock_init(&ls->ls_recover_list_lock);
536 ls->ls_recover_list_count = 0;
537 ls->ls_local_handle = ls;
538 init_waitqueue_head(&ls->ls_wait_general);
539 INIT_LIST_HEAD(&ls->ls_root_list);
540 init_rwsem(&ls->ls_root_sem);
542 down_write(&ls->ls_in_recovery);
544 spin_lock(&lslist_lock);
545 ls->ls_create_count = 1;
546 list_add(&ls->ls_list, &lslist);
547 spin_unlock(&lslist_lock);
549 /* needs to find ls in lslist */
550 error = dlm_recoverd_start(ls);
551 if (error) {
552 log_error(ls, "can't start dlm_recoverd %d", error);
553 goto out_delist;
556 ls->ls_kobj.kset = dlm_kset;
557 error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
558 "%s", ls->ls_name);
559 if (error)
560 goto out_stop;
561 kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
563 /* let kobject handle freeing of ls if there's an error */
564 do_unreg = 1;
566 /* This uevent triggers dlm_controld in userspace to add us to the
567 group of nodes that are members of this lockspace (managed by the
568 cluster infrastructure.) Once it's done that, it tells us who the
569 current lockspace members are (via configfs) and then tells the
570 lockspace to start running (via sysfs) in dlm_ls_start(). */
572 error = do_uevent(ls, 1);
573 if (error)
574 goto out_stop;
576 wait_for_completion(&ls->ls_members_done);
577 error = ls->ls_members_result;
578 if (error)
579 goto out_members;
581 dlm_create_debug_file(ls);
583 log_debug(ls, "join complete");
584 *lockspace = ls;
585 return 0;
587 out_members:
588 do_uevent(ls, 0);
589 dlm_clear_members(ls);
590 kfree(ls->ls_node_array);
591 out_stop:
592 dlm_recoverd_stop(ls);
593 out_delist:
594 spin_lock(&lslist_lock);
595 list_del(&ls->ls_list);
596 spin_unlock(&lslist_lock);
597 kfree(ls->ls_recover_buf);
598 out_dirfree:
599 kfree(ls->ls_dirtbl);
600 out_lkbfree:
601 kfree(ls->ls_lkbtbl);
602 out_rsbfree:
603 kfree(ls->ls_rsbtbl);
604 out_lsfree:
605 if (do_unreg)
606 kobject_put(&ls->ls_kobj);
607 else
608 kfree(ls);
609 out:
610 module_put(THIS_MODULE);
611 return error;
614 int dlm_new_lockspace(const char *name, int namelen, void **lockspace,
615 uint32_t flags, int lvblen)
617 int error = 0;
619 mutex_lock(&ls_lock);
620 if (!ls_count)
621 error = threads_start();
622 if (error)
623 goto out;
625 error = new_lockspace(name, namelen, lockspace, flags, lvblen);
626 if (!error)
627 ls_count++;
628 if (error > 0)
629 error = 0;
630 if (!ls_count)
631 threads_stop();
632 out:
633 mutex_unlock(&ls_lock);
634 return error;
637 /* Return 1 if the lockspace still has active remote locks,
638 * 2 if the lockspace still has active local locks.
640 static int lockspace_busy(struct dlm_ls *ls)
642 int i, lkb_found = 0;
643 struct dlm_lkb *lkb;
645 /* NOTE: We check the lockidtbl here rather than the resource table.
646 This is because there may be LKBs queued as ASTs that have been
647 unlinked from their RSBs and are pending deletion once the AST has
648 been delivered */
650 for (i = 0; i < ls->ls_lkbtbl_size; i++) {
651 read_lock(&ls->ls_lkbtbl[i].lock);
652 if (!list_empty(&ls->ls_lkbtbl[i].list)) {
653 lkb_found = 1;
654 list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list,
655 lkb_idtbl_list) {
656 if (!lkb->lkb_nodeid) {
657 read_unlock(&ls->ls_lkbtbl[i].lock);
658 return 2;
662 read_unlock(&ls->ls_lkbtbl[i].lock);
664 return lkb_found;
667 static int release_lockspace(struct dlm_ls *ls, int force)
669 struct dlm_lkb *lkb;
670 struct dlm_rsb *rsb;
671 struct list_head *head;
672 int i, busy, rv;
674 busy = lockspace_busy(ls);
676 spin_lock(&lslist_lock);
677 if (ls->ls_create_count == 1) {
678 if (busy > force)
679 rv = -EBUSY;
680 else {
681 /* remove_lockspace takes ls off lslist */
682 ls->ls_create_count = 0;
683 rv = 0;
685 } else if (ls->ls_create_count > 1) {
686 rv = --ls->ls_create_count;
687 } else {
688 rv = -EINVAL;
690 spin_unlock(&lslist_lock);
692 if (rv) {
693 log_debug(ls, "release_lockspace no remove %d", rv);
694 return rv;
697 dlm_device_deregister(ls);
699 if (force < 3 && dlm_user_daemon_available())
700 do_uevent(ls, 0);
702 dlm_recoverd_stop(ls);
704 remove_lockspace(ls);
706 dlm_delete_debug_file(ls);
708 dlm_astd_suspend();
710 kfree(ls->ls_recover_buf);
713 * Free direntry structs.
716 dlm_dir_clear(ls);
717 kfree(ls->ls_dirtbl);
720 * Free all lkb's on lkbtbl[] lists.
723 for (i = 0; i < ls->ls_lkbtbl_size; i++) {
724 head = &ls->ls_lkbtbl[i].list;
725 while (!list_empty(head)) {
726 lkb = list_entry(head->next, struct dlm_lkb,
727 lkb_idtbl_list);
729 list_del(&lkb->lkb_idtbl_list);
731 dlm_del_ast(lkb);
733 if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
734 dlm_free_lvb(lkb->lkb_lvbptr);
736 dlm_free_lkb(lkb);
739 dlm_astd_resume();
741 kfree(ls->ls_lkbtbl);
744 * Free all rsb's on rsbtbl[] lists
747 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
748 head = &ls->ls_rsbtbl[i].list;
749 while (!list_empty(head)) {
750 rsb = list_entry(head->next, struct dlm_rsb,
751 res_hashchain);
753 list_del(&rsb->res_hashchain);
754 dlm_free_rsb(rsb);
757 head = &ls->ls_rsbtbl[i].toss;
758 while (!list_empty(head)) {
759 rsb = list_entry(head->next, struct dlm_rsb,
760 res_hashchain);
761 list_del(&rsb->res_hashchain);
762 dlm_free_rsb(rsb);
766 kfree(ls->ls_rsbtbl);
769 * Free structures on any other lists
772 dlm_purge_requestqueue(ls);
773 kfree(ls->ls_recover_args);
774 dlm_clear_free_entries(ls);
775 dlm_clear_members(ls);
776 dlm_clear_members_gone(ls);
777 kfree(ls->ls_node_array);
778 log_debug(ls, "release_lockspace final free");
779 kobject_put(&ls->ls_kobj);
780 /* The ls structure will be freed when the kobject is done with */
782 module_put(THIS_MODULE);
783 return 0;
787 * Called when a system has released all its locks and is not going to use the
788 * lockspace any longer. We free everything we're managing for this lockspace.
789 * Remaining nodes will go through the recovery process as if we'd died. The
790 * lockspace must continue to function as usual, participating in recoveries,
791 * until this returns.
793 * Force has 4 possible values:
794 * 0 - don't destroy locksapce if it has any LKBs
795 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
796 * 2 - destroy lockspace regardless of LKBs
797 * 3 - destroy lockspace as part of a forced shutdown
800 int dlm_release_lockspace(void *lockspace, int force)
802 struct dlm_ls *ls;
803 int error;
805 ls = dlm_find_lockspace_local(lockspace);
806 if (!ls)
807 return -EINVAL;
808 dlm_put_lockspace(ls);
810 mutex_lock(&ls_lock);
811 error = release_lockspace(ls, force);
812 if (!error)
813 ls_count--;
814 if (!ls_count)
815 threads_stop();
816 mutex_unlock(&ls_lock);
818 return error;
821 void dlm_stop_lockspaces(void)
823 struct dlm_ls *ls;
825 restart:
826 spin_lock(&lslist_lock);
827 list_for_each_entry(ls, &lslist, ls_list) {
828 if (!test_bit(LSFL_RUNNING, &ls->ls_flags))
829 continue;
830 spin_unlock(&lslist_lock);
831 log_error(ls, "no userland control daemon, stopping lockspace");
832 dlm_ls_stop(ls);
833 goto restart;
835 spin_unlock(&lslist_lock);