mm: do_migrate_pages() calls migrate_to_node() even if task is already on a correct...
[linux/fpc-iii.git] / fs / dlm / lockspace.c
blobca506abbdd3b88abdc990e9df6c7dc5b369367b5
1 /******************************************************************************
2 *******************************************************************************
3 **
4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5 ** Copyright (C) 2004-2011 Red Hat, Inc. All rights reserved.
6 **
7 ** This copyrighted material is made available to anyone wishing to use,
8 ** modify, copy, or redistribute it subject to the terms and conditions
9 ** of the GNU General Public License v.2.
11 *******************************************************************************
12 ******************************************************************************/
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "dir.h"
19 #include "lowcomms.h"
20 #include "config.h"
21 #include "memory.h"
22 #include "lock.h"
23 #include "recover.h"
24 #include "requestqueue.h"
25 #include "user.h"
26 #include "ast.h"
28 static int ls_count;
29 static struct mutex ls_lock;
30 static struct list_head lslist;
31 static spinlock_t lslist_lock;
32 static struct task_struct * scand_task;
35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
37 ssize_t ret = len;
38 int n = simple_strtol(buf, NULL, 0);
40 ls = dlm_find_lockspace_local(ls->ls_local_handle);
41 if (!ls)
42 return -EINVAL;
44 switch (n) {
45 case 0:
46 dlm_ls_stop(ls);
47 break;
48 case 1:
49 dlm_ls_start(ls);
50 break;
51 default:
52 ret = -EINVAL;
54 dlm_put_lockspace(ls);
55 return ret;
58 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
60 ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
61 set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
62 wake_up(&ls->ls_uevent_wait);
63 return len;
66 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
68 return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
71 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
73 ls->ls_global_id = simple_strtoul(buf, NULL, 0);
74 return len;
77 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
79 return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
82 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
84 int val = simple_strtoul(buf, NULL, 0);
85 if (val == 1)
86 set_bit(LSFL_NODIR, &ls->ls_flags);
87 return len;
90 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
92 uint32_t status = dlm_recover_status(ls);
93 return snprintf(buf, PAGE_SIZE, "%x\n", status);
96 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
98 return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
101 struct dlm_attr {
102 struct attribute attr;
103 ssize_t (*show)(struct dlm_ls *, char *);
104 ssize_t (*store)(struct dlm_ls *, const char *, size_t);
107 static struct dlm_attr dlm_attr_control = {
108 .attr = {.name = "control", .mode = S_IWUSR},
109 .store = dlm_control_store
112 static struct dlm_attr dlm_attr_event = {
113 .attr = {.name = "event_done", .mode = S_IWUSR},
114 .store = dlm_event_store
117 static struct dlm_attr dlm_attr_id = {
118 .attr = {.name = "id", .mode = S_IRUGO | S_IWUSR},
119 .show = dlm_id_show,
120 .store = dlm_id_store
123 static struct dlm_attr dlm_attr_nodir = {
124 .attr = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
125 .show = dlm_nodir_show,
126 .store = dlm_nodir_store
129 static struct dlm_attr dlm_attr_recover_status = {
130 .attr = {.name = "recover_status", .mode = S_IRUGO},
131 .show = dlm_recover_status_show
134 static struct dlm_attr dlm_attr_recover_nodeid = {
135 .attr = {.name = "recover_nodeid", .mode = S_IRUGO},
136 .show = dlm_recover_nodeid_show
139 static struct attribute *dlm_attrs[] = {
140 &dlm_attr_control.attr,
141 &dlm_attr_event.attr,
142 &dlm_attr_id.attr,
143 &dlm_attr_nodir.attr,
144 &dlm_attr_recover_status.attr,
145 &dlm_attr_recover_nodeid.attr,
146 NULL,
149 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
150 char *buf)
152 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
153 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
154 return a->show ? a->show(ls, buf) : 0;
157 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
158 const char *buf, size_t len)
160 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
161 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
162 return a->store ? a->store(ls, buf, len) : len;
165 static void lockspace_kobj_release(struct kobject *k)
167 struct dlm_ls *ls = container_of(k, struct dlm_ls, ls_kobj);
168 kfree(ls);
171 static const struct sysfs_ops dlm_attr_ops = {
172 .show = dlm_attr_show,
173 .store = dlm_attr_store,
176 static struct kobj_type dlm_ktype = {
177 .default_attrs = dlm_attrs,
178 .sysfs_ops = &dlm_attr_ops,
179 .release = lockspace_kobj_release,
182 static struct kset *dlm_kset;
184 static int do_uevent(struct dlm_ls *ls, int in)
186 int error;
188 if (in)
189 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
190 else
191 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
193 log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving");
195 /* dlm_controld will see the uevent, do the necessary group management
196 and then write to sysfs to wake us */
198 error = wait_event_interruptible(ls->ls_uevent_wait,
199 test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
201 log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result);
203 if (error)
204 goto out;
206 error = ls->ls_uevent_result;
207 out:
208 if (error)
209 log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
210 error, ls->ls_uevent_result);
211 return error;
214 static int dlm_uevent(struct kset *kset, struct kobject *kobj,
215 struct kobj_uevent_env *env)
217 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
219 add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
220 return 0;
223 static struct kset_uevent_ops dlm_uevent_ops = {
224 .uevent = dlm_uevent,
227 int __init dlm_lockspace_init(void)
229 ls_count = 0;
230 mutex_init(&ls_lock);
231 INIT_LIST_HEAD(&lslist);
232 spin_lock_init(&lslist_lock);
234 dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
235 if (!dlm_kset) {
236 printk(KERN_WARNING "%s: can not create kset\n", __func__);
237 return -ENOMEM;
239 return 0;
242 void dlm_lockspace_exit(void)
244 kset_unregister(dlm_kset);
247 static struct dlm_ls *find_ls_to_scan(void)
249 struct dlm_ls *ls;
251 spin_lock(&lslist_lock);
252 list_for_each_entry(ls, &lslist, ls_list) {
253 if (time_after_eq(jiffies, ls->ls_scan_time +
254 dlm_config.ci_scan_secs * HZ)) {
255 spin_unlock(&lslist_lock);
256 return ls;
259 spin_unlock(&lslist_lock);
260 return NULL;
263 static int dlm_scand(void *data)
265 struct dlm_ls *ls;
267 while (!kthread_should_stop()) {
268 ls = find_ls_to_scan();
269 if (ls) {
270 if (dlm_lock_recovery_try(ls)) {
271 ls->ls_scan_time = jiffies;
272 dlm_scan_rsbs(ls);
273 dlm_scan_timeout(ls);
274 dlm_scan_waiters(ls);
275 dlm_unlock_recovery(ls);
276 } else {
277 ls->ls_scan_time += HZ;
279 continue;
281 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
283 return 0;
286 static int dlm_scand_start(void)
288 struct task_struct *p;
289 int error = 0;
291 p = kthread_run(dlm_scand, NULL, "dlm_scand");
292 if (IS_ERR(p))
293 error = PTR_ERR(p);
294 else
295 scand_task = p;
296 return error;
299 static void dlm_scand_stop(void)
301 kthread_stop(scand_task);
304 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
306 struct dlm_ls *ls;
308 spin_lock(&lslist_lock);
310 list_for_each_entry(ls, &lslist, ls_list) {
311 if (ls->ls_global_id == id) {
312 ls->ls_count++;
313 goto out;
316 ls = NULL;
317 out:
318 spin_unlock(&lslist_lock);
319 return ls;
322 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
324 struct dlm_ls *ls;
326 spin_lock(&lslist_lock);
327 list_for_each_entry(ls, &lslist, ls_list) {
328 if (ls->ls_local_handle == lockspace) {
329 ls->ls_count++;
330 goto out;
333 ls = NULL;
334 out:
335 spin_unlock(&lslist_lock);
336 return ls;
339 struct dlm_ls *dlm_find_lockspace_device(int minor)
341 struct dlm_ls *ls;
343 spin_lock(&lslist_lock);
344 list_for_each_entry(ls, &lslist, ls_list) {
345 if (ls->ls_device.minor == minor) {
346 ls->ls_count++;
347 goto out;
350 ls = NULL;
351 out:
352 spin_unlock(&lslist_lock);
353 return ls;
356 void dlm_put_lockspace(struct dlm_ls *ls)
358 spin_lock(&lslist_lock);
359 ls->ls_count--;
360 spin_unlock(&lslist_lock);
363 static void remove_lockspace(struct dlm_ls *ls)
365 for (;;) {
366 spin_lock(&lslist_lock);
367 if (ls->ls_count == 0) {
368 WARN_ON(ls->ls_create_count != 0);
369 list_del(&ls->ls_list);
370 spin_unlock(&lslist_lock);
371 return;
373 spin_unlock(&lslist_lock);
374 ssleep(1);
378 static int threads_start(void)
380 int error;
382 error = dlm_scand_start();
383 if (error) {
384 log_print("cannot start dlm_scand thread %d", error);
385 goto fail;
388 /* Thread for sending/receiving messages for all lockspace's */
389 error = dlm_lowcomms_start();
390 if (error) {
391 log_print("cannot start dlm lowcomms %d", error);
392 goto scand_fail;
395 return 0;
397 scand_fail:
398 dlm_scand_stop();
399 fail:
400 return error;
403 static void threads_stop(void)
405 dlm_scand_stop();
406 dlm_lowcomms_stop();
409 static int new_lockspace(const char *name, const char *cluster,
410 uint32_t flags, int lvblen,
411 const struct dlm_lockspace_ops *ops, void *ops_arg,
412 int *ops_result, dlm_lockspace_t **lockspace)
414 struct dlm_ls *ls;
415 int i, size, error;
416 int do_unreg = 0;
417 int namelen = strlen(name);
419 if (namelen > DLM_LOCKSPACE_LEN)
420 return -EINVAL;
422 if (!lvblen || (lvblen % 8))
423 return -EINVAL;
425 if (!try_module_get(THIS_MODULE))
426 return -EINVAL;
428 if (!dlm_user_daemon_available()) {
429 log_print("dlm user daemon not available");
430 error = -EUNATCH;
431 goto out;
434 if (ops && ops_result) {
435 if (!dlm_config.ci_recover_callbacks)
436 *ops_result = -EOPNOTSUPP;
437 else
438 *ops_result = 0;
441 if (dlm_config.ci_recover_callbacks && cluster &&
442 strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
443 log_print("dlm cluster name %s mismatch %s",
444 dlm_config.ci_cluster_name, cluster);
445 error = -EBADR;
446 goto out;
449 error = 0;
451 spin_lock(&lslist_lock);
452 list_for_each_entry(ls, &lslist, ls_list) {
453 WARN_ON(ls->ls_create_count <= 0);
454 if (ls->ls_namelen != namelen)
455 continue;
456 if (memcmp(ls->ls_name, name, namelen))
457 continue;
458 if (flags & DLM_LSFL_NEWEXCL) {
459 error = -EEXIST;
460 break;
462 ls->ls_create_count++;
463 *lockspace = ls;
464 error = 1;
465 break;
467 spin_unlock(&lslist_lock);
469 if (error)
470 goto out;
472 error = -ENOMEM;
474 ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
475 if (!ls)
476 goto out;
477 memcpy(ls->ls_name, name, namelen);
478 ls->ls_namelen = namelen;
479 ls->ls_lvblen = lvblen;
480 ls->ls_count = 0;
481 ls->ls_flags = 0;
482 ls->ls_scan_time = jiffies;
484 if (ops && dlm_config.ci_recover_callbacks) {
485 ls->ls_ops = ops;
486 ls->ls_ops_arg = ops_arg;
489 if (flags & DLM_LSFL_TIMEWARN)
490 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
492 /* ls_exflags are forced to match among nodes, and we don't
493 need to require all nodes to have some flags set */
494 ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
495 DLM_LSFL_NEWEXCL));
497 size = dlm_config.ci_rsbtbl_size;
498 ls->ls_rsbtbl_size = size;
500 ls->ls_rsbtbl = vmalloc(sizeof(struct dlm_rsbtable) * size);
501 if (!ls->ls_rsbtbl)
502 goto out_lsfree;
503 for (i = 0; i < size; i++) {
504 ls->ls_rsbtbl[i].keep.rb_node = NULL;
505 ls->ls_rsbtbl[i].toss.rb_node = NULL;
506 spin_lock_init(&ls->ls_rsbtbl[i].lock);
509 idr_init(&ls->ls_lkbidr);
510 spin_lock_init(&ls->ls_lkbidr_spin);
512 size = dlm_config.ci_dirtbl_size;
513 ls->ls_dirtbl_size = size;
515 ls->ls_dirtbl = vmalloc(sizeof(struct dlm_dirtable) * size);
516 if (!ls->ls_dirtbl)
517 goto out_lkbfree;
518 for (i = 0; i < size; i++) {
519 INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
520 spin_lock_init(&ls->ls_dirtbl[i].lock);
523 INIT_LIST_HEAD(&ls->ls_waiters);
524 mutex_init(&ls->ls_waiters_mutex);
525 INIT_LIST_HEAD(&ls->ls_orphans);
526 mutex_init(&ls->ls_orphans_mutex);
527 INIT_LIST_HEAD(&ls->ls_timeout);
528 mutex_init(&ls->ls_timeout_mutex);
530 INIT_LIST_HEAD(&ls->ls_new_rsb);
531 spin_lock_init(&ls->ls_new_rsb_spin);
533 INIT_LIST_HEAD(&ls->ls_nodes);
534 INIT_LIST_HEAD(&ls->ls_nodes_gone);
535 ls->ls_num_nodes = 0;
536 ls->ls_low_nodeid = 0;
537 ls->ls_total_weight = 0;
538 ls->ls_node_array = NULL;
540 memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
541 ls->ls_stub_rsb.res_ls = ls;
543 ls->ls_debug_rsb_dentry = NULL;
544 ls->ls_debug_waiters_dentry = NULL;
546 init_waitqueue_head(&ls->ls_uevent_wait);
547 ls->ls_uevent_result = 0;
548 init_completion(&ls->ls_members_done);
549 ls->ls_members_result = -1;
551 mutex_init(&ls->ls_cb_mutex);
552 INIT_LIST_HEAD(&ls->ls_cb_delay);
554 ls->ls_recoverd_task = NULL;
555 mutex_init(&ls->ls_recoverd_active);
556 spin_lock_init(&ls->ls_recover_lock);
557 spin_lock_init(&ls->ls_rcom_spin);
558 get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
559 ls->ls_recover_status = 0;
560 ls->ls_recover_seq = 0;
561 ls->ls_recover_args = NULL;
562 init_rwsem(&ls->ls_in_recovery);
563 init_rwsem(&ls->ls_recv_active);
564 INIT_LIST_HEAD(&ls->ls_requestqueue);
565 mutex_init(&ls->ls_requestqueue_mutex);
566 mutex_init(&ls->ls_clear_proc_locks);
568 ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
569 if (!ls->ls_recover_buf)
570 goto out_dirfree;
572 ls->ls_slot = 0;
573 ls->ls_num_slots = 0;
574 ls->ls_slots_size = 0;
575 ls->ls_slots = NULL;
577 INIT_LIST_HEAD(&ls->ls_recover_list);
578 spin_lock_init(&ls->ls_recover_list_lock);
579 ls->ls_recover_list_count = 0;
580 ls->ls_local_handle = ls;
581 init_waitqueue_head(&ls->ls_wait_general);
582 INIT_LIST_HEAD(&ls->ls_root_list);
583 init_rwsem(&ls->ls_root_sem);
585 down_write(&ls->ls_in_recovery);
587 spin_lock(&lslist_lock);
588 ls->ls_create_count = 1;
589 list_add(&ls->ls_list, &lslist);
590 spin_unlock(&lslist_lock);
592 if (flags & DLM_LSFL_FS) {
593 error = dlm_callback_start(ls);
594 if (error) {
595 log_error(ls, "can't start dlm_callback %d", error);
596 goto out_delist;
600 /* needs to find ls in lslist */
601 error = dlm_recoverd_start(ls);
602 if (error) {
603 log_error(ls, "can't start dlm_recoverd %d", error);
604 goto out_callback;
607 ls->ls_kobj.kset = dlm_kset;
608 error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
609 "%s", ls->ls_name);
610 if (error)
611 goto out_recoverd;
612 kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
614 /* let kobject handle freeing of ls if there's an error */
615 do_unreg = 1;
617 /* This uevent triggers dlm_controld in userspace to add us to the
618 group of nodes that are members of this lockspace (managed by the
619 cluster infrastructure.) Once it's done that, it tells us who the
620 current lockspace members are (via configfs) and then tells the
621 lockspace to start running (via sysfs) in dlm_ls_start(). */
623 error = do_uevent(ls, 1);
624 if (error)
625 goto out_recoverd;
627 wait_for_completion(&ls->ls_members_done);
628 error = ls->ls_members_result;
629 if (error)
630 goto out_members;
632 dlm_create_debug_file(ls);
634 log_debug(ls, "join complete");
635 *lockspace = ls;
636 return 0;
638 out_members:
639 do_uevent(ls, 0);
640 dlm_clear_members(ls);
641 kfree(ls->ls_node_array);
642 out_recoverd:
643 dlm_recoverd_stop(ls);
644 out_callback:
645 dlm_callback_stop(ls);
646 out_delist:
647 spin_lock(&lslist_lock);
648 list_del(&ls->ls_list);
649 spin_unlock(&lslist_lock);
650 kfree(ls->ls_recover_buf);
651 out_dirfree:
652 vfree(ls->ls_dirtbl);
653 out_lkbfree:
654 idr_destroy(&ls->ls_lkbidr);
655 vfree(ls->ls_rsbtbl);
656 out_lsfree:
657 if (do_unreg)
658 kobject_put(&ls->ls_kobj);
659 else
660 kfree(ls);
661 out:
662 module_put(THIS_MODULE);
663 return error;
666 int dlm_new_lockspace(const char *name, const char *cluster,
667 uint32_t flags, int lvblen,
668 const struct dlm_lockspace_ops *ops, void *ops_arg,
669 int *ops_result, dlm_lockspace_t **lockspace)
671 int error = 0;
673 mutex_lock(&ls_lock);
674 if (!ls_count)
675 error = threads_start();
676 if (error)
677 goto out;
679 error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
680 ops_result, lockspace);
681 if (!error)
682 ls_count++;
683 if (error > 0)
684 error = 0;
685 if (!ls_count)
686 threads_stop();
687 out:
688 mutex_unlock(&ls_lock);
689 return error;
692 static int lkb_idr_is_local(int id, void *p, void *data)
694 struct dlm_lkb *lkb = p;
696 if (!lkb->lkb_nodeid)
697 return 1;
698 return 0;
701 static int lkb_idr_is_any(int id, void *p, void *data)
703 return 1;
706 static int lkb_idr_free(int id, void *p, void *data)
708 struct dlm_lkb *lkb = p;
710 if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
711 dlm_free_lvb(lkb->lkb_lvbptr);
713 dlm_free_lkb(lkb);
714 return 0;
717 /* NOTE: We check the lkbidr here rather than the resource table.
718 This is because there may be LKBs queued as ASTs that have been unlinked
719 from their RSBs and are pending deletion once the AST has been delivered */
721 static int lockspace_busy(struct dlm_ls *ls, int force)
723 int rv;
725 spin_lock(&ls->ls_lkbidr_spin);
726 if (force == 0) {
727 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
728 } else if (force == 1) {
729 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
730 } else {
731 rv = 0;
733 spin_unlock(&ls->ls_lkbidr_spin);
734 return rv;
737 static int release_lockspace(struct dlm_ls *ls, int force)
739 struct dlm_rsb *rsb;
740 struct rb_node *n;
741 int i, busy, rv;
743 busy = lockspace_busy(ls, force);
745 spin_lock(&lslist_lock);
746 if (ls->ls_create_count == 1) {
747 if (busy) {
748 rv = -EBUSY;
749 } else {
750 /* remove_lockspace takes ls off lslist */
751 ls->ls_create_count = 0;
752 rv = 0;
754 } else if (ls->ls_create_count > 1) {
755 rv = --ls->ls_create_count;
756 } else {
757 rv = -EINVAL;
759 spin_unlock(&lslist_lock);
761 if (rv) {
762 log_debug(ls, "release_lockspace no remove %d", rv);
763 return rv;
766 dlm_device_deregister(ls);
768 if (force < 3 && dlm_user_daemon_available())
769 do_uevent(ls, 0);
771 dlm_recoverd_stop(ls);
773 dlm_callback_stop(ls);
775 remove_lockspace(ls);
777 dlm_delete_debug_file(ls);
779 kfree(ls->ls_recover_buf);
782 * Free direntry structs.
785 dlm_dir_clear(ls);
786 vfree(ls->ls_dirtbl);
789 * Free all lkb's in idr
792 idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
793 idr_remove_all(&ls->ls_lkbidr);
794 idr_destroy(&ls->ls_lkbidr);
797 * Free all rsb's on rsbtbl[] lists
800 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
801 while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
802 rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
803 rb_erase(n, &ls->ls_rsbtbl[i].keep);
804 dlm_free_rsb(rsb);
807 while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
808 rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
809 rb_erase(n, &ls->ls_rsbtbl[i].toss);
810 dlm_free_rsb(rsb);
814 vfree(ls->ls_rsbtbl);
816 while (!list_empty(&ls->ls_new_rsb)) {
817 rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
818 res_hashchain);
819 list_del(&rsb->res_hashchain);
820 dlm_free_rsb(rsb);
824 * Free structures on any other lists
827 dlm_purge_requestqueue(ls);
828 kfree(ls->ls_recover_args);
829 dlm_clear_free_entries(ls);
830 dlm_clear_members(ls);
831 dlm_clear_members_gone(ls);
832 kfree(ls->ls_node_array);
833 log_debug(ls, "release_lockspace final free");
834 kobject_put(&ls->ls_kobj);
835 /* The ls structure will be freed when the kobject is done with */
837 module_put(THIS_MODULE);
838 return 0;
842 * Called when a system has released all its locks and is not going to use the
843 * lockspace any longer. We free everything we're managing for this lockspace.
844 * Remaining nodes will go through the recovery process as if we'd died. The
845 * lockspace must continue to function as usual, participating in recoveries,
846 * until this returns.
848 * Force has 4 possible values:
849 * 0 - don't destroy locksapce if it has any LKBs
850 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
851 * 2 - destroy lockspace regardless of LKBs
852 * 3 - destroy lockspace as part of a forced shutdown
855 int dlm_release_lockspace(void *lockspace, int force)
857 struct dlm_ls *ls;
858 int error;
860 ls = dlm_find_lockspace_local(lockspace);
861 if (!ls)
862 return -EINVAL;
863 dlm_put_lockspace(ls);
865 mutex_lock(&ls_lock);
866 error = release_lockspace(ls, force);
867 if (!error)
868 ls_count--;
869 if (!ls_count)
870 threads_stop();
871 mutex_unlock(&ls_lock);
873 return error;
876 void dlm_stop_lockspaces(void)
878 struct dlm_ls *ls;
880 restart:
881 spin_lock(&lslist_lock);
882 list_for_each_entry(ls, &lslist, ls_list) {
883 if (!test_bit(LSFL_RUNNING, &ls->ls_flags))
884 continue;
885 spin_unlock(&lslist_lock);
886 log_error(ls, "no userland control daemon, stopping lockspace");
887 dlm_ls_stop(ls);
888 goto restart;
890 spin_unlock(&lslist_lock);