4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched/signal.h>
29 #include <linux/wait.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
39 #include "drbd_protocol.h"
42 static int make_ov_request(struct drbd_device
*, int);
43 static int make_resync_request(struct drbd_device
*, int);
46 * drbd_md_endio (defined here)
47 * drbd_request_endio (defined here)
48 * drbd_peer_request_endio (defined here)
49 * drbd_bm_endio (defined in drbd_bitmap.c)
51 * For all these callbacks, note the following:
52 * The callbacks will be called in irq context by the IDE drivers,
53 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
54 * Try to get the locking right :)
58 /* used for synchronous meta data and bitmap IO
59 * submitted by drbd_md_sync_page_io()
61 void drbd_md_endio(struct bio
*bio
)
63 struct drbd_device
*device
;
65 device
= bio
->bi_private
;
66 device
->md_io
.error
= blk_status_to_errno(bio
->bi_status
);
68 /* special case: drbd_md_read() during drbd_adm_attach() */
73 /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
74 * to timeout on the lower level device, and eventually detach from it.
75 * If this io completion runs after that timeout expired, this
76 * drbd_md_put_buffer() may allow us to finally try and re-attach.
77 * During normal operation, this only puts that extra reference
79 * Make sure we first drop the reference, and only then signal
80 * completion, or we may (in drbd_al_read_log()) cycle so fast into the
81 * next drbd_md_sync_page_io(), that we trigger the
82 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
84 drbd_md_put_buffer(device
);
85 device
->md_io
.done
= 1;
86 wake_up(&device
->misc_wait
);
89 /* reads on behalf of the partner,
90 * "submitted" by the receiver
92 static void drbd_endio_read_sec_final(struct drbd_peer_request
*peer_req
) __releases(local
)
94 unsigned long flags
= 0;
95 struct drbd_peer_device
*peer_device
= peer_req
->peer_device
;
96 struct drbd_device
*device
= peer_device
->device
;
98 spin_lock_irqsave(&device
->resource
->req_lock
, flags
);
99 device
->read_cnt
+= peer_req
->i
.size
>> 9;
100 list_del(&peer_req
->w
.list
);
101 if (list_empty(&device
->read_ee
))
102 wake_up(&device
->ee_wait
);
103 if (test_bit(__EE_WAS_ERROR
, &peer_req
->flags
))
104 __drbd_chk_io_error(device
, DRBD_READ_ERROR
);
105 spin_unlock_irqrestore(&device
->resource
->req_lock
, flags
);
107 drbd_queue_work(&peer_device
->connection
->sender_work
, &peer_req
->w
);
111 /* writes on behalf of the partner, or resync writes,
112 * "submitted" by the receiver, final stage. */
113 void drbd_endio_write_sec_final(struct drbd_peer_request
*peer_req
) __releases(local
)
115 unsigned long flags
= 0;
116 struct drbd_peer_device
*peer_device
= peer_req
->peer_device
;
117 struct drbd_device
*device
= peer_device
->device
;
118 struct drbd_connection
*connection
= peer_device
->connection
;
119 struct drbd_interval i
;
122 int do_al_complete_io
;
124 /* after we moved peer_req to done_ee,
125 * we may no longer access it,
126 * it may be freed/reused already!
127 * (as soon as we release the req_lock) */
129 do_al_complete_io
= peer_req
->flags
& EE_CALL_AL_COMPLETE_IO
;
130 block_id
= peer_req
->block_id
;
131 peer_req
->flags
&= ~EE_CALL_AL_COMPLETE_IO
;
133 if (peer_req
->flags
& EE_WAS_ERROR
) {
134 /* In protocol != C, we usually do not send write acks.
135 * In case of a write error, send the neg ack anyways. */
136 if (!__test_and_set_bit(__EE_SEND_WRITE_ACK
, &peer_req
->flags
))
138 drbd_set_out_of_sync(device
, peer_req
->i
.sector
, peer_req
->i
.size
);
141 spin_lock_irqsave(&device
->resource
->req_lock
, flags
);
142 device
->writ_cnt
+= peer_req
->i
.size
>> 9;
143 list_move_tail(&peer_req
->w
.list
, &device
->done_ee
);
146 * Do not remove from the write_requests tree here: we did not send the
147 * Ack yet and did not wake possibly waiting conflicting requests.
148 * Removed from the tree from "drbd_process_done_ee" within the
149 * appropriate dw.cb (e_end_block/e_end_resync_block) or from
150 * _drbd_clear_done_ee.
153 do_wake
= list_empty(block_id
== ID_SYNCER
? &device
->sync_ee
: &device
->active_ee
);
155 /* FIXME do we want to detach for failed REQ_OP_DISCARD?
156 * ((peer_req->flags & (EE_WAS_ERROR|EE_TRIM)) == EE_WAS_ERROR) */
157 if (peer_req
->flags
& EE_WAS_ERROR
)
158 __drbd_chk_io_error(device
, DRBD_WRITE_ERROR
);
160 if (connection
->cstate
>= C_WF_REPORT_PARAMS
) {
161 kref_get(&device
->kref
); /* put is in drbd_send_acks_wf() */
162 if (!queue_work(connection
->ack_sender
, &peer_device
->send_acks_work
))
163 kref_put(&device
->kref
, drbd_destroy_device
);
165 spin_unlock_irqrestore(&device
->resource
->req_lock
, flags
);
167 if (block_id
== ID_SYNCER
)
168 drbd_rs_complete_io(device
, i
.sector
);
171 wake_up(&device
->ee_wait
);
173 if (do_al_complete_io
)
174 drbd_al_complete_io(device
, &i
);
179 /* writes on behalf of the partner, or resync writes,
180 * "submitted" by the receiver.
182 void drbd_peer_request_endio(struct bio
*bio
)
184 struct drbd_peer_request
*peer_req
= bio
->bi_private
;
185 struct drbd_device
*device
= peer_req
->peer_device
->device
;
186 bool is_write
= bio_data_dir(bio
) == WRITE
;
187 bool is_discard
= bio_op(bio
) == REQ_OP_WRITE_ZEROES
||
188 bio_op(bio
) == REQ_OP_DISCARD
;
190 if (bio
->bi_status
&& __ratelimit(&drbd_ratelimit_state
))
191 drbd_warn(device
, "%s: error=%d s=%llus\n",
192 is_write
? (is_discard
? "discard" : "write")
193 : "read", bio
->bi_status
,
194 (unsigned long long)peer_req
->i
.sector
);
197 set_bit(__EE_WAS_ERROR
, &peer_req
->flags
);
199 bio_put(bio
); /* no need for the bio anymore */
200 if (atomic_dec_and_test(&peer_req
->pending_bios
)) {
202 drbd_endio_write_sec_final(peer_req
);
204 drbd_endio_read_sec_final(peer_req
);
209 drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device
*device
)
211 panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n",
212 device
->minor
, device
->resource
->name
, device
->vnr
);
215 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
217 void drbd_request_endio(struct bio
*bio
)
220 struct drbd_request
*req
= bio
->bi_private
;
221 struct drbd_device
*device
= req
->device
;
222 struct bio_and_error m
;
223 enum drbd_req_event what
;
225 /* If this request was aborted locally before,
226 * but now was completed "successfully",
227 * chances are that this caused arbitrary data corruption.
229 * "aborting" requests, or force-detaching the disk, is intended for
230 * completely blocked/hung local backing devices which do no longer
231 * complete requests at all, not even do error completions. In this
232 * situation, usually a hard-reset and failover is the only way out.
234 * By "aborting", basically faking a local error-completion,
235 * we allow for a more graceful swichover by cleanly migrating services.
236 * Still the affected node has to be rebooted "soon".
238 * By completing these requests, we allow the upper layers to re-use
239 * the associated data pages.
241 * If later the local backing device "recovers", and now DMAs some data
242 * from disk into the original request pages, in the best case it will
243 * just put random data into unused pages; but typically it will corrupt
244 * meanwhile completely unrelated data, causing all sorts of damage.
246 * Which means delayed successful completion,
247 * especially for READ requests,
248 * is a reason to panic().
250 * We assume that a delayed *error* completion is OK,
251 * though we still will complain noisily about it.
253 if (unlikely(req
->rq_state
& RQ_LOCAL_ABORTED
)) {
254 if (__ratelimit(&drbd_ratelimit_state
))
255 drbd_emerg(device
, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
258 drbd_panic_after_delayed_completion_of_aborted_request(device
);
261 /* to avoid recursion in __req_mod */
262 if (unlikely(bio
->bi_status
)) {
263 switch (bio_op(bio
)) {
264 case REQ_OP_WRITE_ZEROES
:
266 if (bio
->bi_status
== BLK_STS_NOTSUPP
)
267 what
= DISCARD_COMPLETED_NOTSUPP
;
269 what
= DISCARD_COMPLETED_WITH_ERROR
;
272 if (bio
->bi_opf
& REQ_RAHEAD
)
273 what
= READ_AHEAD_COMPLETED_WITH_ERROR
;
275 what
= READ_COMPLETED_WITH_ERROR
;
278 what
= WRITE_COMPLETED_WITH_ERROR
;
285 req
->private_bio
= ERR_PTR(blk_status_to_errno(bio
->bi_status
));
288 /* not req_mod(), we need irqsave here! */
289 spin_lock_irqsave(&device
->resource
->req_lock
, flags
);
290 __req_mod(req
, what
, &m
);
291 spin_unlock_irqrestore(&device
->resource
->req_lock
, flags
);
295 complete_master_bio(device
, &m
);
298 void drbd_csum_ee(struct crypto_shash
*tfm
, struct drbd_peer_request
*peer_req
, void *digest
)
300 SHASH_DESC_ON_STACK(desc
, tfm
);
301 struct page
*page
= peer_req
->pages
;
309 crypto_shash_init(desc
);
311 src
= kmap_atomic(page
);
312 while ((tmp
= page_chain_next(page
))) {
313 /* all but the last page will be fully used */
314 crypto_shash_update(desc
, src
, PAGE_SIZE
);
317 src
= kmap_atomic(page
);
319 /* and now the last, possibly only partially used page */
320 len
= peer_req
->i
.size
& (PAGE_SIZE
- 1);
321 crypto_shash_update(desc
, src
, len
?: PAGE_SIZE
);
324 crypto_shash_final(desc
, digest
);
325 shash_desc_zero(desc
);
328 void drbd_csum_bio(struct crypto_shash
*tfm
, struct bio
*bio
, void *digest
)
330 SHASH_DESC_ON_STACK(desc
, tfm
);
332 struct bvec_iter iter
;
337 crypto_shash_init(desc
);
339 bio_for_each_segment(bvec
, bio
, iter
) {
342 src
= kmap_atomic(bvec
.bv_page
);
343 crypto_shash_update(desc
, src
+ bvec
.bv_offset
, bvec
.bv_len
);
346 /* REQ_OP_WRITE_SAME has only one segment,
347 * checksum the payload only once. */
348 if (bio_op(bio
) == REQ_OP_WRITE_SAME
)
351 crypto_shash_final(desc
, digest
);
352 shash_desc_zero(desc
);
355 /* MAYBE merge common code with w_e_end_ov_req */
356 static int w_e_send_csum(struct drbd_work
*w
, int cancel
)
358 struct drbd_peer_request
*peer_req
= container_of(w
, struct drbd_peer_request
, w
);
359 struct drbd_peer_device
*peer_device
= peer_req
->peer_device
;
360 struct drbd_device
*device
= peer_device
->device
;
365 if (unlikely(cancel
))
368 if (unlikely((peer_req
->flags
& EE_WAS_ERROR
) != 0))
371 digest_size
= crypto_shash_digestsize(peer_device
->connection
->csums_tfm
);
372 digest
= kmalloc(digest_size
, GFP_NOIO
);
374 sector_t sector
= peer_req
->i
.sector
;
375 unsigned int size
= peer_req
->i
.size
;
376 drbd_csum_ee(peer_device
->connection
->csums_tfm
, peer_req
, digest
);
377 /* Free peer_req and pages before send.
378 * In case we block on congestion, we could otherwise run into
379 * some distributed deadlock, if the other side blocks on
380 * congestion as well, because our receiver blocks in
381 * drbd_alloc_pages due to pp_in_use > max_buffers. */
382 drbd_free_peer_req(device
, peer_req
);
384 inc_rs_pending(device
);
385 err
= drbd_send_drequest_csum(peer_device
, sector
, size
,
390 drbd_err(device
, "kmalloc() of digest failed.\n");
396 drbd_free_peer_req(device
, peer_req
);
399 drbd_err(device
, "drbd_send_drequest(..., csum) failed\n");
403 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
405 static int read_for_csum(struct drbd_peer_device
*peer_device
, sector_t sector
, int size
)
407 struct drbd_device
*device
= peer_device
->device
;
408 struct drbd_peer_request
*peer_req
;
410 if (!get_ldev(device
))
413 /* GFP_TRY, because if there is no memory available right now, this may
414 * be rescheduled for later. It is "only" background resync, after all. */
415 peer_req
= drbd_alloc_peer_req(peer_device
, ID_SYNCER
/* unused */, sector
,
416 size
, size
, GFP_TRY
);
420 peer_req
->w
.cb
= w_e_send_csum
;
421 spin_lock_irq(&device
->resource
->req_lock
);
422 list_add_tail(&peer_req
->w
.list
, &device
->read_ee
);
423 spin_unlock_irq(&device
->resource
->req_lock
);
425 atomic_add(size
>> 9, &device
->rs_sect_ev
);
426 if (drbd_submit_peer_request(device
, peer_req
, REQ_OP_READ
, 0,
427 DRBD_FAULT_RS_RD
) == 0)
430 /* If it failed because of ENOMEM, retry should help. If it failed
431 * because bio_add_page failed (probably broken lower level driver),
432 * retry may or may not help.
433 * If it does not, you may need to force disconnect. */
434 spin_lock_irq(&device
->resource
->req_lock
);
435 list_del(&peer_req
->w
.list
);
436 spin_unlock_irq(&device
->resource
->req_lock
);
438 drbd_free_peer_req(device
, peer_req
);
444 int w_resync_timer(struct drbd_work
*w
, int cancel
)
446 struct drbd_device
*device
=
447 container_of(w
, struct drbd_device
, resync_work
);
449 switch (device
->state
.conn
) {
451 make_ov_request(device
, cancel
);
454 make_resync_request(device
, cancel
);
461 void resync_timer_fn(struct timer_list
*t
)
463 struct drbd_device
*device
= from_timer(device
, t
, resync_timer
);
465 drbd_queue_work_if_unqueued(
466 &first_peer_device(device
)->connection
->sender_work
,
467 &device
->resync_work
);
470 static void fifo_set(struct fifo_buffer
*fb
, int value
)
474 for (i
= 0; i
< fb
->size
; i
++)
475 fb
->values
[i
] = value
;
478 static int fifo_push(struct fifo_buffer
*fb
, int value
)
482 ov
= fb
->values
[fb
->head_index
];
483 fb
->values
[fb
->head_index
++] = value
;
485 if (fb
->head_index
>= fb
->size
)
491 static void fifo_add_val(struct fifo_buffer
*fb
, int value
)
495 for (i
= 0; i
< fb
->size
; i
++)
496 fb
->values
[i
] += value
;
499 struct fifo_buffer
*fifo_alloc(int fifo_size
)
501 struct fifo_buffer
*fb
;
503 fb
= kzalloc(sizeof(struct fifo_buffer
) + sizeof(int) * fifo_size
, GFP_NOIO
);
508 fb
->size
= fifo_size
;
514 static int drbd_rs_controller(struct drbd_device
*device
, unsigned int sect_in
)
516 struct disk_conf
*dc
;
517 unsigned int want
; /* The number of sectors we want in-flight */
518 int req_sect
; /* Number of sectors to request in this turn */
519 int correction
; /* Number of sectors more we need in-flight */
520 int cps
; /* correction per invocation of drbd_rs_controller() */
521 int steps
; /* Number of time steps to plan ahead */
524 struct fifo_buffer
*plan
;
526 dc
= rcu_dereference(device
->ldev
->disk_conf
);
527 plan
= rcu_dereference(device
->rs_plan_s
);
529 steps
= plan
->size
; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
531 if (device
->rs_in_flight
+ sect_in
== 0) { /* At start of resync */
532 want
= ((dc
->resync_rate
* 2 * SLEEP_TIME
) / HZ
) * steps
;
533 } else { /* normal path */
534 want
= dc
->c_fill_target
? dc
->c_fill_target
:
535 sect_in
* dc
->c_delay_target
* HZ
/ (SLEEP_TIME
* 10);
538 correction
= want
- device
->rs_in_flight
- plan
->total
;
541 cps
= correction
/ steps
;
542 fifo_add_val(plan
, cps
);
543 plan
->total
+= cps
* steps
;
545 /* What we do in this step */
546 curr_corr
= fifo_push(plan
, 0);
547 plan
->total
-= curr_corr
;
549 req_sect
= sect_in
+ curr_corr
;
553 max_sect
= (dc
->c_max_rate
* 2 * SLEEP_TIME
) / HZ
;
554 if (req_sect
> max_sect
)
558 drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
559 sect_in, device->rs_in_flight, want, correction,
560 steps, cps, device->rs_planed, curr_corr, req_sect);
566 static int drbd_rs_number_requests(struct drbd_device
*device
)
568 unsigned int sect_in
; /* Number of sectors that came in since the last turn */
571 sect_in
= atomic_xchg(&device
->rs_sect_in
, 0);
572 device
->rs_in_flight
-= sect_in
;
575 mxb
= drbd_get_max_buffers(device
) / 2;
576 if (rcu_dereference(device
->rs_plan_s
)->size
) {
577 number
= drbd_rs_controller(device
, sect_in
) >> (BM_BLOCK_SHIFT
- 9);
578 device
->c_sync_rate
= number
* HZ
* (BM_BLOCK_SIZE
/ 1024) / SLEEP_TIME
;
580 device
->c_sync_rate
= rcu_dereference(device
->ldev
->disk_conf
)->resync_rate
;
581 number
= SLEEP_TIME
* device
->c_sync_rate
/ ((BM_BLOCK_SIZE
/ 1024) * HZ
);
585 /* Don't have more than "max-buffers"/2 in-flight.
586 * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
587 * potentially causing a distributed deadlock on congestion during
588 * online-verify or (checksum-based) resync, if max-buffers,
589 * socket buffer sizes and resync rate settings are mis-configured. */
591 /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
592 * mxb (as used here, and in drbd_alloc_pages on the peer) is
593 * "number of pages" (typically also 4k),
594 * but "rs_in_flight" is in "sectors" (512 Byte). */
595 if (mxb
- device
->rs_in_flight
/8 < number
)
596 number
= mxb
- device
->rs_in_flight
/8;
601 static int make_resync_request(struct drbd_device
*const device
, int cancel
)
603 struct drbd_peer_device
*const peer_device
= first_peer_device(device
);
604 struct drbd_connection
*const connection
= peer_device
? peer_device
->connection
: NULL
;
607 const sector_t capacity
= drbd_get_capacity(device
->this_bdev
);
609 int number
, rollback_i
, size
;
610 int align
, requeue
= 0;
612 int discard_granularity
= 0;
614 if (unlikely(cancel
))
617 if (device
->rs_total
== 0) {
619 drbd_resync_finished(device
);
623 if (!get_ldev(device
)) {
624 /* Since we only need to access device->rsync a
625 get_ldev_if_state(device,D_FAILED) would be sufficient, but
626 to continue resync with a broken disk makes no sense at
628 drbd_err(device
, "Disk broke down during resync!\n");
632 if (connection
->agreed_features
& DRBD_FF_THIN_RESYNC
) {
634 discard_granularity
= rcu_dereference(device
->ldev
->disk_conf
)->rs_discard_granularity
;
638 max_bio_size
= queue_max_hw_sectors(device
->rq_queue
) << 9;
639 number
= drbd_rs_number_requests(device
);
643 for (i
= 0; i
< number
; i
++) {
644 /* Stop generating RS requests when half of the send buffer is filled,
645 * but notify TCP that we'd like to have more space. */
646 mutex_lock(&connection
->data
.mutex
);
647 if (connection
->data
.socket
) {
648 struct sock
*sk
= connection
->data
.socket
->sk
;
649 int queued
= sk
->sk_wmem_queued
;
650 int sndbuf
= sk
->sk_sndbuf
;
651 if (queued
> sndbuf
/ 2) {
654 set_bit(SOCK_NOSPACE
, &sk
->sk_socket
->flags
);
658 mutex_unlock(&connection
->data
.mutex
);
663 size
= BM_BLOCK_SIZE
;
664 bit
= drbd_bm_find_next(device
, device
->bm_resync_fo
);
666 if (bit
== DRBD_END_OF_BITMAP
) {
667 device
->bm_resync_fo
= drbd_bm_bits(device
);
672 sector
= BM_BIT_TO_SECT(bit
);
674 if (drbd_try_rs_begin_io(device
, sector
)) {
675 device
->bm_resync_fo
= bit
;
678 device
->bm_resync_fo
= bit
+ 1;
680 if (unlikely(drbd_bm_test_bit(device
, bit
) == 0)) {
681 drbd_rs_complete_io(device
, sector
);
685 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
686 /* try to find some adjacent bits.
687 * we stop if we have already the maximum req size.
689 * Additionally always align bigger requests, in order to
690 * be prepared for all stripe sizes of software RAIDs.
695 if (size
+ BM_BLOCK_SIZE
> max_bio_size
)
698 /* Be always aligned */
699 if (sector
& ((1<<(align
+3))-1))
702 if (discard_granularity
&& size
== discard_granularity
)
705 /* do not cross extent boundaries */
706 if (((bit
+1) & BM_BLOCKS_PER_BM_EXT_MASK
) == 0)
708 /* now, is it actually dirty, after all?
709 * caution, drbd_bm_test_bit is tri-state for some
710 * obscure reason; ( b == 0 ) would get the out-of-band
711 * only accidentally right because of the "oddly sized"
712 * adjustment below */
713 if (drbd_bm_test_bit(device
, bit
+1) != 1)
716 size
+= BM_BLOCK_SIZE
;
717 if ((BM_BLOCK_SIZE
<< align
) <= size
)
721 /* if we merged some,
722 * reset the offset to start the next drbd_bm_find_next from */
723 if (size
> BM_BLOCK_SIZE
)
724 device
->bm_resync_fo
= bit
+ 1;
727 /* adjust very last sectors, in case we are oddly sized */
728 if (sector
+ (size
>>9) > capacity
)
729 size
= (capacity
-sector
)<<9;
731 if (device
->use_csums
) {
732 switch (read_for_csum(peer_device
, sector
, size
)) {
733 case -EIO
: /* Disk failure */
736 case -EAGAIN
: /* allocation failed, or ldev busy */
737 drbd_rs_complete_io(device
, sector
);
738 device
->bm_resync_fo
= BM_SECT_TO_BIT(sector
);
750 inc_rs_pending(device
);
751 err
= drbd_send_drequest(peer_device
,
752 size
== discard_granularity
? P_RS_THIN_REQ
: P_RS_DATA_REQUEST
,
753 sector
, size
, ID_SYNCER
);
755 drbd_err(device
, "drbd_send_drequest() failed, aborting...\n");
756 dec_rs_pending(device
);
763 if (device
->bm_resync_fo
>= drbd_bm_bits(device
)) {
764 /* last syncer _request_ was sent,
765 * but the P_RS_DATA_REPLY not yet received. sync will end (and
766 * next sync group will resume), as soon as we receive the last
767 * resync data block, and the last bit is cleared.
768 * until then resync "work" is "inactive" ...
775 device
->rs_in_flight
+= (i
<< (BM_BLOCK_SHIFT
- 9));
776 mod_timer(&device
->resync_timer
, jiffies
+ SLEEP_TIME
);
781 static int make_ov_request(struct drbd_device
*device
, int cancel
)
785 const sector_t capacity
= drbd_get_capacity(device
->this_bdev
);
786 bool stop_sector_reached
= false;
788 if (unlikely(cancel
))
791 number
= drbd_rs_number_requests(device
);
793 sector
= device
->ov_position
;
794 for (i
= 0; i
< number
; i
++) {
795 if (sector
>= capacity
)
798 /* We check for "finished" only in the reply path:
799 * w_e_end_ov_reply().
800 * We need to send at least one request out. */
801 stop_sector_reached
= i
> 0
802 && verify_can_do_stop_sector(device
)
803 && sector
>= device
->ov_stop_sector
;
804 if (stop_sector_reached
)
807 size
= BM_BLOCK_SIZE
;
809 if (drbd_try_rs_begin_io(device
, sector
)) {
810 device
->ov_position
= sector
;
814 if (sector
+ (size
>>9) > capacity
)
815 size
= (capacity
-sector
)<<9;
817 inc_rs_pending(device
);
818 if (drbd_send_ov_request(first_peer_device(device
), sector
, size
)) {
819 dec_rs_pending(device
);
822 sector
+= BM_SECT_PER_BIT
;
824 device
->ov_position
= sector
;
827 device
->rs_in_flight
+= (i
<< (BM_BLOCK_SHIFT
- 9));
828 if (i
== 0 || !stop_sector_reached
)
829 mod_timer(&device
->resync_timer
, jiffies
+ SLEEP_TIME
);
833 int w_ov_finished(struct drbd_work
*w
, int cancel
)
835 struct drbd_device_work
*dw
=
836 container_of(w
, struct drbd_device_work
, w
);
837 struct drbd_device
*device
= dw
->device
;
839 ov_out_of_sync_print(device
);
840 drbd_resync_finished(device
);
845 static int w_resync_finished(struct drbd_work
*w
, int cancel
)
847 struct drbd_device_work
*dw
=
848 container_of(w
, struct drbd_device_work
, w
);
849 struct drbd_device
*device
= dw
->device
;
852 drbd_resync_finished(device
);
857 static void ping_peer(struct drbd_device
*device
)
859 struct drbd_connection
*connection
= first_peer_device(device
)->connection
;
861 clear_bit(GOT_PING_ACK
, &connection
->flags
);
862 request_ping(connection
);
863 wait_event(connection
->ping_wait
,
864 test_bit(GOT_PING_ACK
, &connection
->flags
) || device
->state
.conn
< C_CONNECTED
);
867 int drbd_resync_finished(struct drbd_device
*device
)
869 struct drbd_connection
*connection
= first_peer_device(device
)->connection
;
870 unsigned long db
, dt
, dbdt
;
872 union drbd_state os
, ns
;
873 struct drbd_device_work
*dw
;
874 char *khelper_cmd
= NULL
;
877 /* Remove all elements from the resync LRU. Since future actions
878 * might set bits in the (main) bitmap, then the entries in the
879 * resync LRU would be wrong. */
880 if (drbd_rs_del_all(device
)) {
881 /* In case this is not possible now, most probably because
882 * there are P_RS_DATA_REPLY Packets lingering on the worker's
883 * queue (or even the read operations for those packets
884 * is not finished by now). Retry in 100ms. */
886 schedule_timeout_interruptible(HZ
/ 10);
887 dw
= kmalloc(sizeof(struct drbd_device_work
), GFP_ATOMIC
);
889 dw
->w
.cb
= w_resync_finished
;
891 drbd_queue_work(&connection
->sender_work
, &dw
->w
);
894 drbd_err(device
, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
897 dt
= (jiffies
- device
->rs_start
- device
->rs_paused
) / HZ
;
901 db
= device
->rs_total
;
902 /* adjust for verify start and stop sectors, respective reached position */
903 if (device
->state
.conn
== C_VERIFY_S
|| device
->state
.conn
== C_VERIFY_T
)
904 db
-= device
->ov_left
;
906 dbdt
= Bit2KB(db
/dt
);
907 device
->rs_paused
/= HZ
;
909 if (!get_ldev(device
))
914 spin_lock_irq(&device
->resource
->req_lock
);
915 os
= drbd_read_state(device
);
917 verify_done
= (os
.conn
== C_VERIFY_S
|| os
.conn
== C_VERIFY_T
);
919 /* This protects us against multiple calls (that can happen in the presence
920 of application IO), and against connectivity loss just before we arrive here. */
921 if (os
.conn
<= C_CONNECTED
)
925 ns
.conn
= C_CONNECTED
;
927 drbd_info(device
, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
928 verify_done
? "Online verify" : "Resync",
929 dt
+ device
->rs_paused
, device
->rs_paused
, dbdt
);
931 n_oos
= drbd_bm_total_weight(device
);
933 if (os
.conn
== C_VERIFY_S
|| os
.conn
== C_VERIFY_T
) {
935 drbd_alert(device
, "Online verify found %lu %dk block out of sync!\n",
937 khelper_cmd
= "out-of-sync";
940 D_ASSERT(device
, (n_oos
- device
->rs_failed
) == 0);
942 if (os
.conn
== C_SYNC_TARGET
|| os
.conn
== C_PAUSED_SYNC_T
)
943 khelper_cmd
= "after-resync-target";
945 if (device
->use_csums
&& device
->rs_total
) {
946 const unsigned long s
= device
->rs_same_csum
;
947 const unsigned long t
= device
->rs_total
;
950 (t
< 100000) ? ((s
*100)/t
) : (s
/(t
/100));
951 drbd_info(device
, "%u %% had equal checksums, eliminated: %luK; "
952 "transferred %luK total %luK\n",
954 Bit2KB(device
->rs_same_csum
),
955 Bit2KB(device
->rs_total
- device
->rs_same_csum
),
956 Bit2KB(device
->rs_total
));
960 if (device
->rs_failed
) {
961 drbd_info(device
, " %lu failed blocks\n", device
->rs_failed
);
963 if (os
.conn
== C_SYNC_TARGET
|| os
.conn
== C_PAUSED_SYNC_T
) {
964 ns
.disk
= D_INCONSISTENT
;
965 ns
.pdsk
= D_UP_TO_DATE
;
967 ns
.disk
= D_UP_TO_DATE
;
968 ns
.pdsk
= D_INCONSISTENT
;
971 ns
.disk
= D_UP_TO_DATE
;
972 ns
.pdsk
= D_UP_TO_DATE
;
974 if (os
.conn
== C_SYNC_TARGET
|| os
.conn
== C_PAUSED_SYNC_T
) {
975 if (device
->p_uuid
) {
977 for (i
= UI_BITMAP
; i
<= UI_HISTORY_END
; i
++)
978 _drbd_uuid_set(device
, i
, device
->p_uuid
[i
]);
979 drbd_uuid_set(device
, UI_BITMAP
, device
->ldev
->md
.uuid
[UI_CURRENT
]);
980 _drbd_uuid_set(device
, UI_CURRENT
, device
->p_uuid
[UI_CURRENT
]);
982 drbd_err(device
, "device->p_uuid is NULL! BUG\n");
986 if (!(os
.conn
== C_VERIFY_S
|| os
.conn
== C_VERIFY_T
)) {
987 /* for verify runs, we don't update uuids here,
988 * so there would be nothing to report. */
989 drbd_uuid_set_bm(device
, 0UL);
990 drbd_print_uuids(device
, "updated UUIDs");
991 if (device
->p_uuid
) {
992 /* Now the two UUID sets are equal, update what we
993 * know of the peer. */
995 for (i
= UI_CURRENT
; i
<= UI_HISTORY_END
; i
++)
996 device
->p_uuid
[i
] = device
->ldev
->md
.uuid
[i
];
1001 _drbd_set_state(device
, ns
, CS_VERBOSE
, NULL
);
1003 spin_unlock_irq(&device
->resource
->req_lock
);
1005 /* If we have been sync source, and have an effective fencing-policy,
1006 * once *all* volumes are back in sync, call "unfence". */
1007 if (os
.conn
== C_SYNC_SOURCE
) {
1008 enum drbd_disk_state disk_state
= D_MASK
;
1009 enum drbd_disk_state pdsk_state
= D_MASK
;
1010 enum drbd_fencing_p fp
= FP_DONT_CARE
;
1013 fp
= rcu_dereference(device
->ldev
->disk_conf
)->fencing
;
1014 if (fp
!= FP_DONT_CARE
) {
1015 struct drbd_peer_device
*peer_device
;
1017 idr_for_each_entry(&connection
->peer_devices
, peer_device
, vnr
) {
1018 struct drbd_device
*device
= peer_device
->device
;
1019 disk_state
= min_t(enum drbd_disk_state
, disk_state
, device
->state
.disk
);
1020 pdsk_state
= min_t(enum drbd_disk_state
, pdsk_state
, device
->state
.pdsk
);
1024 if (disk_state
== D_UP_TO_DATE
&& pdsk_state
== D_UP_TO_DATE
)
1025 conn_khelper(connection
, "unfence-peer");
1030 device
->rs_total
= 0;
1031 device
->rs_failed
= 0;
1032 device
->rs_paused
= 0;
1034 /* reset start sector, if we reached end of device */
1035 if (verify_done
&& device
->ov_left
== 0)
1036 device
->ov_start_sector
= 0;
1038 drbd_md_sync(device
);
1041 drbd_khelper(device
, khelper_cmd
);
1047 static void move_to_net_ee_or_free(struct drbd_device
*device
, struct drbd_peer_request
*peer_req
)
1049 if (drbd_peer_req_has_active_page(peer_req
)) {
1050 /* This might happen if sendpage() has not finished */
1051 int i
= (peer_req
->i
.size
+ PAGE_SIZE
-1) >> PAGE_SHIFT
;
1052 atomic_add(i
, &device
->pp_in_use_by_net
);
1053 atomic_sub(i
, &device
->pp_in_use
);
1054 spin_lock_irq(&device
->resource
->req_lock
);
1055 list_add_tail(&peer_req
->w
.list
, &device
->net_ee
);
1056 spin_unlock_irq(&device
->resource
->req_lock
);
1057 wake_up(&drbd_pp_wait
);
1059 drbd_free_peer_req(device
, peer_req
);
1063 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
1065 * @cancel: The connection will be closed anyways
1067 int w_e_end_data_req(struct drbd_work
*w
, int cancel
)
1069 struct drbd_peer_request
*peer_req
= container_of(w
, struct drbd_peer_request
, w
);
1070 struct drbd_peer_device
*peer_device
= peer_req
->peer_device
;
1071 struct drbd_device
*device
= peer_device
->device
;
1074 if (unlikely(cancel
)) {
1075 drbd_free_peer_req(device
, peer_req
);
1076 dec_unacked(device
);
1080 if (likely((peer_req
->flags
& EE_WAS_ERROR
) == 0)) {
1081 err
= drbd_send_block(peer_device
, P_DATA_REPLY
, peer_req
);
1083 if (__ratelimit(&drbd_ratelimit_state
))
1084 drbd_err(device
, "Sending NegDReply. sector=%llus.\n",
1085 (unsigned long long)peer_req
->i
.sector
);
1087 err
= drbd_send_ack(peer_device
, P_NEG_DREPLY
, peer_req
);
1090 dec_unacked(device
);
1092 move_to_net_ee_or_free(device
, peer_req
);
1095 drbd_err(device
, "drbd_send_block() failed\n");
1099 static bool all_zero(struct drbd_peer_request
*peer_req
)
1101 struct page
*page
= peer_req
->pages
;
1102 unsigned int len
= peer_req
->i
.size
;
1104 page_chain_for_each(page
) {
1105 unsigned int l
= min_t(unsigned int, len
, PAGE_SIZE
);
1106 unsigned int i
, words
= l
/ sizeof(long);
1109 d
= kmap_atomic(page
);
1110 for (i
= 0; i
< words
; i
++) {
1124 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1126 * @cancel: The connection will be closed anyways
1128 int w_e_end_rsdata_req(struct drbd_work
*w
, int cancel
)
1130 struct drbd_peer_request
*peer_req
= container_of(w
, struct drbd_peer_request
, w
);
1131 struct drbd_peer_device
*peer_device
= peer_req
->peer_device
;
1132 struct drbd_device
*device
= peer_device
->device
;
1135 if (unlikely(cancel
)) {
1136 drbd_free_peer_req(device
, peer_req
);
1137 dec_unacked(device
);
1141 if (get_ldev_if_state(device
, D_FAILED
)) {
1142 drbd_rs_complete_io(device
, peer_req
->i
.sector
);
1146 if (device
->state
.conn
== C_AHEAD
) {
1147 err
= drbd_send_ack(peer_device
, P_RS_CANCEL
, peer_req
);
1148 } else if (likely((peer_req
->flags
& EE_WAS_ERROR
) == 0)) {
1149 if (likely(device
->state
.pdsk
>= D_INCONSISTENT
)) {
1150 inc_rs_pending(device
);
1151 if (peer_req
->flags
& EE_RS_THIN_REQ
&& all_zero(peer_req
))
1152 err
= drbd_send_rs_deallocated(peer_device
, peer_req
);
1154 err
= drbd_send_block(peer_device
, P_RS_DATA_REPLY
, peer_req
);
1156 if (__ratelimit(&drbd_ratelimit_state
))
1157 drbd_err(device
, "Not sending RSDataReply, "
1158 "partner DISKLESS!\n");
1162 if (__ratelimit(&drbd_ratelimit_state
))
1163 drbd_err(device
, "Sending NegRSDReply. sector %llus.\n",
1164 (unsigned long long)peer_req
->i
.sector
);
1166 err
= drbd_send_ack(peer_device
, P_NEG_RS_DREPLY
, peer_req
);
1168 /* update resync data with failure */
1169 drbd_rs_failed_io(device
, peer_req
->i
.sector
, peer_req
->i
.size
);
1172 dec_unacked(device
);
1174 move_to_net_ee_or_free(device
, peer_req
);
1177 drbd_err(device
, "drbd_send_block() failed\n");
1181 int w_e_end_csum_rs_req(struct drbd_work
*w
, int cancel
)
1183 struct drbd_peer_request
*peer_req
= container_of(w
, struct drbd_peer_request
, w
);
1184 struct drbd_peer_device
*peer_device
= peer_req
->peer_device
;
1185 struct drbd_device
*device
= peer_device
->device
;
1186 struct digest_info
*di
;
1188 void *digest
= NULL
;
1191 if (unlikely(cancel
)) {
1192 drbd_free_peer_req(device
, peer_req
);
1193 dec_unacked(device
);
1197 if (get_ldev(device
)) {
1198 drbd_rs_complete_io(device
, peer_req
->i
.sector
);
1202 di
= peer_req
->digest
;
1204 if (likely((peer_req
->flags
& EE_WAS_ERROR
) == 0)) {
1205 /* quick hack to try to avoid a race against reconfiguration.
1206 * a real fix would be much more involved,
1207 * introducing more locking mechanisms */
1208 if (peer_device
->connection
->csums_tfm
) {
1209 digest_size
= crypto_shash_digestsize(peer_device
->connection
->csums_tfm
);
1210 D_ASSERT(device
, digest_size
== di
->digest_size
);
1211 digest
= kmalloc(digest_size
, GFP_NOIO
);
1214 drbd_csum_ee(peer_device
->connection
->csums_tfm
, peer_req
, digest
);
1215 eq
= !memcmp(digest
, di
->digest
, digest_size
);
1220 drbd_set_in_sync(device
, peer_req
->i
.sector
, peer_req
->i
.size
);
1221 /* rs_same_csums unit is BM_BLOCK_SIZE */
1222 device
->rs_same_csum
+= peer_req
->i
.size
>> BM_BLOCK_SHIFT
;
1223 err
= drbd_send_ack(peer_device
, P_RS_IS_IN_SYNC
, peer_req
);
1225 inc_rs_pending(device
);
1226 peer_req
->block_id
= ID_SYNCER
; /* By setting block_id, digest pointer becomes invalid! */
1227 peer_req
->flags
&= ~EE_HAS_DIGEST
; /* This peer request no longer has a digest pointer */
1229 err
= drbd_send_block(peer_device
, P_RS_DATA_REPLY
, peer_req
);
1232 err
= drbd_send_ack(peer_device
, P_NEG_RS_DREPLY
, peer_req
);
1233 if (__ratelimit(&drbd_ratelimit_state
))
1234 drbd_err(device
, "Sending NegDReply. I guess it gets messy.\n");
1237 dec_unacked(device
);
1238 move_to_net_ee_or_free(device
, peer_req
);
1241 drbd_err(device
, "drbd_send_block/ack() failed\n");
1245 int w_e_end_ov_req(struct drbd_work
*w
, int cancel
)
1247 struct drbd_peer_request
*peer_req
= container_of(w
, struct drbd_peer_request
, w
);
1248 struct drbd_peer_device
*peer_device
= peer_req
->peer_device
;
1249 struct drbd_device
*device
= peer_device
->device
;
1250 sector_t sector
= peer_req
->i
.sector
;
1251 unsigned int size
= peer_req
->i
.size
;
1256 if (unlikely(cancel
))
1259 digest_size
= crypto_shash_digestsize(peer_device
->connection
->verify_tfm
);
1260 digest
= kmalloc(digest_size
, GFP_NOIO
);
1262 err
= 1; /* terminate the connection in case the allocation failed */
1266 if (likely(!(peer_req
->flags
& EE_WAS_ERROR
)))
1267 drbd_csum_ee(peer_device
->connection
->verify_tfm
, peer_req
, digest
);
1269 memset(digest
, 0, digest_size
);
1271 /* Free e and pages before send.
1272 * In case we block on congestion, we could otherwise run into
1273 * some distributed deadlock, if the other side blocks on
1274 * congestion as well, because our receiver blocks in
1275 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1276 drbd_free_peer_req(device
, peer_req
);
1278 inc_rs_pending(device
);
1279 err
= drbd_send_drequest_csum(peer_device
, sector
, size
, digest
, digest_size
, P_OV_REPLY
);
1281 dec_rs_pending(device
);
1286 drbd_free_peer_req(device
, peer_req
);
1287 dec_unacked(device
);
1291 void drbd_ov_out_of_sync_found(struct drbd_device
*device
, sector_t sector
, int size
)
1293 if (device
->ov_last_oos_start
+ device
->ov_last_oos_size
== sector
) {
1294 device
->ov_last_oos_size
+= size
>>9;
1296 device
->ov_last_oos_start
= sector
;
1297 device
->ov_last_oos_size
= size
>>9;
1299 drbd_set_out_of_sync(device
, sector
, size
);
1302 int w_e_end_ov_reply(struct drbd_work
*w
, int cancel
)
1304 struct drbd_peer_request
*peer_req
= container_of(w
, struct drbd_peer_request
, w
);
1305 struct drbd_peer_device
*peer_device
= peer_req
->peer_device
;
1306 struct drbd_device
*device
= peer_device
->device
;
1307 struct digest_info
*di
;
1309 sector_t sector
= peer_req
->i
.sector
;
1310 unsigned int size
= peer_req
->i
.size
;
1313 bool stop_sector_reached
= false;
1315 if (unlikely(cancel
)) {
1316 drbd_free_peer_req(device
, peer_req
);
1317 dec_unacked(device
);
1321 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1322 * the resync lru has been cleaned up already */
1323 if (get_ldev(device
)) {
1324 drbd_rs_complete_io(device
, peer_req
->i
.sector
);
1328 di
= peer_req
->digest
;
1330 if (likely((peer_req
->flags
& EE_WAS_ERROR
) == 0)) {
1331 digest_size
= crypto_shash_digestsize(peer_device
->connection
->verify_tfm
);
1332 digest
= kmalloc(digest_size
, GFP_NOIO
);
1334 drbd_csum_ee(peer_device
->connection
->verify_tfm
, peer_req
, digest
);
1336 D_ASSERT(device
, digest_size
== di
->digest_size
);
1337 eq
= !memcmp(digest
, di
->digest
, digest_size
);
1342 /* Free peer_req and pages before send.
1343 * In case we block on congestion, we could otherwise run into
1344 * some distributed deadlock, if the other side blocks on
1345 * congestion as well, because our receiver blocks in
1346 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1347 drbd_free_peer_req(device
, peer_req
);
1349 drbd_ov_out_of_sync_found(device
, sector
, size
);
1351 ov_out_of_sync_print(device
);
1353 err
= drbd_send_ack_ex(peer_device
, P_OV_RESULT
, sector
, size
,
1354 eq
? ID_IN_SYNC
: ID_OUT_OF_SYNC
);
1356 dec_unacked(device
);
1360 /* let's advance progress step marks only for every other megabyte */
1361 if ((device
->ov_left
& 0x200) == 0x200)
1362 drbd_advance_rs_marks(device
, device
->ov_left
);
1364 stop_sector_reached
= verify_can_do_stop_sector(device
) &&
1365 (sector
+ (size
>>9)) >= device
->ov_stop_sector
;
1367 if (device
->ov_left
== 0 || stop_sector_reached
) {
1368 ov_out_of_sync_print(device
);
1369 drbd_resync_finished(device
);
1376 * We need to track the number of pending barrier acks,
1377 * and to be able to wait for them.
1378 * See also comment in drbd_adm_attach before drbd_suspend_io.
1380 static int drbd_send_barrier(struct drbd_connection
*connection
)
1382 struct p_barrier
*p
;
1383 struct drbd_socket
*sock
;
1385 sock
= &connection
->data
;
1386 p
= conn_prepare_command(connection
, sock
);
1389 p
->barrier
= connection
->send
.current_epoch_nr
;
1391 connection
->send
.current_epoch_writes
= 0;
1392 connection
->send
.last_sent_barrier_jif
= jiffies
;
1394 return conn_send_command(connection
, sock
, P_BARRIER
, sizeof(*p
), NULL
, 0);
1397 static int pd_send_unplug_remote(struct drbd_peer_device
*pd
)
1399 struct drbd_socket
*sock
= &pd
->connection
->data
;
1400 if (!drbd_prepare_command(pd
, sock
))
1402 return drbd_send_command(pd
, sock
, P_UNPLUG_REMOTE
, 0, NULL
, 0);
1405 int w_send_write_hint(struct drbd_work
*w
, int cancel
)
1407 struct drbd_device
*device
=
1408 container_of(w
, struct drbd_device
, unplug_work
);
1412 return pd_send_unplug_remote(first_peer_device(device
));
1415 static void re_init_if_first_write(struct drbd_connection
*connection
, unsigned int epoch
)
1417 if (!connection
->send
.seen_any_write_yet
) {
1418 connection
->send
.seen_any_write_yet
= true;
1419 connection
->send
.current_epoch_nr
= epoch
;
1420 connection
->send
.current_epoch_writes
= 0;
1421 connection
->send
.last_sent_barrier_jif
= jiffies
;
1425 static void maybe_send_barrier(struct drbd_connection
*connection
, unsigned int epoch
)
1427 /* re-init if first write on this connection */
1428 if (!connection
->send
.seen_any_write_yet
)
1430 if (connection
->send
.current_epoch_nr
!= epoch
) {
1431 if (connection
->send
.current_epoch_writes
)
1432 drbd_send_barrier(connection
);
1433 connection
->send
.current_epoch_nr
= epoch
;
1437 int w_send_out_of_sync(struct drbd_work
*w
, int cancel
)
1439 struct drbd_request
*req
= container_of(w
, struct drbd_request
, w
);
1440 struct drbd_device
*device
= req
->device
;
1441 struct drbd_peer_device
*const peer_device
= first_peer_device(device
);
1442 struct drbd_connection
*const connection
= peer_device
->connection
;
1445 if (unlikely(cancel
)) {
1446 req_mod(req
, SEND_CANCELED
);
1449 req
->pre_send_jif
= jiffies
;
1451 /* this time, no connection->send.current_epoch_writes++;
1452 * If it was sent, it was the closing barrier for the last
1453 * replicated epoch, before we went into AHEAD mode.
1454 * No more barriers will be sent, until we leave AHEAD mode again. */
1455 maybe_send_barrier(connection
, req
->epoch
);
1457 err
= drbd_send_out_of_sync(peer_device
, req
);
1458 req_mod(req
, OOS_HANDED_TO_NETWORK
);
1464 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1466 * @cancel: The connection will be closed anyways
1468 int w_send_dblock(struct drbd_work
*w
, int cancel
)
1470 struct drbd_request
*req
= container_of(w
, struct drbd_request
, w
);
1471 struct drbd_device
*device
= req
->device
;
1472 struct drbd_peer_device
*const peer_device
= first_peer_device(device
);
1473 struct drbd_connection
*connection
= peer_device
->connection
;
1474 bool do_send_unplug
= req
->rq_state
& RQ_UNPLUG
;
1477 if (unlikely(cancel
)) {
1478 req_mod(req
, SEND_CANCELED
);
1481 req
->pre_send_jif
= jiffies
;
1483 re_init_if_first_write(connection
, req
->epoch
);
1484 maybe_send_barrier(connection
, req
->epoch
);
1485 connection
->send
.current_epoch_writes
++;
1487 err
= drbd_send_dblock(peer_device
, req
);
1488 req_mod(req
, err
? SEND_FAILED
: HANDED_OVER_TO_NETWORK
);
1490 if (do_send_unplug
&& !err
)
1491 pd_send_unplug_remote(peer_device
);
1497 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1499 * @cancel: The connection will be closed anyways
1501 int w_send_read_req(struct drbd_work
*w
, int cancel
)
1503 struct drbd_request
*req
= container_of(w
, struct drbd_request
, w
);
1504 struct drbd_device
*device
= req
->device
;
1505 struct drbd_peer_device
*const peer_device
= first_peer_device(device
);
1506 struct drbd_connection
*connection
= peer_device
->connection
;
1507 bool do_send_unplug
= req
->rq_state
& RQ_UNPLUG
;
1510 if (unlikely(cancel
)) {
1511 req_mod(req
, SEND_CANCELED
);
1514 req
->pre_send_jif
= jiffies
;
1516 /* Even read requests may close a write epoch,
1517 * if there was any yet. */
1518 maybe_send_barrier(connection
, req
->epoch
);
1520 err
= drbd_send_drequest(peer_device
, P_DATA_REQUEST
, req
->i
.sector
, req
->i
.size
,
1521 (unsigned long)req
);
1523 req_mod(req
, err
? SEND_FAILED
: HANDED_OVER_TO_NETWORK
);
1525 if (do_send_unplug
&& !err
)
1526 pd_send_unplug_remote(peer_device
);
1531 int w_restart_disk_io(struct drbd_work
*w
, int cancel
)
1533 struct drbd_request
*req
= container_of(w
, struct drbd_request
, w
);
1534 struct drbd_device
*device
= req
->device
;
1536 if (bio_data_dir(req
->master_bio
) == WRITE
&& req
->rq_state
& RQ_IN_ACT_LOG
)
1537 drbd_al_begin_io(device
, &req
->i
);
1539 drbd_req_make_private_bio(req
, req
->master_bio
);
1540 bio_set_dev(req
->private_bio
, device
->ldev
->backing_bdev
);
1541 generic_make_request(req
->private_bio
);
1546 static int _drbd_may_sync_now(struct drbd_device
*device
)
1548 struct drbd_device
*odev
= device
;
1552 if (!odev
->ldev
|| odev
->state
.disk
== D_DISKLESS
)
1555 resync_after
= rcu_dereference(odev
->ldev
->disk_conf
)->resync_after
;
1557 if (resync_after
== -1)
1559 odev
= minor_to_device(resync_after
);
1562 if ((odev
->state
.conn
>= C_SYNC_SOURCE
&&
1563 odev
->state
.conn
<= C_PAUSED_SYNC_T
) ||
1564 odev
->state
.aftr_isp
|| odev
->state
.peer_isp
||
1565 odev
->state
.user_isp
)
1571 * drbd_pause_after() - Pause resync on all devices that may not resync now
1572 * @device: DRBD device.
1574 * Called from process context only (admin command and after_state_ch).
1576 static bool drbd_pause_after(struct drbd_device
*device
)
1578 bool changed
= false;
1579 struct drbd_device
*odev
;
1583 idr_for_each_entry(&drbd_devices
, odev
, i
) {
1584 if (odev
->state
.conn
== C_STANDALONE
&& odev
->state
.disk
== D_DISKLESS
)
1586 if (!_drbd_may_sync_now(odev
) &&
1587 _drbd_set_state(_NS(odev
, aftr_isp
, 1),
1588 CS_HARD
, NULL
) != SS_NOTHING_TO_DO
)
1597 * drbd_resume_next() - Resume resync on all devices that may resync now
1598 * @device: DRBD device.
1600 * Called from process context only (admin command and worker).
1602 static bool drbd_resume_next(struct drbd_device
*device
)
1604 bool changed
= false;
1605 struct drbd_device
*odev
;
1609 idr_for_each_entry(&drbd_devices
, odev
, i
) {
1610 if (odev
->state
.conn
== C_STANDALONE
&& odev
->state
.disk
== D_DISKLESS
)
1612 if (odev
->state
.aftr_isp
) {
1613 if (_drbd_may_sync_now(odev
) &&
1614 _drbd_set_state(_NS(odev
, aftr_isp
, 0),
1615 CS_HARD
, NULL
) != SS_NOTHING_TO_DO
)
1623 void resume_next_sg(struct drbd_device
*device
)
1625 lock_all_resources();
1626 drbd_resume_next(device
);
1627 unlock_all_resources();
1630 void suspend_other_sg(struct drbd_device
*device
)
1632 lock_all_resources();
1633 drbd_pause_after(device
);
1634 unlock_all_resources();
1637 /* caller must lock_all_resources() */
1638 enum drbd_ret_code
drbd_resync_after_valid(struct drbd_device
*device
, int o_minor
)
1640 struct drbd_device
*odev
;
1645 if (o_minor
< -1 || o_minor
> MINORMASK
)
1646 return ERR_RESYNC_AFTER
;
1648 /* check for loops */
1649 odev
= minor_to_device(o_minor
);
1652 return ERR_RESYNC_AFTER_CYCLE
;
1654 /* You are free to depend on diskless, non-existing,
1655 * or not yet/no longer existing minors.
1656 * We only reject dependency loops.
1657 * We cannot follow the dependency chain beyond a detached or
1660 if (!odev
|| !odev
->ldev
|| odev
->state
.disk
== D_DISKLESS
)
1664 resync_after
= rcu_dereference(odev
->ldev
->disk_conf
)->resync_after
;
1666 /* dependency chain ends here, no cycles. */
1667 if (resync_after
== -1)
1670 /* follow the dependency chain */
1671 odev
= minor_to_device(resync_after
);
1675 /* caller must lock_all_resources() */
1676 void drbd_resync_after_changed(struct drbd_device
*device
)
1681 changed
= drbd_pause_after(device
);
1682 changed
|= drbd_resume_next(device
);
1686 void drbd_rs_controller_reset(struct drbd_device
*device
)
1688 struct gendisk
*disk
= device
->ldev
->backing_bdev
->bd_contains
->bd_disk
;
1689 struct fifo_buffer
*plan
;
1691 atomic_set(&device
->rs_sect_in
, 0);
1692 atomic_set(&device
->rs_sect_ev
, 0);
1693 device
->rs_in_flight
= 0;
1694 device
->rs_last_events
= (int)part_stat_read_accum(&disk
->part0
, sectors
);
1696 /* Updating the RCU protected object in place is necessary since
1697 this function gets called from atomic context.
1698 It is valid since all other updates also lead to an completely
1701 plan
= rcu_dereference(device
->rs_plan_s
);
1707 void start_resync_timer_fn(struct timer_list
*t
)
1709 struct drbd_device
*device
= from_timer(device
, t
, start_resync_timer
);
1710 drbd_device_post_work(device
, RS_START
);
1713 static void do_start_resync(struct drbd_device
*device
)
1715 if (atomic_read(&device
->unacked_cnt
) || atomic_read(&device
->rs_pending_cnt
)) {
1716 drbd_warn(device
, "postponing start_resync ...\n");
1717 device
->start_resync_timer
.expires
= jiffies
+ HZ
/10;
1718 add_timer(&device
->start_resync_timer
);
1722 drbd_start_resync(device
, C_SYNC_SOURCE
);
1723 clear_bit(AHEAD_TO_SYNC_SOURCE
, &device
->flags
);
1726 static bool use_checksum_based_resync(struct drbd_connection
*connection
, struct drbd_device
*device
)
1728 bool csums_after_crash_only
;
1730 csums_after_crash_only
= rcu_dereference(connection
->net_conf
)->csums_after_crash_only
;
1732 return connection
->agreed_pro_version
>= 89 && /* supported? */
1733 connection
->csums_tfm
&& /* configured? */
1734 (csums_after_crash_only
== false /* use for each resync? */
1735 || test_bit(CRASHED_PRIMARY
, &device
->flags
)); /* or only after Primary crash? */
1739 * drbd_start_resync() - Start the resync process
1740 * @device: DRBD device.
1741 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET
1743 * This function might bring you directly into one of the
1744 * C_PAUSED_SYNC_* states.
1746 void drbd_start_resync(struct drbd_device
*device
, enum drbd_conns side
)
1748 struct drbd_peer_device
*peer_device
= first_peer_device(device
);
1749 struct drbd_connection
*connection
= peer_device
? peer_device
->connection
: NULL
;
1750 union drbd_state ns
;
1753 if (device
->state
.conn
>= C_SYNC_SOURCE
&& device
->state
.conn
< C_AHEAD
) {
1754 drbd_err(device
, "Resync already running!\n");
1759 drbd_err(device
, "No connection to peer, aborting!\n");
1763 if (!test_bit(B_RS_H_DONE
, &device
->flags
)) {
1764 if (side
== C_SYNC_TARGET
) {
1765 /* Since application IO was locked out during C_WF_BITMAP_T and
1766 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1767 we check that we might make the data inconsistent. */
1768 r
= drbd_khelper(device
, "before-resync-target");
1769 r
= (r
>> 8) & 0xff;
1771 drbd_info(device
, "before-resync-target handler returned %d, "
1772 "dropping connection.\n", r
);
1773 conn_request_state(connection
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
1776 } else /* C_SYNC_SOURCE */ {
1777 r
= drbd_khelper(device
, "before-resync-source");
1778 r
= (r
>> 8) & 0xff;
1781 drbd_info(device
, "before-resync-source handler returned %d, "
1782 "ignoring. Old userland tools?", r
);
1784 drbd_info(device
, "before-resync-source handler returned %d, "
1785 "dropping connection.\n", r
);
1786 conn_request_state(connection
,
1787 NS(conn
, C_DISCONNECTING
), CS_HARD
);
1794 if (current
== connection
->worker
.task
) {
1795 /* The worker should not sleep waiting for state_mutex,
1796 that can take long */
1797 if (!mutex_trylock(device
->state_mutex
)) {
1798 set_bit(B_RS_H_DONE
, &device
->flags
);
1799 device
->start_resync_timer
.expires
= jiffies
+ HZ
/5;
1800 add_timer(&device
->start_resync_timer
);
1804 mutex_lock(device
->state_mutex
);
1807 lock_all_resources();
1808 clear_bit(B_RS_H_DONE
, &device
->flags
);
1809 /* Did some connection breakage or IO error race with us? */
1810 if (device
->state
.conn
< C_CONNECTED
1811 || !get_ldev_if_state(device
, D_NEGOTIATING
)) {
1812 unlock_all_resources();
1816 ns
= drbd_read_state(device
);
1818 ns
.aftr_isp
= !_drbd_may_sync_now(device
);
1822 if (side
== C_SYNC_TARGET
)
1823 ns
.disk
= D_INCONSISTENT
;
1824 else /* side == C_SYNC_SOURCE */
1825 ns
.pdsk
= D_INCONSISTENT
;
1827 r
= _drbd_set_state(device
, ns
, CS_VERBOSE
, NULL
);
1828 ns
= drbd_read_state(device
);
1830 if (ns
.conn
< C_CONNECTED
)
1831 r
= SS_UNKNOWN_ERROR
;
1833 if (r
== SS_SUCCESS
) {
1834 unsigned long tw
= drbd_bm_total_weight(device
);
1835 unsigned long now
= jiffies
;
1838 device
->rs_failed
= 0;
1839 device
->rs_paused
= 0;
1840 device
->rs_same_csum
= 0;
1841 device
->rs_last_sect_ev
= 0;
1842 device
->rs_total
= tw
;
1843 device
->rs_start
= now
;
1844 for (i
= 0; i
< DRBD_SYNC_MARKS
; i
++) {
1845 device
->rs_mark_left
[i
] = tw
;
1846 device
->rs_mark_time
[i
] = now
;
1848 drbd_pause_after(device
);
1849 /* Forget potentially stale cached per resync extent bit-counts.
1850 * Open coded drbd_rs_cancel_all(device), we already have IRQs
1851 * disabled, and know the disk state is ok. */
1852 spin_lock(&device
->al_lock
);
1853 lc_reset(device
->resync
);
1854 device
->resync_locked
= 0;
1855 device
->resync_wenr
= LC_FREE
;
1856 spin_unlock(&device
->al_lock
);
1858 unlock_all_resources();
1860 if (r
== SS_SUCCESS
) {
1861 wake_up(&device
->al_wait
); /* for lc_reset() above */
1862 /* reset rs_last_bcast when a resync or verify is started,
1863 * to deal with potential jiffies wrap. */
1864 device
->rs_last_bcast
= jiffies
- HZ
;
1866 drbd_info(device
, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1867 drbd_conn_str(ns
.conn
),
1868 (unsigned long) device
->rs_total
<< (BM_BLOCK_SHIFT
-10),
1869 (unsigned long) device
->rs_total
);
1870 if (side
== C_SYNC_TARGET
) {
1871 device
->bm_resync_fo
= 0;
1872 device
->use_csums
= use_checksum_based_resync(connection
, device
);
1874 device
->use_csums
= false;
1877 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1878 * with w_send_oos, or the sync target will get confused as to
1879 * how much bits to resync. We cannot do that always, because for an
1880 * empty resync and protocol < 95, we need to do it here, as we call
1881 * drbd_resync_finished from here in that case.
1882 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1883 * and from after_state_ch otherwise. */
1884 if (side
== C_SYNC_SOURCE
&& connection
->agreed_pro_version
< 96)
1885 drbd_gen_and_send_sync_uuid(peer_device
);
1887 if (connection
->agreed_pro_version
< 95 && device
->rs_total
== 0) {
1888 /* This still has a race (about when exactly the peers
1889 * detect connection loss) that can lead to a full sync
1890 * on next handshake. In 8.3.9 we fixed this with explicit
1891 * resync-finished notifications, but the fix
1892 * introduces a protocol change. Sleeping for some
1893 * time longer than the ping interval + timeout on the
1894 * SyncSource, to give the SyncTarget the chance to
1895 * detect connection loss, then waiting for a ping
1896 * response (implicit in drbd_resync_finished) reduces
1897 * the race considerably, but does not solve it. */
1898 if (side
== C_SYNC_SOURCE
) {
1899 struct net_conf
*nc
;
1903 nc
= rcu_dereference(connection
->net_conf
);
1904 timeo
= nc
->ping_int
* HZ
+ nc
->ping_timeo
* HZ
/ 9;
1906 schedule_timeout_interruptible(timeo
);
1908 drbd_resync_finished(device
);
1911 drbd_rs_controller_reset(device
);
1912 /* ns.conn may already be != device->state.conn,
1913 * we may have been paused in between, or become paused until
1914 * the timer triggers.
1915 * No matter, that is handled in resync_timer_fn() */
1916 if (ns
.conn
== C_SYNC_TARGET
)
1917 mod_timer(&device
->resync_timer
, jiffies
);
1919 drbd_md_sync(device
);
1923 mutex_unlock(device
->state_mutex
);
1926 static void update_on_disk_bitmap(struct drbd_device
*device
, bool resync_done
)
1928 struct sib_info sib
= { .sib_reason
= SIB_SYNC_PROGRESS
, };
1929 device
->rs_last_bcast
= jiffies
;
1931 if (!get_ldev(device
))
1934 drbd_bm_write_lazy(device
, 0);
1935 if (resync_done
&& is_sync_state(device
->state
.conn
))
1936 drbd_resync_finished(device
);
1938 drbd_bcast_event(device
, &sib
);
1939 /* update timestamp, in case it took a while to write out stuff */
1940 device
->rs_last_bcast
= jiffies
;
1944 static void drbd_ldev_destroy(struct drbd_device
*device
)
1946 lc_destroy(device
->resync
);
1947 device
->resync
= NULL
;
1948 lc_destroy(device
->act_log
);
1949 device
->act_log
= NULL
;
1952 drbd_backing_dev_free(device
, device
->ldev
);
1953 device
->ldev
= NULL
;
1956 clear_bit(GOING_DISKLESS
, &device
->flags
);
1957 wake_up(&device
->misc_wait
);
1960 static void go_diskless(struct drbd_device
*device
)
1962 D_ASSERT(device
, device
->state
.disk
== D_FAILED
);
1963 /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1964 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1965 * the protected members anymore, though, so once put_ldev reaches zero
1966 * again, it will be safe to free them. */
1968 /* Try to write changed bitmap pages, read errors may have just
1969 * set some bits outside the area covered by the activity log.
1971 * If we have an IO error during the bitmap writeout,
1972 * we will want a full sync next time, just in case.
1973 * (Do we want a specific meta data flag for this?)
1975 * If that does not make it to stable storage either,
1976 * we cannot do anything about that anymore.
1978 * We still need to check if both bitmap and ldev are present, we may
1979 * end up here after a failed attach, before ldev was even assigned.
1981 if (device
->bitmap
&& device
->ldev
) {
1982 /* An interrupted resync or similar is allowed to recounts bits
1984 * Any modifications would not be expected anymore, though.
1986 if (drbd_bitmap_io_from_worker(device
, drbd_bm_write
,
1987 "detach", BM_LOCKED_TEST_ALLOWED
)) {
1988 if (test_bit(WAS_READ_ERROR
, &device
->flags
)) {
1989 drbd_md_set_flag(device
, MDF_FULL_SYNC
);
1990 drbd_md_sync(device
);
1995 drbd_force_state(device
, NS(disk
, D_DISKLESS
));
1998 static int do_md_sync(struct drbd_device
*device
)
2000 drbd_warn(device
, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
2001 drbd_md_sync(device
);
2005 /* only called from drbd_worker thread, no locking */
2006 void __update_timing_details(
2007 struct drbd_thread_timing_details
*tdp
,
2008 unsigned int *cb_nr
,
2010 const char *fn
, const unsigned int line
)
2012 unsigned int i
= *cb_nr
% DRBD_THREAD_DETAILS_HIST
;
2013 struct drbd_thread_timing_details
*td
= tdp
+ i
;
2015 td
->start_jif
= jiffies
;
2021 i
= (i
+1) % DRBD_THREAD_DETAILS_HIST
;
2023 memset(td
, 0, sizeof(*td
));
2028 static void do_device_work(struct drbd_device
*device
, const unsigned long todo
)
2030 if (test_bit(MD_SYNC
, &todo
))
2032 if (test_bit(RS_DONE
, &todo
) ||
2033 test_bit(RS_PROGRESS
, &todo
))
2034 update_on_disk_bitmap(device
, test_bit(RS_DONE
, &todo
));
2035 if (test_bit(GO_DISKLESS
, &todo
))
2036 go_diskless(device
);
2037 if (test_bit(DESTROY_DISK
, &todo
))
2038 drbd_ldev_destroy(device
);
2039 if (test_bit(RS_START
, &todo
))
2040 do_start_resync(device
);
2043 #define DRBD_DEVICE_WORK_MASK \
2044 ((1UL << GO_DISKLESS) \
2045 |(1UL << DESTROY_DISK) \
2047 |(1UL << RS_START) \
2048 |(1UL << RS_PROGRESS) \
2052 static unsigned long get_work_bits(unsigned long *flags
)
2054 unsigned long old
, new;
2057 new = old
& ~DRBD_DEVICE_WORK_MASK
;
2058 } while (cmpxchg(flags
, old
, new) != old
);
2059 return old
& DRBD_DEVICE_WORK_MASK
;
2062 static void do_unqueued_work(struct drbd_connection
*connection
)
2064 struct drbd_peer_device
*peer_device
;
2068 idr_for_each_entry(&connection
->peer_devices
, peer_device
, vnr
) {
2069 struct drbd_device
*device
= peer_device
->device
;
2070 unsigned long todo
= get_work_bits(&device
->flags
);
2074 kref_get(&device
->kref
);
2076 do_device_work(device
, todo
);
2077 kref_put(&device
->kref
, drbd_destroy_device
);
2083 static bool dequeue_work_batch(struct drbd_work_queue
*queue
, struct list_head
*work_list
)
2085 spin_lock_irq(&queue
->q_lock
);
2086 list_splice_tail_init(&queue
->q
, work_list
);
2087 spin_unlock_irq(&queue
->q_lock
);
2088 return !list_empty(work_list
);
2091 static void wait_for_work(struct drbd_connection
*connection
, struct list_head
*work_list
)
2094 struct net_conf
*nc
;
2097 dequeue_work_batch(&connection
->sender_work
, work_list
);
2098 if (!list_empty(work_list
))
2101 /* Still nothing to do?
2102 * Maybe we still need to close the current epoch,
2103 * even if no new requests are queued yet.
2105 * Also, poke TCP, just in case.
2106 * Then wait for new work (or signal). */
2108 nc
= rcu_dereference(connection
->net_conf
);
2109 uncork
= nc
? nc
->tcp_cork
: 0;
2112 mutex_lock(&connection
->data
.mutex
);
2113 if (connection
->data
.socket
)
2114 drbd_tcp_uncork(connection
->data
.socket
);
2115 mutex_unlock(&connection
->data
.mutex
);
2120 prepare_to_wait(&connection
->sender_work
.q_wait
, &wait
, TASK_INTERRUPTIBLE
);
2121 spin_lock_irq(&connection
->resource
->req_lock
);
2122 spin_lock(&connection
->sender_work
.q_lock
); /* FIXME get rid of this one? */
2123 if (!list_empty(&connection
->sender_work
.q
))
2124 list_splice_tail_init(&connection
->sender_work
.q
, work_list
);
2125 spin_unlock(&connection
->sender_work
.q_lock
); /* FIXME get rid of this one? */
2126 if (!list_empty(work_list
) || signal_pending(current
)) {
2127 spin_unlock_irq(&connection
->resource
->req_lock
);
2131 /* We found nothing new to do, no to-be-communicated request,
2132 * no other work item. We may still need to close the last
2133 * epoch. Next incoming request epoch will be connection ->
2134 * current transfer log epoch number. If that is different
2135 * from the epoch of the last request we communicated, it is
2136 * safe to send the epoch separating barrier now.
2139 atomic_read(&connection
->current_tle_nr
) !=
2140 connection
->send
.current_epoch_nr
;
2141 spin_unlock_irq(&connection
->resource
->req_lock
);
2144 maybe_send_barrier(connection
,
2145 connection
->send
.current_epoch_nr
+ 1);
2147 if (test_bit(DEVICE_WORK_PENDING
, &connection
->flags
))
2150 /* drbd_send() may have called flush_signals() */
2151 if (get_t_state(&connection
->worker
) != RUNNING
)
2155 /* may be woken up for other things but new work, too,
2156 * e.g. if the current epoch got closed.
2157 * In which case we send the barrier above. */
2159 finish_wait(&connection
->sender_work
.q_wait
, &wait
);
2161 /* someone may have changed the config while we have been waiting above. */
2163 nc
= rcu_dereference(connection
->net_conf
);
2164 cork
= nc
? nc
->tcp_cork
: 0;
2166 mutex_lock(&connection
->data
.mutex
);
2167 if (connection
->data
.socket
) {
2169 drbd_tcp_cork(connection
->data
.socket
);
2171 drbd_tcp_uncork(connection
->data
.socket
);
2173 mutex_unlock(&connection
->data
.mutex
);
2176 int drbd_worker(struct drbd_thread
*thi
)
2178 struct drbd_connection
*connection
= thi
->connection
;
2179 struct drbd_work
*w
= NULL
;
2180 struct drbd_peer_device
*peer_device
;
2181 LIST_HEAD(work_list
);
2184 while (get_t_state(thi
) == RUNNING
) {
2185 drbd_thread_current_set_cpu(thi
);
2187 if (list_empty(&work_list
)) {
2188 update_worker_timing_details(connection
, wait_for_work
);
2189 wait_for_work(connection
, &work_list
);
2192 if (test_and_clear_bit(DEVICE_WORK_PENDING
, &connection
->flags
)) {
2193 update_worker_timing_details(connection
, do_unqueued_work
);
2194 do_unqueued_work(connection
);
2197 if (signal_pending(current
)) {
2198 flush_signals(current
);
2199 if (get_t_state(thi
) == RUNNING
) {
2200 drbd_warn(connection
, "Worker got an unexpected signal\n");
2206 if (get_t_state(thi
) != RUNNING
)
2209 if (!list_empty(&work_list
)) {
2210 w
= list_first_entry(&work_list
, struct drbd_work
, list
);
2211 list_del_init(&w
->list
);
2212 update_worker_timing_details(connection
, w
->cb
);
2213 if (w
->cb(w
, connection
->cstate
< C_WF_REPORT_PARAMS
) == 0)
2215 if (connection
->cstate
>= C_WF_REPORT_PARAMS
)
2216 conn_request_state(connection
, NS(conn
, C_NETWORK_FAILURE
), CS_HARD
);
2221 if (test_and_clear_bit(DEVICE_WORK_PENDING
, &connection
->flags
)) {
2222 update_worker_timing_details(connection
, do_unqueued_work
);
2223 do_unqueued_work(connection
);
2225 if (!list_empty(&work_list
)) {
2226 w
= list_first_entry(&work_list
, struct drbd_work
, list
);
2227 list_del_init(&w
->list
);
2228 update_worker_timing_details(connection
, w
->cb
);
2231 dequeue_work_batch(&connection
->sender_work
, &work_list
);
2232 } while (!list_empty(&work_list
) || test_bit(DEVICE_WORK_PENDING
, &connection
->flags
));
2235 idr_for_each_entry(&connection
->peer_devices
, peer_device
, vnr
) {
2236 struct drbd_device
*device
= peer_device
->device
;
2237 D_ASSERT(device
, device
->state
.disk
== D_DISKLESS
&& device
->state
.conn
== C_STANDALONE
);
2238 kref_get(&device
->kref
);
2240 drbd_device_cleanup(device
);
2241 kref_put(&device
->kref
, drbd_destroy_device
);