2 * Copyright (C) 2015, SUSE
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2, or (at your option)
12 #include <linux/module.h>
13 #include <linux/dlm.h>
14 #include <linux/sched.h>
15 #include <linux/raid/md_p.h>
18 #include "md-cluster.h"
21 #define NEW_DEV_TIMEOUT 5000
23 struct dlm_lock_resource
{
26 char *name
; /* lock name. */
27 uint32_t flags
; /* flags to pass to dlm_lock() */
28 struct completion completion
; /* completion for synchronized locking */
29 void (*bast
)(void *arg
, int mode
); /* blocking AST function pointer*/
30 struct mddev
*mddev
; /* pointing back to mddev. */
37 struct list_head list
;
45 /* md_cluster_info flags */
46 #define MD_CLUSTER_WAITING_FOR_NEWDISK 1
49 struct md_cluster_info
{
50 /* dlm lock space and resources for clustered raid. */
51 dlm_lockspace_t
*lockspace
;
53 struct completion completion
;
54 struct dlm_lock_resource
*sb_lock
;
55 struct mutex sb_mutex
;
56 struct dlm_lock_resource
*bitmap_lockres
;
57 struct list_head suspend_list
;
58 spinlock_t suspend_lock
;
59 struct md_thread
*recovery_thread
;
60 unsigned long recovery_map
;
61 /* communication loc resources */
62 struct dlm_lock_resource
*ack_lockres
;
63 struct dlm_lock_resource
*message_lockres
;
64 struct dlm_lock_resource
*token_lockres
;
65 struct dlm_lock_resource
*no_new_dev_lockres
;
66 struct md_thread
*recv_thread
;
67 struct completion newdisk_completion
;
82 /* TODO: Unionize this for smaller footprint */
89 static void sync_ast(void *arg
)
91 struct dlm_lock_resource
*res
;
93 res
= (struct dlm_lock_resource
*) arg
;
94 complete(&res
->completion
);
97 static int dlm_lock_sync(struct dlm_lock_resource
*res
, int mode
)
101 init_completion(&res
->completion
);
102 ret
= dlm_lock(res
->ls
, mode
, &res
->lksb
,
103 res
->flags
, res
->name
, strlen(res
->name
),
104 0, sync_ast
, res
, res
->bast
);
107 wait_for_completion(&res
->completion
);
108 return res
->lksb
.sb_status
;
111 static int dlm_unlock_sync(struct dlm_lock_resource
*res
)
113 return dlm_lock_sync(res
, DLM_LOCK_NL
);
116 static struct dlm_lock_resource
*lockres_init(struct mddev
*mddev
,
117 char *name
, void (*bastfn
)(void *arg
, int mode
), int with_lvb
)
119 struct dlm_lock_resource
*res
= NULL
;
121 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
123 res
= kzalloc(sizeof(struct dlm_lock_resource
), GFP_KERNEL
);
126 res
->ls
= cinfo
->lockspace
;
128 namelen
= strlen(name
);
129 res
->name
= kzalloc(namelen
+ 1, GFP_KERNEL
);
131 pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name
);
134 strlcpy(res
->name
, name
, namelen
+ 1);
136 res
->lksb
.sb_lvbptr
= kzalloc(LVB_SIZE
, GFP_KERNEL
);
137 if (!res
->lksb
.sb_lvbptr
) {
138 pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name
);
141 res
->flags
= DLM_LKF_VALBLK
;
147 res
->flags
|= DLM_LKF_EXPEDITE
;
149 ret
= dlm_lock_sync(res
, DLM_LOCK_NL
);
151 pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name
);
154 res
->flags
&= ~DLM_LKF_EXPEDITE
;
155 res
->flags
|= DLM_LKF_CONVERT
;
159 kfree(res
->lksb
.sb_lvbptr
);
165 static void lockres_free(struct dlm_lock_resource
*res
)
170 init_completion(&res
->completion
);
171 dlm_unlock(res
->ls
, res
->lksb
.sb_lkid
, 0, &res
->lksb
, res
);
172 wait_for_completion(&res
->completion
);
175 kfree(res
->lksb
.sb_lvbptr
);
179 static char *pretty_uuid(char *dest
, char *src
)
183 for (i
= 0; i
< 16; i
++) {
184 if (i
== 4 || i
== 6 || i
== 8 || i
== 10)
185 len
+= sprintf(dest
+ len
, "-");
186 len
+= sprintf(dest
+ len
, "%02x", (__u8
)src
[i
]);
191 static void add_resync_info(struct mddev
*mddev
, struct dlm_lock_resource
*lockres
,
192 sector_t lo
, sector_t hi
)
194 struct resync_info
*ri
;
196 ri
= (struct resync_info
*)lockres
->lksb
.sb_lvbptr
;
197 ri
->lo
= cpu_to_le64(lo
);
198 ri
->hi
= cpu_to_le64(hi
);
201 static struct suspend_info
*read_resync_info(struct mddev
*mddev
, struct dlm_lock_resource
*lockres
)
203 struct resync_info ri
;
204 struct suspend_info
*s
= NULL
;
207 dlm_lock_sync(lockres
, DLM_LOCK_CR
);
208 memcpy(&ri
, lockres
->lksb
.sb_lvbptr
, sizeof(struct resync_info
));
209 hi
= le64_to_cpu(ri
.hi
);
211 s
= kzalloc(sizeof(struct suspend_info
), GFP_KERNEL
);
215 s
->lo
= le64_to_cpu(ri
.lo
);
217 dlm_unlock_sync(lockres
);
222 static void recover_bitmaps(struct md_thread
*thread
)
224 struct mddev
*mddev
= thread
->mddev
;
225 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
226 struct dlm_lock_resource
*bm_lockres
;
229 struct suspend_info
*s
, *tmp
;
232 while (cinfo
->recovery_map
) {
233 slot
= fls64((u64
)cinfo
->recovery_map
) - 1;
235 /* Clear suspend_area associated with the bitmap */
236 spin_lock_irq(&cinfo
->suspend_lock
);
237 list_for_each_entry_safe(s
, tmp
, &cinfo
->suspend_list
, list
)
238 if (slot
== s
->slot
) {
242 spin_unlock_irq(&cinfo
->suspend_lock
);
244 snprintf(str
, 64, "bitmap%04d", slot
);
245 bm_lockres
= lockres_init(mddev
, str
, NULL
, 1);
247 pr_err("md-cluster: Cannot initialize bitmaps\n");
251 ret
= dlm_lock_sync(bm_lockres
, DLM_LOCK_PW
);
253 pr_err("md-cluster: Could not DLM lock %s: %d\n",
257 ret
= bitmap_copy_from_slot(mddev
, slot
, &lo
, &hi
, true);
259 pr_err("md-cluster: Could not copy data from bitmap %d\n", slot
);
263 /* TODO:Wait for current resync to get over */
264 set_bit(MD_RECOVERY_NEEDED
, &mddev
->recovery
);
265 if (lo
< mddev
->recovery_cp
)
266 mddev
->recovery_cp
= lo
;
267 md_check_recovery(mddev
);
270 dlm_unlock_sync(bm_lockres
);
272 clear_bit(slot
, &cinfo
->recovery_map
);
276 static void recover_prep(void *arg
)
280 static void recover_slot(void *arg
, struct dlm_slot
*slot
)
282 struct mddev
*mddev
= arg
;
283 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
285 pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n",
286 mddev
->bitmap_info
.cluster_name
,
287 slot
->nodeid
, slot
->slot
,
289 set_bit(slot
->slot
- 1, &cinfo
->recovery_map
);
290 if (!cinfo
->recovery_thread
) {
291 cinfo
->recovery_thread
= md_register_thread(recover_bitmaps
,
293 if (!cinfo
->recovery_thread
) {
294 pr_warn("md-cluster: Could not create recovery thread\n");
298 md_wakeup_thread(cinfo
->recovery_thread
);
301 static void recover_done(void *arg
, struct dlm_slot
*slots
,
302 int num_slots
, int our_slot
,
305 struct mddev
*mddev
= arg
;
306 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
308 cinfo
->slot_number
= our_slot
;
309 complete(&cinfo
->completion
);
312 static const struct dlm_lockspace_ops md_ls_ops
= {
313 .recover_prep
= recover_prep
,
314 .recover_slot
= recover_slot
,
315 .recover_done
= recover_done
,
319 * The BAST function for the ack lock resource
320 * This function wakes up the receive thread in
321 * order to receive and process the message.
323 static void ack_bast(void *arg
, int mode
)
325 struct dlm_lock_resource
*res
= (struct dlm_lock_resource
*)arg
;
326 struct md_cluster_info
*cinfo
= res
->mddev
->cluster_info
;
328 if (mode
== DLM_LOCK_EX
)
329 md_wakeup_thread(cinfo
->recv_thread
);
332 static void __remove_suspend_info(struct md_cluster_info
*cinfo
, int slot
)
334 struct suspend_info
*s
, *tmp
;
336 list_for_each_entry_safe(s
, tmp
, &cinfo
->suspend_list
, list
)
337 if (slot
== s
->slot
) {
338 pr_info("%s:%d Deleting suspend_info: %d\n",
339 __func__
, __LINE__
, slot
);
346 static void remove_suspend_info(struct md_cluster_info
*cinfo
, int slot
)
348 spin_lock_irq(&cinfo
->suspend_lock
);
349 __remove_suspend_info(cinfo
, slot
);
350 spin_unlock_irq(&cinfo
->suspend_lock
);
354 static void process_suspend_info(struct md_cluster_info
*cinfo
,
355 int slot
, sector_t lo
, sector_t hi
)
357 struct suspend_info
*s
;
360 remove_suspend_info(cinfo
, slot
);
363 s
= kzalloc(sizeof(struct suspend_info
), GFP_KERNEL
);
369 spin_lock_irq(&cinfo
->suspend_lock
);
370 /* Remove existing entry (if exists) before adding */
371 __remove_suspend_info(cinfo
, slot
);
372 list_add(&s
->list
, &cinfo
->suspend_list
);
373 spin_unlock_irq(&cinfo
->suspend_lock
);
376 static void process_add_new_disk(struct mddev
*mddev
, struct cluster_msg
*cmsg
)
379 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
380 char event_name
[] = "EVENT=ADD_DEVICE";
382 char *envp
[] = {event_name
, disk_uuid
, raid_slot
, NULL
};
385 len
= snprintf(disk_uuid
, 64, "DEVICE_UUID=");
386 pretty_uuid(disk_uuid
+ len
, cmsg
->uuid
);
387 snprintf(raid_slot
, 16, "RAID_DISK=%d", cmsg
->raid_slot
);
388 pr_info("%s:%d Sending kobject change with %s and %s\n", __func__
, __LINE__
, disk_uuid
, raid_slot
);
389 init_completion(&cinfo
->newdisk_completion
);
390 set_bit(MD_CLUSTER_WAITING_FOR_NEWDISK
, &cinfo
->state
);
391 kobject_uevent_env(&disk_to_dev(mddev
->gendisk
)->kobj
, KOBJ_CHANGE
, envp
);
392 wait_for_completion_timeout(&cinfo
->newdisk_completion
,
394 clear_bit(MD_CLUSTER_WAITING_FOR_NEWDISK
, &cinfo
->state
);
398 static void process_metadata_update(struct mddev
*mddev
, struct cluster_msg
*msg
)
400 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
403 dlm_lock_sync(cinfo
->no_new_dev_lockres
, DLM_LOCK_CR
);
406 static void process_remove_disk(struct mddev
*mddev
, struct cluster_msg
*msg
)
408 struct md_rdev
*rdev
= md_find_rdev_nr_rcu(mddev
, msg
->raid_slot
);
411 md_kick_rdev_from_array(rdev
);
413 pr_warn("%s: %d Could not find disk(%d) to REMOVE\n", __func__
, __LINE__
, msg
->raid_slot
);
416 static void process_readd_disk(struct mddev
*mddev
, struct cluster_msg
*msg
)
418 struct md_rdev
*rdev
= md_find_rdev_nr_rcu(mddev
, msg
->raid_slot
);
420 if (rdev
&& test_bit(Faulty
, &rdev
->flags
))
421 clear_bit(Faulty
, &rdev
->flags
);
423 pr_warn("%s: %d Could not find disk(%d) which is faulty", __func__
, __LINE__
, msg
->raid_slot
);
426 static void process_recvd_msg(struct mddev
*mddev
, struct cluster_msg
*msg
)
429 case METADATA_UPDATED
:
430 pr_info("%s: %d Received message: METADATA_UPDATE from %d\n",
431 __func__
, __LINE__
, msg
->slot
);
432 process_metadata_update(mddev
, msg
);
435 pr_info("%s: %d Received message: RESYNCING from %d\n",
436 __func__
, __LINE__
, msg
->slot
);
437 process_suspend_info(mddev
->cluster_info
, msg
->slot
,
438 msg
->low
, msg
->high
);
441 pr_info("%s: %d Received message: NEWDISK from %d\n",
442 __func__
, __LINE__
, msg
->slot
);
443 process_add_new_disk(mddev
, msg
);
446 pr_info("%s: %d Received REMOVE from %d\n",
447 __func__
, __LINE__
, msg
->slot
);
448 process_remove_disk(mddev
, msg
);
451 pr_info("%s: %d Received RE_ADD from %d\n",
452 __func__
, __LINE__
, msg
->slot
);
453 process_readd_disk(mddev
, msg
);
456 pr_warn("%s:%d Received unknown message from %d\n",
457 __func__
, __LINE__
, msg
->slot
);
462 * thread for receiving message
464 static void recv_daemon(struct md_thread
*thread
)
466 struct md_cluster_info
*cinfo
= thread
->mddev
->cluster_info
;
467 struct dlm_lock_resource
*ack_lockres
= cinfo
->ack_lockres
;
468 struct dlm_lock_resource
*message_lockres
= cinfo
->message_lockres
;
469 struct cluster_msg msg
;
471 /*get CR on Message*/
472 if (dlm_lock_sync(message_lockres
, DLM_LOCK_CR
)) {
473 pr_err("md/raid1:failed to get CR on MESSAGE\n");
477 /* read lvb and wake up thread to process this message_lockres */
478 memcpy(&msg
, message_lockres
->lksb
.sb_lvbptr
, sizeof(struct cluster_msg
));
479 process_recvd_msg(thread
->mddev
, &msg
);
481 /*release CR on ack_lockres*/
482 dlm_unlock_sync(ack_lockres
);
483 /*up-convert to EX on message_lockres*/
484 dlm_lock_sync(message_lockres
, DLM_LOCK_EX
);
485 /*get CR on ack_lockres again*/
486 dlm_lock_sync(ack_lockres
, DLM_LOCK_CR
);
487 /*release CR on message_lockres*/
488 dlm_unlock_sync(message_lockres
);
492 * Takes the lock on the TOKEN lock resource so no other
493 * node can communicate while the operation is underway.
495 static int lock_comm(struct md_cluster_info
*cinfo
)
499 error
= dlm_lock_sync(cinfo
->token_lockres
, DLM_LOCK_EX
);
501 pr_err("md-cluster(%s:%d): failed to get EX on TOKEN (%d)\n",
502 __func__
, __LINE__
, error
);
506 static void unlock_comm(struct md_cluster_info
*cinfo
)
508 dlm_unlock_sync(cinfo
->token_lockres
);
512 * This function performs the actual sending of the message. This function is
513 * usually called after performing the encompassing operation
515 * 1. Grabs the message lockresource in EX mode
516 * 2. Copies the message to the message LVB
517 * 3. Downconverts message lockresource to CR
518 * 4. Upconverts ack lock resource from CR to EX. This forces the BAST on other nodes
519 * and the other nodes read the message. The thread will wait here until all other
520 * nodes have released ack lock resource.
521 * 5. Downconvert ack lockresource to CR
523 static int __sendmsg(struct md_cluster_info
*cinfo
, struct cluster_msg
*cmsg
)
526 int slot
= cinfo
->slot_number
- 1;
528 cmsg
->slot
= cpu_to_le32(slot
);
529 /*get EX on Message*/
530 error
= dlm_lock_sync(cinfo
->message_lockres
, DLM_LOCK_EX
);
532 pr_err("md-cluster: failed to get EX on MESSAGE (%d)\n", error
);
536 memcpy(cinfo
->message_lockres
->lksb
.sb_lvbptr
, (void *)cmsg
,
537 sizeof(struct cluster_msg
));
538 /*down-convert EX to CR on Message*/
539 error
= dlm_lock_sync(cinfo
->message_lockres
, DLM_LOCK_CR
);
541 pr_err("md-cluster: failed to convert EX to CR on MESSAGE(%d)\n",
546 /*up-convert CR to EX on Ack*/
547 error
= dlm_lock_sync(cinfo
->ack_lockres
, DLM_LOCK_EX
);
549 pr_err("md-cluster: failed to convert CR to EX on ACK(%d)\n",
554 /*down-convert EX to CR on Ack*/
555 error
= dlm_lock_sync(cinfo
->ack_lockres
, DLM_LOCK_CR
);
557 pr_err("md-cluster: failed to convert EX to CR on ACK(%d)\n",
563 dlm_unlock_sync(cinfo
->message_lockres
);
568 static int sendmsg(struct md_cluster_info
*cinfo
, struct cluster_msg
*cmsg
)
573 ret
= __sendmsg(cinfo
, cmsg
);
578 static int gather_all_resync_info(struct mddev
*mddev
, int total_slots
)
580 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
582 struct dlm_lock_resource
*bm_lockres
;
583 struct suspend_info
*s
;
587 for (i
= 0; i
< total_slots
; i
++) {
588 memset(str
, '\0', 64);
589 snprintf(str
, 64, "bitmap%04d", i
);
590 bm_lockres
= lockres_init(mddev
, str
, NULL
, 1);
593 if (i
== (cinfo
->slot_number
- 1))
596 bm_lockres
->flags
|= DLM_LKF_NOQUEUE
;
597 ret
= dlm_lock_sync(bm_lockres
, DLM_LOCK_PW
);
598 if (ret
== -EAGAIN
) {
599 memset(bm_lockres
->lksb
.sb_lvbptr
, '\0', LVB_SIZE
);
600 s
= read_resync_info(mddev
, bm_lockres
);
602 pr_info("%s:%d Resync[%llu..%llu] in progress on %d\n",
604 (unsigned long long) s
->lo
,
605 (unsigned long long) s
->hi
, i
);
606 spin_lock_irq(&cinfo
->suspend_lock
);
608 list_add(&s
->list
, &cinfo
->suspend_list
);
609 spin_unlock_irq(&cinfo
->suspend_lock
);
612 lockres_free(bm_lockres
);
617 /* TODO: Read the disk bitmap sb and check if it needs recovery */
618 dlm_unlock_sync(bm_lockres
);
619 lockres_free(bm_lockres
);
625 static int join(struct mddev
*mddev
, int nodes
)
627 struct md_cluster_info
*cinfo
;
631 if (!try_module_get(THIS_MODULE
))
634 cinfo
= kzalloc(sizeof(struct md_cluster_info
), GFP_KERNEL
);
638 init_completion(&cinfo
->completion
);
640 mutex_init(&cinfo
->sb_mutex
);
641 mddev
->cluster_info
= cinfo
;
644 pretty_uuid(str
, mddev
->uuid
);
645 ret
= dlm_new_lockspace(str
, mddev
->bitmap_info
.cluster_name
,
646 DLM_LSFL_FS
, LVB_SIZE
,
647 &md_ls_ops
, mddev
, &ops_rv
, &cinfo
->lockspace
);
650 wait_for_completion(&cinfo
->completion
);
651 if (nodes
< cinfo
->slot_number
) {
652 pr_err("md-cluster: Slot allotted(%d) is greater than available slots(%d).",
653 cinfo
->slot_number
, nodes
);
657 cinfo
->sb_lock
= lockres_init(mddev
, "cmd-super",
659 if (!cinfo
->sb_lock
) {
663 /* Initiate the communication resources */
665 cinfo
->recv_thread
= md_register_thread(recv_daemon
, mddev
, "cluster_recv");
666 if (!cinfo
->recv_thread
) {
667 pr_err("md-cluster: cannot allocate memory for recv_thread!\n");
670 cinfo
->message_lockres
= lockres_init(mddev
, "message", NULL
, 1);
671 if (!cinfo
->message_lockres
)
673 cinfo
->token_lockres
= lockres_init(mddev
, "token", NULL
, 0);
674 if (!cinfo
->token_lockres
)
676 cinfo
->ack_lockres
= lockres_init(mddev
, "ack", ack_bast
, 0);
677 if (!cinfo
->ack_lockres
)
679 cinfo
->no_new_dev_lockres
= lockres_init(mddev
, "no-new-dev", NULL
, 0);
680 if (!cinfo
->no_new_dev_lockres
)
683 /* get sync CR lock on ACK. */
684 if (dlm_lock_sync(cinfo
->ack_lockres
, DLM_LOCK_CR
))
685 pr_err("md-cluster: failed to get a sync CR lock on ACK!(%d)\n",
687 /* get sync CR lock on no-new-dev. */
688 if (dlm_lock_sync(cinfo
->no_new_dev_lockres
, DLM_LOCK_CR
))
689 pr_err("md-cluster: failed to get a sync CR lock on no-new-dev!(%d)\n", ret
);
692 pr_info("md-cluster: Joined cluster %s slot %d\n", str
, cinfo
->slot_number
);
693 snprintf(str
, 64, "bitmap%04d", cinfo
->slot_number
- 1);
694 cinfo
->bitmap_lockres
= lockres_init(mddev
, str
, NULL
, 1);
695 if (!cinfo
->bitmap_lockres
)
697 if (dlm_lock_sync(cinfo
->bitmap_lockres
, DLM_LOCK_PW
)) {
698 pr_err("Failed to get bitmap lock\n");
703 INIT_LIST_HEAD(&cinfo
->suspend_list
);
704 spin_lock_init(&cinfo
->suspend_lock
);
706 ret
= gather_all_resync_info(mddev
, nodes
);
712 lockres_free(cinfo
->message_lockres
);
713 lockres_free(cinfo
->token_lockres
);
714 lockres_free(cinfo
->ack_lockres
);
715 lockres_free(cinfo
->no_new_dev_lockres
);
716 lockres_free(cinfo
->bitmap_lockres
);
717 lockres_free(cinfo
->sb_lock
);
718 if (cinfo
->lockspace
)
719 dlm_release_lockspace(cinfo
->lockspace
, 2);
720 mddev
->cluster_info
= NULL
;
722 module_put(THIS_MODULE
);
726 static int leave(struct mddev
*mddev
)
728 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
732 md_unregister_thread(&cinfo
->recovery_thread
);
733 md_unregister_thread(&cinfo
->recv_thread
);
734 lockres_free(cinfo
->message_lockres
);
735 lockres_free(cinfo
->token_lockres
);
736 lockres_free(cinfo
->ack_lockres
);
737 lockres_free(cinfo
->no_new_dev_lockres
);
738 lockres_free(cinfo
->sb_lock
);
739 lockres_free(cinfo
->bitmap_lockres
);
740 dlm_release_lockspace(cinfo
->lockspace
, 2);
744 /* slot_number(): Returns the MD slot number to use
745 * DLM starts the slot numbers from 1, wheras cluster-md
746 * wants the number to be from zero, so we deduct one
748 static int slot_number(struct mddev
*mddev
)
750 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
752 return cinfo
->slot_number
- 1;
755 static void resync_info_update(struct mddev
*mddev
, sector_t lo
, sector_t hi
)
757 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
759 add_resync_info(mddev
, cinfo
->bitmap_lockres
, lo
, hi
);
760 /* Re-acquire the lock to refresh LVB */
761 dlm_lock_sync(cinfo
->bitmap_lockres
, DLM_LOCK_PW
);
764 static int metadata_update_start(struct mddev
*mddev
)
766 return lock_comm(mddev
->cluster_info
);
769 static int metadata_update_finish(struct mddev
*mddev
)
771 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
772 struct cluster_msg cmsg
;
775 memset(&cmsg
, 0, sizeof(cmsg
));
776 cmsg
.type
= cpu_to_le32(METADATA_UPDATED
);
777 ret
= __sendmsg(cinfo
, &cmsg
);
782 static int metadata_update_cancel(struct mddev
*mddev
)
784 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
786 return dlm_unlock_sync(cinfo
->token_lockres
);
789 static int resync_send(struct mddev
*mddev
, enum msg_type type
,
790 sector_t lo
, sector_t hi
)
792 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
793 struct cluster_msg cmsg
;
794 int slot
= cinfo
->slot_number
- 1;
796 pr_info("%s:%d lo: %llu hi: %llu\n", __func__
, __LINE__
,
797 (unsigned long long)lo
,
798 (unsigned long long)hi
);
799 resync_info_update(mddev
, lo
, hi
);
800 cmsg
.type
= cpu_to_le32(type
);
801 cmsg
.slot
= cpu_to_le32(slot
);
802 cmsg
.low
= cpu_to_le64(lo
);
803 cmsg
.high
= cpu_to_le64(hi
);
804 return sendmsg(cinfo
, &cmsg
);
807 static int resync_start(struct mddev
*mddev
, sector_t lo
, sector_t hi
)
809 pr_info("%s:%d\n", __func__
, __LINE__
);
810 return resync_send(mddev
, RESYNCING
, lo
, hi
);
813 static void resync_finish(struct mddev
*mddev
)
815 pr_info("%s:%d\n", __func__
, __LINE__
);
816 resync_send(mddev
, RESYNCING
, 0, 0);
819 static int area_resyncing(struct mddev
*mddev
, sector_t lo
, sector_t hi
)
821 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
823 struct suspend_info
*s
;
825 spin_lock_irq(&cinfo
->suspend_lock
);
826 if (list_empty(&cinfo
->suspend_list
))
828 list_for_each_entry(s
, &cinfo
->suspend_list
, list
)
829 if (hi
> s
->lo
&& lo
< s
->hi
) {
834 spin_unlock_irq(&cinfo
->suspend_lock
);
838 static int add_new_disk_start(struct mddev
*mddev
, struct md_rdev
*rdev
)
840 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
841 struct cluster_msg cmsg
;
843 struct mdp_superblock_1
*sb
= page_address(rdev
->sb_page
);
844 char *uuid
= sb
->device_uuid
;
846 memset(&cmsg
, 0, sizeof(cmsg
));
847 cmsg
.type
= cpu_to_le32(NEWDISK
);
848 memcpy(cmsg
.uuid
, uuid
, 16);
849 cmsg
.raid_slot
= rdev
->desc_nr
;
851 ret
= __sendmsg(cinfo
, &cmsg
);
854 cinfo
->no_new_dev_lockres
->flags
|= DLM_LKF_NOQUEUE
;
855 ret
= dlm_lock_sync(cinfo
->no_new_dev_lockres
, DLM_LOCK_EX
);
856 cinfo
->no_new_dev_lockres
->flags
&= ~DLM_LKF_NOQUEUE
;
857 /* Some node does not "see" the device */
861 dlm_lock_sync(cinfo
->no_new_dev_lockres
, DLM_LOCK_CR
);
865 static int add_new_disk_finish(struct mddev
*mddev
)
867 struct cluster_msg cmsg
;
868 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
870 /* Write sb and inform others */
871 md_update_sb(mddev
, 1);
872 cmsg
.type
= METADATA_UPDATED
;
873 ret
= __sendmsg(cinfo
, &cmsg
);
878 static int new_disk_ack(struct mddev
*mddev
, bool ack
)
880 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
882 if (!test_bit(MD_CLUSTER_WAITING_FOR_NEWDISK
, &cinfo
->state
)) {
883 pr_warn("md-cluster(%s): Spurious cluster confirmation\n", mdname(mddev
));
888 dlm_unlock_sync(cinfo
->no_new_dev_lockres
);
889 complete(&cinfo
->newdisk_completion
);
893 static int remove_disk(struct mddev
*mddev
, struct md_rdev
*rdev
)
895 struct cluster_msg cmsg
;
896 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
898 cmsg
.raid_slot
= rdev
->desc_nr
;
899 return __sendmsg(cinfo
, &cmsg
);
902 static int gather_bitmaps(struct md_rdev
*rdev
)
906 struct cluster_msg cmsg
;
907 struct mddev
*mddev
= rdev
->mddev
;
908 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
911 cmsg
.raid_slot
= rdev
->desc_nr
;
912 err
= sendmsg(cinfo
, &cmsg
);
916 for (sn
= 0; sn
< mddev
->bitmap_info
.nodes
; sn
++) {
917 if (sn
== (cinfo
->slot_number
- 1))
919 err
= bitmap_copy_from_slot(mddev
, sn
, &lo
, &hi
, false);
921 pr_warn("md-cluster: Could not gather bitmaps from slot %d", sn
);
924 if ((hi
> 0) && (lo
< mddev
->recovery_cp
))
925 mddev
->recovery_cp
= lo
;
931 static struct md_cluster_operations cluster_ops
= {
934 .slot_number
= slot_number
,
935 .resync_info_update
= resync_info_update
,
936 .resync_start
= resync_start
,
937 .resync_finish
= resync_finish
,
938 .metadata_update_start
= metadata_update_start
,
939 .metadata_update_finish
= metadata_update_finish
,
940 .metadata_update_cancel
= metadata_update_cancel
,
941 .area_resyncing
= area_resyncing
,
942 .add_new_disk_start
= add_new_disk_start
,
943 .add_new_disk_finish
= add_new_disk_finish
,
944 .new_disk_ack
= new_disk_ack
,
945 .remove_disk
= remove_disk
,
946 .gather_bitmaps
= gather_bitmaps
,
949 static int __init
cluster_init(void)
951 pr_warn("md-cluster: EXPERIMENTAL. Use with caution\n");
952 pr_info("Registering Cluster MD functions\n");
953 register_md_cluster_operations(&cluster_ops
, THIS_MODULE
);
957 static void cluster_exit(void)
959 unregister_md_cluster_operations();
962 module_init(cluster_init
);
963 module_exit(cluster_exit
);
964 MODULE_LICENSE("GPL");
965 MODULE_DESCRIPTION("Clustering support for MD");