2 * Copyright (C) 2015, SUSE
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2, or (at your option)
12 #include <linux/module.h>
13 #include <linux/dlm.h>
14 #include <linux/sched.h>
15 #include <linux/raid/md_p.h>
18 #include "md-cluster.h"
21 #define NEW_DEV_TIMEOUT 5000
23 struct dlm_lock_resource
{
26 char *name
; /* lock name. */
27 uint32_t flags
; /* flags to pass to dlm_lock() */
28 struct completion completion
; /* completion for synchronized locking */
29 void (*bast
)(void *arg
, int mode
); /* blocking AST function pointer*/
30 struct mddev
*mddev
; /* pointing back to mddev. */
38 struct list_head list
;
46 /* md_cluster_info flags */
47 #define MD_CLUSTER_WAITING_FOR_NEWDISK 1
48 #define MD_CLUSTER_SUSPEND_READ_BALANCING 2
49 #define MD_CLUSTER_BEGIN_JOIN_CLUSTER 3
52 struct md_cluster_info
{
53 /* dlm lock space and resources for clustered raid. */
54 dlm_lockspace_t
*lockspace
;
56 struct completion completion
;
57 struct dlm_lock_resource
*bitmap_lockres
;
58 struct dlm_lock_resource
*resync_lockres
;
59 struct list_head suspend_list
;
60 spinlock_t suspend_lock
;
61 struct md_thread
*recovery_thread
;
62 unsigned long recovery_map
;
63 /* communication loc resources */
64 struct dlm_lock_resource
*ack_lockres
;
65 struct dlm_lock_resource
*message_lockres
;
66 struct dlm_lock_resource
*token_lockres
;
67 struct dlm_lock_resource
*no_new_dev_lockres
;
68 struct md_thread
*recv_thread
;
69 struct completion newdisk_completion
;
85 /* TODO: Unionize this for smaller footprint */
92 static void sync_ast(void *arg
)
94 struct dlm_lock_resource
*res
;
97 complete(&res
->completion
);
100 static int dlm_lock_sync(struct dlm_lock_resource
*res
, int mode
)
104 ret
= dlm_lock(res
->ls
, mode
, &res
->lksb
,
105 res
->flags
, res
->name
, strlen(res
->name
),
106 0, sync_ast
, res
, res
->bast
);
109 wait_for_completion(&res
->completion
);
110 if (res
->lksb
.sb_status
== 0)
112 return res
->lksb
.sb_status
;
115 static int dlm_unlock_sync(struct dlm_lock_resource
*res
)
117 return dlm_lock_sync(res
, DLM_LOCK_NL
);
120 static struct dlm_lock_resource
*lockres_init(struct mddev
*mddev
,
121 char *name
, void (*bastfn
)(void *arg
, int mode
), int with_lvb
)
123 struct dlm_lock_resource
*res
= NULL
;
125 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
127 res
= kzalloc(sizeof(struct dlm_lock_resource
), GFP_KERNEL
);
130 init_completion(&res
->completion
);
131 res
->ls
= cinfo
->lockspace
;
133 res
->mode
= DLM_LOCK_IV
;
134 namelen
= strlen(name
);
135 res
->name
= kzalloc(namelen
+ 1, GFP_KERNEL
);
137 pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name
);
140 strlcpy(res
->name
, name
, namelen
+ 1);
142 res
->lksb
.sb_lvbptr
= kzalloc(LVB_SIZE
, GFP_KERNEL
);
143 if (!res
->lksb
.sb_lvbptr
) {
144 pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name
);
147 res
->flags
= DLM_LKF_VALBLK
;
153 res
->flags
|= DLM_LKF_EXPEDITE
;
155 ret
= dlm_lock_sync(res
, DLM_LOCK_NL
);
157 pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name
);
160 res
->flags
&= ~DLM_LKF_EXPEDITE
;
161 res
->flags
|= DLM_LKF_CONVERT
;
165 kfree(res
->lksb
.sb_lvbptr
);
171 static void lockres_free(struct dlm_lock_resource
*res
)
178 /* cancel a lock request or a conversion request that is blocked */
179 res
->flags
|= DLM_LKF_CANCEL
;
181 ret
= dlm_unlock(res
->ls
, res
->lksb
.sb_lkid
, 0, &res
->lksb
, res
);
182 if (unlikely(ret
!= 0)) {
183 pr_info("%s: failed to unlock %s return %d\n", __func__
, res
->name
, ret
);
185 /* if a lock conversion is cancelled, then the lock is put
186 * back to grant queue, need to ensure it is unlocked */
187 if (ret
== -DLM_ECANCEL
)
190 res
->flags
&= ~DLM_LKF_CANCEL
;
191 wait_for_completion(&res
->completion
);
194 kfree(res
->lksb
.sb_lvbptr
);
198 static void add_resync_info(struct dlm_lock_resource
*lockres
,
199 sector_t lo
, sector_t hi
)
201 struct resync_info
*ri
;
203 ri
= (struct resync_info
*)lockres
->lksb
.sb_lvbptr
;
204 ri
->lo
= cpu_to_le64(lo
);
205 ri
->hi
= cpu_to_le64(hi
);
208 static struct suspend_info
*read_resync_info(struct mddev
*mddev
, struct dlm_lock_resource
*lockres
)
210 struct resync_info ri
;
211 struct suspend_info
*s
= NULL
;
214 dlm_lock_sync(lockres
, DLM_LOCK_CR
);
215 memcpy(&ri
, lockres
->lksb
.sb_lvbptr
, sizeof(struct resync_info
));
216 hi
= le64_to_cpu(ri
.hi
);
218 s
= kzalloc(sizeof(struct suspend_info
), GFP_KERNEL
);
222 s
->lo
= le64_to_cpu(ri
.lo
);
224 dlm_unlock_sync(lockres
);
229 static void recover_bitmaps(struct md_thread
*thread
)
231 struct mddev
*mddev
= thread
->mddev
;
232 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
233 struct dlm_lock_resource
*bm_lockres
;
236 struct suspend_info
*s
, *tmp
;
239 while (cinfo
->recovery_map
) {
240 slot
= fls64((u64
)cinfo
->recovery_map
) - 1;
242 snprintf(str
, 64, "bitmap%04d", slot
);
243 bm_lockres
= lockres_init(mddev
, str
, NULL
, 1);
245 pr_err("md-cluster: Cannot initialize bitmaps\n");
249 ret
= dlm_lock_sync(bm_lockres
, DLM_LOCK_PW
);
251 pr_err("md-cluster: Could not DLM lock %s: %d\n",
255 ret
= bitmap_copy_from_slot(mddev
, slot
, &lo
, &hi
, true);
257 pr_err("md-cluster: Could not copy data from bitmap %d\n", slot
);
261 /* Clear suspend_area associated with the bitmap */
262 spin_lock_irq(&cinfo
->suspend_lock
);
263 list_for_each_entry_safe(s
, tmp
, &cinfo
->suspend_list
, list
)
264 if (slot
== s
->slot
) {
268 spin_unlock_irq(&cinfo
->suspend_lock
);
271 /* TODO:Wait for current resync to get over */
272 set_bit(MD_RECOVERY_NEEDED
, &mddev
->recovery
);
273 if (lo
< mddev
->recovery_cp
)
274 mddev
->recovery_cp
= lo
;
275 md_check_recovery(mddev
);
278 dlm_unlock_sync(bm_lockres
);
280 clear_bit(slot
, &cinfo
->recovery_map
);
284 static void recover_prep(void *arg
)
286 struct mddev
*mddev
= arg
;
287 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
288 set_bit(MD_CLUSTER_SUSPEND_READ_BALANCING
, &cinfo
->state
);
291 static void __recover_slot(struct mddev
*mddev
, int slot
)
293 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
295 set_bit(slot
, &cinfo
->recovery_map
);
296 if (!cinfo
->recovery_thread
) {
297 cinfo
->recovery_thread
= md_register_thread(recover_bitmaps
,
299 if (!cinfo
->recovery_thread
) {
300 pr_warn("md-cluster: Could not create recovery thread\n");
304 md_wakeup_thread(cinfo
->recovery_thread
);
307 static void recover_slot(void *arg
, struct dlm_slot
*slot
)
309 struct mddev
*mddev
= arg
;
310 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
312 pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n",
313 mddev
->bitmap_info
.cluster_name
,
314 slot
->nodeid
, slot
->slot
,
316 /* deduct one since dlm slot starts from one while the num of
317 * cluster-md begins with 0 */
318 __recover_slot(mddev
, slot
->slot
- 1);
321 static void recover_done(void *arg
, struct dlm_slot
*slots
,
322 int num_slots
, int our_slot
,
325 struct mddev
*mddev
= arg
;
326 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
328 cinfo
->slot_number
= our_slot
;
329 /* completion is only need to be complete when node join cluster,
330 * it doesn't need to run during another node's failure */
331 if (test_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER
, &cinfo
->state
)) {
332 complete(&cinfo
->completion
);
333 clear_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER
, &cinfo
->state
);
335 clear_bit(MD_CLUSTER_SUSPEND_READ_BALANCING
, &cinfo
->state
);
338 /* the ops is called when node join the cluster, and do lock recovery
339 * if node failure occurs */
340 static const struct dlm_lockspace_ops md_ls_ops
= {
341 .recover_prep
= recover_prep
,
342 .recover_slot
= recover_slot
,
343 .recover_done
= recover_done
,
347 * The BAST function for the ack lock resource
348 * This function wakes up the receive thread in
349 * order to receive and process the message.
351 static void ack_bast(void *arg
, int mode
)
353 struct dlm_lock_resource
*res
= arg
;
354 struct md_cluster_info
*cinfo
= res
->mddev
->cluster_info
;
356 if (mode
== DLM_LOCK_EX
)
357 md_wakeup_thread(cinfo
->recv_thread
);
360 static void __remove_suspend_info(struct md_cluster_info
*cinfo
, int slot
)
362 struct suspend_info
*s
, *tmp
;
364 list_for_each_entry_safe(s
, tmp
, &cinfo
->suspend_list
, list
)
365 if (slot
== s
->slot
) {
372 static void remove_suspend_info(struct mddev
*mddev
, int slot
)
374 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
375 spin_lock_irq(&cinfo
->suspend_lock
);
376 __remove_suspend_info(cinfo
, slot
);
377 spin_unlock_irq(&cinfo
->suspend_lock
);
378 mddev
->pers
->quiesce(mddev
, 2);
382 static void process_suspend_info(struct mddev
*mddev
,
383 int slot
, sector_t lo
, sector_t hi
)
385 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
386 struct suspend_info
*s
;
389 remove_suspend_info(mddev
, slot
);
390 set_bit(MD_RECOVERY_NEEDED
, &mddev
->recovery
);
391 md_wakeup_thread(mddev
->thread
);
394 s
= kzalloc(sizeof(struct suspend_info
), GFP_KERNEL
);
400 mddev
->pers
->quiesce(mddev
, 1);
401 mddev
->pers
->quiesce(mddev
, 0);
402 spin_lock_irq(&cinfo
->suspend_lock
);
403 /* Remove existing entry (if exists) before adding */
404 __remove_suspend_info(cinfo
, slot
);
405 list_add(&s
->list
, &cinfo
->suspend_list
);
406 spin_unlock_irq(&cinfo
->suspend_lock
);
407 mddev
->pers
->quiesce(mddev
, 2);
410 static void process_add_new_disk(struct mddev
*mddev
, struct cluster_msg
*cmsg
)
413 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
414 char event_name
[] = "EVENT=ADD_DEVICE";
416 char *envp
[] = {event_name
, disk_uuid
, raid_slot
, NULL
};
419 len
= snprintf(disk_uuid
, 64, "DEVICE_UUID=");
420 sprintf(disk_uuid
+ len
, "%pU", cmsg
->uuid
);
421 snprintf(raid_slot
, 16, "RAID_DISK=%d", le32_to_cpu(cmsg
->raid_slot
));
422 pr_info("%s:%d Sending kobject change with %s and %s\n", __func__
, __LINE__
, disk_uuid
, raid_slot
);
423 init_completion(&cinfo
->newdisk_completion
);
424 set_bit(MD_CLUSTER_WAITING_FOR_NEWDISK
, &cinfo
->state
);
425 kobject_uevent_env(&disk_to_dev(mddev
->gendisk
)->kobj
, KOBJ_CHANGE
, envp
);
426 wait_for_completion_timeout(&cinfo
->newdisk_completion
,
428 clear_bit(MD_CLUSTER_WAITING_FOR_NEWDISK
, &cinfo
->state
);
432 static void process_metadata_update(struct mddev
*mddev
, struct cluster_msg
*msg
)
434 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
435 md_reload_sb(mddev
, le32_to_cpu(msg
->raid_slot
));
436 dlm_lock_sync(cinfo
->no_new_dev_lockres
, DLM_LOCK_CR
);
439 static void process_remove_disk(struct mddev
*mddev
, struct cluster_msg
*msg
)
441 struct md_rdev
*rdev
= md_find_rdev_nr_rcu(mddev
,
442 le32_to_cpu(msg
->raid_slot
));
445 md_kick_rdev_from_array(rdev
);
447 pr_warn("%s: %d Could not find disk(%d) to REMOVE\n",
448 __func__
, __LINE__
, le32_to_cpu(msg
->raid_slot
));
451 static void process_readd_disk(struct mddev
*mddev
, struct cluster_msg
*msg
)
453 struct md_rdev
*rdev
= md_find_rdev_nr_rcu(mddev
,
454 le32_to_cpu(msg
->raid_slot
));
456 if (rdev
&& test_bit(Faulty
, &rdev
->flags
))
457 clear_bit(Faulty
, &rdev
->flags
);
459 pr_warn("%s: %d Could not find disk(%d) which is faulty",
460 __func__
, __LINE__
, le32_to_cpu(msg
->raid_slot
));
463 static void process_recvd_msg(struct mddev
*mddev
, struct cluster_msg
*msg
)
465 if (WARN(mddev
->cluster_info
->slot_number
- 1 == le32_to_cpu(msg
->slot
),
466 "node %d received it's own msg\n", le32_to_cpu(msg
->slot
)))
468 switch (le32_to_cpu(msg
->type
)) {
469 case METADATA_UPDATED
:
470 process_metadata_update(mddev
, msg
);
473 process_suspend_info(mddev
, le32_to_cpu(msg
->slot
),
474 le64_to_cpu(msg
->low
),
475 le64_to_cpu(msg
->high
));
478 process_add_new_disk(mddev
, msg
);
481 process_remove_disk(mddev
, msg
);
484 process_readd_disk(mddev
, msg
);
486 case BITMAP_NEEDS_SYNC
:
487 __recover_slot(mddev
, le32_to_cpu(msg
->slot
));
490 pr_warn("%s:%d Received unknown message from %d\n",
491 __func__
, __LINE__
, msg
->slot
);
496 * thread for receiving message
498 static void recv_daemon(struct md_thread
*thread
)
500 struct md_cluster_info
*cinfo
= thread
->mddev
->cluster_info
;
501 struct dlm_lock_resource
*ack_lockres
= cinfo
->ack_lockres
;
502 struct dlm_lock_resource
*message_lockres
= cinfo
->message_lockres
;
503 struct cluster_msg msg
;
506 /*get CR on Message*/
507 if (dlm_lock_sync(message_lockres
, DLM_LOCK_CR
)) {
508 pr_err("md/raid1:failed to get CR on MESSAGE\n");
512 /* read lvb and wake up thread to process this message_lockres */
513 memcpy(&msg
, message_lockres
->lksb
.sb_lvbptr
, sizeof(struct cluster_msg
));
514 process_recvd_msg(thread
->mddev
, &msg
);
516 /*release CR on ack_lockres*/
517 ret
= dlm_unlock_sync(ack_lockres
);
518 if (unlikely(ret
!= 0))
519 pr_info("unlock ack failed return %d\n", ret
);
520 /*up-convert to PR on message_lockres*/
521 ret
= dlm_lock_sync(message_lockres
, DLM_LOCK_PR
);
522 if (unlikely(ret
!= 0))
523 pr_info("lock PR on msg failed return %d\n", ret
);
524 /*get CR on ack_lockres again*/
525 ret
= dlm_lock_sync(ack_lockres
, DLM_LOCK_CR
);
526 if (unlikely(ret
!= 0))
527 pr_info("lock CR on ack failed return %d\n", ret
);
528 /*release CR on message_lockres*/
529 ret
= dlm_unlock_sync(message_lockres
);
530 if (unlikely(ret
!= 0))
531 pr_info("unlock msg failed return %d\n", ret
);
535 * Takes the lock on the TOKEN lock resource so no other
536 * node can communicate while the operation is underway.
537 * If called again, and the TOKEN lock is alread in EX mode
538 * return success. However, care must be taken that unlock_comm()
539 * is called only once.
541 static int lock_comm(struct md_cluster_info
*cinfo
)
545 if (cinfo
->token_lockres
->mode
== DLM_LOCK_EX
)
548 error
= dlm_lock_sync(cinfo
->token_lockres
, DLM_LOCK_EX
);
550 pr_err("md-cluster(%s:%d): failed to get EX on TOKEN (%d)\n",
551 __func__
, __LINE__
, error
);
555 static void unlock_comm(struct md_cluster_info
*cinfo
)
557 WARN_ON(cinfo
->token_lockres
->mode
!= DLM_LOCK_EX
);
558 dlm_unlock_sync(cinfo
->token_lockres
);
562 * This function performs the actual sending of the message. This function is
563 * usually called after performing the encompassing operation
565 * 1. Grabs the message lockresource in EX mode
566 * 2. Copies the message to the message LVB
567 * 3. Downconverts message lockresource to CW
568 * 4. Upconverts ack lock resource from CR to EX. This forces the BAST on other nodes
569 * and the other nodes read the message. The thread will wait here until all other
570 * nodes have released ack lock resource.
571 * 5. Downconvert ack lockresource to CR
573 static int __sendmsg(struct md_cluster_info
*cinfo
, struct cluster_msg
*cmsg
)
576 int slot
= cinfo
->slot_number
- 1;
578 cmsg
->slot
= cpu_to_le32(slot
);
579 /*get EX on Message*/
580 error
= dlm_lock_sync(cinfo
->message_lockres
, DLM_LOCK_EX
);
582 pr_err("md-cluster: failed to get EX on MESSAGE (%d)\n", error
);
586 memcpy(cinfo
->message_lockres
->lksb
.sb_lvbptr
, (void *)cmsg
,
587 sizeof(struct cluster_msg
));
588 /*down-convert EX to CW on Message*/
589 error
= dlm_lock_sync(cinfo
->message_lockres
, DLM_LOCK_CW
);
591 pr_err("md-cluster: failed to convert EX to CW on MESSAGE(%d)\n",
596 /*up-convert CR to EX on Ack*/
597 error
= dlm_lock_sync(cinfo
->ack_lockres
, DLM_LOCK_EX
);
599 pr_err("md-cluster: failed to convert CR to EX on ACK(%d)\n",
604 /*down-convert EX to CR on Ack*/
605 error
= dlm_lock_sync(cinfo
->ack_lockres
, DLM_LOCK_CR
);
607 pr_err("md-cluster: failed to convert EX to CR on ACK(%d)\n",
613 error
= dlm_unlock_sync(cinfo
->message_lockres
);
614 if (unlikely(error
!= 0)) {
615 pr_err("md-cluster: failed convert to NL on MESSAGE(%d)\n",
617 /* in case the message can't be released due to some reason */
624 static int sendmsg(struct md_cluster_info
*cinfo
, struct cluster_msg
*cmsg
)
629 ret
= __sendmsg(cinfo
, cmsg
);
634 static int gather_all_resync_info(struct mddev
*mddev
, int total_slots
)
636 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
638 struct dlm_lock_resource
*bm_lockres
;
639 struct suspend_info
*s
;
644 for (i
= 0; i
< total_slots
; i
++) {
645 memset(str
, '\0', 64);
646 snprintf(str
, 64, "bitmap%04d", i
);
647 bm_lockres
= lockres_init(mddev
, str
, NULL
, 1);
650 if (i
== (cinfo
->slot_number
- 1))
653 bm_lockres
->flags
|= DLM_LKF_NOQUEUE
;
654 ret
= dlm_lock_sync(bm_lockres
, DLM_LOCK_PW
);
655 if (ret
== -EAGAIN
) {
656 memset(bm_lockres
->lksb
.sb_lvbptr
, '\0', LVB_SIZE
);
657 s
= read_resync_info(mddev
, bm_lockres
);
659 pr_info("%s:%d Resync[%llu..%llu] in progress on %d\n",
661 (unsigned long long) s
->lo
,
662 (unsigned long long) s
->hi
, i
);
663 spin_lock_irq(&cinfo
->suspend_lock
);
665 list_add(&s
->list
, &cinfo
->suspend_list
);
666 spin_unlock_irq(&cinfo
->suspend_lock
);
669 lockres_free(bm_lockres
);
673 lockres_free(bm_lockres
);
677 /* Read the disk bitmap sb and check if it needs recovery */
678 ret
= bitmap_copy_from_slot(mddev
, i
, &lo
, &hi
, false);
680 pr_warn("md-cluster: Could not gather bitmaps from slot %d", i
);
681 lockres_free(bm_lockres
);
684 if ((hi
> 0) && (lo
< mddev
->recovery_cp
)) {
685 set_bit(MD_RECOVERY_NEEDED
, &mddev
->recovery
);
686 mddev
->recovery_cp
= lo
;
687 md_check_recovery(mddev
);
690 dlm_unlock_sync(bm_lockres
);
691 lockres_free(bm_lockres
);
697 static int join(struct mddev
*mddev
, int nodes
)
699 struct md_cluster_info
*cinfo
;
703 cinfo
= kzalloc(sizeof(struct md_cluster_info
), GFP_KERNEL
);
707 INIT_LIST_HEAD(&cinfo
->suspend_list
);
708 spin_lock_init(&cinfo
->suspend_lock
);
709 init_completion(&cinfo
->completion
);
710 set_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER
, &cinfo
->state
);
712 mddev
->cluster_info
= cinfo
;
715 sprintf(str
, "%pU", mddev
->uuid
);
716 ret
= dlm_new_lockspace(str
, mddev
->bitmap_info
.cluster_name
,
717 DLM_LSFL_FS
, LVB_SIZE
,
718 &md_ls_ops
, mddev
, &ops_rv
, &cinfo
->lockspace
);
721 wait_for_completion(&cinfo
->completion
);
722 if (nodes
< cinfo
->slot_number
) {
723 pr_err("md-cluster: Slot allotted(%d) is greater than available slots(%d).",
724 cinfo
->slot_number
, nodes
);
728 /* Initiate the communication resources */
730 cinfo
->recv_thread
= md_register_thread(recv_daemon
, mddev
, "cluster_recv");
731 if (!cinfo
->recv_thread
) {
732 pr_err("md-cluster: cannot allocate memory for recv_thread!\n");
735 cinfo
->message_lockres
= lockres_init(mddev
, "message", NULL
, 1);
736 if (!cinfo
->message_lockres
)
738 cinfo
->token_lockres
= lockres_init(mddev
, "token", NULL
, 0);
739 if (!cinfo
->token_lockres
)
741 cinfo
->ack_lockres
= lockres_init(mddev
, "ack", ack_bast
, 0);
742 if (!cinfo
->ack_lockres
)
744 cinfo
->no_new_dev_lockres
= lockres_init(mddev
, "no-new-dev", NULL
, 0);
745 if (!cinfo
->no_new_dev_lockres
)
748 /* get sync CR lock on ACK. */
749 if (dlm_lock_sync(cinfo
->ack_lockres
, DLM_LOCK_CR
))
750 pr_err("md-cluster: failed to get a sync CR lock on ACK!(%d)\n",
752 /* get sync CR lock on no-new-dev. */
753 if (dlm_lock_sync(cinfo
->no_new_dev_lockres
, DLM_LOCK_CR
))
754 pr_err("md-cluster: failed to get a sync CR lock on no-new-dev!(%d)\n", ret
);
757 pr_info("md-cluster: Joined cluster %s slot %d\n", str
, cinfo
->slot_number
);
758 snprintf(str
, 64, "bitmap%04d", cinfo
->slot_number
- 1);
759 cinfo
->bitmap_lockres
= lockres_init(mddev
, str
, NULL
, 1);
760 if (!cinfo
->bitmap_lockres
)
762 if (dlm_lock_sync(cinfo
->bitmap_lockres
, DLM_LOCK_PW
)) {
763 pr_err("Failed to get bitmap lock\n");
768 cinfo
->resync_lockres
= lockres_init(mddev
, "resync", NULL
, 0);
769 if (!cinfo
->resync_lockres
)
772 ret
= gather_all_resync_info(mddev
, nodes
);
778 lockres_free(cinfo
->message_lockres
);
779 lockres_free(cinfo
->token_lockres
);
780 lockres_free(cinfo
->ack_lockres
);
781 lockres_free(cinfo
->no_new_dev_lockres
);
782 lockres_free(cinfo
->resync_lockres
);
783 lockres_free(cinfo
->bitmap_lockres
);
784 if (cinfo
->lockspace
)
785 dlm_release_lockspace(cinfo
->lockspace
, 2);
786 mddev
->cluster_info
= NULL
;
791 static void resync_bitmap(struct mddev
*mddev
)
793 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
794 struct cluster_msg cmsg
= {0};
797 cmsg
.type
= cpu_to_le32(BITMAP_NEEDS_SYNC
);
798 err
= sendmsg(cinfo
, &cmsg
);
800 pr_err("%s:%d: failed to send BITMAP_NEEDS_SYNC message (%d)\n",
801 __func__
, __LINE__
, err
);
804 static int leave(struct mddev
*mddev
)
806 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
811 /* BITMAP_NEEDS_SYNC message should be sent when node
812 * is leaving the cluster with dirty bitmap, also we
813 * can only deliver it when dlm connection is available */
814 if (cinfo
->slot_number
> 0 && mddev
->recovery_cp
!= MaxSector
)
815 resync_bitmap(mddev
);
817 md_unregister_thread(&cinfo
->recovery_thread
);
818 md_unregister_thread(&cinfo
->recv_thread
);
819 lockres_free(cinfo
->message_lockres
);
820 lockres_free(cinfo
->token_lockres
);
821 lockres_free(cinfo
->ack_lockres
);
822 lockres_free(cinfo
->no_new_dev_lockres
);
823 lockres_free(cinfo
->bitmap_lockres
);
824 dlm_release_lockspace(cinfo
->lockspace
, 2);
829 /* slot_number(): Returns the MD slot number to use
830 * DLM starts the slot numbers from 1, wheras cluster-md
831 * wants the number to be from zero, so we deduct one
833 static int slot_number(struct mddev
*mddev
)
835 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
837 return cinfo
->slot_number
- 1;
840 static int metadata_update_start(struct mddev
*mddev
)
842 return lock_comm(mddev
->cluster_info
);
845 static int metadata_update_finish(struct mddev
*mddev
)
847 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
848 struct cluster_msg cmsg
;
849 struct md_rdev
*rdev
;
853 memset(&cmsg
, 0, sizeof(cmsg
));
854 cmsg
.type
= cpu_to_le32(METADATA_UPDATED
);
855 /* Pick up a good active device number to send.
857 rdev_for_each(rdev
, mddev
)
858 if (rdev
->raid_disk
> -1 && !test_bit(Faulty
, &rdev
->flags
)) {
859 raid_slot
= rdev
->desc_nr
;
862 if (raid_slot
>= 0) {
863 cmsg
.raid_slot
= cpu_to_le32(raid_slot
);
864 ret
= __sendmsg(cinfo
, &cmsg
);
866 pr_warn("md-cluster: No good device id found to send\n");
871 static void metadata_update_cancel(struct mddev
*mddev
)
873 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
877 static int resync_start(struct mddev
*mddev
)
879 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
880 cinfo
->resync_lockres
->flags
|= DLM_LKF_NOQUEUE
;
881 return dlm_lock_sync(cinfo
->resync_lockres
, DLM_LOCK_EX
);
884 static int resync_info_update(struct mddev
*mddev
, sector_t lo
, sector_t hi
)
886 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
887 struct cluster_msg cmsg
= {0};
889 add_resync_info(cinfo
->bitmap_lockres
, lo
, hi
);
890 /* Re-acquire the lock to refresh LVB */
891 dlm_lock_sync(cinfo
->bitmap_lockres
, DLM_LOCK_PW
);
892 cmsg
.type
= cpu_to_le32(RESYNCING
);
893 cmsg
.low
= cpu_to_le64(lo
);
894 cmsg
.high
= cpu_to_le64(hi
);
896 return sendmsg(cinfo
, &cmsg
);
899 static int resync_finish(struct mddev
*mddev
)
901 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
902 cinfo
->resync_lockres
->flags
&= ~DLM_LKF_NOQUEUE
;
903 dlm_unlock_sync(cinfo
->resync_lockres
);
904 return resync_info_update(mddev
, 0, 0);
907 static int area_resyncing(struct mddev
*mddev
, int direction
,
908 sector_t lo
, sector_t hi
)
910 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
912 struct suspend_info
*s
;
914 if ((direction
== READ
) &&
915 test_bit(MD_CLUSTER_SUSPEND_READ_BALANCING
, &cinfo
->state
))
918 spin_lock_irq(&cinfo
->suspend_lock
);
919 if (list_empty(&cinfo
->suspend_list
))
921 list_for_each_entry(s
, &cinfo
->suspend_list
, list
)
922 if (hi
> s
->lo
&& lo
< s
->hi
) {
927 spin_unlock_irq(&cinfo
->suspend_lock
);
931 /* add_new_disk() - initiates a disk add
932 * However, if this fails before writing md_update_sb(),
933 * add_new_disk_cancel() must be called to release token lock
935 static int add_new_disk(struct mddev
*mddev
, struct md_rdev
*rdev
)
937 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
938 struct cluster_msg cmsg
;
940 struct mdp_superblock_1
*sb
= page_address(rdev
->sb_page
);
941 char *uuid
= sb
->device_uuid
;
943 memset(&cmsg
, 0, sizeof(cmsg
));
944 cmsg
.type
= cpu_to_le32(NEWDISK
);
945 memcpy(cmsg
.uuid
, uuid
, 16);
946 cmsg
.raid_slot
= cpu_to_le32(rdev
->desc_nr
);
948 ret
= __sendmsg(cinfo
, &cmsg
);
953 cinfo
->no_new_dev_lockres
->flags
|= DLM_LKF_NOQUEUE
;
954 ret
= dlm_lock_sync(cinfo
->no_new_dev_lockres
, DLM_LOCK_EX
);
955 cinfo
->no_new_dev_lockres
->flags
&= ~DLM_LKF_NOQUEUE
;
956 /* Some node does not "see" the device */
962 dlm_lock_sync(cinfo
->no_new_dev_lockres
, DLM_LOCK_CR
);
966 static void add_new_disk_cancel(struct mddev
*mddev
)
968 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
972 static int new_disk_ack(struct mddev
*mddev
, bool ack
)
974 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
976 if (!test_bit(MD_CLUSTER_WAITING_FOR_NEWDISK
, &cinfo
->state
)) {
977 pr_warn("md-cluster(%s): Spurious cluster confirmation\n", mdname(mddev
));
982 dlm_unlock_sync(cinfo
->no_new_dev_lockres
);
983 complete(&cinfo
->newdisk_completion
);
987 static int remove_disk(struct mddev
*mddev
, struct md_rdev
*rdev
)
989 struct cluster_msg cmsg
= {0};
990 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
991 cmsg
.type
= cpu_to_le32(REMOVE
);
992 cmsg
.raid_slot
= cpu_to_le32(rdev
->desc_nr
);
993 return __sendmsg(cinfo
, &cmsg
);
996 static int gather_bitmaps(struct md_rdev
*rdev
)
1000 struct cluster_msg cmsg
= {0};
1001 struct mddev
*mddev
= rdev
->mddev
;
1002 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
1004 cmsg
.type
= cpu_to_le32(RE_ADD
);
1005 cmsg
.raid_slot
= cpu_to_le32(rdev
->desc_nr
);
1006 err
= sendmsg(cinfo
, &cmsg
);
1010 for (sn
= 0; sn
< mddev
->bitmap_info
.nodes
; sn
++) {
1011 if (sn
== (cinfo
->slot_number
- 1))
1013 err
= bitmap_copy_from_slot(mddev
, sn
, &lo
, &hi
, false);
1015 pr_warn("md-cluster: Could not gather bitmaps from slot %d", sn
);
1018 if ((hi
> 0) && (lo
< mddev
->recovery_cp
))
1019 mddev
->recovery_cp
= lo
;
1025 static struct md_cluster_operations cluster_ops
= {
1028 .slot_number
= slot_number
,
1029 .resync_start
= resync_start
,
1030 .resync_finish
= resync_finish
,
1031 .resync_info_update
= resync_info_update
,
1032 .metadata_update_start
= metadata_update_start
,
1033 .metadata_update_finish
= metadata_update_finish
,
1034 .metadata_update_cancel
= metadata_update_cancel
,
1035 .area_resyncing
= area_resyncing
,
1036 .add_new_disk
= add_new_disk
,
1037 .add_new_disk_cancel
= add_new_disk_cancel
,
1038 .new_disk_ack
= new_disk_ack
,
1039 .remove_disk
= remove_disk
,
1040 .gather_bitmaps
= gather_bitmaps
,
1043 static int __init
cluster_init(void)
1045 pr_warn("md-cluster: EXPERIMENTAL. Use with caution\n");
1046 pr_info("Registering Cluster MD functions\n");
1047 register_md_cluster_operations(&cluster_ops
, THIS_MODULE
);
1051 static void cluster_exit(void)
1053 unregister_md_cluster_operations();
1056 module_init(cluster_init
);
1057 module_exit(cluster_exit
);
1058 MODULE_AUTHOR("SUSE");
1059 MODULE_LICENSE("GPL");
1060 MODULE_DESCRIPTION("Clustering support for MD");