perf tools: Don't clone maps from parent when synthesizing forks
[linux/fpc-iii.git] / drivers / block / drbd / drbd_worker.c
blob99255d0c9e2ffab9bde0500b14ed446c5bce281d
1 /*
2 drbd_worker.c
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched/signal.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
38 #include "drbd_int.h"
39 #include "drbd_protocol.h"
40 #include "drbd_req.h"
42 static int make_ov_request(struct drbd_device *, int);
43 static int make_resync_request(struct drbd_device *, int);
45 /* endio handlers:
46 * drbd_md_endio (defined here)
47 * drbd_request_endio (defined here)
48 * drbd_peer_request_endio (defined here)
49 * drbd_bm_endio (defined in drbd_bitmap.c)
51 * For all these callbacks, note the following:
52 * The callbacks will be called in irq context by the IDE drivers,
53 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
54 * Try to get the locking right :)
58 /* used for synchronous meta data and bitmap IO
59 * submitted by drbd_md_sync_page_io()
61 void drbd_md_endio(struct bio *bio)
63 struct drbd_device *device;
65 device = bio->bi_private;
66 device->md_io.error = blk_status_to_errno(bio->bi_status);
68 /* special case: drbd_md_read() during drbd_adm_attach() */
69 if (device->ldev)
70 put_ldev(device);
71 bio_put(bio);
73 /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
74 * to timeout on the lower level device, and eventually detach from it.
75 * If this io completion runs after that timeout expired, this
76 * drbd_md_put_buffer() may allow us to finally try and re-attach.
77 * During normal operation, this only puts that extra reference
78 * down to 1 again.
79 * Make sure we first drop the reference, and only then signal
80 * completion, or we may (in drbd_al_read_log()) cycle so fast into the
81 * next drbd_md_sync_page_io(), that we trigger the
82 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
84 drbd_md_put_buffer(device);
85 device->md_io.done = 1;
86 wake_up(&device->misc_wait);
89 /* reads on behalf of the partner,
90 * "submitted" by the receiver
92 static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
94 unsigned long flags = 0;
95 struct drbd_peer_device *peer_device = peer_req->peer_device;
96 struct drbd_device *device = peer_device->device;
98 spin_lock_irqsave(&device->resource->req_lock, flags);
99 device->read_cnt += peer_req->i.size >> 9;
100 list_del(&peer_req->w.list);
101 if (list_empty(&device->read_ee))
102 wake_up(&device->ee_wait);
103 if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
104 __drbd_chk_io_error(device, DRBD_READ_ERROR);
105 spin_unlock_irqrestore(&device->resource->req_lock, flags);
107 drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
108 put_ldev(device);
111 /* writes on behalf of the partner, or resync writes,
112 * "submitted" by the receiver, final stage. */
113 void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
115 unsigned long flags = 0;
116 struct drbd_peer_device *peer_device = peer_req->peer_device;
117 struct drbd_device *device = peer_device->device;
118 struct drbd_connection *connection = peer_device->connection;
119 struct drbd_interval i;
120 int do_wake;
121 u64 block_id;
122 int do_al_complete_io;
124 /* after we moved peer_req to done_ee,
125 * we may no longer access it,
126 * it may be freed/reused already!
127 * (as soon as we release the req_lock) */
128 i = peer_req->i;
129 do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
130 block_id = peer_req->block_id;
131 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
133 if (peer_req->flags & EE_WAS_ERROR) {
134 /* In protocol != C, we usually do not send write acks.
135 * In case of a write error, send the neg ack anyways. */
136 if (!__test_and_set_bit(__EE_SEND_WRITE_ACK, &peer_req->flags))
137 inc_unacked(device);
138 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
141 spin_lock_irqsave(&device->resource->req_lock, flags);
142 device->writ_cnt += peer_req->i.size >> 9;
143 list_move_tail(&peer_req->w.list, &device->done_ee);
146 * Do not remove from the write_requests tree here: we did not send the
147 * Ack yet and did not wake possibly waiting conflicting requests.
148 * Removed from the tree from "drbd_process_done_ee" within the
149 * appropriate dw.cb (e_end_block/e_end_resync_block) or from
150 * _drbd_clear_done_ee.
153 do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
155 /* FIXME do we want to detach for failed REQ_OP_DISCARD?
156 * ((peer_req->flags & (EE_WAS_ERROR|EE_IS_TRIM)) == EE_WAS_ERROR) */
157 if (peer_req->flags & EE_WAS_ERROR)
158 __drbd_chk_io_error(device, DRBD_WRITE_ERROR);
160 if (connection->cstate >= C_WF_REPORT_PARAMS) {
161 kref_get(&device->kref); /* put is in drbd_send_acks_wf() */
162 if (!queue_work(connection->ack_sender, &peer_device->send_acks_work))
163 kref_put(&device->kref, drbd_destroy_device);
165 spin_unlock_irqrestore(&device->resource->req_lock, flags);
167 if (block_id == ID_SYNCER)
168 drbd_rs_complete_io(device, i.sector);
170 if (do_wake)
171 wake_up(&device->ee_wait);
173 if (do_al_complete_io)
174 drbd_al_complete_io(device, &i);
176 put_ldev(device);
179 /* writes on behalf of the partner, or resync writes,
180 * "submitted" by the receiver.
182 void drbd_peer_request_endio(struct bio *bio)
184 struct drbd_peer_request *peer_req = bio->bi_private;
185 struct drbd_device *device = peer_req->peer_device->device;
186 bool is_write = bio_data_dir(bio) == WRITE;
187 bool is_discard = bio_op(bio) == REQ_OP_WRITE_ZEROES ||
188 bio_op(bio) == REQ_OP_DISCARD;
190 if (bio->bi_status && __ratelimit(&drbd_ratelimit_state))
191 drbd_warn(device, "%s: error=%d s=%llus\n",
192 is_write ? (is_discard ? "discard" : "write")
193 : "read", bio->bi_status,
194 (unsigned long long)peer_req->i.sector);
196 if (bio->bi_status)
197 set_bit(__EE_WAS_ERROR, &peer_req->flags);
199 bio_put(bio); /* no need for the bio anymore */
200 if (atomic_dec_and_test(&peer_req->pending_bios)) {
201 if (is_write)
202 drbd_endio_write_sec_final(peer_req);
203 else
204 drbd_endio_read_sec_final(peer_req);
208 static void
209 drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device)
211 panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n",
212 device->minor, device->resource->name, device->vnr);
215 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
217 void drbd_request_endio(struct bio *bio)
219 unsigned long flags;
220 struct drbd_request *req = bio->bi_private;
221 struct drbd_device *device = req->device;
222 struct bio_and_error m;
223 enum drbd_req_event what;
225 /* If this request was aborted locally before,
226 * but now was completed "successfully",
227 * chances are that this caused arbitrary data corruption.
229 * "aborting" requests, or force-detaching the disk, is intended for
230 * completely blocked/hung local backing devices which do no longer
231 * complete requests at all, not even do error completions. In this
232 * situation, usually a hard-reset and failover is the only way out.
234 * By "aborting", basically faking a local error-completion,
235 * we allow for a more graceful swichover by cleanly migrating services.
236 * Still the affected node has to be rebooted "soon".
238 * By completing these requests, we allow the upper layers to re-use
239 * the associated data pages.
241 * If later the local backing device "recovers", and now DMAs some data
242 * from disk into the original request pages, in the best case it will
243 * just put random data into unused pages; but typically it will corrupt
244 * meanwhile completely unrelated data, causing all sorts of damage.
246 * Which means delayed successful completion,
247 * especially for READ requests,
248 * is a reason to panic().
250 * We assume that a delayed *error* completion is OK,
251 * though we still will complain noisily about it.
253 if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
254 if (__ratelimit(&drbd_ratelimit_state))
255 drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
257 if (!bio->bi_status)
258 drbd_panic_after_delayed_completion_of_aborted_request(device);
261 /* to avoid recursion in __req_mod */
262 if (unlikely(bio->bi_status)) {
263 switch (bio_op(bio)) {
264 case REQ_OP_WRITE_ZEROES:
265 case REQ_OP_DISCARD:
266 if (bio->bi_status == BLK_STS_NOTSUPP)
267 what = DISCARD_COMPLETED_NOTSUPP;
268 else
269 what = DISCARD_COMPLETED_WITH_ERROR;
270 break;
271 case REQ_OP_READ:
272 if (bio->bi_opf & REQ_RAHEAD)
273 what = READ_AHEAD_COMPLETED_WITH_ERROR;
274 else
275 what = READ_COMPLETED_WITH_ERROR;
276 break;
277 default:
278 what = WRITE_COMPLETED_WITH_ERROR;
279 break;
281 } else {
282 what = COMPLETED_OK;
285 req->private_bio = ERR_PTR(blk_status_to_errno(bio->bi_status));
286 bio_put(bio);
288 /* not req_mod(), we need irqsave here! */
289 spin_lock_irqsave(&device->resource->req_lock, flags);
290 __req_mod(req, what, &m);
291 spin_unlock_irqrestore(&device->resource->req_lock, flags);
292 put_ldev(device);
294 if (m.bio)
295 complete_master_bio(device, &m);
298 void drbd_csum_ee(struct crypto_shash *tfm, struct drbd_peer_request *peer_req, void *digest)
300 SHASH_DESC_ON_STACK(desc, tfm);
301 struct page *page = peer_req->pages;
302 struct page *tmp;
303 unsigned len;
304 void *src;
306 desc->tfm = tfm;
307 desc->flags = 0;
309 crypto_shash_init(desc);
311 src = kmap_atomic(page);
312 while ((tmp = page_chain_next(page))) {
313 /* all but the last page will be fully used */
314 crypto_shash_update(desc, src, PAGE_SIZE);
315 kunmap_atomic(src);
316 page = tmp;
317 src = kmap_atomic(page);
319 /* and now the last, possibly only partially used page */
320 len = peer_req->i.size & (PAGE_SIZE - 1);
321 crypto_shash_update(desc, src, len ?: PAGE_SIZE);
322 kunmap_atomic(src);
324 crypto_shash_final(desc, digest);
325 shash_desc_zero(desc);
328 void drbd_csum_bio(struct crypto_shash *tfm, struct bio *bio, void *digest)
330 SHASH_DESC_ON_STACK(desc, tfm);
331 struct bio_vec bvec;
332 struct bvec_iter iter;
334 desc->tfm = tfm;
335 desc->flags = 0;
337 crypto_shash_init(desc);
339 bio_for_each_segment(bvec, bio, iter) {
340 u8 *src;
342 src = kmap_atomic(bvec.bv_page);
343 crypto_shash_update(desc, src + bvec.bv_offset, bvec.bv_len);
344 kunmap_atomic(src);
346 /* REQ_OP_WRITE_SAME has only one segment,
347 * checksum the payload only once. */
348 if (bio_op(bio) == REQ_OP_WRITE_SAME)
349 break;
351 crypto_shash_final(desc, digest);
352 shash_desc_zero(desc);
355 /* MAYBE merge common code with w_e_end_ov_req */
356 static int w_e_send_csum(struct drbd_work *w, int cancel)
358 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
359 struct drbd_peer_device *peer_device = peer_req->peer_device;
360 struct drbd_device *device = peer_device->device;
361 int digest_size;
362 void *digest;
363 int err = 0;
365 if (unlikely(cancel))
366 goto out;
368 if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
369 goto out;
371 digest_size = crypto_shash_digestsize(peer_device->connection->csums_tfm);
372 digest = kmalloc(digest_size, GFP_NOIO);
373 if (digest) {
374 sector_t sector = peer_req->i.sector;
375 unsigned int size = peer_req->i.size;
376 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
377 /* Free peer_req and pages before send.
378 * In case we block on congestion, we could otherwise run into
379 * some distributed deadlock, if the other side blocks on
380 * congestion as well, because our receiver blocks in
381 * drbd_alloc_pages due to pp_in_use > max_buffers. */
382 drbd_free_peer_req(device, peer_req);
383 peer_req = NULL;
384 inc_rs_pending(device);
385 err = drbd_send_drequest_csum(peer_device, sector, size,
386 digest, digest_size,
387 P_CSUM_RS_REQUEST);
388 kfree(digest);
389 } else {
390 drbd_err(device, "kmalloc() of digest failed.\n");
391 err = -ENOMEM;
394 out:
395 if (peer_req)
396 drbd_free_peer_req(device, peer_req);
398 if (unlikely(err))
399 drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
400 return err;
403 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
405 static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
407 struct drbd_device *device = peer_device->device;
408 struct drbd_peer_request *peer_req;
410 if (!get_ldev(device))
411 return -EIO;
413 /* GFP_TRY, because if there is no memory available right now, this may
414 * be rescheduled for later. It is "only" background resync, after all. */
415 peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
416 size, size, GFP_TRY);
417 if (!peer_req)
418 goto defer;
420 peer_req->w.cb = w_e_send_csum;
421 spin_lock_irq(&device->resource->req_lock);
422 list_add_tail(&peer_req->w.list, &device->read_ee);
423 spin_unlock_irq(&device->resource->req_lock);
425 atomic_add(size >> 9, &device->rs_sect_ev);
426 if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
427 DRBD_FAULT_RS_RD) == 0)
428 return 0;
430 /* If it failed because of ENOMEM, retry should help. If it failed
431 * because bio_add_page failed (probably broken lower level driver),
432 * retry may or may not help.
433 * If it does not, you may need to force disconnect. */
434 spin_lock_irq(&device->resource->req_lock);
435 list_del(&peer_req->w.list);
436 spin_unlock_irq(&device->resource->req_lock);
438 drbd_free_peer_req(device, peer_req);
439 defer:
440 put_ldev(device);
441 return -EAGAIN;
444 int w_resync_timer(struct drbd_work *w, int cancel)
446 struct drbd_device *device =
447 container_of(w, struct drbd_device, resync_work);
449 switch (device->state.conn) {
450 case C_VERIFY_S:
451 make_ov_request(device, cancel);
452 break;
453 case C_SYNC_TARGET:
454 make_resync_request(device, cancel);
455 break;
458 return 0;
461 void resync_timer_fn(struct timer_list *t)
463 struct drbd_device *device = from_timer(device, t, resync_timer);
465 drbd_queue_work_if_unqueued(
466 &first_peer_device(device)->connection->sender_work,
467 &device->resync_work);
470 static void fifo_set(struct fifo_buffer *fb, int value)
472 int i;
474 for (i = 0; i < fb->size; i++)
475 fb->values[i] = value;
478 static int fifo_push(struct fifo_buffer *fb, int value)
480 int ov;
482 ov = fb->values[fb->head_index];
483 fb->values[fb->head_index++] = value;
485 if (fb->head_index >= fb->size)
486 fb->head_index = 0;
488 return ov;
491 static void fifo_add_val(struct fifo_buffer *fb, int value)
493 int i;
495 for (i = 0; i < fb->size; i++)
496 fb->values[i] += value;
499 struct fifo_buffer *fifo_alloc(int fifo_size)
501 struct fifo_buffer *fb;
503 fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
504 if (!fb)
505 return NULL;
507 fb->head_index = 0;
508 fb->size = fifo_size;
509 fb->total = 0;
511 return fb;
514 static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
516 struct disk_conf *dc;
517 unsigned int want; /* The number of sectors we want in-flight */
518 int req_sect; /* Number of sectors to request in this turn */
519 int correction; /* Number of sectors more we need in-flight */
520 int cps; /* correction per invocation of drbd_rs_controller() */
521 int steps; /* Number of time steps to plan ahead */
522 int curr_corr;
523 int max_sect;
524 struct fifo_buffer *plan;
526 dc = rcu_dereference(device->ldev->disk_conf);
527 plan = rcu_dereference(device->rs_plan_s);
529 steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
531 if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
532 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
533 } else { /* normal path */
534 want = dc->c_fill_target ? dc->c_fill_target :
535 sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
538 correction = want - device->rs_in_flight - plan->total;
540 /* Plan ahead */
541 cps = correction / steps;
542 fifo_add_val(plan, cps);
543 plan->total += cps * steps;
545 /* What we do in this step */
546 curr_corr = fifo_push(plan, 0);
547 plan->total -= curr_corr;
549 req_sect = sect_in + curr_corr;
550 if (req_sect < 0)
551 req_sect = 0;
553 max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
554 if (req_sect > max_sect)
555 req_sect = max_sect;
558 drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
559 sect_in, device->rs_in_flight, want, correction,
560 steps, cps, device->rs_planed, curr_corr, req_sect);
563 return req_sect;
566 static int drbd_rs_number_requests(struct drbd_device *device)
568 unsigned int sect_in; /* Number of sectors that came in since the last turn */
569 int number, mxb;
571 sect_in = atomic_xchg(&device->rs_sect_in, 0);
572 device->rs_in_flight -= sect_in;
574 rcu_read_lock();
575 mxb = drbd_get_max_buffers(device) / 2;
576 if (rcu_dereference(device->rs_plan_s)->size) {
577 number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
578 device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
579 } else {
580 device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
581 number = SLEEP_TIME * device->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ);
583 rcu_read_unlock();
585 /* Don't have more than "max-buffers"/2 in-flight.
586 * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
587 * potentially causing a distributed deadlock on congestion during
588 * online-verify or (checksum-based) resync, if max-buffers,
589 * socket buffer sizes and resync rate settings are mis-configured. */
591 /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
592 * mxb (as used here, and in drbd_alloc_pages on the peer) is
593 * "number of pages" (typically also 4k),
594 * but "rs_in_flight" is in "sectors" (512 Byte). */
595 if (mxb - device->rs_in_flight/8 < number)
596 number = mxb - device->rs_in_flight/8;
598 return number;
601 static int make_resync_request(struct drbd_device *const device, int cancel)
603 struct drbd_peer_device *const peer_device = first_peer_device(device);
604 struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
605 unsigned long bit;
606 sector_t sector;
607 const sector_t capacity = drbd_get_capacity(device->this_bdev);
608 int max_bio_size;
609 int number, rollback_i, size;
610 int align, requeue = 0;
611 int i = 0;
612 int discard_granularity = 0;
614 if (unlikely(cancel))
615 return 0;
617 if (device->rs_total == 0) {
618 /* empty resync? */
619 drbd_resync_finished(device);
620 return 0;
623 if (!get_ldev(device)) {
624 /* Since we only need to access device->rsync a
625 get_ldev_if_state(device,D_FAILED) would be sufficient, but
626 to continue resync with a broken disk makes no sense at
627 all */
628 drbd_err(device, "Disk broke down during resync!\n");
629 return 0;
632 if (connection->agreed_features & DRBD_FF_THIN_RESYNC) {
633 rcu_read_lock();
634 discard_granularity = rcu_dereference(device->ldev->disk_conf)->rs_discard_granularity;
635 rcu_read_unlock();
638 max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
639 number = drbd_rs_number_requests(device);
640 if (number <= 0)
641 goto requeue;
643 for (i = 0; i < number; i++) {
644 /* Stop generating RS requests when half of the send buffer is filled,
645 * but notify TCP that we'd like to have more space. */
646 mutex_lock(&connection->data.mutex);
647 if (connection->data.socket) {
648 struct sock *sk = connection->data.socket->sk;
649 int queued = sk->sk_wmem_queued;
650 int sndbuf = sk->sk_sndbuf;
651 if (queued > sndbuf / 2) {
652 requeue = 1;
653 if (sk->sk_socket)
654 set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
656 } else
657 requeue = 1;
658 mutex_unlock(&connection->data.mutex);
659 if (requeue)
660 goto requeue;
662 next_sector:
663 size = BM_BLOCK_SIZE;
664 bit = drbd_bm_find_next(device, device->bm_resync_fo);
666 if (bit == DRBD_END_OF_BITMAP) {
667 device->bm_resync_fo = drbd_bm_bits(device);
668 put_ldev(device);
669 return 0;
672 sector = BM_BIT_TO_SECT(bit);
674 if (drbd_try_rs_begin_io(device, sector)) {
675 device->bm_resync_fo = bit;
676 goto requeue;
678 device->bm_resync_fo = bit + 1;
680 if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
681 drbd_rs_complete_io(device, sector);
682 goto next_sector;
685 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
686 /* try to find some adjacent bits.
687 * we stop if we have already the maximum req size.
689 * Additionally always align bigger requests, in order to
690 * be prepared for all stripe sizes of software RAIDs.
692 align = 1;
693 rollback_i = i;
694 while (i < number) {
695 if (size + BM_BLOCK_SIZE > max_bio_size)
696 break;
698 /* Be always aligned */
699 if (sector & ((1<<(align+3))-1))
700 break;
702 if (discard_granularity && size == discard_granularity)
703 break;
705 /* do not cross extent boundaries */
706 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
707 break;
708 /* now, is it actually dirty, after all?
709 * caution, drbd_bm_test_bit is tri-state for some
710 * obscure reason; ( b == 0 ) would get the out-of-band
711 * only accidentally right because of the "oddly sized"
712 * adjustment below */
713 if (drbd_bm_test_bit(device, bit+1) != 1)
714 break;
715 bit++;
716 size += BM_BLOCK_SIZE;
717 if ((BM_BLOCK_SIZE << align) <= size)
718 align++;
719 i++;
721 /* if we merged some,
722 * reset the offset to start the next drbd_bm_find_next from */
723 if (size > BM_BLOCK_SIZE)
724 device->bm_resync_fo = bit + 1;
725 #endif
727 /* adjust very last sectors, in case we are oddly sized */
728 if (sector + (size>>9) > capacity)
729 size = (capacity-sector)<<9;
731 if (device->use_csums) {
732 switch (read_for_csum(peer_device, sector, size)) {
733 case -EIO: /* Disk failure */
734 put_ldev(device);
735 return -EIO;
736 case -EAGAIN: /* allocation failed, or ldev busy */
737 drbd_rs_complete_io(device, sector);
738 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
739 i = rollback_i;
740 goto requeue;
741 case 0:
742 /* everything ok */
743 break;
744 default:
745 BUG();
747 } else {
748 int err;
750 inc_rs_pending(device);
751 err = drbd_send_drequest(peer_device,
752 size == discard_granularity ? P_RS_THIN_REQ : P_RS_DATA_REQUEST,
753 sector, size, ID_SYNCER);
754 if (err) {
755 drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
756 dec_rs_pending(device);
757 put_ldev(device);
758 return err;
763 if (device->bm_resync_fo >= drbd_bm_bits(device)) {
764 /* last syncer _request_ was sent,
765 * but the P_RS_DATA_REPLY not yet received. sync will end (and
766 * next sync group will resume), as soon as we receive the last
767 * resync data block, and the last bit is cleared.
768 * until then resync "work" is "inactive" ...
770 put_ldev(device);
771 return 0;
774 requeue:
775 device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
776 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
777 put_ldev(device);
778 return 0;
781 static int make_ov_request(struct drbd_device *device, int cancel)
783 int number, i, size;
784 sector_t sector;
785 const sector_t capacity = drbd_get_capacity(device->this_bdev);
786 bool stop_sector_reached = false;
788 if (unlikely(cancel))
789 return 1;
791 number = drbd_rs_number_requests(device);
793 sector = device->ov_position;
794 for (i = 0; i < number; i++) {
795 if (sector >= capacity)
796 return 1;
798 /* We check for "finished" only in the reply path:
799 * w_e_end_ov_reply().
800 * We need to send at least one request out. */
801 stop_sector_reached = i > 0
802 && verify_can_do_stop_sector(device)
803 && sector >= device->ov_stop_sector;
804 if (stop_sector_reached)
805 break;
807 size = BM_BLOCK_SIZE;
809 if (drbd_try_rs_begin_io(device, sector)) {
810 device->ov_position = sector;
811 goto requeue;
814 if (sector + (size>>9) > capacity)
815 size = (capacity-sector)<<9;
817 inc_rs_pending(device);
818 if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
819 dec_rs_pending(device);
820 return 0;
822 sector += BM_SECT_PER_BIT;
824 device->ov_position = sector;
826 requeue:
827 device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
828 if (i == 0 || !stop_sector_reached)
829 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
830 return 1;
833 int w_ov_finished(struct drbd_work *w, int cancel)
835 struct drbd_device_work *dw =
836 container_of(w, struct drbd_device_work, w);
837 struct drbd_device *device = dw->device;
838 kfree(dw);
839 ov_out_of_sync_print(device);
840 drbd_resync_finished(device);
842 return 0;
845 static int w_resync_finished(struct drbd_work *w, int cancel)
847 struct drbd_device_work *dw =
848 container_of(w, struct drbd_device_work, w);
849 struct drbd_device *device = dw->device;
850 kfree(dw);
852 drbd_resync_finished(device);
854 return 0;
857 static void ping_peer(struct drbd_device *device)
859 struct drbd_connection *connection = first_peer_device(device)->connection;
861 clear_bit(GOT_PING_ACK, &connection->flags);
862 request_ping(connection);
863 wait_event(connection->ping_wait,
864 test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
867 int drbd_resync_finished(struct drbd_device *device)
869 struct drbd_connection *connection = first_peer_device(device)->connection;
870 unsigned long db, dt, dbdt;
871 unsigned long n_oos;
872 union drbd_state os, ns;
873 struct drbd_device_work *dw;
874 char *khelper_cmd = NULL;
875 int verify_done = 0;
877 /* Remove all elements from the resync LRU. Since future actions
878 * might set bits in the (main) bitmap, then the entries in the
879 * resync LRU would be wrong. */
880 if (drbd_rs_del_all(device)) {
881 /* In case this is not possible now, most probably because
882 * there are P_RS_DATA_REPLY Packets lingering on the worker's
883 * queue (or even the read operations for those packets
884 * is not finished by now). Retry in 100ms. */
886 schedule_timeout_interruptible(HZ / 10);
887 dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
888 if (dw) {
889 dw->w.cb = w_resync_finished;
890 dw->device = device;
891 drbd_queue_work(&connection->sender_work, &dw->w);
892 return 1;
894 drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
897 dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
898 if (dt <= 0)
899 dt = 1;
901 db = device->rs_total;
902 /* adjust for verify start and stop sectors, respective reached position */
903 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
904 db -= device->ov_left;
906 dbdt = Bit2KB(db/dt);
907 device->rs_paused /= HZ;
909 if (!get_ldev(device))
910 goto out;
912 ping_peer(device);
914 spin_lock_irq(&device->resource->req_lock);
915 os = drbd_read_state(device);
917 verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
919 /* This protects us against multiple calls (that can happen in the presence
920 of application IO), and against connectivity loss just before we arrive here. */
921 if (os.conn <= C_CONNECTED)
922 goto out_unlock;
924 ns = os;
925 ns.conn = C_CONNECTED;
927 drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
928 verify_done ? "Online verify" : "Resync",
929 dt + device->rs_paused, device->rs_paused, dbdt);
931 n_oos = drbd_bm_total_weight(device);
933 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
934 if (n_oos) {
935 drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
936 n_oos, Bit2KB(1));
937 khelper_cmd = "out-of-sync";
939 } else {
940 D_ASSERT(device, (n_oos - device->rs_failed) == 0);
942 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
943 khelper_cmd = "after-resync-target";
945 if (device->use_csums && device->rs_total) {
946 const unsigned long s = device->rs_same_csum;
947 const unsigned long t = device->rs_total;
948 const int ratio =
949 (t == 0) ? 0 :
950 (t < 100000) ? ((s*100)/t) : (s/(t/100));
951 drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
952 "transferred %luK total %luK\n",
953 ratio,
954 Bit2KB(device->rs_same_csum),
955 Bit2KB(device->rs_total - device->rs_same_csum),
956 Bit2KB(device->rs_total));
960 if (device->rs_failed) {
961 drbd_info(device, " %lu failed blocks\n", device->rs_failed);
963 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
964 ns.disk = D_INCONSISTENT;
965 ns.pdsk = D_UP_TO_DATE;
966 } else {
967 ns.disk = D_UP_TO_DATE;
968 ns.pdsk = D_INCONSISTENT;
970 } else {
971 ns.disk = D_UP_TO_DATE;
972 ns.pdsk = D_UP_TO_DATE;
974 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
975 if (device->p_uuid) {
976 int i;
977 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
978 _drbd_uuid_set(device, i, device->p_uuid[i]);
979 drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
980 _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
981 } else {
982 drbd_err(device, "device->p_uuid is NULL! BUG\n");
986 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
987 /* for verify runs, we don't update uuids here,
988 * so there would be nothing to report. */
989 drbd_uuid_set_bm(device, 0UL);
990 drbd_print_uuids(device, "updated UUIDs");
991 if (device->p_uuid) {
992 /* Now the two UUID sets are equal, update what we
993 * know of the peer. */
994 int i;
995 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
996 device->p_uuid[i] = device->ldev->md.uuid[i];
1001 _drbd_set_state(device, ns, CS_VERBOSE, NULL);
1002 out_unlock:
1003 spin_unlock_irq(&device->resource->req_lock);
1005 /* If we have been sync source, and have an effective fencing-policy,
1006 * once *all* volumes are back in sync, call "unfence". */
1007 if (os.conn == C_SYNC_SOURCE) {
1008 enum drbd_disk_state disk_state = D_MASK;
1009 enum drbd_disk_state pdsk_state = D_MASK;
1010 enum drbd_fencing_p fp = FP_DONT_CARE;
1012 rcu_read_lock();
1013 fp = rcu_dereference(device->ldev->disk_conf)->fencing;
1014 if (fp != FP_DONT_CARE) {
1015 struct drbd_peer_device *peer_device;
1016 int vnr;
1017 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1018 struct drbd_device *device = peer_device->device;
1019 disk_state = min_t(enum drbd_disk_state, disk_state, device->state.disk);
1020 pdsk_state = min_t(enum drbd_disk_state, pdsk_state, device->state.pdsk);
1023 rcu_read_unlock();
1024 if (disk_state == D_UP_TO_DATE && pdsk_state == D_UP_TO_DATE)
1025 conn_khelper(connection, "unfence-peer");
1028 put_ldev(device);
1029 out:
1030 device->rs_total = 0;
1031 device->rs_failed = 0;
1032 device->rs_paused = 0;
1034 /* reset start sector, if we reached end of device */
1035 if (verify_done && device->ov_left == 0)
1036 device->ov_start_sector = 0;
1038 drbd_md_sync(device);
1040 if (khelper_cmd)
1041 drbd_khelper(device, khelper_cmd);
1043 return 1;
1046 /* helper */
1047 static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
1049 if (drbd_peer_req_has_active_page(peer_req)) {
1050 /* This might happen if sendpage() has not finished */
1051 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
1052 atomic_add(i, &device->pp_in_use_by_net);
1053 atomic_sub(i, &device->pp_in_use);
1054 spin_lock_irq(&device->resource->req_lock);
1055 list_add_tail(&peer_req->w.list, &device->net_ee);
1056 spin_unlock_irq(&device->resource->req_lock);
1057 wake_up(&drbd_pp_wait);
1058 } else
1059 drbd_free_peer_req(device, peer_req);
1063 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
1064 * @w: work object.
1065 * @cancel: The connection will be closed anyways
1067 int w_e_end_data_req(struct drbd_work *w, int cancel)
1069 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1070 struct drbd_peer_device *peer_device = peer_req->peer_device;
1071 struct drbd_device *device = peer_device->device;
1072 int err;
1074 if (unlikely(cancel)) {
1075 drbd_free_peer_req(device, peer_req);
1076 dec_unacked(device);
1077 return 0;
1080 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1081 err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1082 } else {
1083 if (__ratelimit(&drbd_ratelimit_state))
1084 drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1085 (unsigned long long)peer_req->i.sector);
1087 err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1090 dec_unacked(device);
1092 move_to_net_ee_or_free(device, peer_req);
1094 if (unlikely(err))
1095 drbd_err(device, "drbd_send_block() failed\n");
1096 return err;
1099 static bool all_zero(struct drbd_peer_request *peer_req)
1101 struct page *page = peer_req->pages;
1102 unsigned int len = peer_req->i.size;
1104 page_chain_for_each(page) {
1105 unsigned int l = min_t(unsigned int, len, PAGE_SIZE);
1106 unsigned int i, words = l / sizeof(long);
1107 unsigned long *d;
1109 d = kmap_atomic(page);
1110 for (i = 0; i < words; i++) {
1111 if (d[i]) {
1112 kunmap_atomic(d);
1113 return false;
1116 kunmap_atomic(d);
1117 len -= l;
1120 return true;
1124 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1125 * @w: work object.
1126 * @cancel: The connection will be closed anyways
1128 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1130 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1131 struct drbd_peer_device *peer_device = peer_req->peer_device;
1132 struct drbd_device *device = peer_device->device;
1133 int err;
1135 if (unlikely(cancel)) {
1136 drbd_free_peer_req(device, peer_req);
1137 dec_unacked(device);
1138 return 0;
1141 if (get_ldev_if_state(device, D_FAILED)) {
1142 drbd_rs_complete_io(device, peer_req->i.sector);
1143 put_ldev(device);
1146 if (device->state.conn == C_AHEAD) {
1147 err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1148 } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1149 if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1150 inc_rs_pending(device);
1151 if (peer_req->flags & EE_RS_THIN_REQ && all_zero(peer_req))
1152 err = drbd_send_rs_deallocated(peer_device, peer_req);
1153 else
1154 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1155 } else {
1156 if (__ratelimit(&drbd_ratelimit_state))
1157 drbd_err(device, "Not sending RSDataReply, "
1158 "partner DISKLESS!\n");
1159 err = 0;
1161 } else {
1162 if (__ratelimit(&drbd_ratelimit_state))
1163 drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1164 (unsigned long long)peer_req->i.sector);
1166 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1168 /* update resync data with failure */
1169 drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1172 dec_unacked(device);
1174 move_to_net_ee_or_free(device, peer_req);
1176 if (unlikely(err))
1177 drbd_err(device, "drbd_send_block() failed\n");
1178 return err;
1181 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1183 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1184 struct drbd_peer_device *peer_device = peer_req->peer_device;
1185 struct drbd_device *device = peer_device->device;
1186 struct digest_info *di;
1187 int digest_size;
1188 void *digest = NULL;
1189 int err, eq = 0;
1191 if (unlikely(cancel)) {
1192 drbd_free_peer_req(device, peer_req);
1193 dec_unacked(device);
1194 return 0;
1197 if (get_ldev(device)) {
1198 drbd_rs_complete_io(device, peer_req->i.sector);
1199 put_ldev(device);
1202 di = peer_req->digest;
1204 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1205 /* quick hack to try to avoid a race against reconfiguration.
1206 * a real fix would be much more involved,
1207 * introducing more locking mechanisms */
1208 if (peer_device->connection->csums_tfm) {
1209 digest_size = crypto_shash_digestsize(peer_device->connection->csums_tfm);
1210 D_ASSERT(device, digest_size == di->digest_size);
1211 digest = kmalloc(digest_size, GFP_NOIO);
1213 if (digest) {
1214 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1215 eq = !memcmp(digest, di->digest, digest_size);
1216 kfree(digest);
1219 if (eq) {
1220 drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1221 /* rs_same_csums unit is BM_BLOCK_SIZE */
1222 device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1223 err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1224 } else {
1225 inc_rs_pending(device);
1226 peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1227 peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1228 kfree(di);
1229 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1231 } else {
1232 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1233 if (__ratelimit(&drbd_ratelimit_state))
1234 drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1237 dec_unacked(device);
1238 move_to_net_ee_or_free(device, peer_req);
1240 if (unlikely(err))
1241 drbd_err(device, "drbd_send_block/ack() failed\n");
1242 return err;
1245 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1247 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1248 struct drbd_peer_device *peer_device = peer_req->peer_device;
1249 struct drbd_device *device = peer_device->device;
1250 sector_t sector = peer_req->i.sector;
1251 unsigned int size = peer_req->i.size;
1252 int digest_size;
1253 void *digest;
1254 int err = 0;
1256 if (unlikely(cancel))
1257 goto out;
1259 digest_size = crypto_shash_digestsize(peer_device->connection->verify_tfm);
1260 digest = kmalloc(digest_size, GFP_NOIO);
1261 if (!digest) {
1262 err = 1; /* terminate the connection in case the allocation failed */
1263 goto out;
1266 if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1267 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1268 else
1269 memset(digest, 0, digest_size);
1271 /* Free e and pages before send.
1272 * In case we block on congestion, we could otherwise run into
1273 * some distributed deadlock, if the other side blocks on
1274 * congestion as well, because our receiver blocks in
1275 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1276 drbd_free_peer_req(device, peer_req);
1277 peer_req = NULL;
1278 inc_rs_pending(device);
1279 err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1280 if (err)
1281 dec_rs_pending(device);
1282 kfree(digest);
1284 out:
1285 if (peer_req)
1286 drbd_free_peer_req(device, peer_req);
1287 dec_unacked(device);
1288 return err;
1291 void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1293 if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1294 device->ov_last_oos_size += size>>9;
1295 } else {
1296 device->ov_last_oos_start = sector;
1297 device->ov_last_oos_size = size>>9;
1299 drbd_set_out_of_sync(device, sector, size);
1302 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1304 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1305 struct drbd_peer_device *peer_device = peer_req->peer_device;
1306 struct drbd_device *device = peer_device->device;
1307 struct digest_info *di;
1308 void *digest;
1309 sector_t sector = peer_req->i.sector;
1310 unsigned int size = peer_req->i.size;
1311 int digest_size;
1312 int err, eq = 0;
1313 bool stop_sector_reached = false;
1315 if (unlikely(cancel)) {
1316 drbd_free_peer_req(device, peer_req);
1317 dec_unacked(device);
1318 return 0;
1321 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1322 * the resync lru has been cleaned up already */
1323 if (get_ldev(device)) {
1324 drbd_rs_complete_io(device, peer_req->i.sector);
1325 put_ldev(device);
1328 di = peer_req->digest;
1330 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1331 digest_size = crypto_shash_digestsize(peer_device->connection->verify_tfm);
1332 digest = kmalloc(digest_size, GFP_NOIO);
1333 if (digest) {
1334 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1336 D_ASSERT(device, digest_size == di->digest_size);
1337 eq = !memcmp(digest, di->digest, digest_size);
1338 kfree(digest);
1342 /* Free peer_req and pages before send.
1343 * In case we block on congestion, we could otherwise run into
1344 * some distributed deadlock, if the other side blocks on
1345 * congestion as well, because our receiver blocks in
1346 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1347 drbd_free_peer_req(device, peer_req);
1348 if (!eq)
1349 drbd_ov_out_of_sync_found(device, sector, size);
1350 else
1351 ov_out_of_sync_print(device);
1353 err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1354 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1356 dec_unacked(device);
1358 --device->ov_left;
1360 /* let's advance progress step marks only for every other megabyte */
1361 if ((device->ov_left & 0x200) == 0x200)
1362 drbd_advance_rs_marks(device, device->ov_left);
1364 stop_sector_reached = verify_can_do_stop_sector(device) &&
1365 (sector + (size>>9)) >= device->ov_stop_sector;
1367 if (device->ov_left == 0 || stop_sector_reached) {
1368 ov_out_of_sync_print(device);
1369 drbd_resync_finished(device);
1372 return err;
1375 /* FIXME
1376 * We need to track the number of pending barrier acks,
1377 * and to be able to wait for them.
1378 * See also comment in drbd_adm_attach before drbd_suspend_io.
1380 static int drbd_send_barrier(struct drbd_connection *connection)
1382 struct p_barrier *p;
1383 struct drbd_socket *sock;
1385 sock = &connection->data;
1386 p = conn_prepare_command(connection, sock);
1387 if (!p)
1388 return -EIO;
1389 p->barrier = connection->send.current_epoch_nr;
1390 p->pad = 0;
1391 connection->send.current_epoch_writes = 0;
1392 connection->send.last_sent_barrier_jif = jiffies;
1394 return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1397 static int pd_send_unplug_remote(struct drbd_peer_device *pd)
1399 struct drbd_socket *sock = &pd->connection->data;
1400 if (!drbd_prepare_command(pd, sock))
1401 return -EIO;
1402 return drbd_send_command(pd, sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1405 int w_send_write_hint(struct drbd_work *w, int cancel)
1407 struct drbd_device *device =
1408 container_of(w, struct drbd_device, unplug_work);
1410 if (cancel)
1411 return 0;
1412 return pd_send_unplug_remote(first_peer_device(device));
1415 static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1417 if (!connection->send.seen_any_write_yet) {
1418 connection->send.seen_any_write_yet = true;
1419 connection->send.current_epoch_nr = epoch;
1420 connection->send.current_epoch_writes = 0;
1421 connection->send.last_sent_barrier_jif = jiffies;
1425 static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1427 /* re-init if first write on this connection */
1428 if (!connection->send.seen_any_write_yet)
1429 return;
1430 if (connection->send.current_epoch_nr != epoch) {
1431 if (connection->send.current_epoch_writes)
1432 drbd_send_barrier(connection);
1433 connection->send.current_epoch_nr = epoch;
1437 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1439 struct drbd_request *req = container_of(w, struct drbd_request, w);
1440 struct drbd_device *device = req->device;
1441 struct drbd_peer_device *const peer_device = first_peer_device(device);
1442 struct drbd_connection *const connection = peer_device->connection;
1443 int err;
1445 if (unlikely(cancel)) {
1446 req_mod(req, SEND_CANCELED);
1447 return 0;
1449 req->pre_send_jif = jiffies;
1451 /* this time, no connection->send.current_epoch_writes++;
1452 * If it was sent, it was the closing barrier for the last
1453 * replicated epoch, before we went into AHEAD mode.
1454 * No more barriers will be sent, until we leave AHEAD mode again. */
1455 maybe_send_barrier(connection, req->epoch);
1457 err = drbd_send_out_of_sync(peer_device, req);
1458 req_mod(req, OOS_HANDED_TO_NETWORK);
1460 return err;
1464 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1465 * @w: work object.
1466 * @cancel: The connection will be closed anyways
1468 int w_send_dblock(struct drbd_work *w, int cancel)
1470 struct drbd_request *req = container_of(w, struct drbd_request, w);
1471 struct drbd_device *device = req->device;
1472 struct drbd_peer_device *const peer_device = first_peer_device(device);
1473 struct drbd_connection *connection = peer_device->connection;
1474 bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1475 int err;
1477 if (unlikely(cancel)) {
1478 req_mod(req, SEND_CANCELED);
1479 return 0;
1481 req->pre_send_jif = jiffies;
1483 re_init_if_first_write(connection, req->epoch);
1484 maybe_send_barrier(connection, req->epoch);
1485 connection->send.current_epoch_writes++;
1487 err = drbd_send_dblock(peer_device, req);
1488 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1490 if (do_send_unplug && !err)
1491 pd_send_unplug_remote(peer_device);
1493 return err;
1497 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1498 * @w: work object.
1499 * @cancel: The connection will be closed anyways
1501 int w_send_read_req(struct drbd_work *w, int cancel)
1503 struct drbd_request *req = container_of(w, struct drbd_request, w);
1504 struct drbd_device *device = req->device;
1505 struct drbd_peer_device *const peer_device = first_peer_device(device);
1506 struct drbd_connection *connection = peer_device->connection;
1507 bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1508 int err;
1510 if (unlikely(cancel)) {
1511 req_mod(req, SEND_CANCELED);
1512 return 0;
1514 req->pre_send_jif = jiffies;
1516 /* Even read requests may close a write epoch,
1517 * if there was any yet. */
1518 maybe_send_barrier(connection, req->epoch);
1520 err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1521 (unsigned long)req);
1523 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1525 if (do_send_unplug && !err)
1526 pd_send_unplug_remote(peer_device);
1528 return err;
1531 int w_restart_disk_io(struct drbd_work *w, int cancel)
1533 struct drbd_request *req = container_of(w, struct drbd_request, w);
1534 struct drbd_device *device = req->device;
1536 if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1537 drbd_al_begin_io(device, &req->i);
1539 drbd_req_make_private_bio(req, req->master_bio);
1540 bio_set_dev(req->private_bio, device->ldev->backing_bdev);
1541 generic_make_request(req->private_bio);
1543 return 0;
1546 static int _drbd_may_sync_now(struct drbd_device *device)
1548 struct drbd_device *odev = device;
1549 int resync_after;
1551 while (1) {
1552 if (!odev->ldev || odev->state.disk == D_DISKLESS)
1553 return 1;
1554 rcu_read_lock();
1555 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1556 rcu_read_unlock();
1557 if (resync_after == -1)
1558 return 1;
1559 odev = minor_to_device(resync_after);
1560 if (!odev)
1561 return 1;
1562 if ((odev->state.conn >= C_SYNC_SOURCE &&
1563 odev->state.conn <= C_PAUSED_SYNC_T) ||
1564 odev->state.aftr_isp || odev->state.peer_isp ||
1565 odev->state.user_isp)
1566 return 0;
1571 * drbd_pause_after() - Pause resync on all devices that may not resync now
1572 * @device: DRBD device.
1574 * Called from process context only (admin command and after_state_ch).
1576 static bool drbd_pause_after(struct drbd_device *device)
1578 bool changed = false;
1579 struct drbd_device *odev;
1580 int i;
1582 rcu_read_lock();
1583 idr_for_each_entry(&drbd_devices, odev, i) {
1584 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1585 continue;
1586 if (!_drbd_may_sync_now(odev) &&
1587 _drbd_set_state(_NS(odev, aftr_isp, 1),
1588 CS_HARD, NULL) != SS_NOTHING_TO_DO)
1589 changed = true;
1591 rcu_read_unlock();
1593 return changed;
1597 * drbd_resume_next() - Resume resync on all devices that may resync now
1598 * @device: DRBD device.
1600 * Called from process context only (admin command and worker).
1602 static bool drbd_resume_next(struct drbd_device *device)
1604 bool changed = false;
1605 struct drbd_device *odev;
1606 int i;
1608 rcu_read_lock();
1609 idr_for_each_entry(&drbd_devices, odev, i) {
1610 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1611 continue;
1612 if (odev->state.aftr_isp) {
1613 if (_drbd_may_sync_now(odev) &&
1614 _drbd_set_state(_NS(odev, aftr_isp, 0),
1615 CS_HARD, NULL) != SS_NOTHING_TO_DO)
1616 changed = true;
1619 rcu_read_unlock();
1620 return changed;
1623 void resume_next_sg(struct drbd_device *device)
1625 lock_all_resources();
1626 drbd_resume_next(device);
1627 unlock_all_resources();
1630 void suspend_other_sg(struct drbd_device *device)
1632 lock_all_resources();
1633 drbd_pause_after(device);
1634 unlock_all_resources();
1637 /* caller must lock_all_resources() */
1638 enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1640 struct drbd_device *odev;
1641 int resync_after;
1643 if (o_minor == -1)
1644 return NO_ERROR;
1645 if (o_minor < -1 || o_minor > MINORMASK)
1646 return ERR_RESYNC_AFTER;
1648 /* check for loops */
1649 odev = minor_to_device(o_minor);
1650 while (1) {
1651 if (odev == device)
1652 return ERR_RESYNC_AFTER_CYCLE;
1654 /* You are free to depend on diskless, non-existing,
1655 * or not yet/no longer existing minors.
1656 * We only reject dependency loops.
1657 * We cannot follow the dependency chain beyond a detached or
1658 * missing minor.
1660 if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1661 return NO_ERROR;
1663 rcu_read_lock();
1664 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1665 rcu_read_unlock();
1666 /* dependency chain ends here, no cycles. */
1667 if (resync_after == -1)
1668 return NO_ERROR;
1670 /* follow the dependency chain */
1671 odev = minor_to_device(resync_after);
1675 /* caller must lock_all_resources() */
1676 void drbd_resync_after_changed(struct drbd_device *device)
1678 int changed;
1680 do {
1681 changed = drbd_pause_after(device);
1682 changed |= drbd_resume_next(device);
1683 } while (changed);
1686 void drbd_rs_controller_reset(struct drbd_device *device)
1688 struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
1689 struct fifo_buffer *plan;
1691 atomic_set(&device->rs_sect_in, 0);
1692 atomic_set(&device->rs_sect_ev, 0);
1693 device->rs_in_flight = 0;
1694 device->rs_last_events = (int)part_stat_read_accum(&disk->part0, sectors);
1696 /* Updating the RCU protected object in place is necessary since
1697 this function gets called from atomic context.
1698 It is valid since all other updates also lead to an completely
1699 empty fifo */
1700 rcu_read_lock();
1701 plan = rcu_dereference(device->rs_plan_s);
1702 plan->total = 0;
1703 fifo_set(plan, 0);
1704 rcu_read_unlock();
1707 void start_resync_timer_fn(struct timer_list *t)
1709 struct drbd_device *device = from_timer(device, t, start_resync_timer);
1710 drbd_device_post_work(device, RS_START);
1713 static void do_start_resync(struct drbd_device *device)
1715 if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1716 drbd_warn(device, "postponing start_resync ...\n");
1717 device->start_resync_timer.expires = jiffies + HZ/10;
1718 add_timer(&device->start_resync_timer);
1719 return;
1722 drbd_start_resync(device, C_SYNC_SOURCE);
1723 clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1726 static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1728 bool csums_after_crash_only;
1729 rcu_read_lock();
1730 csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1731 rcu_read_unlock();
1732 return connection->agreed_pro_version >= 89 && /* supported? */
1733 connection->csums_tfm && /* configured? */
1734 (csums_after_crash_only == false /* use for each resync? */
1735 || test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */
1739 * drbd_start_resync() - Start the resync process
1740 * @device: DRBD device.
1741 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET
1743 * This function might bring you directly into one of the
1744 * C_PAUSED_SYNC_* states.
1746 void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1748 struct drbd_peer_device *peer_device = first_peer_device(device);
1749 struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1750 union drbd_state ns;
1751 int r;
1753 if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1754 drbd_err(device, "Resync already running!\n");
1755 return;
1758 if (!connection) {
1759 drbd_err(device, "No connection to peer, aborting!\n");
1760 return;
1763 if (!test_bit(B_RS_H_DONE, &device->flags)) {
1764 if (side == C_SYNC_TARGET) {
1765 /* Since application IO was locked out during C_WF_BITMAP_T and
1766 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1767 we check that we might make the data inconsistent. */
1768 r = drbd_khelper(device, "before-resync-target");
1769 r = (r >> 8) & 0xff;
1770 if (r > 0) {
1771 drbd_info(device, "before-resync-target handler returned %d, "
1772 "dropping connection.\n", r);
1773 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1774 return;
1776 } else /* C_SYNC_SOURCE */ {
1777 r = drbd_khelper(device, "before-resync-source");
1778 r = (r >> 8) & 0xff;
1779 if (r > 0) {
1780 if (r == 3) {
1781 drbd_info(device, "before-resync-source handler returned %d, "
1782 "ignoring. Old userland tools?", r);
1783 } else {
1784 drbd_info(device, "before-resync-source handler returned %d, "
1785 "dropping connection.\n", r);
1786 conn_request_state(connection,
1787 NS(conn, C_DISCONNECTING), CS_HARD);
1788 return;
1794 if (current == connection->worker.task) {
1795 /* The worker should not sleep waiting for state_mutex,
1796 that can take long */
1797 if (!mutex_trylock(device->state_mutex)) {
1798 set_bit(B_RS_H_DONE, &device->flags);
1799 device->start_resync_timer.expires = jiffies + HZ/5;
1800 add_timer(&device->start_resync_timer);
1801 return;
1803 } else {
1804 mutex_lock(device->state_mutex);
1807 lock_all_resources();
1808 clear_bit(B_RS_H_DONE, &device->flags);
1809 /* Did some connection breakage or IO error race with us? */
1810 if (device->state.conn < C_CONNECTED
1811 || !get_ldev_if_state(device, D_NEGOTIATING)) {
1812 unlock_all_resources();
1813 goto out;
1816 ns = drbd_read_state(device);
1818 ns.aftr_isp = !_drbd_may_sync_now(device);
1820 ns.conn = side;
1822 if (side == C_SYNC_TARGET)
1823 ns.disk = D_INCONSISTENT;
1824 else /* side == C_SYNC_SOURCE */
1825 ns.pdsk = D_INCONSISTENT;
1827 r = _drbd_set_state(device, ns, CS_VERBOSE, NULL);
1828 ns = drbd_read_state(device);
1830 if (ns.conn < C_CONNECTED)
1831 r = SS_UNKNOWN_ERROR;
1833 if (r == SS_SUCCESS) {
1834 unsigned long tw = drbd_bm_total_weight(device);
1835 unsigned long now = jiffies;
1836 int i;
1838 device->rs_failed = 0;
1839 device->rs_paused = 0;
1840 device->rs_same_csum = 0;
1841 device->rs_last_sect_ev = 0;
1842 device->rs_total = tw;
1843 device->rs_start = now;
1844 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1845 device->rs_mark_left[i] = tw;
1846 device->rs_mark_time[i] = now;
1848 drbd_pause_after(device);
1849 /* Forget potentially stale cached per resync extent bit-counts.
1850 * Open coded drbd_rs_cancel_all(device), we already have IRQs
1851 * disabled, and know the disk state is ok. */
1852 spin_lock(&device->al_lock);
1853 lc_reset(device->resync);
1854 device->resync_locked = 0;
1855 device->resync_wenr = LC_FREE;
1856 spin_unlock(&device->al_lock);
1858 unlock_all_resources();
1860 if (r == SS_SUCCESS) {
1861 wake_up(&device->al_wait); /* for lc_reset() above */
1862 /* reset rs_last_bcast when a resync or verify is started,
1863 * to deal with potential jiffies wrap. */
1864 device->rs_last_bcast = jiffies - HZ;
1866 drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1867 drbd_conn_str(ns.conn),
1868 (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1869 (unsigned long) device->rs_total);
1870 if (side == C_SYNC_TARGET) {
1871 device->bm_resync_fo = 0;
1872 device->use_csums = use_checksum_based_resync(connection, device);
1873 } else {
1874 device->use_csums = false;
1877 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1878 * with w_send_oos, or the sync target will get confused as to
1879 * how much bits to resync. We cannot do that always, because for an
1880 * empty resync and protocol < 95, we need to do it here, as we call
1881 * drbd_resync_finished from here in that case.
1882 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1883 * and from after_state_ch otherwise. */
1884 if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1885 drbd_gen_and_send_sync_uuid(peer_device);
1887 if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1888 /* This still has a race (about when exactly the peers
1889 * detect connection loss) that can lead to a full sync
1890 * on next handshake. In 8.3.9 we fixed this with explicit
1891 * resync-finished notifications, but the fix
1892 * introduces a protocol change. Sleeping for some
1893 * time longer than the ping interval + timeout on the
1894 * SyncSource, to give the SyncTarget the chance to
1895 * detect connection loss, then waiting for a ping
1896 * response (implicit in drbd_resync_finished) reduces
1897 * the race considerably, but does not solve it. */
1898 if (side == C_SYNC_SOURCE) {
1899 struct net_conf *nc;
1900 int timeo;
1902 rcu_read_lock();
1903 nc = rcu_dereference(connection->net_conf);
1904 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1905 rcu_read_unlock();
1906 schedule_timeout_interruptible(timeo);
1908 drbd_resync_finished(device);
1911 drbd_rs_controller_reset(device);
1912 /* ns.conn may already be != device->state.conn,
1913 * we may have been paused in between, or become paused until
1914 * the timer triggers.
1915 * No matter, that is handled in resync_timer_fn() */
1916 if (ns.conn == C_SYNC_TARGET)
1917 mod_timer(&device->resync_timer, jiffies);
1919 drbd_md_sync(device);
1921 put_ldev(device);
1922 out:
1923 mutex_unlock(device->state_mutex);
1926 static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
1928 struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1929 device->rs_last_bcast = jiffies;
1931 if (!get_ldev(device))
1932 return;
1934 drbd_bm_write_lazy(device, 0);
1935 if (resync_done && is_sync_state(device->state.conn))
1936 drbd_resync_finished(device);
1938 drbd_bcast_event(device, &sib);
1939 /* update timestamp, in case it took a while to write out stuff */
1940 device->rs_last_bcast = jiffies;
1941 put_ldev(device);
1944 static void drbd_ldev_destroy(struct drbd_device *device)
1946 lc_destroy(device->resync);
1947 device->resync = NULL;
1948 lc_destroy(device->act_log);
1949 device->act_log = NULL;
1951 __acquire(local);
1952 drbd_backing_dev_free(device, device->ldev);
1953 device->ldev = NULL;
1954 __release(local);
1956 clear_bit(GOING_DISKLESS, &device->flags);
1957 wake_up(&device->misc_wait);
1960 static void go_diskless(struct drbd_device *device)
1962 D_ASSERT(device, device->state.disk == D_FAILED);
1963 /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1964 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1965 * the protected members anymore, though, so once put_ldev reaches zero
1966 * again, it will be safe to free them. */
1968 /* Try to write changed bitmap pages, read errors may have just
1969 * set some bits outside the area covered by the activity log.
1971 * If we have an IO error during the bitmap writeout,
1972 * we will want a full sync next time, just in case.
1973 * (Do we want a specific meta data flag for this?)
1975 * If that does not make it to stable storage either,
1976 * we cannot do anything about that anymore.
1978 * We still need to check if both bitmap and ldev are present, we may
1979 * end up here after a failed attach, before ldev was even assigned.
1981 if (device->bitmap && device->ldev) {
1982 /* An interrupted resync or similar is allowed to recounts bits
1983 * while we detach.
1984 * Any modifications would not be expected anymore, though.
1986 if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1987 "detach", BM_LOCKED_TEST_ALLOWED)) {
1988 if (test_bit(WAS_READ_ERROR, &device->flags)) {
1989 drbd_md_set_flag(device, MDF_FULL_SYNC);
1990 drbd_md_sync(device);
1995 drbd_force_state(device, NS(disk, D_DISKLESS));
1998 static int do_md_sync(struct drbd_device *device)
2000 drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
2001 drbd_md_sync(device);
2002 return 0;
2005 /* only called from drbd_worker thread, no locking */
2006 void __update_timing_details(
2007 struct drbd_thread_timing_details *tdp,
2008 unsigned int *cb_nr,
2009 void *cb,
2010 const char *fn, const unsigned int line)
2012 unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
2013 struct drbd_thread_timing_details *td = tdp + i;
2015 td->start_jif = jiffies;
2016 td->cb_addr = cb;
2017 td->caller_fn = fn;
2018 td->line = line;
2019 td->cb_nr = *cb_nr;
2021 i = (i+1) % DRBD_THREAD_DETAILS_HIST;
2022 td = tdp + i;
2023 memset(td, 0, sizeof(*td));
2025 ++(*cb_nr);
2028 static void do_device_work(struct drbd_device *device, const unsigned long todo)
2030 if (test_bit(MD_SYNC, &todo))
2031 do_md_sync(device);
2032 if (test_bit(RS_DONE, &todo) ||
2033 test_bit(RS_PROGRESS, &todo))
2034 update_on_disk_bitmap(device, test_bit(RS_DONE, &todo));
2035 if (test_bit(GO_DISKLESS, &todo))
2036 go_diskless(device);
2037 if (test_bit(DESTROY_DISK, &todo))
2038 drbd_ldev_destroy(device);
2039 if (test_bit(RS_START, &todo))
2040 do_start_resync(device);
2043 #define DRBD_DEVICE_WORK_MASK \
2044 ((1UL << GO_DISKLESS) \
2045 |(1UL << DESTROY_DISK) \
2046 |(1UL << MD_SYNC) \
2047 |(1UL << RS_START) \
2048 |(1UL << RS_PROGRESS) \
2049 |(1UL << RS_DONE) \
2052 static unsigned long get_work_bits(unsigned long *flags)
2054 unsigned long old, new;
2055 do {
2056 old = *flags;
2057 new = old & ~DRBD_DEVICE_WORK_MASK;
2058 } while (cmpxchg(flags, old, new) != old);
2059 return old & DRBD_DEVICE_WORK_MASK;
2062 static void do_unqueued_work(struct drbd_connection *connection)
2064 struct drbd_peer_device *peer_device;
2065 int vnr;
2067 rcu_read_lock();
2068 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2069 struct drbd_device *device = peer_device->device;
2070 unsigned long todo = get_work_bits(&device->flags);
2071 if (!todo)
2072 continue;
2074 kref_get(&device->kref);
2075 rcu_read_unlock();
2076 do_device_work(device, todo);
2077 kref_put(&device->kref, drbd_destroy_device);
2078 rcu_read_lock();
2080 rcu_read_unlock();
2083 static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
2085 spin_lock_irq(&queue->q_lock);
2086 list_splice_tail_init(&queue->q, work_list);
2087 spin_unlock_irq(&queue->q_lock);
2088 return !list_empty(work_list);
2091 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
2093 DEFINE_WAIT(wait);
2094 struct net_conf *nc;
2095 int uncork, cork;
2097 dequeue_work_batch(&connection->sender_work, work_list);
2098 if (!list_empty(work_list))
2099 return;
2101 /* Still nothing to do?
2102 * Maybe we still need to close the current epoch,
2103 * even if no new requests are queued yet.
2105 * Also, poke TCP, just in case.
2106 * Then wait for new work (or signal). */
2107 rcu_read_lock();
2108 nc = rcu_dereference(connection->net_conf);
2109 uncork = nc ? nc->tcp_cork : 0;
2110 rcu_read_unlock();
2111 if (uncork) {
2112 mutex_lock(&connection->data.mutex);
2113 if (connection->data.socket)
2114 drbd_tcp_uncork(connection->data.socket);
2115 mutex_unlock(&connection->data.mutex);
2118 for (;;) {
2119 int send_barrier;
2120 prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2121 spin_lock_irq(&connection->resource->req_lock);
2122 spin_lock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */
2123 if (!list_empty(&connection->sender_work.q))
2124 list_splice_tail_init(&connection->sender_work.q, work_list);
2125 spin_unlock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */
2126 if (!list_empty(work_list) || signal_pending(current)) {
2127 spin_unlock_irq(&connection->resource->req_lock);
2128 break;
2131 /* We found nothing new to do, no to-be-communicated request,
2132 * no other work item. We may still need to close the last
2133 * epoch. Next incoming request epoch will be connection ->
2134 * current transfer log epoch number. If that is different
2135 * from the epoch of the last request we communicated, it is
2136 * safe to send the epoch separating barrier now.
2138 send_barrier =
2139 atomic_read(&connection->current_tle_nr) !=
2140 connection->send.current_epoch_nr;
2141 spin_unlock_irq(&connection->resource->req_lock);
2143 if (send_barrier)
2144 maybe_send_barrier(connection,
2145 connection->send.current_epoch_nr + 1);
2147 if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2148 break;
2150 /* drbd_send() may have called flush_signals() */
2151 if (get_t_state(&connection->worker) != RUNNING)
2152 break;
2154 schedule();
2155 /* may be woken up for other things but new work, too,
2156 * e.g. if the current epoch got closed.
2157 * In which case we send the barrier above. */
2159 finish_wait(&connection->sender_work.q_wait, &wait);
2161 /* someone may have changed the config while we have been waiting above. */
2162 rcu_read_lock();
2163 nc = rcu_dereference(connection->net_conf);
2164 cork = nc ? nc->tcp_cork : 0;
2165 rcu_read_unlock();
2166 mutex_lock(&connection->data.mutex);
2167 if (connection->data.socket) {
2168 if (cork)
2169 drbd_tcp_cork(connection->data.socket);
2170 else if (!uncork)
2171 drbd_tcp_uncork(connection->data.socket);
2173 mutex_unlock(&connection->data.mutex);
2176 int drbd_worker(struct drbd_thread *thi)
2178 struct drbd_connection *connection = thi->connection;
2179 struct drbd_work *w = NULL;
2180 struct drbd_peer_device *peer_device;
2181 LIST_HEAD(work_list);
2182 int vnr;
2184 while (get_t_state(thi) == RUNNING) {
2185 drbd_thread_current_set_cpu(thi);
2187 if (list_empty(&work_list)) {
2188 update_worker_timing_details(connection, wait_for_work);
2189 wait_for_work(connection, &work_list);
2192 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2193 update_worker_timing_details(connection, do_unqueued_work);
2194 do_unqueued_work(connection);
2197 if (signal_pending(current)) {
2198 flush_signals(current);
2199 if (get_t_state(thi) == RUNNING) {
2200 drbd_warn(connection, "Worker got an unexpected signal\n");
2201 continue;
2203 break;
2206 if (get_t_state(thi) != RUNNING)
2207 break;
2209 if (!list_empty(&work_list)) {
2210 w = list_first_entry(&work_list, struct drbd_work, list);
2211 list_del_init(&w->list);
2212 update_worker_timing_details(connection, w->cb);
2213 if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2214 continue;
2215 if (connection->cstate >= C_WF_REPORT_PARAMS)
2216 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2220 do {
2221 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2222 update_worker_timing_details(connection, do_unqueued_work);
2223 do_unqueued_work(connection);
2225 if (!list_empty(&work_list)) {
2226 w = list_first_entry(&work_list, struct drbd_work, list);
2227 list_del_init(&w->list);
2228 update_worker_timing_details(connection, w->cb);
2229 w->cb(w, 1);
2230 } else
2231 dequeue_work_batch(&connection->sender_work, &work_list);
2232 } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2234 rcu_read_lock();
2235 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2236 struct drbd_device *device = peer_device->device;
2237 D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2238 kref_get(&device->kref);
2239 rcu_read_unlock();
2240 drbd_device_cleanup(device);
2241 kref_put(&device->kref, drbd_destroy_device);
2242 rcu_read_lock();
2244 rcu_read_unlock();
2246 return 0;