2 * Copyright (C) 2015, SUSE
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2, or (at your option)
12 #include <linux/module.h>
13 #include <linux/dlm.h>
14 #include <linux/sched.h>
15 #include <linux/raid/md_p.h>
18 #include "md-cluster.h"
21 #define NEW_DEV_TIMEOUT 5000
23 struct dlm_lock_resource
{
26 char *name
; /* lock name. */
27 uint32_t flags
; /* flags to pass to dlm_lock() */
28 struct completion completion
; /* completion for synchronized locking */
29 void (*bast
)(void *arg
, int mode
); /* blocking AST function pointer*/
30 struct mddev
*mddev
; /* pointing back to mddev. */
38 struct list_head list
;
46 /* md_cluster_info flags */
47 #define MD_CLUSTER_WAITING_FOR_NEWDISK 1
48 #define MD_CLUSTER_SUSPEND_READ_BALANCING 2
49 #define MD_CLUSTER_BEGIN_JOIN_CLUSTER 3
52 struct md_cluster_info
{
53 /* dlm lock space and resources for clustered raid. */
54 dlm_lockspace_t
*lockspace
;
56 struct completion completion
;
57 struct dlm_lock_resource
*bitmap_lockres
;
58 struct dlm_lock_resource
*resync_lockres
;
59 struct list_head suspend_list
;
60 spinlock_t suspend_lock
;
61 struct md_thread
*recovery_thread
;
62 unsigned long recovery_map
;
63 /* communication loc resources */
64 struct dlm_lock_resource
*ack_lockres
;
65 struct dlm_lock_resource
*message_lockres
;
66 struct dlm_lock_resource
*token_lockres
;
67 struct dlm_lock_resource
*no_new_dev_lockres
;
68 struct md_thread
*recv_thread
;
69 struct completion newdisk_completion
;
85 /* TODO: Unionize this for smaller footprint */
92 static void sync_ast(void *arg
)
94 struct dlm_lock_resource
*res
;
97 complete(&res
->completion
);
100 static int dlm_lock_sync(struct dlm_lock_resource
*res
, int mode
)
104 ret
= dlm_lock(res
->ls
, mode
, &res
->lksb
,
105 res
->flags
, res
->name
, strlen(res
->name
),
106 0, sync_ast
, res
, res
->bast
);
109 wait_for_completion(&res
->completion
);
110 if (res
->lksb
.sb_status
== 0)
112 return res
->lksb
.sb_status
;
115 static int dlm_unlock_sync(struct dlm_lock_resource
*res
)
117 return dlm_lock_sync(res
, DLM_LOCK_NL
);
120 static struct dlm_lock_resource
*lockres_init(struct mddev
*mddev
,
121 char *name
, void (*bastfn
)(void *arg
, int mode
), int with_lvb
)
123 struct dlm_lock_resource
*res
= NULL
;
125 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
127 res
= kzalloc(sizeof(struct dlm_lock_resource
), GFP_KERNEL
);
130 init_completion(&res
->completion
);
131 res
->ls
= cinfo
->lockspace
;
133 res
->mode
= DLM_LOCK_IV
;
134 namelen
= strlen(name
);
135 res
->name
= kzalloc(namelen
+ 1, GFP_KERNEL
);
137 pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name
);
140 strlcpy(res
->name
, name
, namelen
+ 1);
142 res
->lksb
.sb_lvbptr
= kzalloc(LVB_SIZE
, GFP_KERNEL
);
143 if (!res
->lksb
.sb_lvbptr
) {
144 pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name
);
147 res
->flags
= DLM_LKF_VALBLK
;
153 res
->flags
|= DLM_LKF_EXPEDITE
;
155 ret
= dlm_lock_sync(res
, DLM_LOCK_NL
);
157 pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name
);
160 res
->flags
&= ~DLM_LKF_EXPEDITE
;
161 res
->flags
|= DLM_LKF_CONVERT
;
165 kfree(res
->lksb
.sb_lvbptr
);
171 static void lockres_free(struct dlm_lock_resource
*res
)
178 /* cancel a lock request or a conversion request that is blocked */
179 res
->flags
|= DLM_LKF_CANCEL
;
181 ret
= dlm_unlock(res
->ls
, res
->lksb
.sb_lkid
, 0, &res
->lksb
, res
);
182 if (unlikely(ret
!= 0)) {
183 pr_info("%s: failed to unlock %s return %d\n", __func__
, res
->name
, ret
);
185 /* if a lock conversion is cancelled, then the lock is put
186 * back to grant queue, need to ensure it is unlocked */
187 if (ret
== -DLM_ECANCEL
)
190 res
->flags
&= ~DLM_LKF_CANCEL
;
191 wait_for_completion(&res
->completion
);
194 kfree(res
->lksb
.sb_lvbptr
);
198 static void add_resync_info(struct dlm_lock_resource
*lockres
,
199 sector_t lo
, sector_t hi
)
201 struct resync_info
*ri
;
203 ri
= (struct resync_info
*)lockres
->lksb
.sb_lvbptr
;
204 ri
->lo
= cpu_to_le64(lo
);
205 ri
->hi
= cpu_to_le64(hi
);
208 static struct suspend_info
*read_resync_info(struct mddev
*mddev
, struct dlm_lock_resource
*lockres
)
210 struct resync_info ri
;
211 struct suspend_info
*s
= NULL
;
214 dlm_lock_sync(lockres
, DLM_LOCK_CR
);
215 memcpy(&ri
, lockres
->lksb
.sb_lvbptr
, sizeof(struct resync_info
));
216 hi
= le64_to_cpu(ri
.hi
);
218 s
= kzalloc(sizeof(struct suspend_info
), GFP_KERNEL
);
222 s
->lo
= le64_to_cpu(ri
.lo
);
224 dlm_unlock_sync(lockres
);
229 static void recover_bitmaps(struct md_thread
*thread
)
231 struct mddev
*mddev
= thread
->mddev
;
232 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
233 struct dlm_lock_resource
*bm_lockres
;
236 struct suspend_info
*s
, *tmp
;
239 while (cinfo
->recovery_map
) {
240 slot
= fls64((u64
)cinfo
->recovery_map
) - 1;
242 /* Clear suspend_area associated with the bitmap */
243 spin_lock_irq(&cinfo
->suspend_lock
);
244 list_for_each_entry_safe(s
, tmp
, &cinfo
->suspend_list
, list
)
245 if (slot
== s
->slot
) {
249 spin_unlock_irq(&cinfo
->suspend_lock
);
251 snprintf(str
, 64, "bitmap%04d", slot
);
252 bm_lockres
= lockres_init(mddev
, str
, NULL
, 1);
254 pr_err("md-cluster: Cannot initialize bitmaps\n");
258 ret
= dlm_lock_sync(bm_lockres
, DLM_LOCK_PW
);
260 pr_err("md-cluster: Could not DLM lock %s: %d\n",
264 ret
= bitmap_copy_from_slot(mddev
, slot
, &lo
, &hi
, true);
266 pr_err("md-cluster: Could not copy data from bitmap %d\n", slot
);
270 /* TODO:Wait for current resync to get over */
271 set_bit(MD_RECOVERY_NEEDED
, &mddev
->recovery
);
272 if (lo
< mddev
->recovery_cp
)
273 mddev
->recovery_cp
= lo
;
274 md_check_recovery(mddev
);
277 dlm_unlock_sync(bm_lockres
);
279 clear_bit(slot
, &cinfo
->recovery_map
);
283 static void recover_prep(void *arg
)
285 struct mddev
*mddev
= arg
;
286 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
287 set_bit(MD_CLUSTER_SUSPEND_READ_BALANCING
, &cinfo
->state
);
290 static void __recover_slot(struct mddev
*mddev
, int slot
)
292 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
294 set_bit(slot
, &cinfo
->recovery_map
);
295 if (!cinfo
->recovery_thread
) {
296 cinfo
->recovery_thread
= md_register_thread(recover_bitmaps
,
298 if (!cinfo
->recovery_thread
) {
299 pr_warn("md-cluster: Could not create recovery thread\n");
303 md_wakeup_thread(cinfo
->recovery_thread
);
306 static void recover_slot(void *arg
, struct dlm_slot
*slot
)
308 struct mddev
*mddev
= arg
;
309 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
311 pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n",
312 mddev
->bitmap_info
.cluster_name
,
313 slot
->nodeid
, slot
->slot
,
315 /* deduct one since dlm slot starts from one while the num of
316 * cluster-md begins with 0 */
317 __recover_slot(mddev
, slot
->slot
- 1);
320 static void recover_done(void *arg
, struct dlm_slot
*slots
,
321 int num_slots
, int our_slot
,
324 struct mddev
*mddev
= arg
;
325 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
327 cinfo
->slot_number
= our_slot
;
328 /* completion is only need to be complete when node join cluster,
329 * it doesn't need to run during another node's failure */
330 if (test_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER
, &cinfo
->state
)) {
331 complete(&cinfo
->completion
);
332 clear_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER
, &cinfo
->state
);
334 clear_bit(MD_CLUSTER_SUSPEND_READ_BALANCING
, &cinfo
->state
);
337 /* the ops is called when node join the cluster, and do lock recovery
338 * if node failure occurs */
339 static const struct dlm_lockspace_ops md_ls_ops
= {
340 .recover_prep
= recover_prep
,
341 .recover_slot
= recover_slot
,
342 .recover_done
= recover_done
,
346 * The BAST function for the ack lock resource
347 * This function wakes up the receive thread in
348 * order to receive and process the message.
350 static void ack_bast(void *arg
, int mode
)
352 struct dlm_lock_resource
*res
= arg
;
353 struct md_cluster_info
*cinfo
= res
->mddev
->cluster_info
;
355 if (mode
== DLM_LOCK_EX
)
356 md_wakeup_thread(cinfo
->recv_thread
);
359 static void __remove_suspend_info(struct md_cluster_info
*cinfo
, int slot
)
361 struct suspend_info
*s
, *tmp
;
363 list_for_each_entry_safe(s
, tmp
, &cinfo
->suspend_list
, list
)
364 if (slot
== s
->slot
) {
371 static void remove_suspend_info(struct mddev
*mddev
, int slot
)
373 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
374 spin_lock_irq(&cinfo
->suspend_lock
);
375 __remove_suspend_info(cinfo
, slot
);
376 spin_unlock_irq(&cinfo
->suspend_lock
);
377 mddev
->pers
->quiesce(mddev
, 2);
381 static void process_suspend_info(struct mddev
*mddev
,
382 int slot
, sector_t lo
, sector_t hi
)
384 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
385 struct suspend_info
*s
;
388 remove_suspend_info(mddev
, slot
);
389 set_bit(MD_RECOVERY_NEEDED
, &mddev
->recovery
);
390 md_wakeup_thread(mddev
->thread
);
393 s
= kzalloc(sizeof(struct suspend_info
), GFP_KERNEL
);
399 mddev
->pers
->quiesce(mddev
, 1);
400 mddev
->pers
->quiesce(mddev
, 0);
401 spin_lock_irq(&cinfo
->suspend_lock
);
402 /* Remove existing entry (if exists) before adding */
403 __remove_suspend_info(cinfo
, slot
);
404 list_add(&s
->list
, &cinfo
->suspend_list
);
405 spin_unlock_irq(&cinfo
->suspend_lock
);
406 mddev
->pers
->quiesce(mddev
, 2);
409 static void process_add_new_disk(struct mddev
*mddev
, struct cluster_msg
*cmsg
)
412 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
413 char event_name
[] = "EVENT=ADD_DEVICE";
415 char *envp
[] = {event_name
, disk_uuid
, raid_slot
, NULL
};
418 len
= snprintf(disk_uuid
, 64, "DEVICE_UUID=");
419 sprintf(disk_uuid
+ len
, "%pU", cmsg
->uuid
);
420 snprintf(raid_slot
, 16, "RAID_DISK=%d", le32_to_cpu(cmsg
->raid_slot
));
421 pr_info("%s:%d Sending kobject change with %s and %s\n", __func__
, __LINE__
, disk_uuid
, raid_slot
);
422 init_completion(&cinfo
->newdisk_completion
);
423 set_bit(MD_CLUSTER_WAITING_FOR_NEWDISK
, &cinfo
->state
);
424 kobject_uevent_env(&disk_to_dev(mddev
->gendisk
)->kobj
, KOBJ_CHANGE
, envp
);
425 wait_for_completion_timeout(&cinfo
->newdisk_completion
,
427 clear_bit(MD_CLUSTER_WAITING_FOR_NEWDISK
, &cinfo
->state
);
431 static void process_metadata_update(struct mddev
*mddev
, struct cluster_msg
*msg
)
433 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
434 md_reload_sb(mddev
, le32_to_cpu(msg
->raid_slot
));
435 dlm_lock_sync(cinfo
->no_new_dev_lockres
, DLM_LOCK_CR
);
438 static void process_remove_disk(struct mddev
*mddev
, struct cluster_msg
*msg
)
440 struct md_rdev
*rdev
= md_find_rdev_nr_rcu(mddev
,
441 le32_to_cpu(msg
->raid_slot
));
444 md_kick_rdev_from_array(rdev
);
446 pr_warn("%s: %d Could not find disk(%d) to REMOVE\n",
447 __func__
, __LINE__
, le32_to_cpu(msg
->raid_slot
));
450 static void process_readd_disk(struct mddev
*mddev
, struct cluster_msg
*msg
)
452 struct md_rdev
*rdev
= md_find_rdev_nr_rcu(mddev
,
453 le32_to_cpu(msg
->raid_slot
));
455 if (rdev
&& test_bit(Faulty
, &rdev
->flags
))
456 clear_bit(Faulty
, &rdev
->flags
);
458 pr_warn("%s: %d Could not find disk(%d) which is faulty",
459 __func__
, __LINE__
, le32_to_cpu(msg
->raid_slot
));
462 static void process_recvd_msg(struct mddev
*mddev
, struct cluster_msg
*msg
)
464 if (WARN(mddev
->cluster_info
->slot_number
- 1 == le32_to_cpu(msg
->slot
),
465 "node %d received it's own msg\n", le32_to_cpu(msg
->slot
)))
467 switch (le32_to_cpu(msg
->type
)) {
468 case METADATA_UPDATED
:
469 process_metadata_update(mddev
, msg
);
472 process_suspend_info(mddev
, le32_to_cpu(msg
->slot
),
473 le64_to_cpu(msg
->low
),
474 le64_to_cpu(msg
->high
));
477 process_add_new_disk(mddev
, msg
);
480 process_remove_disk(mddev
, msg
);
483 process_readd_disk(mddev
, msg
);
485 case BITMAP_NEEDS_SYNC
:
486 __recover_slot(mddev
, le32_to_cpu(msg
->slot
));
489 pr_warn("%s:%d Received unknown message from %d\n",
490 __func__
, __LINE__
, msg
->slot
);
495 * thread for receiving message
497 static void recv_daemon(struct md_thread
*thread
)
499 struct md_cluster_info
*cinfo
= thread
->mddev
->cluster_info
;
500 struct dlm_lock_resource
*ack_lockres
= cinfo
->ack_lockres
;
501 struct dlm_lock_resource
*message_lockres
= cinfo
->message_lockres
;
502 struct cluster_msg msg
;
505 /*get CR on Message*/
506 if (dlm_lock_sync(message_lockres
, DLM_LOCK_CR
)) {
507 pr_err("md/raid1:failed to get CR on MESSAGE\n");
511 /* read lvb and wake up thread to process this message_lockres */
512 memcpy(&msg
, message_lockres
->lksb
.sb_lvbptr
, sizeof(struct cluster_msg
));
513 process_recvd_msg(thread
->mddev
, &msg
);
515 /*release CR on ack_lockres*/
516 ret
= dlm_unlock_sync(ack_lockres
);
517 if (unlikely(ret
!= 0))
518 pr_info("unlock ack failed return %d\n", ret
);
519 /*up-convert to PR on message_lockres*/
520 ret
= dlm_lock_sync(message_lockres
, DLM_LOCK_PR
);
521 if (unlikely(ret
!= 0))
522 pr_info("lock PR on msg failed return %d\n", ret
);
523 /*get CR on ack_lockres again*/
524 ret
= dlm_lock_sync(ack_lockres
, DLM_LOCK_CR
);
525 if (unlikely(ret
!= 0))
526 pr_info("lock CR on ack failed return %d\n", ret
);
527 /*release CR on message_lockres*/
528 ret
= dlm_unlock_sync(message_lockres
);
529 if (unlikely(ret
!= 0))
530 pr_info("unlock msg failed return %d\n", ret
);
534 * Takes the lock on the TOKEN lock resource so no other
535 * node can communicate while the operation is underway.
536 * If called again, and the TOKEN lock is alread in EX mode
537 * return success. However, care must be taken that unlock_comm()
538 * is called only once.
540 static int lock_comm(struct md_cluster_info
*cinfo
)
544 if (cinfo
->token_lockres
->mode
== DLM_LOCK_EX
)
547 error
= dlm_lock_sync(cinfo
->token_lockres
, DLM_LOCK_EX
);
549 pr_err("md-cluster(%s:%d): failed to get EX on TOKEN (%d)\n",
550 __func__
, __LINE__
, error
);
554 static void unlock_comm(struct md_cluster_info
*cinfo
)
556 WARN_ON(cinfo
->token_lockres
->mode
!= DLM_LOCK_EX
);
557 dlm_unlock_sync(cinfo
->token_lockres
);
561 * This function performs the actual sending of the message. This function is
562 * usually called after performing the encompassing operation
564 * 1. Grabs the message lockresource in EX mode
565 * 2. Copies the message to the message LVB
566 * 3. Downconverts message lockresource to CW
567 * 4. Upconverts ack lock resource from CR to EX. This forces the BAST on other nodes
568 * and the other nodes read the message. The thread will wait here until all other
569 * nodes have released ack lock resource.
570 * 5. Downconvert ack lockresource to CR
572 static int __sendmsg(struct md_cluster_info
*cinfo
, struct cluster_msg
*cmsg
)
575 int slot
= cinfo
->slot_number
- 1;
577 cmsg
->slot
= cpu_to_le32(slot
);
578 /*get EX on Message*/
579 error
= dlm_lock_sync(cinfo
->message_lockres
, DLM_LOCK_EX
);
581 pr_err("md-cluster: failed to get EX on MESSAGE (%d)\n", error
);
585 memcpy(cinfo
->message_lockres
->lksb
.sb_lvbptr
, (void *)cmsg
,
586 sizeof(struct cluster_msg
));
587 /*down-convert EX to CW on Message*/
588 error
= dlm_lock_sync(cinfo
->message_lockres
, DLM_LOCK_CW
);
590 pr_err("md-cluster: failed to convert EX to CW on MESSAGE(%d)\n",
595 /*up-convert CR to EX on Ack*/
596 error
= dlm_lock_sync(cinfo
->ack_lockres
, DLM_LOCK_EX
);
598 pr_err("md-cluster: failed to convert CR to EX on ACK(%d)\n",
603 /*down-convert EX to CR on Ack*/
604 error
= dlm_lock_sync(cinfo
->ack_lockres
, DLM_LOCK_CR
);
606 pr_err("md-cluster: failed to convert EX to CR on ACK(%d)\n",
612 error
= dlm_unlock_sync(cinfo
->message_lockres
);
613 if (unlikely(error
!= 0)) {
614 pr_err("md-cluster: failed convert to NL on MESSAGE(%d)\n",
616 /* in case the message can't be released due to some reason */
623 static int sendmsg(struct md_cluster_info
*cinfo
, struct cluster_msg
*cmsg
)
628 ret
= __sendmsg(cinfo
, cmsg
);
633 static int gather_all_resync_info(struct mddev
*mddev
, int total_slots
)
635 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
637 struct dlm_lock_resource
*bm_lockres
;
638 struct suspend_info
*s
;
643 for (i
= 0; i
< total_slots
; i
++) {
644 memset(str
, '\0', 64);
645 snprintf(str
, 64, "bitmap%04d", i
);
646 bm_lockres
= lockres_init(mddev
, str
, NULL
, 1);
649 if (i
== (cinfo
->slot_number
- 1))
652 bm_lockres
->flags
|= DLM_LKF_NOQUEUE
;
653 ret
= dlm_lock_sync(bm_lockres
, DLM_LOCK_PW
);
654 if (ret
== -EAGAIN
) {
655 memset(bm_lockres
->lksb
.sb_lvbptr
, '\0', LVB_SIZE
);
656 s
= read_resync_info(mddev
, bm_lockres
);
658 pr_info("%s:%d Resync[%llu..%llu] in progress on %d\n",
660 (unsigned long long) s
->lo
,
661 (unsigned long long) s
->hi
, i
);
662 spin_lock_irq(&cinfo
->suspend_lock
);
664 list_add(&s
->list
, &cinfo
->suspend_list
);
665 spin_unlock_irq(&cinfo
->suspend_lock
);
668 lockres_free(bm_lockres
);
672 lockres_free(bm_lockres
);
676 /* Read the disk bitmap sb and check if it needs recovery */
677 ret
= bitmap_copy_from_slot(mddev
, i
, &lo
, &hi
, false);
679 pr_warn("md-cluster: Could not gather bitmaps from slot %d", i
);
680 lockres_free(bm_lockres
);
683 if ((hi
> 0) && (lo
< mddev
->recovery_cp
)) {
684 set_bit(MD_RECOVERY_NEEDED
, &mddev
->recovery
);
685 mddev
->recovery_cp
= lo
;
686 md_check_recovery(mddev
);
689 dlm_unlock_sync(bm_lockres
);
690 lockres_free(bm_lockres
);
696 static int join(struct mddev
*mddev
, int nodes
)
698 struct md_cluster_info
*cinfo
;
702 cinfo
= kzalloc(sizeof(struct md_cluster_info
), GFP_KERNEL
);
706 INIT_LIST_HEAD(&cinfo
->suspend_list
);
707 spin_lock_init(&cinfo
->suspend_lock
);
708 init_completion(&cinfo
->completion
);
709 set_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER
, &cinfo
->state
);
711 mddev
->cluster_info
= cinfo
;
714 sprintf(str
, "%pU", mddev
->uuid
);
715 ret
= dlm_new_lockspace(str
, mddev
->bitmap_info
.cluster_name
,
716 DLM_LSFL_FS
, LVB_SIZE
,
717 &md_ls_ops
, mddev
, &ops_rv
, &cinfo
->lockspace
);
720 wait_for_completion(&cinfo
->completion
);
721 if (nodes
< cinfo
->slot_number
) {
722 pr_err("md-cluster: Slot allotted(%d) is greater than available slots(%d).",
723 cinfo
->slot_number
, nodes
);
727 /* Initiate the communication resources */
729 cinfo
->recv_thread
= md_register_thread(recv_daemon
, mddev
, "cluster_recv");
730 if (!cinfo
->recv_thread
) {
731 pr_err("md-cluster: cannot allocate memory for recv_thread!\n");
734 cinfo
->message_lockres
= lockres_init(mddev
, "message", NULL
, 1);
735 if (!cinfo
->message_lockres
)
737 cinfo
->token_lockres
= lockres_init(mddev
, "token", NULL
, 0);
738 if (!cinfo
->token_lockres
)
740 cinfo
->ack_lockres
= lockres_init(mddev
, "ack", ack_bast
, 0);
741 if (!cinfo
->ack_lockres
)
743 cinfo
->no_new_dev_lockres
= lockres_init(mddev
, "no-new-dev", NULL
, 0);
744 if (!cinfo
->no_new_dev_lockres
)
747 /* get sync CR lock on ACK. */
748 if (dlm_lock_sync(cinfo
->ack_lockres
, DLM_LOCK_CR
))
749 pr_err("md-cluster: failed to get a sync CR lock on ACK!(%d)\n",
751 /* get sync CR lock on no-new-dev. */
752 if (dlm_lock_sync(cinfo
->no_new_dev_lockres
, DLM_LOCK_CR
))
753 pr_err("md-cluster: failed to get a sync CR lock on no-new-dev!(%d)\n", ret
);
756 pr_info("md-cluster: Joined cluster %s slot %d\n", str
, cinfo
->slot_number
);
757 snprintf(str
, 64, "bitmap%04d", cinfo
->slot_number
- 1);
758 cinfo
->bitmap_lockres
= lockres_init(mddev
, str
, NULL
, 1);
759 if (!cinfo
->bitmap_lockres
)
761 if (dlm_lock_sync(cinfo
->bitmap_lockres
, DLM_LOCK_PW
)) {
762 pr_err("Failed to get bitmap lock\n");
767 cinfo
->resync_lockres
= lockres_init(mddev
, "resync", NULL
, 0);
768 if (!cinfo
->resync_lockres
)
771 ret
= gather_all_resync_info(mddev
, nodes
);
777 lockres_free(cinfo
->message_lockres
);
778 lockres_free(cinfo
->token_lockres
);
779 lockres_free(cinfo
->ack_lockres
);
780 lockres_free(cinfo
->no_new_dev_lockres
);
781 lockres_free(cinfo
->resync_lockres
);
782 lockres_free(cinfo
->bitmap_lockres
);
783 if (cinfo
->lockspace
)
784 dlm_release_lockspace(cinfo
->lockspace
, 2);
785 mddev
->cluster_info
= NULL
;
790 static void resync_bitmap(struct mddev
*mddev
)
792 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
793 struct cluster_msg cmsg
= {0};
796 cmsg
.type
= cpu_to_le32(BITMAP_NEEDS_SYNC
);
797 err
= sendmsg(cinfo
, &cmsg
);
799 pr_err("%s:%d: failed to send BITMAP_NEEDS_SYNC message (%d)\n",
800 __func__
, __LINE__
, err
);
803 static int leave(struct mddev
*mddev
)
805 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
810 /* BITMAP_NEEDS_SYNC message should be sent when node
811 * is leaving the cluster with dirty bitmap, also we
812 * can only deliver it when dlm connection is available */
813 if (cinfo
->slot_number
> 0 && mddev
->recovery_cp
!= MaxSector
)
814 resync_bitmap(mddev
);
816 md_unregister_thread(&cinfo
->recovery_thread
);
817 md_unregister_thread(&cinfo
->recv_thread
);
818 lockres_free(cinfo
->message_lockres
);
819 lockres_free(cinfo
->token_lockres
);
820 lockres_free(cinfo
->ack_lockres
);
821 lockres_free(cinfo
->no_new_dev_lockres
);
822 lockres_free(cinfo
->bitmap_lockres
);
823 dlm_release_lockspace(cinfo
->lockspace
, 2);
827 /* slot_number(): Returns the MD slot number to use
828 * DLM starts the slot numbers from 1, wheras cluster-md
829 * wants the number to be from zero, so we deduct one
831 static int slot_number(struct mddev
*mddev
)
833 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
835 return cinfo
->slot_number
- 1;
838 static int metadata_update_start(struct mddev
*mddev
)
840 return lock_comm(mddev
->cluster_info
);
843 static int metadata_update_finish(struct mddev
*mddev
)
845 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
846 struct cluster_msg cmsg
;
847 struct md_rdev
*rdev
;
851 memset(&cmsg
, 0, sizeof(cmsg
));
852 cmsg
.type
= cpu_to_le32(METADATA_UPDATED
);
853 /* Pick up a good active device number to send.
855 rdev_for_each(rdev
, mddev
)
856 if (rdev
->raid_disk
> -1 && !test_bit(Faulty
, &rdev
->flags
)) {
857 raid_slot
= rdev
->desc_nr
;
860 if (raid_slot
>= 0) {
861 cmsg
.raid_slot
= cpu_to_le32(raid_slot
);
862 ret
= __sendmsg(cinfo
, &cmsg
);
864 pr_warn("md-cluster: No good device id found to send\n");
869 static void metadata_update_cancel(struct mddev
*mddev
)
871 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
875 static int resync_start(struct mddev
*mddev
)
877 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
878 cinfo
->resync_lockres
->flags
|= DLM_LKF_NOQUEUE
;
879 return dlm_lock_sync(cinfo
->resync_lockres
, DLM_LOCK_EX
);
882 static int resync_info_update(struct mddev
*mddev
, sector_t lo
, sector_t hi
)
884 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
885 struct cluster_msg cmsg
= {0};
887 add_resync_info(cinfo
->bitmap_lockres
, lo
, hi
);
888 /* Re-acquire the lock to refresh LVB */
889 dlm_lock_sync(cinfo
->bitmap_lockres
, DLM_LOCK_PW
);
890 cmsg
.type
= cpu_to_le32(RESYNCING
);
891 cmsg
.low
= cpu_to_le64(lo
);
892 cmsg
.high
= cpu_to_le64(hi
);
894 return sendmsg(cinfo
, &cmsg
);
897 static int resync_finish(struct mddev
*mddev
)
899 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
900 cinfo
->resync_lockres
->flags
&= ~DLM_LKF_NOQUEUE
;
901 dlm_unlock_sync(cinfo
->resync_lockres
);
902 return resync_info_update(mddev
, 0, 0);
905 static int area_resyncing(struct mddev
*mddev
, int direction
,
906 sector_t lo
, sector_t hi
)
908 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
910 struct suspend_info
*s
;
912 if ((direction
== READ
) &&
913 test_bit(MD_CLUSTER_SUSPEND_READ_BALANCING
, &cinfo
->state
))
916 spin_lock_irq(&cinfo
->suspend_lock
);
917 if (list_empty(&cinfo
->suspend_list
))
919 list_for_each_entry(s
, &cinfo
->suspend_list
, list
)
920 if (hi
> s
->lo
&& lo
< s
->hi
) {
925 spin_unlock_irq(&cinfo
->suspend_lock
);
929 /* add_new_disk() - initiates a disk add
930 * However, if this fails before writing md_update_sb(),
931 * add_new_disk_cancel() must be called to release token lock
933 static int add_new_disk(struct mddev
*mddev
, struct md_rdev
*rdev
)
935 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
936 struct cluster_msg cmsg
;
938 struct mdp_superblock_1
*sb
= page_address(rdev
->sb_page
);
939 char *uuid
= sb
->device_uuid
;
941 memset(&cmsg
, 0, sizeof(cmsg
));
942 cmsg
.type
= cpu_to_le32(NEWDISK
);
943 memcpy(cmsg
.uuid
, uuid
, 16);
944 cmsg
.raid_slot
= cpu_to_le32(rdev
->desc_nr
);
946 ret
= __sendmsg(cinfo
, &cmsg
);
949 cinfo
->no_new_dev_lockres
->flags
|= DLM_LKF_NOQUEUE
;
950 ret
= dlm_lock_sync(cinfo
->no_new_dev_lockres
, DLM_LOCK_EX
);
951 cinfo
->no_new_dev_lockres
->flags
&= ~DLM_LKF_NOQUEUE
;
952 /* Some node does not "see" the device */
958 dlm_lock_sync(cinfo
->no_new_dev_lockres
, DLM_LOCK_CR
);
962 static void add_new_disk_cancel(struct mddev
*mddev
)
964 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
968 static int new_disk_ack(struct mddev
*mddev
, bool ack
)
970 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
972 if (!test_bit(MD_CLUSTER_WAITING_FOR_NEWDISK
, &cinfo
->state
)) {
973 pr_warn("md-cluster(%s): Spurious cluster confirmation\n", mdname(mddev
));
978 dlm_unlock_sync(cinfo
->no_new_dev_lockres
);
979 complete(&cinfo
->newdisk_completion
);
983 static int remove_disk(struct mddev
*mddev
, struct md_rdev
*rdev
)
985 struct cluster_msg cmsg
= {0};
986 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
987 cmsg
.type
= cpu_to_le32(REMOVE
);
988 cmsg
.raid_slot
= cpu_to_le32(rdev
->desc_nr
);
989 return __sendmsg(cinfo
, &cmsg
);
992 static int gather_bitmaps(struct md_rdev
*rdev
)
996 struct cluster_msg cmsg
= {0};
997 struct mddev
*mddev
= rdev
->mddev
;
998 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
1000 cmsg
.type
= cpu_to_le32(RE_ADD
);
1001 cmsg
.raid_slot
= cpu_to_le32(rdev
->desc_nr
);
1002 err
= sendmsg(cinfo
, &cmsg
);
1006 for (sn
= 0; sn
< mddev
->bitmap_info
.nodes
; sn
++) {
1007 if (sn
== (cinfo
->slot_number
- 1))
1009 err
= bitmap_copy_from_slot(mddev
, sn
, &lo
, &hi
, false);
1011 pr_warn("md-cluster: Could not gather bitmaps from slot %d", sn
);
1014 if ((hi
> 0) && (lo
< mddev
->recovery_cp
))
1015 mddev
->recovery_cp
= lo
;
1021 static struct md_cluster_operations cluster_ops
= {
1024 .slot_number
= slot_number
,
1025 .resync_start
= resync_start
,
1026 .resync_finish
= resync_finish
,
1027 .resync_info_update
= resync_info_update
,
1028 .metadata_update_start
= metadata_update_start
,
1029 .metadata_update_finish
= metadata_update_finish
,
1030 .metadata_update_cancel
= metadata_update_cancel
,
1031 .area_resyncing
= area_resyncing
,
1032 .add_new_disk
= add_new_disk
,
1033 .add_new_disk_cancel
= add_new_disk_cancel
,
1034 .new_disk_ack
= new_disk_ack
,
1035 .remove_disk
= remove_disk
,
1036 .gather_bitmaps
= gather_bitmaps
,
1039 static int __init
cluster_init(void)
1041 pr_warn("md-cluster: EXPERIMENTAL. Use with caution\n");
1042 pr_info("Registering Cluster MD functions\n");
1043 register_md_cluster_operations(&cluster_ops
, THIS_MODULE
);
1047 static void cluster_exit(void)
1049 unregister_md_cluster_operations();
1052 module_init(cluster_init
);
1053 module_exit(cluster_exit
);
1054 MODULE_AUTHOR("SUSE");
1055 MODULE_LICENSE("GPL");
1056 MODULE_DESCRIPTION("Clustering support for MD");