4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
39 #include "drbd_protocol.h"
42 static int make_ov_request(struct drbd_device
*, int);
43 static int make_resync_request(struct drbd_device
*, int);
46 * drbd_md_endio (defined here)
47 * drbd_request_endio (defined here)
48 * drbd_peer_request_endio (defined here)
49 * drbd_bm_endio (defined in drbd_bitmap.c)
51 * For all these callbacks, note the following:
52 * The callbacks will be called in irq context by the IDE drivers,
53 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
54 * Try to get the locking right :)
59 /* About the global_state_lock
60 Each state transition on an device holds a read lock. In case we have
61 to evaluate the resync after dependencies, we grab a write lock, because
62 we need stable states on all devices for that. */
63 rwlock_t global_state_lock
;
65 /* used for synchronous meta data and bitmap IO
66 * submitted by drbd_md_sync_page_io()
68 void drbd_md_endio(struct bio
*bio
)
70 struct drbd_device
*device
;
72 device
= bio
->bi_private
;
73 device
->md_io
.error
= bio
->bi_error
;
75 /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
76 * to timeout on the lower level device, and eventually detach from it.
77 * If this io completion runs after that timeout expired, this
78 * drbd_md_put_buffer() may allow us to finally try and re-attach.
79 * During normal operation, this only puts that extra reference
81 * Make sure we first drop the reference, and only then signal
82 * completion, or we may (in drbd_al_read_log()) cycle so fast into the
83 * next drbd_md_sync_page_io(), that we trigger the
84 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
86 drbd_md_put_buffer(device
);
87 device
->md_io
.done
= 1;
88 wake_up(&device
->misc_wait
);
90 if (device
->ldev
) /* special case: drbd_md_read() during drbd_adm_attach() */
94 /* reads on behalf of the partner,
95 * "submitted" by the receiver
97 static void drbd_endio_read_sec_final(struct drbd_peer_request
*peer_req
) __releases(local
)
99 unsigned long flags
= 0;
100 struct drbd_peer_device
*peer_device
= peer_req
->peer_device
;
101 struct drbd_device
*device
= peer_device
->device
;
103 spin_lock_irqsave(&device
->resource
->req_lock
, flags
);
104 device
->read_cnt
+= peer_req
->i
.size
>> 9;
105 list_del(&peer_req
->w
.list
);
106 if (list_empty(&device
->read_ee
))
107 wake_up(&device
->ee_wait
);
108 if (test_bit(__EE_WAS_ERROR
, &peer_req
->flags
))
109 __drbd_chk_io_error(device
, DRBD_READ_ERROR
);
110 spin_unlock_irqrestore(&device
->resource
->req_lock
, flags
);
112 drbd_queue_work(&peer_device
->connection
->sender_work
, &peer_req
->w
);
116 /* writes on behalf of the partner, or resync writes,
117 * "submitted" by the receiver, final stage. */
118 void drbd_endio_write_sec_final(struct drbd_peer_request
*peer_req
) __releases(local
)
120 unsigned long flags
= 0;
121 struct drbd_peer_device
*peer_device
= peer_req
->peer_device
;
122 struct drbd_device
*device
= peer_device
->device
;
123 struct drbd_interval i
;
126 int do_al_complete_io
;
128 /* after we moved peer_req to done_ee,
129 * we may no longer access it,
130 * it may be freed/reused already!
131 * (as soon as we release the req_lock) */
133 do_al_complete_io
= peer_req
->flags
& EE_CALL_AL_COMPLETE_IO
;
134 block_id
= peer_req
->block_id
;
135 peer_req
->flags
&= ~EE_CALL_AL_COMPLETE_IO
;
137 spin_lock_irqsave(&device
->resource
->req_lock
, flags
);
138 device
->writ_cnt
+= peer_req
->i
.size
>> 9;
139 list_move_tail(&peer_req
->w
.list
, &device
->done_ee
);
142 * Do not remove from the write_requests tree here: we did not send the
143 * Ack yet and did not wake possibly waiting conflicting requests.
144 * Removed from the tree from "drbd_process_done_ee" within the
145 * appropriate dw.cb (e_end_block/e_end_resync_block) or from
146 * _drbd_clear_done_ee.
149 do_wake
= list_empty(block_id
== ID_SYNCER
? &device
->sync_ee
: &device
->active_ee
);
151 /* FIXME do we want to detach for failed REQ_DISCARD?
152 * ((peer_req->flags & (EE_WAS_ERROR|EE_IS_TRIM)) == EE_WAS_ERROR) */
153 if (peer_req
->flags
& EE_WAS_ERROR
)
154 __drbd_chk_io_error(device
, DRBD_WRITE_ERROR
);
155 spin_unlock_irqrestore(&device
->resource
->req_lock
, flags
);
157 if (block_id
== ID_SYNCER
)
158 drbd_rs_complete_io(device
, i
.sector
);
161 wake_up(&device
->ee_wait
);
163 if (do_al_complete_io
)
164 drbd_al_complete_io(device
, &i
);
166 wake_asender(peer_device
->connection
);
170 /* writes on behalf of the partner, or resync writes,
171 * "submitted" by the receiver.
173 void drbd_peer_request_endio(struct bio
*bio
)
175 struct drbd_peer_request
*peer_req
= bio
->bi_private
;
176 struct drbd_device
*device
= peer_req
->peer_device
->device
;
177 int is_write
= bio_data_dir(bio
) == WRITE
;
178 int is_discard
= !!(bio
->bi_rw
& REQ_DISCARD
);
180 if (bio
->bi_error
&& __ratelimit(&drbd_ratelimit_state
))
181 drbd_warn(device
, "%s: error=%d s=%llus\n",
182 is_write
? (is_discard
? "discard" : "write")
183 : "read", bio
->bi_error
,
184 (unsigned long long)peer_req
->i
.sector
);
187 set_bit(__EE_WAS_ERROR
, &peer_req
->flags
);
189 bio_put(bio
); /* no need for the bio anymore */
190 if (atomic_dec_and_test(&peer_req
->pending_bios
)) {
192 drbd_endio_write_sec_final(peer_req
);
194 drbd_endio_read_sec_final(peer_req
);
198 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
200 void drbd_request_endio(struct bio
*bio
)
203 struct drbd_request
*req
= bio
->bi_private
;
204 struct drbd_device
*device
= req
->device
;
205 struct bio_and_error m
;
206 enum drbd_req_event what
;
208 /* If this request was aborted locally before,
209 * but now was completed "successfully",
210 * chances are that this caused arbitrary data corruption.
212 * "aborting" requests, or force-detaching the disk, is intended for
213 * completely blocked/hung local backing devices which do no longer
214 * complete requests at all, not even do error completions. In this
215 * situation, usually a hard-reset and failover is the only way out.
217 * By "aborting", basically faking a local error-completion,
218 * we allow for a more graceful swichover by cleanly migrating services.
219 * Still the affected node has to be rebooted "soon".
221 * By completing these requests, we allow the upper layers to re-use
222 * the associated data pages.
224 * If later the local backing device "recovers", and now DMAs some data
225 * from disk into the original request pages, in the best case it will
226 * just put random data into unused pages; but typically it will corrupt
227 * meanwhile completely unrelated data, causing all sorts of damage.
229 * Which means delayed successful completion,
230 * especially for READ requests,
231 * is a reason to panic().
233 * We assume that a delayed *error* completion is OK,
234 * though we still will complain noisily about it.
236 if (unlikely(req
->rq_state
& RQ_LOCAL_ABORTED
)) {
237 if (__ratelimit(&drbd_ratelimit_state
))
238 drbd_emerg(device
, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
241 panic("possible random memory corruption caused by delayed completion of aborted local request\n");
244 /* to avoid recursion in __req_mod */
245 if (unlikely(bio
->bi_error
)) {
246 if (bio
->bi_rw
& REQ_DISCARD
)
247 what
= (bio
->bi_error
== -EOPNOTSUPP
)
248 ? DISCARD_COMPLETED_NOTSUPP
249 : DISCARD_COMPLETED_WITH_ERROR
;
251 what
= (bio_data_dir(bio
) == WRITE
)
252 ? WRITE_COMPLETED_WITH_ERROR
253 : (bio_rw(bio
) == READ
)
254 ? READ_COMPLETED_WITH_ERROR
255 : READ_AHEAD_COMPLETED_WITH_ERROR
;
259 bio_put(req
->private_bio
);
260 req
->private_bio
= ERR_PTR(bio
->bi_error
);
262 /* not req_mod(), we need irqsave here! */
263 spin_lock_irqsave(&device
->resource
->req_lock
, flags
);
264 __req_mod(req
, what
, &m
);
265 spin_unlock_irqrestore(&device
->resource
->req_lock
, flags
);
269 complete_master_bio(device
, &m
);
272 void drbd_csum_ee(struct crypto_hash
*tfm
, struct drbd_peer_request
*peer_req
, void *digest
)
274 struct hash_desc desc
;
275 struct scatterlist sg
;
276 struct page
*page
= peer_req
->pages
;
283 sg_init_table(&sg
, 1);
284 crypto_hash_init(&desc
);
286 while ((tmp
= page_chain_next(page
))) {
287 /* all but the last page will be fully used */
288 sg_set_page(&sg
, page
, PAGE_SIZE
, 0);
289 crypto_hash_update(&desc
, &sg
, sg
.length
);
292 /* and now the last, possibly only partially used page */
293 len
= peer_req
->i
.size
& (PAGE_SIZE
- 1);
294 sg_set_page(&sg
, page
, len
?: PAGE_SIZE
, 0);
295 crypto_hash_update(&desc
, &sg
, sg
.length
);
296 crypto_hash_final(&desc
, digest
);
299 void drbd_csum_bio(struct crypto_hash
*tfm
, struct bio
*bio
, void *digest
)
301 struct hash_desc desc
;
302 struct scatterlist sg
;
304 struct bvec_iter iter
;
309 sg_init_table(&sg
, 1);
310 crypto_hash_init(&desc
);
312 bio_for_each_segment(bvec
, bio
, iter
) {
313 sg_set_page(&sg
, bvec
.bv_page
, bvec
.bv_len
, bvec
.bv_offset
);
314 crypto_hash_update(&desc
, &sg
, sg
.length
);
316 crypto_hash_final(&desc
, digest
);
319 /* MAYBE merge common code with w_e_end_ov_req */
320 static int w_e_send_csum(struct drbd_work
*w
, int cancel
)
322 struct drbd_peer_request
*peer_req
= container_of(w
, struct drbd_peer_request
, w
);
323 struct drbd_peer_device
*peer_device
= peer_req
->peer_device
;
324 struct drbd_device
*device
= peer_device
->device
;
329 if (unlikely(cancel
))
332 if (unlikely((peer_req
->flags
& EE_WAS_ERROR
) != 0))
335 digest_size
= crypto_hash_digestsize(peer_device
->connection
->csums_tfm
);
336 digest
= kmalloc(digest_size
, GFP_NOIO
);
338 sector_t sector
= peer_req
->i
.sector
;
339 unsigned int size
= peer_req
->i
.size
;
340 drbd_csum_ee(peer_device
->connection
->csums_tfm
, peer_req
, digest
);
341 /* Free peer_req and pages before send.
342 * In case we block on congestion, we could otherwise run into
343 * some distributed deadlock, if the other side blocks on
344 * congestion as well, because our receiver blocks in
345 * drbd_alloc_pages due to pp_in_use > max_buffers. */
346 drbd_free_peer_req(device
, peer_req
);
348 inc_rs_pending(device
);
349 err
= drbd_send_drequest_csum(peer_device
, sector
, size
,
354 drbd_err(device
, "kmalloc() of digest failed.\n");
360 drbd_free_peer_req(device
, peer_req
);
363 drbd_err(device
, "drbd_send_drequest(..., csum) failed\n");
367 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
369 static int read_for_csum(struct drbd_peer_device
*peer_device
, sector_t sector
, int size
)
371 struct drbd_device
*device
= peer_device
->device
;
372 struct drbd_peer_request
*peer_req
;
374 if (!get_ldev(device
))
377 /* GFP_TRY, because if there is no memory available right now, this may
378 * be rescheduled for later. It is "only" background resync, after all. */
379 peer_req
= drbd_alloc_peer_req(peer_device
, ID_SYNCER
/* unused */, sector
,
380 size
, true /* has real payload */, GFP_TRY
);
384 peer_req
->w
.cb
= w_e_send_csum
;
385 spin_lock_irq(&device
->resource
->req_lock
);
386 list_add_tail(&peer_req
->w
.list
, &device
->read_ee
);
387 spin_unlock_irq(&device
->resource
->req_lock
);
389 atomic_add(size
>> 9, &device
->rs_sect_ev
);
390 if (drbd_submit_peer_request(device
, peer_req
, READ
, DRBD_FAULT_RS_RD
) == 0)
393 /* If it failed because of ENOMEM, retry should help. If it failed
394 * because bio_add_page failed (probably broken lower level driver),
395 * retry may or may not help.
396 * If it does not, you may need to force disconnect. */
397 spin_lock_irq(&device
->resource
->req_lock
);
398 list_del(&peer_req
->w
.list
);
399 spin_unlock_irq(&device
->resource
->req_lock
);
401 drbd_free_peer_req(device
, peer_req
);
407 int w_resync_timer(struct drbd_work
*w
, int cancel
)
409 struct drbd_device
*device
=
410 container_of(w
, struct drbd_device
, resync_work
);
412 switch (device
->state
.conn
) {
414 make_ov_request(device
, cancel
);
417 make_resync_request(device
, cancel
);
424 void resync_timer_fn(unsigned long data
)
426 struct drbd_device
*device
= (struct drbd_device
*) data
;
428 drbd_queue_work_if_unqueued(
429 &first_peer_device(device
)->connection
->sender_work
,
430 &device
->resync_work
);
433 static void fifo_set(struct fifo_buffer
*fb
, int value
)
437 for (i
= 0; i
< fb
->size
; i
++)
438 fb
->values
[i
] = value
;
441 static int fifo_push(struct fifo_buffer
*fb
, int value
)
445 ov
= fb
->values
[fb
->head_index
];
446 fb
->values
[fb
->head_index
++] = value
;
448 if (fb
->head_index
>= fb
->size
)
454 static void fifo_add_val(struct fifo_buffer
*fb
, int value
)
458 for (i
= 0; i
< fb
->size
; i
++)
459 fb
->values
[i
] += value
;
462 struct fifo_buffer
*fifo_alloc(int fifo_size
)
464 struct fifo_buffer
*fb
;
466 fb
= kzalloc(sizeof(struct fifo_buffer
) + sizeof(int) * fifo_size
, GFP_NOIO
);
471 fb
->size
= fifo_size
;
477 static int drbd_rs_controller(struct drbd_device
*device
, unsigned int sect_in
)
479 struct disk_conf
*dc
;
480 unsigned int want
; /* The number of sectors we want in-flight */
481 int req_sect
; /* Number of sectors to request in this turn */
482 int correction
; /* Number of sectors more we need in-flight */
483 int cps
; /* correction per invocation of drbd_rs_controller() */
484 int steps
; /* Number of time steps to plan ahead */
487 struct fifo_buffer
*plan
;
489 dc
= rcu_dereference(device
->ldev
->disk_conf
);
490 plan
= rcu_dereference(device
->rs_plan_s
);
492 steps
= plan
->size
; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
494 if (device
->rs_in_flight
+ sect_in
== 0) { /* At start of resync */
495 want
= ((dc
->resync_rate
* 2 * SLEEP_TIME
) / HZ
) * steps
;
496 } else { /* normal path */
497 want
= dc
->c_fill_target
? dc
->c_fill_target
:
498 sect_in
* dc
->c_delay_target
* HZ
/ (SLEEP_TIME
* 10);
501 correction
= want
- device
->rs_in_flight
- plan
->total
;
504 cps
= correction
/ steps
;
505 fifo_add_val(plan
, cps
);
506 plan
->total
+= cps
* steps
;
508 /* What we do in this step */
509 curr_corr
= fifo_push(plan
, 0);
510 plan
->total
-= curr_corr
;
512 req_sect
= sect_in
+ curr_corr
;
516 max_sect
= (dc
->c_max_rate
* 2 * SLEEP_TIME
) / HZ
;
517 if (req_sect
> max_sect
)
521 drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
522 sect_in, device->rs_in_flight, want, correction,
523 steps, cps, device->rs_planed, curr_corr, req_sect);
529 static int drbd_rs_number_requests(struct drbd_device
*device
)
531 unsigned int sect_in
; /* Number of sectors that came in since the last turn */
534 sect_in
= atomic_xchg(&device
->rs_sect_in
, 0);
535 device
->rs_in_flight
-= sect_in
;
538 mxb
= drbd_get_max_buffers(device
) / 2;
539 if (rcu_dereference(device
->rs_plan_s
)->size
) {
540 number
= drbd_rs_controller(device
, sect_in
) >> (BM_BLOCK_SHIFT
- 9);
541 device
->c_sync_rate
= number
* HZ
* (BM_BLOCK_SIZE
/ 1024) / SLEEP_TIME
;
543 device
->c_sync_rate
= rcu_dereference(device
->ldev
->disk_conf
)->resync_rate
;
544 number
= SLEEP_TIME
* device
->c_sync_rate
/ ((BM_BLOCK_SIZE
/ 1024) * HZ
);
548 /* Don't have more than "max-buffers"/2 in-flight.
549 * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
550 * potentially causing a distributed deadlock on congestion during
551 * online-verify or (checksum-based) resync, if max-buffers,
552 * socket buffer sizes and resync rate settings are mis-configured. */
554 /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
555 * mxb (as used here, and in drbd_alloc_pages on the peer) is
556 * "number of pages" (typically also 4k),
557 * but "rs_in_flight" is in "sectors" (512 Byte). */
558 if (mxb
- device
->rs_in_flight
/8 < number
)
559 number
= mxb
- device
->rs_in_flight
/8;
564 static int make_resync_request(struct drbd_device
*const device
, int cancel
)
566 struct drbd_peer_device
*const peer_device
= first_peer_device(device
);
567 struct drbd_connection
*const connection
= peer_device
? peer_device
->connection
: NULL
;
570 const sector_t capacity
= drbd_get_capacity(device
->this_bdev
);
572 int number
, rollback_i
, size
;
573 int align
, requeue
= 0;
576 if (unlikely(cancel
))
579 if (device
->rs_total
== 0) {
581 drbd_resync_finished(device
);
585 if (!get_ldev(device
)) {
586 /* Since we only need to access device->rsync a
587 get_ldev_if_state(device,D_FAILED) would be sufficient, but
588 to continue resync with a broken disk makes no sense at
590 drbd_err(device
, "Disk broke down during resync!\n");
594 max_bio_size
= queue_max_hw_sectors(device
->rq_queue
) << 9;
595 number
= drbd_rs_number_requests(device
);
599 for (i
= 0; i
< number
; i
++) {
600 /* Stop generating RS requests when half of the send buffer is filled,
601 * but notify TCP that we'd like to have more space. */
602 mutex_lock(&connection
->data
.mutex
);
603 if (connection
->data
.socket
) {
604 struct sock
*sk
= connection
->data
.socket
->sk
;
605 int queued
= sk
->sk_wmem_queued
;
606 int sndbuf
= sk
->sk_sndbuf
;
607 if (queued
> sndbuf
/ 2) {
610 set_bit(SOCK_NOSPACE
, &sk
->sk_socket
->flags
);
614 mutex_unlock(&connection
->data
.mutex
);
619 size
= BM_BLOCK_SIZE
;
620 bit
= drbd_bm_find_next(device
, device
->bm_resync_fo
);
622 if (bit
== DRBD_END_OF_BITMAP
) {
623 device
->bm_resync_fo
= drbd_bm_bits(device
);
628 sector
= BM_BIT_TO_SECT(bit
);
630 if (drbd_try_rs_begin_io(device
, sector
)) {
631 device
->bm_resync_fo
= bit
;
634 device
->bm_resync_fo
= bit
+ 1;
636 if (unlikely(drbd_bm_test_bit(device
, bit
) == 0)) {
637 drbd_rs_complete_io(device
, sector
);
641 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
642 /* try to find some adjacent bits.
643 * we stop if we have already the maximum req size.
645 * Additionally always align bigger requests, in order to
646 * be prepared for all stripe sizes of software RAIDs.
651 if (size
+ BM_BLOCK_SIZE
> max_bio_size
)
654 /* Be always aligned */
655 if (sector
& ((1<<(align
+3))-1))
658 /* do not cross extent boundaries */
659 if (((bit
+1) & BM_BLOCKS_PER_BM_EXT_MASK
) == 0)
661 /* now, is it actually dirty, after all?
662 * caution, drbd_bm_test_bit is tri-state for some
663 * obscure reason; ( b == 0 ) would get the out-of-band
664 * only accidentally right because of the "oddly sized"
665 * adjustment below */
666 if (drbd_bm_test_bit(device
, bit
+1) != 1)
669 size
+= BM_BLOCK_SIZE
;
670 if ((BM_BLOCK_SIZE
<< align
) <= size
)
674 /* if we merged some,
675 * reset the offset to start the next drbd_bm_find_next from */
676 if (size
> BM_BLOCK_SIZE
)
677 device
->bm_resync_fo
= bit
+ 1;
680 /* adjust very last sectors, in case we are oddly sized */
681 if (sector
+ (size
>>9) > capacity
)
682 size
= (capacity
-sector
)<<9;
684 if (device
->use_csums
) {
685 switch (read_for_csum(peer_device
, sector
, size
)) {
686 case -EIO
: /* Disk failure */
689 case -EAGAIN
: /* allocation failed, or ldev busy */
690 drbd_rs_complete_io(device
, sector
);
691 device
->bm_resync_fo
= BM_SECT_TO_BIT(sector
);
703 inc_rs_pending(device
);
704 err
= drbd_send_drequest(peer_device
, P_RS_DATA_REQUEST
,
705 sector
, size
, ID_SYNCER
);
707 drbd_err(device
, "drbd_send_drequest() failed, aborting...\n");
708 dec_rs_pending(device
);
715 if (device
->bm_resync_fo
>= drbd_bm_bits(device
)) {
716 /* last syncer _request_ was sent,
717 * but the P_RS_DATA_REPLY not yet received. sync will end (and
718 * next sync group will resume), as soon as we receive the last
719 * resync data block, and the last bit is cleared.
720 * until then resync "work" is "inactive" ...
727 device
->rs_in_flight
+= (i
<< (BM_BLOCK_SHIFT
- 9));
728 mod_timer(&device
->resync_timer
, jiffies
+ SLEEP_TIME
);
733 static int make_ov_request(struct drbd_device
*device
, int cancel
)
737 const sector_t capacity
= drbd_get_capacity(device
->this_bdev
);
738 bool stop_sector_reached
= false;
740 if (unlikely(cancel
))
743 number
= drbd_rs_number_requests(device
);
745 sector
= device
->ov_position
;
746 for (i
= 0; i
< number
; i
++) {
747 if (sector
>= capacity
)
750 /* We check for "finished" only in the reply path:
751 * w_e_end_ov_reply().
752 * We need to send at least one request out. */
753 stop_sector_reached
= i
> 0
754 && verify_can_do_stop_sector(device
)
755 && sector
>= device
->ov_stop_sector
;
756 if (stop_sector_reached
)
759 size
= BM_BLOCK_SIZE
;
761 if (drbd_try_rs_begin_io(device
, sector
)) {
762 device
->ov_position
= sector
;
766 if (sector
+ (size
>>9) > capacity
)
767 size
= (capacity
-sector
)<<9;
769 inc_rs_pending(device
);
770 if (drbd_send_ov_request(first_peer_device(device
), sector
, size
)) {
771 dec_rs_pending(device
);
774 sector
+= BM_SECT_PER_BIT
;
776 device
->ov_position
= sector
;
779 device
->rs_in_flight
+= (i
<< (BM_BLOCK_SHIFT
- 9));
780 if (i
== 0 || !stop_sector_reached
)
781 mod_timer(&device
->resync_timer
, jiffies
+ SLEEP_TIME
);
785 int w_ov_finished(struct drbd_work
*w
, int cancel
)
787 struct drbd_device_work
*dw
=
788 container_of(w
, struct drbd_device_work
, w
);
789 struct drbd_device
*device
= dw
->device
;
791 ov_out_of_sync_print(device
);
792 drbd_resync_finished(device
);
797 static int w_resync_finished(struct drbd_work
*w
, int cancel
)
799 struct drbd_device_work
*dw
=
800 container_of(w
, struct drbd_device_work
, w
);
801 struct drbd_device
*device
= dw
->device
;
804 drbd_resync_finished(device
);
809 static void ping_peer(struct drbd_device
*device
)
811 struct drbd_connection
*connection
= first_peer_device(device
)->connection
;
813 clear_bit(GOT_PING_ACK
, &connection
->flags
);
814 request_ping(connection
);
815 wait_event(connection
->ping_wait
,
816 test_bit(GOT_PING_ACK
, &connection
->flags
) || device
->state
.conn
< C_CONNECTED
);
819 int drbd_resync_finished(struct drbd_device
*device
)
821 unsigned long db
, dt
, dbdt
;
823 union drbd_state os
, ns
;
824 struct drbd_device_work
*dw
;
825 char *khelper_cmd
= NULL
;
828 /* Remove all elements from the resync LRU. Since future actions
829 * might set bits in the (main) bitmap, then the entries in the
830 * resync LRU would be wrong. */
831 if (drbd_rs_del_all(device
)) {
832 /* In case this is not possible now, most probably because
833 * there are P_RS_DATA_REPLY Packets lingering on the worker's
834 * queue (or even the read operations for those packets
835 * is not finished by now). Retry in 100ms. */
837 schedule_timeout_interruptible(HZ
/ 10);
838 dw
= kmalloc(sizeof(struct drbd_device_work
), GFP_ATOMIC
);
840 dw
->w
.cb
= w_resync_finished
;
842 drbd_queue_work(&first_peer_device(device
)->connection
->sender_work
,
846 drbd_err(device
, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
849 dt
= (jiffies
- device
->rs_start
- device
->rs_paused
) / HZ
;
853 db
= device
->rs_total
;
854 /* adjust for verify start and stop sectors, respective reached position */
855 if (device
->state
.conn
== C_VERIFY_S
|| device
->state
.conn
== C_VERIFY_T
)
856 db
-= device
->ov_left
;
858 dbdt
= Bit2KB(db
/dt
);
859 device
->rs_paused
/= HZ
;
861 if (!get_ldev(device
))
866 spin_lock_irq(&device
->resource
->req_lock
);
867 os
= drbd_read_state(device
);
869 verify_done
= (os
.conn
== C_VERIFY_S
|| os
.conn
== C_VERIFY_T
);
871 /* This protects us against multiple calls (that can happen in the presence
872 of application IO), and against connectivity loss just before we arrive here. */
873 if (os
.conn
<= C_CONNECTED
)
877 ns
.conn
= C_CONNECTED
;
879 drbd_info(device
, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
880 verify_done
? "Online verify" : "Resync",
881 dt
+ device
->rs_paused
, device
->rs_paused
, dbdt
);
883 n_oos
= drbd_bm_total_weight(device
);
885 if (os
.conn
== C_VERIFY_S
|| os
.conn
== C_VERIFY_T
) {
887 drbd_alert(device
, "Online verify found %lu %dk block out of sync!\n",
889 khelper_cmd
= "out-of-sync";
892 D_ASSERT(device
, (n_oos
- device
->rs_failed
) == 0);
894 if (os
.conn
== C_SYNC_TARGET
|| os
.conn
== C_PAUSED_SYNC_T
)
895 khelper_cmd
= "after-resync-target";
897 if (device
->use_csums
&& device
->rs_total
) {
898 const unsigned long s
= device
->rs_same_csum
;
899 const unsigned long t
= device
->rs_total
;
902 (t
< 100000) ? ((s
*100)/t
) : (s
/(t
/100));
903 drbd_info(device
, "%u %% had equal checksums, eliminated: %luK; "
904 "transferred %luK total %luK\n",
906 Bit2KB(device
->rs_same_csum
),
907 Bit2KB(device
->rs_total
- device
->rs_same_csum
),
908 Bit2KB(device
->rs_total
));
912 if (device
->rs_failed
) {
913 drbd_info(device
, " %lu failed blocks\n", device
->rs_failed
);
915 if (os
.conn
== C_SYNC_TARGET
|| os
.conn
== C_PAUSED_SYNC_T
) {
916 ns
.disk
= D_INCONSISTENT
;
917 ns
.pdsk
= D_UP_TO_DATE
;
919 ns
.disk
= D_UP_TO_DATE
;
920 ns
.pdsk
= D_INCONSISTENT
;
923 ns
.disk
= D_UP_TO_DATE
;
924 ns
.pdsk
= D_UP_TO_DATE
;
926 if (os
.conn
== C_SYNC_TARGET
|| os
.conn
== C_PAUSED_SYNC_T
) {
927 if (device
->p_uuid
) {
929 for (i
= UI_BITMAP
; i
<= UI_HISTORY_END
; i
++)
930 _drbd_uuid_set(device
, i
, device
->p_uuid
[i
]);
931 drbd_uuid_set(device
, UI_BITMAP
, device
->ldev
->md
.uuid
[UI_CURRENT
]);
932 _drbd_uuid_set(device
, UI_CURRENT
, device
->p_uuid
[UI_CURRENT
]);
934 drbd_err(device
, "device->p_uuid is NULL! BUG\n");
938 if (!(os
.conn
== C_VERIFY_S
|| os
.conn
== C_VERIFY_T
)) {
939 /* for verify runs, we don't update uuids here,
940 * so there would be nothing to report. */
941 drbd_uuid_set_bm(device
, 0UL);
942 drbd_print_uuids(device
, "updated UUIDs");
943 if (device
->p_uuid
) {
944 /* Now the two UUID sets are equal, update what we
945 * know of the peer. */
947 for (i
= UI_CURRENT
; i
<= UI_HISTORY_END
; i
++)
948 device
->p_uuid
[i
] = device
->ldev
->md
.uuid
[i
];
953 _drbd_set_state(device
, ns
, CS_VERBOSE
, NULL
);
955 spin_unlock_irq(&device
->resource
->req_lock
);
958 device
->rs_total
= 0;
959 device
->rs_failed
= 0;
960 device
->rs_paused
= 0;
962 /* reset start sector, if we reached end of device */
963 if (verify_done
&& device
->ov_left
== 0)
964 device
->ov_start_sector
= 0;
966 drbd_md_sync(device
);
969 drbd_khelper(device
, khelper_cmd
);
975 static void move_to_net_ee_or_free(struct drbd_device
*device
, struct drbd_peer_request
*peer_req
)
977 if (drbd_peer_req_has_active_page(peer_req
)) {
978 /* This might happen if sendpage() has not finished */
979 int i
= (peer_req
->i
.size
+ PAGE_SIZE
-1) >> PAGE_SHIFT
;
980 atomic_add(i
, &device
->pp_in_use_by_net
);
981 atomic_sub(i
, &device
->pp_in_use
);
982 spin_lock_irq(&device
->resource
->req_lock
);
983 list_add_tail(&peer_req
->w
.list
, &device
->net_ee
);
984 spin_unlock_irq(&device
->resource
->req_lock
);
985 wake_up(&drbd_pp_wait
);
987 drbd_free_peer_req(device
, peer_req
);
991 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
992 * @device: DRBD device.
994 * @cancel: The connection will be closed anyways
996 int w_e_end_data_req(struct drbd_work
*w
, int cancel
)
998 struct drbd_peer_request
*peer_req
= container_of(w
, struct drbd_peer_request
, w
);
999 struct drbd_peer_device
*peer_device
= peer_req
->peer_device
;
1000 struct drbd_device
*device
= peer_device
->device
;
1003 if (unlikely(cancel
)) {
1004 drbd_free_peer_req(device
, peer_req
);
1005 dec_unacked(device
);
1009 if (likely((peer_req
->flags
& EE_WAS_ERROR
) == 0)) {
1010 err
= drbd_send_block(peer_device
, P_DATA_REPLY
, peer_req
);
1012 if (__ratelimit(&drbd_ratelimit_state
))
1013 drbd_err(device
, "Sending NegDReply. sector=%llus.\n",
1014 (unsigned long long)peer_req
->i
.sector
);
1016 err
= drbd_send_ack(peer_device
, P_NEG_DREPLY
, peer_req
);
1019 dec_unacked(device
);
1021 move_to_net_ee_or_free(device
, peer_req
);
1024 drbd_err(device
, "drbd_send_block() failed\n");
1029 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1031 * @cancel: The connection will be closed anyways
1033 int w_e_end_rsdata_req(struct drbd_work
*w
, int cancel
)
1035 struct drbd_peer_request
*peer_req
= container_of(w
, struct drbd_peer_request
, w
);
1036 struct drbd_peer_device
*peer_device
= peer_req
->peer_device
;
1037 struct drbd_device
*device
= peer_device
->device
;
1040 if (unlikely(cancel
)) {
1041 drbd_free_peer_req(device
, peer_req
);
1042 dec_unacked(device
);
1046 if (get_ldev_if_state(device
, D_FAILED
)) {
1047 drbd_rs_complete_io(device
, peer_req
->i
.sector
);
1051 if (device
->state
.conn
== C_AHEAD
) {
1052 err
= drbd_send_ack(peer_device
, P_RS_CANCEL
, peer_req
);
1053 } else if (likely((peer_req
->flags
& EE_WAS_ERROR
) == 0)) {
1054 if (likely(device
->state
.pdsk
>= D_INCONSISTENT
)) {
1055 inc_rs_pending(device
);
1056 err
= drbd_send_block(peer_device
, P_RS_DATA_REPLY
, peer_req
);
1058 if (__ratelimit(&drbd_ratelimit_state
))
1059 drbd_err(device
, "Not sending RSDataReply, "
1060 "partner DISKLESS!\n");
1064 if (__ratelimit(&drbd_ratelimit_state
))
1065 drbd_err(device
, "Sending NegRSDReply. sector %llus.\n",
1066 (unsigned long long)peer_req
->i
.sector
);
1068 err
= drbd_send_ack(peer_device
, P_NEG_RS_DREPLY
, peer_req
);
1070 /* update resync data with failure */
1071 drbd_rs_failed_io(device
, peer_req
->i
.sector
, peer_req
->i
.size
);
1074 dec_unacked(device
);
1076 move_to_net_ee_or_free(device
, peer_req
);
1079 drbd_err(device
, "drbd_send_block() failed\n");
1083 int w_e_end_csum_rs_req(struct drbd_work
*w
, int cancel
)
1085 struct drbd_peer_request
*peer_req
= container_of(w
, struct drbd_peer_request
, w
);
1086 struct drbd_peer_device
*peer_device
= peer_req
->peer_device
;
1087 struct drbd_device
*device
= peer_device
->device
;
1088 struct digest_info
*di
;
1090 void *digest
= NULL
;
1093 if (unlikely(cancel
)) {
1094 drbd_free_peer_req(device
, peer_req
);
1095 dec_unacked(device
);
1099 if (get_ldev(device
)) {
1100 drbd_rs_complete_io(device
, peer_req
->i
.sector
);
1104 di
= peer_req
->digest
;
1106 if (likely((peer_req
->flags
& EE_WAS_ERROR
) == 0)) {
1107 /* quick hack to try to avoid a race against reconfiguration.
1108 * a real fix would be much more involved,
1109 * introducing more locking mechanisms */
1110 if (peer_device
->connection
->csums_tfm
) {
1111 digest_size
= crypto_hash_digestsize(peer_device
->connection
->csums_tfm
);
1112 D_ASSERT(device
, digest_size
== di
->digest_size
);
1113 digest
= kmalloc(digest_size
, GFP_NOIO
);
1116 drbd_csum_ee(peer_device
->connection
->csums_tfm
, peer_req
, digest
);
1117 eq
= !memcmp(digest
, di
->digest
, digest_size
);
1122 drbd_set_in_sync(device
, peer_req
->i
.sector
, peer_req
->i
.size
);
1123 /* rs_same_csums unit is BM_BLOCK_SIZE */
1124 device
->rs_same_csum
+= peer_req
->i
.size
>> BM_BLOCK_SHIFT
;
1125 err
= drbd_send_ack(peer_device
, P_RS_IS_IN_SYNC
, peer_req
);
1127 inc_rs_pending(device
);
1128 peer_req
->block_id
= ID_SYNCER
; /* By setting block_id, digest pointer becomes invalid! */
1129 peer_req
->flags
&= ~EE_HAS_DIGEST
; /* This peer request no longer has a digest pointer */
1131 err
= drbd_send_block(peer_device
, P_RS_DATA_REPLY
, peer_req
);
1134 err
= drbd_send_ack(peer_device
, P_NEG_RS_DREPLY
, peer_req
);
1135 if (__ratelimit(&drbd_ratelimit_state
))
1136 drbd_err(device
, "Sending NegDReply. I guess it gets messy.\n");
1139 dec_unacked(device
);
1140 move_to_net_ee_or_free(device
, peer_req
);
1143 drbd_err(device
, "drbd_send_block/ack() failed\n");
1147 int w_e_end_ov_req(struct drbd_work
*w
, int cancel
)
1149 struct drbd_peer_request
*peer_req
= container_of(w
, struct drbd_peer_request
, w
);
1150 struct drbd_peer_device
*peer_device
= peer_req
->peer_device
;
1151 struct drbd_device
*device
= peer_device
->device
;
1152 sector_t sector
= peer_req
->i
.sector
;
1153 unsigned int size
= peer_req
->i
.size
;
1158 if (unlikely(cancel
))
1161 digest_size
= crypto_hash_digestsize(peer_device
->connection
->verify_tfm
);
1162 digest
= kmalloc(digest_size
, GFP_NOIO
);
1164 err
= 1; /* terminate the connection in case the allocation failed */
1168 if (likely(!(peer_req
->flags
& EE_WAS_ERROR
)))
1169 drbd_csum_ee(peer_device
->connection
->verify_tfm
, peer_req
, digest
);
1171 memset(digest
, 0, digest_size
);
1173 /* Free e and pages before send.
1174 * In case we block on congestion, we could otherwise run into
1175 * some distributed deadlock, if the other side blocks on
1176 * congestion as well, because our receiver blocks in
1177 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1178 drbd_free_peer_req(device
, peer_req
);
1180 inc_rs_pending(device
);
1181 err
= drbd_send_drequest_csum(peer_device
, sector
, size
, digest
, digest_size
, P_OV_REPLY
);
1183 dec_rs_pending(device
);
1188 drbd_free_peer_req(device
, peer_req
);
1189 dec_unacked(device
);
1193 void drbd_ov_out_of_sync_found(struct drbd_device
*device
, sector_t sector
, int size
)
1195 if (device
->ov_last_oos_start
+ device
->ov_last_oos_size
== sector
) {
1196 device
->ov_last_oos_size
+= size
>>9;
1198 device
->ov_last_oos_start
= sector
;
1199 device
->ov_last_oos_size
= size
>>9;
1201 drbd_set_out_of_sync(device
, sector
, size
);
1204 int w_e_end_ov_reply(struct drbd_work
*w
, int cancel
)
1206 struct drbd_peer_request
*peer_req
= container_of(w
, struct drbd_peer_request
, w
);
1207 struct drbd_peer_device
*peer_device
= peer_req
->peer_device
;
1208 struct drbd_device
*device
= peer_device
->device
;
1209 struct digest_info
*di
;
1211 sector_t sector
= peer_req
->i
.sector
;
1212 unsigned int size
= peer_req
->i
.size
;
1215 bool stop_sector_reached
= false;
1217 if (unlikely(cancel
)) {
1218 drbd_free_peer_req(device
, peer_req
);
1219 dec_unacked(device
);
1223 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1224 * the resync lru has been cleaned up already */
1225 if (get_ldev(device
)) {
1226 drbd_rs_complete_io(device
, peer_req
->i
.sector
);
1230 di
= peer_req
->digest
;
1232 if (likely((peer_req
->flags
& EE_WAS_ERROR
) == 0)) {
1233 digest_size
= crypto_hash_digestsize(peer_device
->connection
->verify_tfm
);
1234 digest
= kmalloc(digest_size
, GFP_NOIO
);
1236 drbd_csum_ee(peer_device
->connection
->verify_tfm
, peer_req
, digest
);
1238 D_ASSERT(device
, digest_size
== di
->digest_size
);
1239 eq
= !memcmp(digest
, di
->digest
, digest_size
);
1244 /* Free peer_req and pages before send.
1245 * In case we block on congestion, we could otherwise run into
1246 * some distributed deadlock, if the other side blocks on
1247 * congestion as well, because our receiver blocks in
1248 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1249 drbd_free_peer_req(device
, peer_req
);
1251 drbd_ov_out_of_sync_found(device
, sector
, size
);
1253 ov_out_of_sync_print(device
);
1255 err
= drbd_send_ack_ex(peer_device
, P_OV_RESULT
, sector
, size
,
1256 eq
? ID_IN_SYNC
: ID_OUT_OF_SYNC
);
1258 dec_unacked(device
);
1262 /* let's advance progress step marks only for every other megabyte */
1263 if ((device
->ov_left
& 0x200) == 0x200)
1264 drbd_advance_rs_marks(device
, device
->ov_left
);
1266 stop_sector_reached
= verify_can_do_stop_sector(device
) &&
1267 (sector
+ (size
>>9)) >= device
->ov_stop_sector
;
1269 if (device
->ov_left
== 0 || stop_sector_reached
) {
1270 ov_out_of_sync_print(device
);
1271 drbd_resync_finished(device
);
1278 * We need to track the number of pending barrier acks,
1279 * and to be able to wait for them.
1280 * See also comment in drbd_adm_attach before drbd_suspend_io.
1282 static int drbd_send_barrier(struct drbd_connection
*connection
)
1284 struct p_barrier
*p
;
1285 struct drbd_socket
*sock
;
1287 sock
= &connection
->data
;
1288 p
= conn_prepare_command(connection
, sock
);
1291 p
->barrier
= connection
->send
.current_epoch_nr
;
1293 connection
->send
.current_epoch_writes
= 0;
1295 return conn_send_command(connection
, sock
, P_BARRIER
, sizeof(*p
), NULL
, 0);
1298 int w_send_write_hint(struct drbd_work
*w
, int cancel
)
1300 struct drbd_device
*device
=
1301 container_of(w
, struct drbd_device
, unplug_work
);
1302 struct drbd_socket
*sock
;
1306 sock
= &first_peer_device(device
)->connection
->data
;
1307 if (!drbd_prepare_command(first_peer_device(device
), sock
))
1309 return drbd_send_command(first_peer_device(device
), sock
, P_UNPLUG_REMOTE
, 0, NULL
, 0);
1312 static void re_init_if_first_write(struct drbd_connection
*connection
, unsigned int epoch
)
1314 if (!connection
->send
.seen_any_write_yet
) {
1315 connection
->send
.seen_any_write_yet
= true;
1316 connection
->send
.current_epoch_nr
= epoch
;
1317 connection
->send
.current_epoch_writes
= 0;
1321 static void maybe_send_barrier(struct drbd_connection
*connection
, unsigned int epoch
)
1323 /* re-init if first write on this connection */
1324 if (!connection
->send
.seen_any_write_yet
)
1326 if (connection
->send
.current_epoch_nr
!= epoch
) {
1327 if (connection
->send
.current_epoch_writes
)
1328 drbd_send_barrier(connection
);
1329 connection
->send
.current_epoch_nr
= epoch
;
1333 int w_send_out_of_sync(struct drbd_work
*w
, int cancel
)
1335 struct drbd_request
*req
= container_of(w
, struct drbd_request
, w
);
1336 struct drbd_device
*device
= req
->device
;
1337 struct drbd_peer_device
*const peer_device
= first_peer_device(device
);
1338 struct drbd_connection
*const connection
= peer_device
->connection
;
1341 if (unlikely(cancel
)) {
1342 req_mod(req
, SEND_CANCELED
);
1345 req
->pre_send_jif
= jiffies
;
1347 /* this time, no connection->send.current_epoch_writes++;
1348 * If it was sent, it was the closing barrier for the last
1349 * replicated epoch, before we went into AHEAD mode.
1350 * No more barriers will be sent, until we leave AHEAD mode again. */
1351 maybe_send_barrier(connection
, req
->epoch
);
1353 err
= drbd_send_out_of_sync(peer_device
, req
);
1354 req_mod(req
, OOS_HANDED_TO_NETWORK
);
1360 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1362 * @cancel: The connection will be closed anyways
1364 int w_send_dblock(struct drbd_work
*w
, int cancel
)
1366 struct drbd_request
*req
= container_of(w
, struct drbd_request
, w
);
1367 struct drbd_device
*device
= req
->device
;
1368 struct drbd_peer_device
*const peer_device
= first_peer_device(device
);
1369 struct drbd_connection
*connection
= peer_device
->connection
;
1372 if (unlikely(cancel
)) {
1373 req_mod(req
, SEND_CANCELED
);
1376 req
->pre_send_jif
= jiffies
;
1378 re_init_if_first_write(connection
, req
->epoch
);
1379 maybe_send_barrier(connection
, req
->epoch
);
1380 connection
->send
.current_epoch_writes
++;
1382 err
= drbd_send_dblock(peer_device
, req
);
1383 req_mod(req
, err
? SEND_FAILED
: HANDED_OVER_TO_NETWORK
);
1389 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1391 * @cancel: The connection will be closed anyways
1393 int w_send_read_req(struct drbd_work
*w
, int cancel
)
1395 struct drbd_request
*req
= container_of(w
, struct drbd_request
, w
);
1396 struct drbd_device
*device
= req
->device
;
1397 struct drbd_peer_device
*const peer_device
= first_peer_device(device
);
1398 struct drbd_connection
*connection
= peer_device
->connection
;
1401 if (unlikely(cancel
)) {
1402 req_mod(req
, SEND_CANCELED
);
1405 req
->pre_send_jif
= jiffies
;
1407 /* Even read requests may close a write epoch,
1408 * if there was any yet. */
1409 maybe_send_barrier(connection
, req
->epoch
);
1411 err
= drbd_send_drequest(peer_device
, P_DATA_REQUEST
, req
->i
.sector
, req
->i
.size
,
1412 (unsigned long)req
);
1414 req_mod(req
, err
? SEND_FAILED
: HANDED_OVER_TO_NETWORK
);
1419 int w_restart_disk_io(struct drbd_work
*w
, int cancel
)
1421 struct drbd_request
*req
= container_of(w
, struct drbd_request
, w
);
1422 struct drbd_device
*device
= req
->device
;
1424 if (bio_data_dir(req
->master_bio
) == WRITE
&& req
->rq_state
& RQ_IN_ACT_LOG
)
1425 drbd_al_begin_io(device
, &req
->i
);
1427 drbd_req_make_private_bio(req
, req
->master_bio
);
1428 req
->private_bio
->bi_bdev
= device
->ldev
->backing_bdev
;
1429 generic_make_request(req
->private_bio
);
1434 static int _drbd_may_sync_now(struct drbd_device
*device
)
1436 struct drbd_device
*odev
= device
;
1440 if (!odev
->ldev
|| odev
->state
.disk
== D_DISKLESS
)
1443 resync_after
= rcu_dereference(odev
->ldev
->disk_conf
)->resync_after
;
1445 if (resync_after
== -1)
1447 odev
= minor_to_device(resync_after
);
1450 if ((odev
->state
.conn
>= C_SYNC_SOURCE
&&
1451 odev
->state
.conn
<= C_PAUSED_SYNC_T
) ||
1452 odev
->state
.aftr_isp
|| odev
->state
.peer_isp
||
1453 odev
->state
.user_isp
)
1459 * _drbd_pause_after() - Pause resync on all devices that may not resync now
1460 * @device: DRBD device.
1462 * Called from process context only (admin command and after_state_ch).
1464 static int _drbd_pause_after(struct drbd_device
*device
)
1466 struct drbd_device
*odev
;
1470 idr_for_each_entry(&drbd_devices
, odev
, i
) {
1471 if (odev
->state
.conn
== C_STANDALONE
&& odev
->state
.disk
== D_DISKLESS
)
1473 if (!_drbd_may_sync_now(odev
))
1474 rv
|= (__drbd_set_state(_NS(odev
, aftr_isp
, 1), CS_HARD
, NULL
)
1475 != SS_NOTHING_TO_DO
);
1483 * _drbd_resume_next() - Resume resync on all devices that may resync now
1484 * @device: DRBD device.
1486 * Called from process context only (admin command and worker).
1488 static int _drbd_resume_next(struct drbd_device
*device
)
1490 struct drbd_device
*odev
;
1494 idr_for_each_entry(&drbd_devices
, odev
, i
) {
1495 if (odev
->state
.conn
== C_STANDALONE
&& odev
->state
.disk
== D_DISKLESS
)
1497 if (odev
->state
.aftr_isp
) {
1498 if (_drbd_may_sync_now(odev
))
1499 rv
|= (__drbd_set_state(_NS(odev
, aftr_isp
, 0),
1501 != SS_NOTHING_TO_DO
) ;
1508 void resume_next_sg(struct drbd_device
*device
)
1510 write_lock_irq(&global_state_lock
);
1511 _drbd_resume_next(device
);
1512 write_unlock_irq(&global_state_lock
);
1515 void suspend_other_sg(struct drbd_device
*device
)
1517 write_lock_irq(&global_state_lock
);
1518 _drbd_pause_after(device
);
1519 write_unlock_irq(&global_state_lock
);
1522 /* caller must hold global_state_lock */
1523 enum drbd_ret_code
drbd_resync_after_valid(struct drbd_device
*device
, int o_minor
)
1525 struct drbd_device
*odev
;
1530 if (o_minor
< -1 || o_minor
> MINORMASK
)
1531 return ERR_RESYNC_AFTER
;
1533 /* check for loops */
1534 odev
= minor_to_device(o_minor
);
1537 return ERR_RESYNC_AFTER_CYCLE
;
1539 /* You are free to depend on diskless, non-existing,
1540 * or not yet/no longer existing minors.
1541 * We only reject dependency loops.
1542 * We cannot follow the dependency chain beyond a detached or
1545 if (!odev
|| !odev
->ldev
|| odev
->state
.disk
== D_DISKLESS
)
1549 resync_after
= rcu_dereference(odev
->ldev
->disk_conf
)->resync_after
;
1551 /* dependency chain ends here, no cycles. */
1552 if (resync_after
== -1)
1555 /* follow the dependency chain */
1556 odev
= minor_to_device(resync_after
);
1560 /* caller must hold global_state_lock */
1561 void drbd_resync_after_changed(struct drbd_device
*device
)
1566 changes
= _drbd_pause_after(device
);
1567 changes
|= _drbd_resume_next(device
);
1571 void drbd_rs_controller_reset(struct drbd_device
*device
)
1573 struct gendisk
*disk
= device
->ldev
->backing_bdev
->bd_contains
->bd_disk
;
1574 struct fifo_buffer
*plan
;
1576 atomic_set(&device
->rs_sect_in
, 0);
1577 atomic_set(&device
->rs_sect_ev
, 0);
1578 device
->rs_in_flight
= 0;
1579 device
->rs_last_events
=
1580 (int)part_stat_read(&disk
->part0
, sectors
[0]) +
1581 (int)part_stat_read(&disk
->part0
, sectors
[1]);
1583 /* Updating the RCU protected object in place is necessary since
1584 this function gets called from atomic context.
1585 It is valid since all other updates also lead to an completely
1588 plan
= rcu_dereference(device
->rs_plan_s
);
1594 void start_resync_timer_fn(unsigned long data
)
1596 struct drbd_device
*device
= (struct drbd_device
*) data
;
1597 drbd_device_post_work(device
, RS_START
);
1600 static void do_start_resync(struct drbd_device
*device
)
1602 if (atomic_read(&device
->unacked_cnt
) || atomic_read(&device
->rs_pending_cnt
)) {
1603 drbd_warn(device
, "postponing start_resync ...\n");
1604 device
->start_resync_timer
.expires
= jiffies
+ HZ
/10;
1605 add_timer(&device
->start_resync_timer
);
1609 drbd_start_resync(device
, C_SYNC_SOURCE
);
1610 clear_bit(AHEAD_TO_SYNC_SOURCE
, &device
->flags
);
1613 static bool use_checksum_based_resync(struct drbd_connection
*connection
, struct drbd_device
*device
)
1615 bool csums_after_crash_only
;
1617 csums_after_crash_only
= rcu_dereference(connection
->net_conf
)->csums_after_crash_only
;
1619 return connection
->agreed_pro_version
>= 89 && /* supported? */
1620 connection
->csums_tfm
&& /* configured? */
1621 (csums_after_crash_only
== 0 /* use for each resync? */
1622 || test_bit(CRASHED_PRIMARY
, &device
->flags
)); /* or only after Primary crash? */
1626 * drbd_start_resync() - Start the resync process
1627 * @device: DRBD device.
1628 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET
1630 * This function might bring you directly into one of the
1631 * C_PAUSED_SYNC_* states.
1633 void drbd_start_resync(struct drbd_device
*device
, enum drbd_conns side
)
1635 struct drbd_peer_device
*peer_device
= first_peer_device(device
);
1636 struct drbd_connection
*connection
= peer_device
? peer_device
->connection
: NULL
;
1637 union drbd_state ns
;
1640 if (device
->state
.conn
>= C_SYNC_SOURCE
&& device
->state
.conn
< C_AHEAD
) {
1641 drbd_err(device
, "Resync already running!\n");
1645 if (!test_bit(B_RS_H_DONE
, &device
->flags
)) {
1646 if (side
== C_SYNC_TARGET
) {
1647 /* Since application IO was locked out during C_WF_BITMAP_T and
1648 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1649 we check that we might make the data inconsistent. */
1650 r
= drbd_khelper(device
, "before-resync-target");
1651 r
= (r
>> 8) & 0xff;
1653 drbd_info(device
, "before-resync-target handler returned %d, "
1654 "dropping connection.\n", r
);
1655 conn_request_state(connection
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
1658 } else /* C_SYNC_SOURCE */ {
1659 r
= drbd_khelper(device
, "before-resync-source");
1660 r
= (r
>> 8) & 0xff;
1663 drbd_info(device
, "before-resync-source handler returned %d, "
1664 "ignoring. Old userland tools?", r
);
1666 drbd_info(device
, "before-resync-source handler returned %d, "
1667 "dropping connection.\n", r
);
1668 conn_request_state(connection
,
1669 NS(conn
, C_DISCONNECTING
), CS_HARD
);
1676 if (current
== connection
->worker
.task
) {
1677 /* The worker should not sleep waiting for state_mutex,
1678 that can take long */
1679 if (!mutex_trylock(device
->state_mutex
)) {
1680 set_bit(B_RS_H_DONE
, &device
->flags
);
1681 device
->start_resync_timer
.expires
= jiffies
+ HZ
/5;
1682 add_timer(&device
->start_resync_timer
);
1686 mutex_lock(device
->state_mutex
);
1688 clear_bit(B_RS_H_DONE
, &device
->flags
);
1690 /* req_lock: serialize with drbd_send_and_submit() and others
1691 * global_state_lock: for stable sync-after dependencies */
1692 spin_lock_irq(&device
->resource
->req_lock
);
1693 write_lock(&global_state_lock
);
1694 /* Did some connection breakage or IO error race with us? */
1695 if (device
->state
.conn
< C_CONNECTED
1696 || !get_ldev_if_state(device
, D_NEGOTIATING
)) {
1697 write_unlock(&global_state_lock
);
1698 spin_unlock_irq(&device
->resource
->req_lock
);
1699 mutex_unlock(device
->state_mutex
);
1703 ns
= drbd_read_state(device
);
1705 ns
.aftr_isp
= !_drbd_may_sync_now(device
);
1709 if (side
== C_SYNC_TARGET
)
1710 ns
.disk
= D_INCONSISTENT
;
1711 else /* side == C_SYNC_SOURCE */
1712 ns
.pdsk
= D_INCONSISTENT
;
1714 r
= __drbd_set_state(device
, ns
, CS_VERBOSE
, NULL
);
1715 ns
= drbd_read_state(device
);
1717 if (ns
.conn
< C_CONNECTED
)
1718 r
= SS_UNKNOWN_ERROR
;
1720 if (r
== SS_SUCCESS
) {
1721 unsigned long tw
= drbd_bm_total_weight(device
);
1722 unsigned long now
= jiffies
;
1725 device
->rs_failed
= 0;
1726 device
->rs_paused
= 0;
1727 device
->rs_same_csum
= 0;
1728 device
->rs_last_sect_ev
= 0;
1729 device
->rs_total
= tw
;
1730 device
->rs_start
= now
;
1731 for (i
= 0; i
< DRBD_SYNC_MARKS
; i
++) {
1732 device
->rs_mark_left
[i
] = tw
;
1733 device
->rs_mark_time
[i
] = now
;
1735 _drbd_pause_after(device
);
1736 /* Forget potentially stale cached per resync extent bit-counts.
1737 * Open coded drbd_rs_cancel_all(device), we already have IRQs
1738 * disabled, and know the disk state is ok. */
1739 spin_lock(&device
->al_lock
);
1740 lc_reset(device
->resync
);
1741 device
->resync_locked
= 0;
1742 device
->resync_wenr
= LC_FREE
;
1743 spin_unlock(&device
->al_lock
);
1745 write_unlock(&global_state_lock
);
1746 spin_unlock_irq(&device
->resource
->req_lock
);
1748 if (r
== SS_SUCCESS
) {
1749 wake_up(&device
->al_wait
); /* for lc_reset() above */
1750 /* reset rs_last_bcast when a resync or verify is started,
1751 * to deal with potential jiffies wrap. */
1752 device
->rs_last_bcast
= jiffies
- HZ
;
1754 drbd_info(device
, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1755 drbd_conn_str(ns
.conn
),
1756 (unsigned long) device
->rs_total
<< (BM_BLOCK_SHIFT
-10),
1757 (unsigned long) device
->rs_total
);
1758 if (side
== C_SYNC_TARGET
) {
1759 device
->bm_resync_fo
= 0;
1760 device
->use_csums
= use_checksum_based_resync(connection
, device
);
1762 device
->use_csums
= 0;
1765 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1766 * with w_send_oos, or the sync target will get confused as to
1767 * how much bits to resync. We cannot do that always, because for an
1768 * empty resync and protocol < 95, we need to do it here, as we call
1769 * drbd_resync_finished from here in that case.
1770 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1771 * and from after_state_ch otherwise. */
1772 if (side
== C_SYNC_SOURCE
&& connection
->agreed_pro_version
< 96)
1773 drbd_gen_and_send_sync_uuid(peer_device
);
1775 if (connection
->agreed_pro_version
< 95 && device
->rs_total
== 0) {
1776 /* This still has a race (about when exactly the peers
1777 * detect connection loss) that can lead to a full sync
1778 * on next handshake. In 8.3.9 we fixed this with explicit
1779 * resync-finished notifications, but the fix
1780 * introduces a protocol change. Sleeping for some
1781 * time longer than the ping interval + timeout on the
1782 * SyncSource, to give the SyncTarget the chance to
1783 * detect connection loss, then waiting for a ping
1784 * response (implicit in drbd_resync_finished) reduces
1785 * the race considerably, but does not solve it. */
1786 if (side
== C_SYNC_SOURCE
) {
1787 struct net_conf
*nc
;
1791 nc
= rcu_dereference(connection
->net_conf
);
1792 timeo
= nc
->ping_int
* HZ
+ nc
->ping_timeo
* HZ
/ 9;
1794 schedule_timeout_interruptible(timeo
);
1796 drbd_resync_finished(device
);
1799 drbd_rs_controller_reset(device
);
1800 /* ns.conn may already be != device->state.conn,
1801 * we may have been paused in between, or become paused until
1802 * the timer triggers.
1803 * No matter, that is handled in resync_timer_fn() */
1804 if (ns
.conn
== C_SYNC_TARGET
)
1805 mod_timer(&device
->resync_timer
, jiffies
);
1807 drbd_md_sync(device
);
1810 mutex_unlock(device
->state_mutex
);
1813 static void update_on_disk_bitmap(struct drbd_device
*device
, bool resync_done
)
1815 struct sib_info sib
= { .sib_reason
= SIB_SYNC_PROGRESS
, };
1816 device
->rs_last_bcast
= jiffies
;
1818 if (!get_ldev(device
))
1821 drbd_bm_write_lazy(device
, 0);
1822 if (resync_done
&& is_sync_state(device
->state
.conn
))
1823 drbd_resync_finished(device
);
1825 drbd_bcast_event(device
, &sib
);
1826 /* update timestamp, in case it took a while to write out stuff */
1827 device
->rs_last_bcast
= jiffies
;
1831 static void drbd_ldev_destroy(struct drbd_device
*device
)
1833 lc_destroy(device
->resync
);
1834 device
->resync
= NULL
;
1835 lc_destroy(device
->act_log
);
1836 device
->act_log
= NULL
;
1839 drbd_free_ldev(device
->ldev
);
1840 device
->ldev
= NULL
;
1843 clear_bit(GOING_DISKLESS
, &device
->flags
);
1844 wake_up(&device
->misc_wait
);
1847 static void go_diskless(struct drbd_device
*device
)
1849 D_ASSERT(device
, device
->state
.disk
== D_FAILED
);
1850 /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1851 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1852 * the protected members anymore, though, so once put_ldev reaches zero
1853 * again, it will be safe to free them. */
1855 /* Try to write changed bitmap pages, read errors may have just
1856 * set some bits outside the area covered by the activity log.
1858 * If we have an IO error during the bitmap writeout,
1859 * we will want a full sync next time, just in case.
1860 * (Do we want a specific meta data flag for this?)
1862 * If that does not make it to stable storage either,
1863 * we cannot do anything about that anymore.
1865 * We still need to check if both bitmap and ldev are present, we may
1866 * end up here after a failed attach, before ldev was even assigned.
1868 if (device
->bitmap
&& device
->ldev
) {
1869 /* An interrupted resync or similar is allowed to recounts bits
1871 * Any modifications would not be expected anymore, though.
1873 if (drbd_bitmap_io_from_worker(device
, drbd_bm_write
,
1874 "detach", BM_LOCKED_TEST_ALLOWED
)) {
1875 if (test_bit(WAS_READ_ERROR
, &device
->flags
)) {
1876 drbd_md_set_flag(device
, MDF_FULL_SYNC
);
1877 drbd_md_sync(device
);
1882 drbd_force_state(device
, NS(disk
, D_DISKLESS
));
1885 static int do_md_sync(struct drbd_device
*device
)
1887 drbd_warn(device
, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
1888 drbd_md_sync(device
);
1892 /* only called from drbd_worker thread, no locking */
1893 void __update_timing_details(
1894 struct drbd_thread_timing_details
*tdp
,
1895 unsigned int *cb_nr
,
1897 const char *fn
, const unsigned int line
)
1899 unsigned int i
= *cb_nr
% DRBD_THREAD_DETAILS_HIST
;
1900 struct drbd_thread_timing_details
*td
= tdp
+ i
;
1902 td
->start_jif
= jiffies
;
1908 i
= (i
+1) % DRBD_THREAD_DETAILS_HIST
;
1910 memset(td
, 0, sizeof(*td
));
1915 static void do_device_work(struct drbd_device
*device
, const unsigned long todo
)
1917 if (test_bit(MD_SYNC
, &todo
))
1919 if (test_bit(RS_DONE
, &todo
) ||
1920 test_bit(RS_PROGRESS
, &todo
))
1921 update_on_disk_bitmap(device
, test_bit(RS_DONE
, &todo
));
1922 if (test_bit(GO_DISKLESS
, &todo
))
1923 go_diskless(device
);
1924 if (test_bit(DESTROY_DISK
, &todo
))
1925 drbd_ldev_destroy(device
);
1926 if (test_bit(RS_START
, &todo
))
1927 do_start_resync(device
);
1930 #define DRBD_DEVICE_WORK_MASK \
1931 ((1UL << GO_DISKLESS) \
1932 |(1UL << DESTROY_DISK) \
1934 |(1UL << RS_START) \
1935 |(1UL << RS_PROGRESS) \
1939 static unsigned long get_work_bits(unsigned long *flags
)
1941 unsigned long old
, new;
1944 new = old
& ~DRBD_DEVICE_WORK_MASK
;
1945 } while (cmpxchg(flags
, old
, new) != old
);
1946 return old
& DRBD_DEVICE_WORK_MASK
;
1949 static void do_unqueued_work(struct drbd_connection
*connection
)
1951 struct drbd_peer_device
*peer_device
;
1955 idr_for_each_entry(&connection
->peer_devices
, peer_device
, vnr
) {
1956 struct drbd_device
*device
= peer_device
->device
;
1957 unsigned long todo
= get_work_bits(&device
->flags
);
1961 kref_get(&device
->kref
);
1963 do_device_work(device
, todo
);
1964 kref_put(&device
->kref
, drbd_destroy_device
);
1970 static bool dequeue_work_batch(struct drbd_work_queue
*queue
, struct list_head
*work_list
)
1972 spin_lock_irq(&queue
->q_lock
);
1973 list_splice_tail_init(&queue
->q
, work_list
);
1974 spin_unlock_irq(&queue
->q_lock
);
1975 return !list_empty(work_list
);
1978 static void wait_for_work(struct drbd_connection
*connection
, struct list_head
*work_list
)
1981 struct net_conf
*nc
;
1984 dequeue_work_batch(&connection
->sender_work
, work_list
);
1985 if (!list_empty(work_list
))
1988 /* Still nothing to do?
1989 * Maybe we still need to close the current epoch,
1990 * even if no new requests are queued yet.
1992 * Also, poke TCP, just in case.
1993 * Then wait for new work (or signal). */
1995 nc
= rcu_dereference(connection
->net_conf
);
1996 uncork
= nc
? nc
->tcp_cork
: 0;
1999 mutex_lock(&connection
->data
.mutex
);
2000 if (connection
->data
.socket
)
2001 drbd_tcp_uncork(connection
->data
.socket
);
2002 mutex_unlock(&connection
->data
.mutex
);
2007 prepare_to_wait(&connection
->sender_work
.q_wait
, &wait
, TASK_INTERRUPTIBLE
);
2008 spin_lock_irq(&connection
->resource
->req_lock
);
2009 spin_lock(&connection
->sender_work
.q_lock
); /* FIXME get rid of this one? */
2010 if (!list_empty(&connection
->sender_work
.q
))
2011 list_splice_tail_init(&connection
->sender_work
.q
, work_list
);
2012 spin_unlock(&connection
->sender_work
.q_lock
); /* FIXME get rid of this one? */
2013 if (!list_empty(work_list
) || signal_pending(current
)) {
2014 spin_unlock_irq(&connection
->resource
->req_lock
);
2018 /* We found nothing new to do, no to-be-communicated request,
2019 * no other work item. We may still need to close the last
2020 * epoch. Next incoming request epoch will be connection ->
2021 * current transfer log epoch number. If that is different
2022 * from the epoch of the last request we communicated, it is
2023 * safe to send the epoch separating barrier now.
2026 atomic_read(&connection
->current_tle_nr
) !=
2027 connection
->send
.current_epoch_nr
;
2028 spin_unlock_irq(&connection
->resource
->req_lock
);
2031 maybe_send_barrier(connection
,
2032 connection
->send
.current_epoch_nr
+ 1);
2034 if (test_bit(DEVICE_WORK_PENDING
, &connection
->flags
))
2037 /* drbd_send() may have called flush_signals() */
2038 if (get_t_state(&connection
->worker
) != RUNNING
)
2042 /* may be woken up for other things but new work, too,
2043 * e.g. if the current epoch got closed.
2044 * In which case we send the barrier above. */
2046 finish_wait(&connection
->sender_work
.q_wait
, &wait
);
2048 /* someone may have changed the config while we have been waiting above. */
2050 nc
= rcu_dereference(connection
->net_conf
);
2051 cork
= nc
? nc
->tcp_cork
: 0;
2053 mutex_lock(&connection
->data
.mutex
);
2054 if (connection
->data
.socket
) {
2056 drbd_tcp_cork(connection
->data
.socket
);
2058 drbd_tcp_uncork(connection
->data
.socket
);
2060 mutex_unlock(&connection
->data
.mutex
);
2063 int drbd_worker(struct drbd_thread
*thi
)
2065 struct drbd_connection
*connection
= thi
->connection
;
2066 struct drbd_work
*w
= NULL
;
2067 struct drbd_peer_device
*peer_device
;
2068 LIST_HEAD(work_list
);
2071 while (get_t_state(thi
) == RUNNING
) {
2072 drbd_thread_current_set_cpu(thi
);
2074 if (list_empty(&work_list
)) {
2075 update_worker_timing_details(connection
, wait_for_work
);
2076 wait_for_work(connection
, &work_list
);
2079 if (test_and_clear_bit(DEVICE_WORK_PENDING
, &connection
->flags
)) {
2080 update_worker_timing_details(connection
, do_unqueued_work
);
2081 do_unqueued_work(connection
);
2084 if (signal_pending(current
)) {
2085 flush_signals(current
);
2086 if (get_t_state(thi
) == RUNNING
) {
2087 drbd_warn(connection
, "Worker got an unexpected signal\n");
2093 if (get_t_state(thi
) != RUNNING
)
2096 if (!list_empty(&work_list
)) {
2097 w
= list_first_entry(&work_list
, struct drbd_work
, list
);
2098 list_del_init(&w
->list
);
2099 update_worker_timing_details(connection
, w
->cb
);
2100 if (w
->cb(w
, connection
->cstate
< C_WF_REPORT_PARAMS
) == 0)
2102 if (connection
->cstate
>= C_WF_REPORT_PARAMS
)
2103 conn_request_state(connection
, NS(conn
, C_NETWORK_FAILURE
), CS_HARD
);
2108 if (test_and_clear_bit(DEVICE_WORK_PENDING
, &connection
->flags
)) {
2109 update_worker_timing_details(connection
, do_unqueued_work
);
2110 do_unqueued_work(connection
);
2112 if (!list_empty(&work_list
)) {
2113 w
= list_first_entry(&work_list
, struct drbd_work
, list
);
2114 list_del_init(&w
->list
);
2115 update_worker_timing_details(connection
, w
->cb
);
2118 dequeue_work_batch(&connection
->sender_work
, &work_list
);
2119 } while (!list_empty(&work_list
) || test_bit(DEVICE_WORK_PENDING
, &connection
->flags
));
2122 idr_for_each_entry(&connection
->peer_devices
, peer_device
, vnr
) {
2123 struct drbd_device
*device
= peer_device
->device
;
2124 D_ASSERT(device
, device
->state
.disk
== D_DISKLESS
&& device
->state
.conn
== C_STANDALONE
);
2125 kref_get(&device
->kref
);
2127 drbd_device_cleanup(device
);
2128 kref_put(&device
->kref
, drbd_destroy_device
);