2 * Copyright (C) 2015, SUSE
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2, or (at your option)
12 #include <linux/module.h>
13 #include <linux/dlm.h>
14 #include <linux/sched.h>
15 #include <linux/raid/md_p.h>
18 #include "md-cluster.h"
21 #define NEW_DEV_TIMEOUT 5000
23 struct dlm_lock_resource
{
26 char *name
; /* lock name. */
27 uint32_t flags
; /* flags to pass to dlm_lock() */
28 struct completion completion
; /* completion for synchronized locking */
29 void (*bast
)(void *arg
, int mode
); /* blocking AST function pointer*/
30 struct mddev
*mddev
; /* pointing back to mddev. */
37 struct list_head list
;
45 /* md_cluster_info flags */
46 #define MD_CLUSTER_WAITING_FOR_NEWDISK 1
47 #define MD_CLUSTER_SUSPEND_READ_BALANCING 2
50 struct md_cluster_info
{
51 /* dlm lock space and resources for clustered raid. */
52 dlm_lockspace_t
*lockspace
;
54 struct completion completion
;
55 struct dlm_lock_resource
*sb_lock
;
56 struct mutex sb_mutex
;
57 struct dlm_lock_resource
*bitmap_lockres
;
58 struct list_head suspend_list
;
59 spinlock_t suspend_lock
;
60 struct md_thread
*recovery_thread
;
61 unsigned long recovery_map
;
62 /* communication loc resources */
63 struct dlm_lock_resource
*ack_lockres
;
64 struct dlm_lock_resource
*message_lockres
;
65 struct dlm_lock_resource
*token_lockres
;
66 struct dlm_lock_resource
*no_new_dev_lockres
;
67 struct md_thread
*recv_thread
;
68 struct completion newdisk_completion
;
83 /* TODO: Unionize this for smaller footprint */
90 static void sync_ast(void *arg
)
92 struct dlm_lock_resource
*res
;
94 res
= (struct dlm_lock_resource
*) arg
;
95 complete(&res
->completion
);
98 static int dlm_lock_sync(struct dlm_lock_resource
*res
, int mode
)
102 init_completion(&res
->completion
);
103 ret
= dlm_lock(res
->ls
, mode
, &res
->lksb
,
104 res
->flags
, res
->name
, strlen(res
->name
),
105 0, sync_ast
, res
, res
->bast
);
108 wait_for_completion(&res
->completion
);
109 return res
->lksb
.sb_status
;
112 static int dlm_unlock_sync(struct dlm_lock_resource
*res
)
114 return dlm_lock_sync(res
, DLM_LOCK_NL
);
117 static struct dlm_lock_resource
*lockres_init(struct mddev
*mddev
,
118 char *name
, void (*bastfn
)(void *arg
, int mode
), int with_lvb
)
120 struct dlm_lock_resource
*res
= NULL
;
122 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
124 res
= kzalloc(sizeof(struct dlm_lock_resource
), GFP_KERNEL
);
127 res
->ls
= cinfo
->lockspace
;
129 namelen
= strlen(name
);
130 res
->name
= kzalloc(namelen
+ 1, GFP_KERNEL
);
132 pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name
);
135 strlcpy(res
->name
, name
, namelen
+ 1);
137 res
->lksb
.sb_lvbptr
= kzalloc(LVB_SIZE
, GFP_KERNEL
);
138 if (!res
->lksb
.sb_lvbptr
) {
139 pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name
);
142 res
->flags
= DLM_LKF_VALBLK
;
148 res
->flags
|= DLM_LKF_EXPEDITE
;
150 ret
= dlm_lock_sync(res
, DLM_LOCK_NL
);
152 pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name
);
155 res
->flags
&= ~DLM_LKF_EXPEDITE
;
156 res
->flags
|= DLM_LKF_CONVERT
;
160 kfree(res
->lksb
.sb_lvbptr
);
166 static void lockres_free(struct dlm_lock_resource
*res
)
171 init_completion(&res
->completion
);
172 dlm_unlock(res
->ls
, res
->lksb
.sb_lkid
, 0, &res
->lksb
, res
);
173 wait_for_completion(&res
->completion
);
176 kfree(res
->lksb
.sb_lvbptr
);
180 static char *pretty_uuid(char *dest
, char *src
)
184 for (i
= 0; i
< 16; i
++) {
185 if (i
== 4 || i
== 6 || i
== 8 || i
== 10)
186 len
+= sprintf(dest
+ len
, "-");
187 len
+= sprintf(dest
+ len
, "%02x", (__u8
)src
[i
]);
192 static void add_resync_info(struct mddev
*mddev
, struct dlm_lock_resource
*lockres
,
193 sector_t lo
, sector_t hi
)
195 struct resync_info
*ri
;
197 ri
= (struct resync_info
*)lockres
->lksb
.sb_lvbptr
;
198 ri
->lo
= cpu_to_le64(lo
);
199 ri
->hi
= cpu_to_le64(hi
);
202 static struct suspend_info
*read_resync_info(struct mddev
*mddev
, struct dlm_lock_resource
*lockres
)
204 struct resync_info ri
;
205 struct suspend_info
*s
= NULL
;
208 dlm_lock_sync(lockres
, DLM_LOCK_CR
);
209 memcpy(&ri
, lockres
->lksb
.sb_lvbptr
, sizeof(struct resync_info
));
210 hi
= le64_to_cpu(ri
.hi
);
212 s
= kzalloc(sizeof(struct suspend_info
), GFP_KERNEL
);
216 s
->lo
= le64_to_cpu(ri
.lo
);
218 dlm_unlock_sync(lockres
);
223 static void recover_bitmaps(struct md_thread
*thread
)
225 struct mddev
*mddev
= thread
->mddev
;
226 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
227 struct dlm_lock_resource
*bm_lockres
;
230 struct suspend_info
*s
, *tmp
;
233 while (cinfo
->recovery_map
) {
234 slot
= fls64((u64
)cinfo
->recovery_map
) - 1;
236 /* Clear suspend_area associated with the bitmap */
237 spin_lock_irq(&cinfo
->suspend_lock
);
238 list_for_each_entry_safe(s
, tmp
, &cinfo
->suspend_list
, list
)
239 if (slot
== s
->slot
) {
243 spin_unlock_irq(&cinfo
->suspend_lock
);
245 snprintf(str
, 64, "bitmap%04d", slot
);
246 bm_lockres
= lockres_init(mddev
, str
, NULL
, 1);
248 pr_err("md-cluster: Cannot initialize bitmaps\n");
252 ret
= dlm_lock_sync(bm_lockres
, DLM_LOCK_PW
);
254 pr_err("md-cluster: Could not DLM lock %s: %d\n",
258 ret
= bitmap_copy_from_slot(mddev
, slot
, &lo
, &hi
, true);
260 pr_err("md-cluster: Could not copy data from bitmap %d\n", slot
);
264 /* TODO:Wait for current resync to get over */
265 set_bit(MD_RECOVERY_NEEDED
, &mddev
->recovery
);
266 if (lo
< mddev
->recovery_cp
)
267 mddev
->recovery_cp
= lo
;
268 md_check_recovery(mddev
);
271 dlm_unlock_sync(bm_lockres
);
273 clear_bit(slot
, &cinfo
->recovery_map
);
277 static void recover_prep(void *arg
)
279 struct mddev
*mddev
= arg
;
280 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
281 set_bit(MD_CLUSTER_SUSPEND_READ_BALANCING
, &cinfo
->state
);
284 static void recover_slot(void *arg
, struct dlm_slot
*slot
)
286 struct mddev
*mddev
= arg
;
287 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
289 pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n",
290 mddev
->bitmap_info
.cluster_name
,
291 slot
->nodeid
, slot
->slot
,
293 set_bit(slot
->slot
- 1, &cinfo
->recovery_map
);
294 if (!cinfo
->recovery_thread
) {
295 cinfo
->recovery_thread
= md_register_thread(recover_bitmaps
,
297 if (!cinfo
->recovery_thread
) {
298 pr_warn("md-cluster: Could not create recovery thread\n");
302 md_wakeup_thread(cinfo
->recovery_thread
);
305 static void recover_done(void *arg
, struct dlm_slot
*slots
,
306 int num_slots
, int our_slot
,
309 struct mddev
*mddev
= arg
;
310 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
312 cinfo
->slot_number
= our_slot
;
313 complete(&cinfo
->completion
);
314 clear_bit(MD_CLUSTER_SUSPEND_READ_BALANCING
, &cinfo
->state
);
317 static const struct dlm_lockspace_ops md_ls_ops
= {
318 .recover_prep
= recover_prep
,
319 .recover_slot
= recover_slot
,
320 .recover_done
= recover_done
,
324 * The BAST function for the ack lock resource
325 * This function wakes up the receive thread in
326 * order to receive and process the message.
328 static void ack_bast(void *arg
, int mode
)
330 struct dlm_lock_resource
*res
= (struct dlm_lock_resource
*)arg
;
331 struct md_cluster_info
*cinfo
= res
->mddev
->cluster_info
;
333 if (mode
== DLM_LOCK_EX
)
334 md_wakeup_thread(cinfo
->recv_thread
);
337 static void __remove_suspend_info(struct md_cluster_info
*cinfo
, int slot
)
339 struct suspend_info
*s
, *tmp
;
341 list_for_each_entry_safe(s
, tmp
, &cinfo
->suspend_list
, list
)
342 if (slot
== s
->slot
) {
343 pr_info("%s:%d Deleting suspend_info: %d\n",
344 __func__
, __LINE__
, slot
);
351 static void remove_suspend_info(struct md_cluster_info
*cinfo
, int slot
)
353 spin_lock_irq(&cinfo
->suspend_lock
);
354 __remove_suspend_info(cinfo
, slot
);
355 spin_unlock_irq(&cinfo
->suspend_lock
);
359 static void process_suspend_info(struct md_cluster_info
*cinfo
,
360 int slot
, sector_t lo
, sector_t hi
)
362 struct suspend_info
*s
;
365 remove_suspend_info(cinfo
, slot
);
368 s
= kzalloc(sizeof(struct suspend_info
), GFP_KERNEL
);
374 spin_lock_irq(&cinfo
->suspend_lock
);
375 /* Remove existing entry (if exists) before adding */
376 __remove_suspend_info(cinfo
, slot
);
377 list_add(&s
->list
, &cinfo
->suspend_list
);
378 spin_unlock_irq(&cinfo
->suspend_lock
);
381 static void process_add_new_disk(struct mddev
*mddev
, struct cluster_msg
*cmsg
)
384 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
385 char event_name
[] = "EVENT=ADD_DEVICE";
387 char *envp
[] = {event_name
, disk_uuid
, raid_slot
, NULL
};
390 len
= snprintf(disk_uuid
, 64, "DEVICE_UUID=");
391 pretty_uuid(disk_uuid
+ len
, cmsg
->uuid
);
392 snprintf(raid_slot
, 16, "RAID_DISK=%d", cmsg
->raid_slot
);
393 pr_info("%s:%d Sending kobject change with %s and %s\n", __func__
, __LINE__
, disk_uuid
, raid_slot
);
394 init_completion(&cinfo
->newdisk_completion
);
395 set_bit(MD_CLUSTER_WAITING_FOR_NEWDISK
, &cinfo
->state
);
396 kobject_uevent_env(&disk_to_dev(mddev
->gendisk
)->kobj
, KOBJ_CHANGE
, envp
);
397 wait_for_completion_timeout(&cinfo
->newdisk_completion
,
399 clear_bit(MD_CLUSTER_WAITING_FOR_NEWDISK
, &cinfo
->state
);
403 static void process_metadata_update(struct mddev
*mddev
, struct cluster_msg
*msg
)
405 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
408 dlm_lock_sync(cinfo
->no_new_dev_lockres
, DLM_LOCK_CR
);
411 static void process_remove_disk(struct mddev
*mddev
, struct cluster_msg
*msg
)
413 struct md_rdev
*rdev
= md_find_rdev_nr_rcu(mddev
, msg
->raid_slot
);
416 md_kick_rdev_from_array(rdev
);
418 pr_warn("%s: %d Could not find disk(%d) to REMOVE\n", __func__
, __LINE__
, msg
->raid_slot
);
421 static void process_readd_disk(struct mddev
*mddev
, struct cluster_msg
*msg
)
423 struct md_rdev
*rdev
= md_find_rdev_nr_rcu(mddev
, msg
->raid_slot
);
425 if (rdev
&& test_bit(Faulty
, &rdev
->flags
))
426 clear_bit(Faulty
, &rdev
->flags
);
428 pr_warn("%s: %d Could not find disk(%d) which is faulty", __func__
, __LINE__
, msg
->raid_slot
);
431 static void process_recvd_msg(struct mddev
*mddev
, struct cluster_msg
*msg
)
434 case METADATA_UPDATED
:
435 pr_info("%s: %d Received message: METADATA_UPDATE from %d\n",
436 __func__
, __LINE__
, msg
->slot
);
437 process_metadata_update(mddev
, msg
);
440 pr_info("%s: %d Received message: RESYNCING from %d\n",
441 __func__
, __LINE__
, msg
->slot
);
442 process_suspend_info(mddev
->cluster_info
, msg
->slot
,
443 msg
->low
, msg
->high
);
446 pr_info("%s: %d Received message: NEWDISK from %d\n",
447 __func__
, __LINE__
, msg
->slot
);
448 process_add_new_disk(mddev
, msg
);
451 pr_info("%s: %d Received REMOVE from %d\n",
452 __func__
, __LINE__
, msg
->slot
);
453 process_remove_disk(mddev
, msg
);
456 pr_info("%s: %d Received RE_ADD from %d\n",
457 __func__
, __LINE__
, msg
->slot
);
458 process_readd_disk(mddev
, msg
);
461 pr_warn("%s:%d Received unknown message from %d\n",
462 __func__
, __LINE__
, msg
->slot
);
467 * thread for receiving message
469 static void recv_daemon(struct md_thread
*thread
)
471 struct md_cluster_info
*cinfo
= thread
->mddev
->cluster_info
;
472 struct dlm_lock_resource
*ack_lockres
= cinfo
->ack_lockres
;
473 struct dlm_lock_resource
*message_lockres
= cinfo
->message_lockres
;
474 struct cluster_msg msg
;
476 /*get CR on Message*/
477 if (dlm_lock_sync(message_lockres
, DLM_LOCK_CR
)) {
478 pr_err("md/raid1:failed to get CR on MESSAGE\n");
482 /* read lvb and wake up thread to process this message_lockres */
483 memcpy(&msg
, message_lockres
->lksb
.sb_lvbptr
, sizeof(struct cluster_msg
));
484 process_recvd_msg(thread
->mddev
, &msg
);
486 /*release CR on ack_lockres*/
487 dlm_unlock_sync(ack_lockres
);
488 /*up-convert to EX on message_lockres*/
489 dlm_lock_sync(message_lockres
, DLM_LOCK_EX
);
490 /*get CR on ack_lockres again*/
491 dlm_lock_sync(ack_lockres
, DLM_LOCK_CR
);
492 /*release CR on message_lockres*/
493 dlm_unlock_sync(message_lockres
);
497 * Takes the lock on the TOKEN lock resource so no other
498 * node can communicate while the operation is underway.
500 static int lock_comm(struct md_cluster_info
*cinfo
)
504 error
= dlm_lock_sync(cinfo
->token_lockres
, DLM_LOCK_EX
);
506 pr_err("md-cluster(%s:%d): failed to get EX on TOKEN (%d)\n",
507 __func__
, __LINE__
, error
);
511 static void unlock_comm(struct md_cluster_info
*cinfo
)
513 dlm_unlock_sync(cinfo
->token_lockres
);
517 * This function performs the actual sending of the message. This function is
518 * usually called after performing the encompassing operation
520 * 1. Grabs the message lockresource in EX mode
521 * 2. Copies the message to the message LVB
522 * 3. Downconverts message lockresource to CR
523 * 4. Upconverts ack lock resource from CR to EX. This forces the BAST on other nodes
524 * and the other nodes read the message. The thread will wait here until all other
525 * nodes have released ack lock resource.
526 * 5. Downconvert ack lockresource to CR
528 static int __sendmsg(struct md_cluster_info
*cinfo
, struct cluster_msg
*cmsg
)
531 int slot
= cinfo
->slot_number
- 1;
533 cmsg
->slot
= cpu_to_le32(slot
);
534 /*get EX on Message*/
535 error
= dlm_lock_sync(cinfo
->message_lockres
, DLM_LOCK_EX
);
537 pr_err("md-cluster: failed to get EX on MESSAGE (%d)\n", error
);
541 memcpy(cinfo
->message_lockres
->lksb
.sb_lvbptr
, (void *)cmsg
,
542 sizeof(struct cluster_msg
));
543 /*down-convert EX to CR on Message*/
544 error
= dlm_lock_sync(cinfo
->message_lockres
, DLM_LOCK_CR
);
546 pr_err("md-cluster: failed to convert EX to CR on MESSAGE(%d)\n",
551 /*up-convert CR to EX on Ack*/
552 error
= dlm_lock_sync(cinfo
->ack_lockres
, DLM_LOCK_EX
);
554 pr_err("md-cluster: failed to convert CR to EX on ACK(%d)\n",
559 /*down-convert EX to CR on Ack*/
560 error
= dlm_lock_sync(cinfo
->ack_lockres
, DLM_LOCK_CR
);
562 pr_err("md-cluster: failed to convert EX to CR on ACK(%d)\n",
568 dlm_unlock_sync(cinfo
->message_lockres
);
573 static int sendmsg(struct md_cluster_info
*cinfo
, struct cluster_msg
*cmsg
)
578 ret
= __sendmsg(cinfo
, cmsg
);
583 static int gather_all_resync_info(struct mddev
*mddev
, int total_slots
)
585 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
587 struct dlm_lock_resource
*bm_lockres
;
588 struct suspend_info
*s
;
592 for (i
= 0; i
< total_slots
; i
++) {
593 memset(str
, '\0', 64);
594 snprintf(str
, 64, "bitmap%04d", i
);
595 bm_lockres
= lockres_init(mddev
, str
, NULL
, 1);
598 if (i
== (cinfo
->slot_number
- 1))
601 bm_lockres
->flags
|= DLM_LKF_NOQUEUE
;
602 ret
= dlm_lock_sync(bm_lockres
, DLM_LOCK_PW
);
603 if (ret
== -EAGAIN
) {
604 memset(bm_lockres
->lksb
.sb_lvbptr
, '\0', LVB_SIZE
);
605 s
= read_resync_info(mddev
, bm_lockres
);
607 pr_info("%s:%d Resync[%llu..%llu] in progress on %d\n",
609 (unsigned long long) s
->lo
,
610 (unsigned long long) s
->hi
, i
);
611 spin_lock_irq(&cinfo
->suspend_lock
);
613 list_add(&s
->list
, &cinfo
->suspend_list
);
614 spin_unlock_irq(&cinfo
->suspend_lock
);
617 lockres_free(bm_lockres
);
622 /* TODO: Read the disk bitmap sb and check if it needs recovery */
623 dlm_unlock_sync(bm_lockres
);
624 lockres_free(bm_lockres
);
630 static int join(struct mddev
*mddev
, int nodes
)
632 struct md_cluster_info
*cinfo
;
636 if (!try_module_get(THIS_MODULE
))
639 cinfo
= kzalloc(sizeof(struct md_cluster_info
), GFP_KERNEL
);
643 init_completion(&cinfo
->completion
);
645 mutex_init(&cinfo
->sb_mutex
);
646 mddev
->cluster_info
= cinfo
;
649 pretty_uuid(str
, mddev
->uuid
);
650 ret
= dlm_new_lockspace(str
, mddev
->bitmap_info
.cluster_name
,
651 DLM_LSFL_FS
, LVB_SIZE
,
652 &md_ls_ops
, mddev
, &ops_rv
, &cinfo
->lockspace
);
655 wait_for_completion(&cinfo
->completion
);
656 if (nodes
< cinfo
->slot_number
) {
657 pr_err("md-cluster: Slot allotted(%d) is greater than available slots(%d).",
658 cinfo
->slot_number
, nodes
);
662 cinfo
->sb_lock
= lockres_init(mddev
, "cmd-super",
664 if (!cinfo
->sb_lock
) {
668 /* Initiate the communication resources */
670 cinfo
->recv_thread
= md_register_thread(recv_daemon
, mddev
, "cluster_recv");
671 if (!cinfo
->recv_thread
) {
672 pr_err("md-cluster: cannot allocate memory for recv_thread!\n");
675 cinfo
->message_lockres
= lockres_init(mddev
, "message", NULL
, 1);
676 if (!cinfo
->message_lockres
)
678 cinfo
->token_lockres
= lockres_init(mddev
, "token", NULL
, 0);
679 if (!cinfo
->token_lockres
)
681 cinfo
->ack_lockres
= lockres_init(mddev
, "ack", ack_bast
, 0);
682 if (!cinfo
->ack_lockres
)
684 cinfo
->no_new_dev_lockres
= lockres_init(mddev
, "no-new-dev", NULL
, 0);
685 if (!cinfo
->no_new_dev_lockres
)
688 /* get sync CR lock on ACK. */
689 if (dlm_lock_sync(cinfo
->ack_lockres
, DLM_LOCK_CR
))
690 pr_err("md-cluster: failed to get a sync CR lock on ACK!(%d)\n",
692 /* get sync CR lock on no-new-dev. */
693 if (dlm_lock_sync(cinfo
->no_new_dev_lockres
, DLM_LOCK_CR
))
694 pr_err("md-cluster: failed to get a sync CR lock on no-new-dev!(%d)\n", ret
);
697 pr_info("md-cluster: Joined cluster %s slot %d\n", str
, cinfo
->slot_number
);
698 snprintf(str
, 64, "bitmap%04d", cinfo
->slot_number
- 1);
699 cinfo
->bitmap_lockres
= lockres_init(mddev
, str
, NULL
, 1);
700 if (!cinfo
->bitmap_lockres
)
702 if (dlm_lock_sync(cinfo
->bitmap_lockres
, DLM_LOCK_PW
)) {
703 pr_err("Failed to get bitmap lock\n");
708 INIT_LIST_HEAD(&cinfo
->suspend_list
);
709 spin_lock_init(&cinfo
->suspend_lock
);
711 ret
= gather_all_resync_info(mddev
, nodes
);
717 lockres_free(cinfo
->message_lockres
);
718 lockres_free(cinfo
->token_lockres
);
719 lockres_free(cinfo
->ack_lockres
);
720 lockres_free(cinfo
->no_new_dev_lockres
);
721 lockres_free(cinfo
->bitmap_lockres
);
722 lockres_free(cinfo
->sb_lock
);
723 if (cinfo
->lockspace
)
724 dlm_release_lockspace(cinfo
->lockspace
, 2);
725 mddev
->cluster_info
= NULL
;
727 module_put(THIS_MODULE
);
731 static int leave(struct mddev
*mddev
)
733 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
737 md_unregister_thread(&cinfo
->recovery_thread
);
738 md_unregister_thread(&cinfo
->recv_thread
);
739 lockres_free(cinfo
->message_lockres
);
740 lockres_free(cinfo
->token_lockres
);
741 lockres_free(cinfo
->ack_lockres
);
742 lockres_free(cinfo
->no_new_dev_lockres
);
743 lockres_free(cinfo
->sb_lock
);
744 lockres_free(cinfo
->bitmap_lockres
);
745 dlm_release_lockspace(cinfo
->lockspace
, 2);
749 /* slot_number(): Returns the MD slot number to use
750 * DLM starts the slot numbers from 1, wheras cluster-md
751 * wants the number to be from zero, so we deduct one
753 static int slot_number(struct mddev
*mddev
)
755 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
757 return cinfo
->slot_number
- 1;
760 static void resync_info_update(struct mddev
*mddev
, sector_t lo
, sector_t hi
)
762 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
764 add_resync_info(mddev
, cinfo
->bitmap_lockres
, lo
, hi
);
765 /* Re-acquire the lock to refresh LVB */
766 dlm_lock_sync(cinfo
->bitmap_lockres
, DLM_LOCK_PW
);
769 static int metadata_update_start(struct mddev
*mddev
)
771 return lock_comm(mddev
->cluster_info
);
774 static int metadata_update_finish(struct mddev
*mddev
)
776 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
777 struct cluster_msg cmsg
;
780 memset(&cmsg
, 0, sizeof(cmsg
));
781 cmsg
.type
= cpu_to_le32(METADATA_UPDATED
);
782 ret
= __sendmsg(cinfo
, &cmsg
);
787 static int metadata_update_cancel(struct mddev
*mddev
)
789 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
791 return dlm_unlock_sync(cinfo
->token_lockres
);
794 static int resync_send(struct mddev
*mddev
, enum msg_type type
,
795 sector_t lo
, sector_t hi
)
797 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
798 struct cluster_msg cmsg
;
799 int slot
= cinfo
->slot_number
- 1;
801 pr_info("%s:%d lo: %llu hi: %llu\n", __func__
, __LINE__
,
802 (unsigned long long)lo
,
803 (unsigned long long)hi
);
804 resync_info_update(mddev
, lo
, hi
);
805 cmsg
.type
= cpu_to_le32(type
);
806 cmsg
.slot
= cpu_to_le32(slot
);
807 cmsg
.low
= cpu_to_le64(lo
);
808 cmsg
.high
= cpu_to_le64(hi
);
809 return sendmsg(cinfo
, &cmsg
);
812 static int resync_start(struct mddev
*mddev
, sector_t lo
, sector_t hi
)
814 pr_info("%s:%d\n", __func__
, __LINE__
);
815 return resync_send(mddev
, RESYNCING
, lo
, hi
);
818 static void resync_finish(struct mddev
*mddev
)
820 pr_info("%s:%d\n", __func__
, __LINE__
);
821 resync_send(mddev
, RESYNCING
, 0, 0);
824 static int area_resyncing(struct mddev
*mddev
, int direction
,
825 sector_t lo
, sector_t hi
)
827 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
829 struct suspend_info
*s
;
831 if ((direction
== READ
) &&
832 test_bit(MD_CLUSTER_SUSPEND_READ_BALANCING
, &cinfo
->state
))
835 spin_lock_irq(&cinfo
->suspend_lock
);
836 if (list_empty(&cinfo
->suspend_list
))
838 list_for_each_entry(s
, &cinfo
->suspend_list
, list
)
839 if (hi
> s
->lo
&& lo
< s
->hi
) {
844 spin_unlock_irq(&cinfo
->suspend_lock
);
848 static int add_new_disk_start(struct mddev
*mddev
, struct md_rdev
*rdev
)
850 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
851 struct cluster_msg cmsg
;
853 struct mdp_superblock_1
*sb
= page_address(rdev
->sb_page
);
854 char *uuid
= sb
->device_uuid
;
856 memset(&cmsg
, 0, sizeof(cmsg
));
857 cmsg
.type
= cpu_to_le32(NEWDISK
);
858 memcpy(cmsg
.uuid
, uuid
, 16);
859 cmsg
.raid_slot
= rdev
->desc_nr
;
861 ret
= __sendmsg(cinfo
, &cmsg
);
864 cinfo
->no_new_dev_lockres
->flags
|= DLM_LKF_NOQUEUE
;
865 ret
= dlm_lock_sync(cinfo
->no_new_dev_lockres
, DLM_LOCK_EX
);
866 cinfo
->no_new_dev_lockres
->flags
&= ~DLM_LKF_NOQUEUE
;
867 /* Some node does not "see" the device */
871 dlm_lock_sync(cinfo
->no_new_dev_lockres
, DLM_LOCK_CR
);
875 static int add_new_disk_finish(struct mddev
*mddev
)
877 struct cluster_msg cmsg
;
878 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
880 /* Write sb and inform others */
881 md_update_sb(mddev
, 1);
882 cmsg
.type
= METADATA_UPDATED
;
883 ret
= __sendmsg(cinfo
, &cmsg
);
888 static int new_disk_ack(struct mddev
*mddev
, bool ack
)
890 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
892 if (!test_bit(MD_CLUSTER_WAITING_FOR_NEWDISK
, &cinfo
->state
)) {
893 pr_warn("md-cluster(%s): Spurious cluster confirmation\n", mdname(mddev
));
898 dlm_unlock_sync(cinfo
->no_new_dev_lockres
);
899 complete(&cinfo
->newdisk_completion
);
903 static int remove_disk(struct mddev
*mddev
, struct md_rdev
*rdev
)
905 struct cluster_msg cmsg
;
906 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
908 cmsg
.raid_slot
= rdev
->desc_nr
;
909 return __sendmsg(cinfo
, &cmsg
);
912 static int gather_bitmaps(struct md_rdev
*rdev
)
916 struct cluster_msg cmsg
;
917 struct mddev
*mddev
= rdev
->mddev
;
918 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
921 cmsg
.raid_slot
= rdev
->desc_nr
;
922 err
= sendmsg(cinfo
, &cmsg
);
926 for (sn
= 0; sn
< mddev
->bitmap_info
.nodes
; sn
++) {
927 if (sn
== (cinfo
->slot_number
- 1))
929 err
= bitmap_copy_from_slot(mddev
, sn
, &lo
, &hi
, false);
931 pr_warn("md-cluster: Could not gather bitmaps from slot %d", sn
);
934 if ((hi
> 0) && (lo
< mddev
->recovery_cp
))
935 mddev
->recovery_cp
= lo
;
941 static struct md_cluster_operations cluster_ops
= {
944 .slot_number
= slot_number
,
945 .resync_info_update
= resync_info_update
,
946 .resync_start
= resync_start
,
947 .resync_finish
= resync_finish
,
948 .metadata_update_start
= metadata_update_start
,
949 .metadata_update_finish
= metadata_update_finish
,
950 .metadata_update_cancel
= metadata_update_cancel
,
951 .area_resyncing
= area_resyncing
,
952 .add_new_disk_start
= add_new_disk_start
,
953 .add_new_disk_finish
= add_new_disk_finish
,
954 .new_disk_ack
= new_disk_ack
,
955 .remove_disk
= remove_disk
,
956 .gather_bitmaps
= gather_bitmaps
,
959 static int __init
cluster_init(void)
961 pr_warn("md-cluster: EXPERIMENTAL. Use with caution\n");
962 pr_info("Registering Cluster MD functions\n");
963 register_md_cluster_operations(&cluster_ops
, THIS_MODULE
);
967 static void cluster_exit(void)
969 unregister_md_cluster_operations();
972 module_init(cluster_init
);
973 module_exit(cluster_exit
);
974 MODULE_LICENSE("GPL");
975 MODULE_DESCRIPTION("Clustering support for MD");