Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/viro/vfs-2.6
[linux/fpc-iii.git] / fs / dlm / lockspace.c
blob499e16759e96fea13329dc05d5408194fa96e4e9
1 /******************************************************************************
2 *******************************************************************************
3 **
4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5 ** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved.
6 **
7 ** This copyrighted material is made available to anyone wishing to use,
8 ** modify, copy, or redistribute it subject to the terms and conditions
9 ** of the GNU General Public License v.2.
11 *******************************************************************************
12 ******************************************************************************/
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "ast.h"
19 #include "dir.h"
20 #include "lowcomms.h"
21 #include "config.h"
22 #include "memory.h"
23 #include "lock.h"
24 #include "recover.h"
25 #include "requestqueue.h"
27 static int ls_count;
28 static struct mutex ls_lock;
29 static struct list_head lslist;
30 static spinlock_t lslist_lock;
31 static struct task_struct * scand_task;
34 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
36 ssize_t ret = len;
37 int n = simple_strtol(buf, NULL, 0);
39 ls = dlm_find_lockspace_local(ls->ls_local_handle);
40 if (!ls)
41 return -EINVAL;
43 switch (n) {
44 case 0:
45 dlm_ls_stop(ls);
46 break;
47 case 1:
48 dlm_ls_start(ls);
49 break;
50 default:
51 ret = -EINVAL;
53 dlm_put_lockspace(ls);
54 return ret;
57 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
59 ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
60 set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
61 wake_up(&ls->ls_uevent_wait);
62 return len;
65 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
67 return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
70 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
72 ls->ls_global_id = simple_strtoul(buf, NULL, 0);
73 return len;
76 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
78 uint32_t status = dlm_recover_status(ls);
79 return snprintf(buf, PAGE_SIZE, "%x\n", status);
82 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
84 return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
87 struct dlm_attr {
88 struct attribute attr;
89 ssize_t (*show)(struct dlm_ls *, char *);
90 ssize_t (*store)(struct dlm_ls *, const char *, size_t);
93 static struct dlm_attr dlm_attr_control = {
94 .attr = {.name = "control", .mode = S_IWUSR},
95 .store = dlm_control_store
98 static struct dlm_attr dlm_attr_event = {
99 .attr = {.name = "event_done", .mode = S_IWUSR},
100 .store = dlm_event_store
103 static struct dlm_attr dlm_attr_id = {
104 .attr = {.name = "id", .mode = S_IRUGO | S_IWUSR},
105 .show = dlm_id_show,
106 .store = dlm_id_store
109 static struct dlm_attr dlm_attr_recover_status = {
110 .attr = {.name = "recover_status", .mode = S_IRUGO},
111 .show = dlm_recover_status_show
114 static struct dlm_attr dlm_attr_recover_nodeid = {
115 .attr = {.name = "recover_nodeid", .mode = S_IRUGO},
116 .show = dlm_recover_nodeid_show
119 static struct attribute *dlm_attrs[] = {
120 &dlm_attr_control.attr,
121 &dlm_attr_event.attr,
122 &dlm_attr_id.attr,
123 &dlm_attr_recover_status.attr,
124 &dlm_attr_recover_nodeid.attr,
125 NULL,
128 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
129 char *buf)
131 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
132 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
133 return a->show ? a->show(ls, buf) : 0;
136 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
137 const char *buf, size_t len)
139 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
140 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
141 return a->store ? a->store(ls, buf, len) : len;
144 static void lockspace_kobj_release(struct kobject *k)
146 struct dlm_ls *ls = container_of(k, struct dlm_ls, ls_kobj);
147 kfree(ls);
150 static struct sysfs_ops dlm_attr_ops = {
151 .show = dlm_attr_show,
152 .store = dlm_attr_store,
155 static struct kobj_type dlm_ktype = {
156 .default_attrs = dlm_attrs,
157 .sysfs_ops = &dlm_attr_ops,
158 .release = lockspace_kobj_release,
161 static struct kset *dlm_kset;
163 static int do_uevent(struct dlm_ls *ls, int in)
165 int error;
167 if (in)
168 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
169 else
170 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
172 log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving");
174 /* dlm_controld will see the uevent, do the necessary group management
175 and then write to sysfs to wake us */
177 error = wait_event_interruptible(ls->ls_uevent_wait,
178 test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
180 log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result);
182 if (error)
183 goto out;
185 error = ls->ls_uevent_result;
186 out:
187 if (error)
188 log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
189 error, ls->ls_uevent_result);
190 return error;
194 int __init dlm_lockspace_init(void)
196 ls_count = 0;
197 mutex_init(&ls_lock);
198 INIT_LIST_HEAD(&lslist);
199 spin_lock_init(&lslist_lock);
201 dlm_kset = kset_create_and_add("dlm", NULL, kernel_kobj);
202 if (!dlm_kset) {
203 printk(KERN_WARNING "%s: can not create kset\n", __func__);
204 return -ENOMEM;
206 return 0;
209 void dlm_lockspace_exit(void)
211 kset_unregister(dlm_kset);
214 static int dlm_scand(void *data)
216 struct dlm_ls *ls;
218 while (!kthread_should_stop()) {
219 list_for_each_entry(ls, &lslist, ls_list) {
220 if (dlm_lock_recovery_try(ls)) {
221 dlm_scan_rsbs(ls);
222 dlm_scan_timeout(ls);
223 dlm_unlock_recovery(ls);
226 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
228 return 0;
231 static int dlm_scand_start(void)
233 struct task_struct *p;
234 int error = 0;
236 p = kthread_run(dlm_scand, NULL, "dlm_scand");
237 if (IS_ERR(p))
238 error = PTR_ERR(p);
239 else
240 scand_task = p;
241 return error;
244 static void dlm_scand_stop(void)
246 kthread_stop(scand_task);
249 static struct dlm_ls *dlm_find_lockspace_name(char *name, int namelen)
251 struct dlm_ls *ls;
253 spin_lock(&lslist_lock);
255 list_for_each_entry(ls, &lslist, ls_list) {
256 if (ls->ls_namelen == namelen &&
257 memcmp(ls->ls_name, name, namelen) == 0)
258 goto out;
260 ls = NULL;
261 out:
262 spin_unlock(&lslist_lock);
263 return ls;
266 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
268 struct dlm_ls *ls;
270 spin_lock(&lslist_lock);
272 list_for_each_entry(ls, &lslist, ls_list) {
273 if (ls->ls_global_id == id) {
274 ls->ls_count++;
275 goto out;
278 ls = NULL;
279 out:
280 spin_unlock(&lslist_lock);
281 return ls;
284 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
286 struct dlm_ls *ls;
288 spin_lock(&lslist_lock);
289 list_for_each_entry(ls, &lslist, ls_list) {
290 if (ls->ls_local_handle == lockspace) {
291 ls->ls_count++;
292 goto out;
295 ls = NULL;
296 out:
297 spin_unlock(&lslist_lock);
298 return ls;
301 struct dlm_ls *dlm_find_lockspace_device(int minor)
303 struct dlm_ls *ls;
305 spin_lock(&lslist_lock);
306 list_for_each_entry(ls, &lslist, ls_list) {
307 if (ls->ls_device.minor == minor) {
308 ls->ls_count++;
309 goto out;
312 ls = NULL;
313 out:
314 spin_unlock(&lslist_lock);
315 return ls;
318 void dlm_put_lockspace(struct dlm_ls *ls)
320 spin_lock(&lslist_lock);
321 ls->ls_count--;
322 spin_unlock(&lslist_lock);
325 static void remove_lockspace(struct dlm_ls *ls)
327 for (;;) {
328 spin_lock(&lslist_lock);
329 if (ls->ls_count == 0) {
330 list_del(&ls->ls_list);
331 spin_unlock(&lslist_lock);
332 return;
334 spin_unlock(&lslist_lock);
335 ssleep(1);
339 static int threads_start(void)
341 int error;
343 /* Thread which process lock requests for all lockspace's */
344 error = dlm_astd_start();
345 if (error) {
346 log_print("cannot start dlm_astd thread %d", error);
347 goto fail;
350 error = dlm_scand_start();
351 if (error) {
352 log_print("cannot start dlm_scand thread %d", error);
353 goto astd_fail;
356 /* Thread for sending/receiving messages for all lockspace's */
357 error = dlm_lowcomms_start();
358 if (error) {
359 log_print("cannot start dlm lowcomms %d", error);
360 goto scand_fail;
363 return 0;
365 scand_fail:
366 dlm_scand_stop();
367 astd_fail:
368 dlm_astd_stop();
369 fail:
370 return error;
373 static void threads_stop(void)
375 dlm_scand_stop();
376 dlm_lowcomms_stop();
377 dlm_astd_stop();
380 static int new_lockspace(char *name, int namelen, void **lockspace,
381 uint32_t flags, int lvblen)
383 struct dlm_ls *ls;
384 int i, size, error = -ENOMEM;
385 int do_unreg = 0;
387 if (namelen > DLM_LOCKSPACE_LEN)
388 return -EINVAL;
390 if (!lvblen || (lvblen % 8))
391 return -EINVAL;
393 if (!try_module_get(THIS_MODULE))
394 return -EINVAL;
396 ls = dlm_find_lockspace_name(name, namelen);
397 if (ls) {
398 *lockspace = ls;
399 module_put(THIS_MODULE);
400 return -EEXIST;
403 ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_KERNEL);
404 if (!ls)
405 goto out;
406 memcpy(ls->ls_name, name, namelen);
407 ls->ls_namelen = namelen;
408 ls->ls_lvblen = lvblen;
409 ls->ls_count = 0;
410 ls->ls_flags = 0;
412 if (flags & DLM_LSFL_TIMEWARN)
413 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
415 if (flags & DLM_LSFL_FS)
416 ls->ls_allocation = GFP_NOFS;
417 else
418 ls->ls_allocation = GFP_KERNEL;
420 /* ls_exflags are forced to match among nodes, and we don't
421 need to require all nodes to have TIMEWARN or FS set */
422 ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS));
424 size = dlm_config.ci_rsbtbl_size;
425 ls->ls_rsbtbl_size = size;
427 ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_KERNEL);
428 if (!ls->ls_rsbtbl)
429 goto out_lsfree;
430 for (i = 0; i < size; i++) {
431 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
432 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
433 rwlock_init(&ls->ls_rsbtbl[i].lock);
436 size = dlm_config.ci_lkbtbl_size;
437 ls->ls_lkbtbl_size = size;
439 ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_KERNEL);
440 if (!ls->ls_lkbtbl)
441 goto out_rsbfree;
442 for (i = 0; i < size; i++) {
443 INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
444 rwlock_init(&ls->ls_lkbtbl[i].lock);
445 ls->ls_lkbtbl[i].counter = 1;
448 size = dlm_config.ci_dirtbl_size;
449 ls->ls_dirtbl_size = size;
451 ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_KERNEL);
452 if (!ls->ls_dirtbl)
453 goto out_lkbfree;
454 for (i = 0; i < size; i++) {
455 INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
456 rwlock_init(&ls->ls_dirtbl[i].lock);
459 INIT_LIST_HEAD(&ls->ls_waiters);
460 mutex_init(&ls->ls_waiters_mutex);
461 INIT_LIST_HEAD(&ls->ls_orphans);
462 mutex_init(&ls->ls_orphans_mutex);
463 INIT_LIST_HEAD(&ls->ls_timeout);
464 mutex_init(&ls->ls_timeout_mutex);
466 INIT_LIST_HEAD(&ls->ls_nodes);
467 INIT_LIST_HEAD(&ls->ls_nodes_gone);
468 ls->ls_num_nodes = 0;
469 ls->ls_low_nodeid = 0;
470 ls->ls_total_weight = 0;
471 ls->ls_node_array = NULL;
473 memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
474 ls->ls_stub_rsb.res_ls = ls;
476 ls->ls_debug_rsb_dentry = NULL;
477 ls->ls_debug_waiters_dentry = NULL;
479 init_waitqueue_head(&ls->ls_uevent_wait);
480 ls->ls_uevent_result = 0;
481 init_completion(&ls->ls_members_done);
482 ls->ls_members_result = -1;
484 ls->ls_recoverd_task = NULL;
485 mutex_init(&ls->ls_recoverd_active);
486 spin_lock_init(&ls->ls_recover_lock);
487 spin_lock_init(&ls->ls_rcom_spin);
488 get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
489 ls->ls_recover_status = 0;
490 ls->ls_recover_seq = 0;
491 ls->ls_recover_args = NULL;
492 init_rwsem(&ls->ls_in_recovery);
493 init_rwsem(&ls->ls_recv_active);
494 INIT_LIST_HEAD(&ls->ls_requestqueue);
495 mutex_init(&ls->ls_requestqueue_mutex);
496 mutex_init(&ls->ls_clear_proc_locks);
498 ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_KERNEL);
499 if (!ls->ls_recover_buf)
500 goto out_dirfree;
502 INIT_LIST_HEAD(&ls->ls_recover_list);
503 spin_lock_init(&ls->ls_recover_list_lock);
504 ls->ls_recover_list_count = 0;
505 ls->ls_local_handle = ls;
506 init_waitqueue_head(&ls->ls_wait_general);
507 INIT_LIST_HEAD(&ls->ls_root_list);
508 init_rwsem(&ls->ls_root_sem);
510 down_write(&ls->ls_in_recovery);
512 spin_lock(&lslist_lock);
513 list_add(&ls->ls_list, &lslist);
514 spin_unlock(&lslist_lock);
516 /* needs to find ls in lslist */
517 error = dlm_recoverd_start(ls);
518 if (error) {
519 log_error(ls, "can't start dlm_recoverd %d", error);
520 goto out_delist;
523 ls->ls_kobj.kset = dlm_kset;
524 error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
525 "%s", ls->ls_name);
526 if (error)
527 goto out_stop;
528 kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
530 /* let kobject handle freeing of ls if there's an error */
531 do_unreg = 1;
533 /* This uevent triggers dlm_controld in userspace to add us to the
534 group of nodes that are members of this lockspace (managed by the
535 cluster infrastructure.) Once it's done that, it tells us who the
536 current lockspace members are (via configfs) and then tells the
537 lockspace to start running (via sysfs) in dlm_ls_start(). */
539 error = do_uevent(ls, 1);
540 if (error)
541 goto out_stop;
543 wait_for_completion(&ls->ls_members_done);
544 error = ls->ls_members_result;
545 if (error)
546 goto out_members;
548 dlm_create_debug_file(ls);
550 log_debug(ls, "join complete");
552 *lockspace = ls;
553 return 0;
555 out_members:
556 do_uevent(ls, 0);
557 dlm_clear_members(ls);
558 kfree(ls->ls_node_array);
559 out_stop:
560 dlm_recoverd_stop(ls);
561 out_delist:
562 spin_lock(&lslist_lock);
563 list_del(&ls->ls_list);
564 spin_unlock(&lslist_lock);
565 kfree(ls->ls_recover_buf);
566 out_dirfree:
567 kfree(ls->ls_dirtbl);
568 out_lkbfree:
569 kfree(ls->ls_lkbtbl);
570 out_rsbfree:
571 kfree(ls->ls_rsbtbl);
572 out_lsfree:
573 if (do_unreg)
574 kobject_put(&ls->ls_kobj);
575 else
576 kfree(ls);
577 out:
578 module_put(THIS_MODULE);
579 return error;
582 int dlm_new_lockspace(char *name, int namelen, void **lockspace,
583 uint32_t flags, int lvblen)
585 int error = 0;
587 mutex_lock(&ls_lock);
588 if (!ls_count)
589 error = threads_start();
590 if (error)
591 goto out;
593 error = new_lockspace(name, namelen, lockspace, flags, lvblen);
594 if (!error)
595 ls_count++;
596 else if (!ls_count)
597 threads_stop();
598 out:
599 mutex_unlock(&ls_lock);
600 return error;
603 /* Return 1 if the lockspace still has active remote locks,
604 * 2 if the lockspace still has active local locks.
606 static int lockspace_busy(struct dlm_ls *ls)
608 int i, lkb_found = 0;
609 struct dlm_lkb *lkb;
611 /* NOTE: We check the lockidtbl here rather than the resource table.
612 This is because there may be LKBs queued as ASTs that have been
613 unlinked from their RSBs and are pending deletion once the AST has
614 been delivered */
616 for (i = 0; i < ls->ls_lkbtbl_size; i++) {
617 read_lock(&ls->ls_lkbtbl[i].lock);
618 if (!list_empty(&ls->ls_lkbtbl[i].list)) {
619 lkb_found = 1;
620 list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list,
621 lkb_idtbl_list) {
622 if (!lkb->lkb_nodeid) {
623 read_unlock(&ls->ls_lkbtbl[i].lock);
624 return 2;
628 read_unlock(&ls->ls_lkbtbl[i].lock);
630 return lkb_found;
633 static int release_lockspace(struct dlm_ls *ls, int force)
635 struct dlm_lkb *lkb;
636 struct dlm_rsb *rsb;
637 struct list_head *head;
638 int i;
639 int busy = lockspace_busy(ls);
641 if (busy > force)
642 return -EBUSY;
644 if (force < 3)
645 do_uevent(ls, 0);
647 dlm_recoverd_stop(ls);
649 remove_lockspace(ls);
651 dlm_delete_debug_file(ls);
653 dlm_astd_suspend();
655 kfree(ls->ls_recover_buf);
658 * Free direntry structs.
661 dlm_dir_clear(ls);
662 kfree(ls->ls_dirtbl);
665 * Free all lkb's on lkbtbl[] lists.
668 for (i = 0; i < ls->ls_lkbtbl_size; i++) {
669 head = &ls->ls_lkbtbl[i].list;
670 while (!list_empty(head)) {
671 lkb = list_entry(head->next, struct dlm_lkb,
672 lkb_idtbl_list);
674 list_del(&lkb->lkb_idtbl_list);
676 dlm_del_ast(lkb);
678 if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
679 dlm_free_lvb(lkb->lkb_lvbptr);
681 dlm_free_lkb(lkb);
684 dlm_astd_resume();
686 kfree(ls->ls_lkbtbl);
689 * Free all rsb's on rsbtbl[] lists
692 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
693 head = &ls->ls_rsbtbl[i].list;
694 while (!list_empty(head)) {
695 rsb = list_entry(head->next, struct dlm_rsb,
696 res_hashchain);
698 list_del(&rsb->res_hashchain);
699 dlm_free_rsb(rsb);
702 head = &ls->ls_rsbtbl[i].toss;
703 while (!list_empty(head)) {
704 rsb = list_entry(head->next, struct dlm_rsb,
705 res_hashchain);
706 list_del(&rsb->res_hashchain);
707 dlm_free_rsb(rsb);
711 kfree(ls->ls_rsbtbl);
714 * Free structures on any other lists
717 dlm_purge_requestqueue(ls);
718 kfree(ls->ls_recover_args);
719 dlm_clear_free_entries(ls);
720 dlm_clear_members(ls);
721 dlm_clear_members_gone(ls);
722 kfree(ls->ls_node_array);
723 kobject_put(&ls->ls_kobj);
724 /* The ls structure will be freed when the kobject is done with */
726 mutex_lock(&ls_lock);
727 ls_count--;
728 if (!ls_count)
729 threads_stop();
730 mutex_unlock(&ls_lock);
732 module_put(THIS_MODULE);
733 return 0;
737 * Called when a system has released all its locks and is not going to use the
738 * lockspace any longer. We free everything we're managing for this lockspace.
739 * Remaining nodes will go through the recovery process as if we'd died. The
740 * lockspace must continue to function as usual, participating in recoveries,
741 * until this returns.
743 * Force has 4 possible values:
744 * 0 - don't destroy locksapce if it has any LKBs
745 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
746 * 2 - destroy lockspace regardless of LKBs
747 * 3 - destroy lockspace as part of a forced shutdown
750 int dlm_release_lockspace(void *lockspace, int force)
752 struct dlm_ls *ls;
754 ls = dlm_find_lockspace_local(lockspace);
755 if (!ls)
756 return -EINVAL;
757 dlm_put_lockspace(ls);
758 return release_lockspace(ls, force);