Linux 4.19.133
[linux/fpc-iii.git] / drivers / block / drbd / drbd_worker.c
blobb8f77e83d456292abac70d8f89360a3697e8e700
1 /*
2 drbd_worker.c
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched/signal.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
38 #include "drbd_int.h"
39 #include "drbd_protocol.h"
40 #include "drbd_req.h"
42 static int make_ov_request(struct drbd_device *, int);
43 static int make_resync_request(struct drbd_device *, int);
45 /* endio handlers:
46 * drbd_md_endio (defined here)
47 * drbd_request_endio (defined here)
48 * drbd_peer_request_endio (defined here)
49 * drbd_bm_endio (defined in drbd_bitmap.c)
51 * For all these callbacks, note the following:
52 * The callbacks will be called in irq context by the IDE drivers,
53 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
54 * Try to get the locking right :)
58 /* used for synchronous meta data and bitmap IO
59 * submitted by drbd_md_sync_page_io()
61 void drbd_md_endio(struct bio *bio)
63 struct drbd_device *device;
65 device = bio->bi_private;
66 device->md_io.error = blk_status_to_errno(bio->bi_status);
68 /* special case: drbd_md_read() during drbd_adm_attach() */
69 if (device->ldev)
70 put_ldev(device);
71 bio_put(bio);
73 /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
74 * to timeout on the lower level device, and eventually detach from it.
75 * If this io completion runs after that timeout expired, this
76 * drbd_md_put_buffer() may allow us to finally try and re-attach.
77 * During normal operation, this only puts that extra reference
78 * down to 1 again.
79 * Make sure we first drop the reference, and only then signal
80 * completion, or we may (in drbd_al_read_log()) cycle so fast into the
81 * next drbd_md_sync_page_io(), that we trigger the
82 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
84 drbd_md_put_buffer(device);
85 device->md_io.done = 1;
86 wake_up(&device->misc_wait);
89 /* reads on behalf of the partner,
90 * "submitted" by the receiver
92 static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
94 unsigned long flags = 0;
95 struct drbd_peer_device *peer_device = peer_req->peer_device;
96 struct drbd_device *device = peer_device->device;
98 spin_lock_irqsave(&device->resource->req_lock, flags);
99 device->read_cnt += peer_req->i.size >> 9;
100 list_del(&peer_req->w.list);
101 if (list_empty(&device->read_ee))
102 wake_up(&device->ee_wait);
103 if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
104 __drbd_chk_io_error(device, DRBD_READ_ERROR);
105 spin_unlock_irqrestore(&device->resource->req_lock, flags);
107 drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
108 put_ldev(device);
111 /* writes on behalf of the partner, or resync writes,
112 * "submitted" by the receiver, final stage. */
113 void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
115 unsigned long flags = 0;
116 struct drbd_peer_device *peer_device = peer_req->peer_device;
117 struct drbd_device *device = peer_device->device;
118 struct drbd_connection *connection = peer_device->connection;
119 struct drbd_interval i;
120 int do_wake;
121 u64 block_id;
122 int do_al_complete_io;
124 /* after we moved peer_req to done_ee,
125 * we may no longer access it,
126 * it may be freed/reused already!
127 * (as soon as we release the req_lock) */
128 i = peer_req->i;
129 do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
130 block_id = peer_req->block_id;
131 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
133 if (peer_req->flags & EE_WAS_ERROR) {
134 /* In protocol != C, we usually do not send write acks.
135 * In case of a write error, send the neg ack anyways. */
136 if (!__test_and_set_bit(__EE_SEND_WRITE_ACK, &peer_req->flags))
137 inc_unacked(device);
138 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
141 spin_lock_irqsave(&device->resource->req_lock, flags);
142 device->writ_cnt += peer_req->i.size >> 9;
143 list_move_tail(&peer_req->w.list, &device->done_ee);
146 * Do not remove from the write_requests tree here: we did not send the
147 * Ack yet and did not wake possibly waiting conflicting requests.
148 * Removed from the tree from "drbd_process_done_ee" within the
149 * appropriate dw.cb (e_end_block/e_end_resync_block) or from
150 * _drbd_clear_done_ee.
153 do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
155 /* FIXME do we want to detach for failed REQ_DISCARD?
156 * ((peer_req->flags & (EE_WAS_ERROR|EE_IS_TRIM)) == EE_WAS_ERROR) */
157 if (peer_req->flags & EE_WAS_ERROR)
158 __drbd_chk_io_error(device, DRBD_WRITE_ERROR);
160 if (connection->cstate >= C_WF_REPORT_PARAMS) {
161 kref_get(&device->kref); /* put is in drbd_send_acks_wf() */
162 if (!queue_work(connection->ack_sender, &peer_device->send_acks_work))
163 kref_put(&device->kref, drbd_destroy_device);
165 spin_unlock_irqrestore(&device->resource->req_lock, flags);
167 if (block_id == ID_SYNCER)
168 drbd_rs_complete_io(device, i.sector);
170 if (do_wake)
171 wake_up(&device->ee_wait);
173 if (do_al_complete_io)
174 drbd_al_complete_io(device, &i);
176 put_ldev(device);
179 /* writes on behalf of the partner, or resync writes,
180 * "submitted" by the receiver.
182 void drbd_peer_request_endio(struct bio *bio)
184 struct drbd_peer_request *peer_req = bio->bi_private;
185 struct drbd_device *device = peer_req->peer_device->device;
186 bool is_write = bio_data_dir(bio) == WRITE;
187 bool is_discard = bio_op(bio) == REQ_OP_WRITE_ZEROES ||
188 bio_op(bio) == REQ_OP_DISCARD;
190 if (bio->bi_status && __ratelimit(&drbd_ratelimit_state))
191 drbd_warn(device, "%s: error=%d s=%llus\n",
192 is_write ? (is_discard ? "discard" : "write")
193 : "read", bio->bi_status,
194 (unsigned long long)peer_req->i.sector);
196 if (bio->bi_status)
197 set_bit(__EE_WAS_ERROR, &peer_req->flags);
199 bio_put(bio); /* no need for the bio anymore */
200 if (atomic_dec_and_test(&peer_req->pending_bios)) {
201 if (is_write)
202 drbd_endio_write_sec_final(peer_req);
203 else
204 drbd_endio_read_sec_final(peer_req);
208 static void
209 drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device)
211 panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n",
212 device->minor, device->resource->name, device->vnr);
215 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
217 void drbd_request_endio(struct bio *bio)
219 unsigned long flags;
220 struct drbd_request *req = bio->bi_private;
221 struct drbd_device *device = req->device;
222 struct bio_and_error m;
223 enum drbd_req_event what;
225 /* If this request was aborted locally before,
226 * but now was completed "successfully",
227 * chances are that this caused arbitrary data corruption.
229 * "aborting" requests, or force-detaching the disk, is intended for
230 * completely blocked/hung local backing devices which do no longer
231 * complete requests at all, not even do error completions. In this
232 * situation, usually a hard-reset and failover is the only way out.
234 * By "aborting", basically faking a local error-completion,
235 * we allow for a more graceful swichover by cleanly migrating services.
236 * Still the affected node has to be rebooted "soon".
238 * By completing these requests, we allow the upper layers to re-use
239 * the associated data pages.
241 * If later the local backing device "recovers", and now DMAs some data
242 * from disk into the original request pages, in the best case it will
243 * just put random data into unused pages; but typically it will corrupt
244 * meanwhile completely unrelated data, causing all sorts of damage.
246 * Which means delayed successful completion,
247 * especially for READ requests,
248 * is a reason to panic().
250 * We assume that a delayed *error* completion is OK,
251 * though we still will complain noisily about it.
253 if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
254 if (__ratelimit(&drbd_ratelimit_state))
255 drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
257 if (!bio->bi_status)
258 drbd_panic_after_delayed_completion_of_aborted_request(device);
261 /* to avoid recursion in __req_mod */
262 if (unlikely(bio->bi_status)) {
263 switch (bio_op(bio)) {
264 case REQ_OP_WRITE_ZEROES:
265 case REQ_OP_DISCARD:
266 if (bio->bi_status == BLK_STS_NOTSUPP)
267 what = DISCARD_COMPLETED_NOTSUPP;
268 else
269 what = DISCARD_COMPLETED_WITH_ERROR;
270 break;
271 case REQ_OP_READ:
272 if (bio->bi_opf & REQ_RAHEAD)
273 what = READ_AHEAD_COMPLETED_WITH_ERROR;
274 else
275 what = READ_COMPLETED_WITH_ERROR;
276 break;
277 default:
278 what = WRITE_COMPLETED_WITH_ERROR;
279 break;
281 } else {
282 what = COMPLETED_OK;
285 req->private_bio = ERR_PTR(blk_status_to_errno(bio->bi_status));
286 bio_put(bio);
288 /* not req_mod(), we need irqsave here! */
289 spin_lock_irqsave(&device->resource->req_lock, flags);
290 __req_mod(req, what, &m);
291 spin_unlock_irqrestore(&device->resource->req_lock, flags);
292 put_ldev(device);
294 if (m.bio)
295 complete_master_bio(device, &m);
298 void drbd_csum_ee(struct crypto_ahash *tfm, struct drbd_peer_request *peer_req, void *digest)
300 AHASH_REQUEST_ON_STACK(req, tfm);
301 struct scatterlist sg;
302 struct page *page = peer_req->pages;
303 struct page *tmp;
304 unsigned len;
306 ahash_request_set_tfm(req, tfm);
307 ahash_request_set_callback(req, 0, NULL, NULL);
309 sg_init_table(&sg, 1);
310 crypto_ahash_init(req);
312 while ((tmp = page_chain_next(page))) {
313 /* all but the last page will be fully used */
314 sg_set_page(&sg, page, PAGE_SIZE, 0);
315 ahash_request_set_crypt(req, &sg, NULL, sg.length);
316 crypto_ahash_update(req);
317 page = tmp;
319 /* and now the last, possibly only partially used page */
320 len = peer_req->i.size & (PAGE_SIZE - 1);
321 sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
322 ahash_request_set_crypt(req, &sg, digest, sg.length);
323 crypto_ahash_finup(req);
324 ahash_request_zero(req);
327 void drbd_csum_bio(struct crypto_ahash *tfm, struct bio *bio, void *digest)
329 AHASH_REQUEST_ON_STACK(req, tfm);
330 struct scatterlist sg;
331 struct bio_vec bvec;
332 struct bvec_iter iter;
334 ahash_request_set_tfm(req, tfm);
335 ahash_request_set_callback(req, 0, NULL, NULL);
337 sg_init_table(&sg, 1);
338 crypto_ahash_init(req);
340 bio_for_each_segment(bvec, bio, iter) {
341 sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset);
342 ahash_request_set_crypt(req, &sg, NULL, sg.length);
343 crypto_ahash_update(req);
344 /* REQ_OP_WRITE_SAME has only one segment,
345 * checksum the payload only once. */
346 if (bio_op(bio) == REQ_OP_WRITE_SAME)
347 break;
349 ahash_request_set_crypt(req, NULL, digest, 0);
350 crypto_ahash_final(req);
351 ahash_request_zero(req);
354 /* MAYBE merge common code with w_e_end_ov_req */
355 static int w_e_send_csum(struct drbd_work *w, int cancel)
357 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
358 struct drbd_peer_device *peer_device = peer_req->peer_device;
359 struct drbd_device *device = peer_device->device;
360 int digest_size;
361 void *digest;
362 int err = 0;
364 if (unlikely(cancel))
365 goto out;
367 if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
368 goto out;
370 digest_size = crypto_ahash_digestsize(peer_device->connection->csums_tfm);
371 digest = kmalloc(digest_size, GFP_NOIO);
372 if (digest) {
373 sector_t sector = peer_req->i.sector;
374 unsigned int size = peer_req->i.size;
375 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
376 /* Free peer_req and pages before send.
377 * In case we block on congestion, we could otherwise run into
378 * some distributed deadlock, if the other side blocks on
379 * congestion as well, because our receiver blocks in
380 * drbd_alloc_pages due to pp_in_use > max_buffers. */
381 drbd_free_peer_req(device, peer_req);
382 peer_req = NULL;
383 inc_rs_pending(device);
384 err = drbd_send_drequest_csum(peer_device, sector, size,
385 digest, digest_size,
386 P_CSUM_RS_REQUEST);
387 kfree(digest);
388 } else {
389 drbd_err(device, "kmalloc() of digest failed.\n");
390 err = -ENOMEM;
393 out:
394 if (peer_req)
395 drbd_free_peer_req(device, peer_req);
397 if (unlikely(err))
398 drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
399 return err;
402 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
404 static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
406 struct drbd_device *device = peer_device->device;
407 struct drbd_peer_request *peer_req;
409 if (!get_ldev(device))
410 return -EIO;
412 /* GFP_TRY, because if there is no memory available right now, this may
413 * be rescheduled for later. It is "only" background resync, after all. */
414 peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
415 size, size, GFP_TRY);
416 if (!peer_req)
417 goto defer;
419 peer_req->w.cb = w_e_send_csum;
420 spin_lock_irq(&device->resource->req_lock);
421 list_add_tail(&peer_req->w.list, &device->read_ee);
422 spin_unlock_irq(&device->resource->req_lock);
424 atomic_add(size >> 9, &device->rs_sect_ev);
425 if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
426 DRBD_FAULT_RS_RD) == 0)
427 return 0;
429 /* If it failed because of ENOMEM, retry should help. If it failed
430 * because bio_add_page failed (probably broken lower level driver),
431 * retry may or may not help.
432 * If it does not, you may need to force disconnect. */
433 spin_lock_irq(&device->resource->req_lock);
434 list_del(&peer_req->w.list);
435 spin_unlock_irq(&device->resource->req_lock);
437 drbd_free_peer_req(device, peer_req);
438 defer:
439 put_ldev(device);
440 return -EAGAIN;
443 int w_resync_timer(struct drbd_work *w, int cancel)
445 struct drbd_device *device =
446 container_of(w, struct drbd_device, resync_work);
448 switch (device->state.conn) {
449 case C_VERIFY_S:
450 make_ov_request(device, cancel);
451 break;
452 case C_SYNC_TARGET:
453 make_resync_request(device, cancel);
454 break;
457 return 0;
460 void resync_timer_fn(struct timer_list *t)
462 struct drbd_device *device = from_timer(device, t, resync_timer);
464 drbd_queue_work_if_unqueued(
465 &first_peer_device(device)->connection->sender_work,
466 &device->resync_work);
469 static void fifo_set(struct fifo_buffer *fb, int value)
471 int i;
473 for (i = 0; i < fb->size; i++)
474 fb->values[i] = value;
477 static int fifo_push(struct fifo_buffer *fb, int value)
479 int ov;
481 ov = fb->values[fb->head_index];
482 fb->values[fb->head_index++] = value;
484 if (fb->head_index >= fb->size)
485 fb->head_index = 0;
487 return ov;
490 static void fifo_add_val(struct fifo_buffer *fb, int value)
492 int i;
494 for (i = 0; i < fb->size; i++)
495 fb->values[i] += value;
498 struct fifo_buffer *fifo_alloc(int fifo_size)
500 struct fifo_buffer *fb;
502 fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
503 if (!fb)
504 return NULL;
506 fb->head_index = 0;
507 fb->size = fifo_size;
508 fb->total = 0;
510 return fb;
513 static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
515 struct disk_conf *dc;
516 unsigned int want; /* The number of sectors we want in-flight */
517 int req_sect; /* Number of sectors to request in this turn */
518 int correction; /* Number of sectors more we need in-flight */
519 int cps; /* correction per invocation of drbd_rs_controller() */
520 int steps; /* Number of time steps to plan ahead */
521 int curr_corr;
522 int max_sect;
523 struct fifo_buffer *plan;
525 dc = rcu_dereference(device->ldev->disk_conf);
526 plan = rcu_dereference(device->rs_plan_s);
528 steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
530 if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
531 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
532 } else { /* normal path */
533 want = dc->c_fill_target ? dc->c_fill_target :
534 sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
537 correction = want - device->rs_in_flight - plan->total;
539 /* Plan ahead */
540 cps = correction / steps;
541 fifo_add_val(plan, cps);
542 plan->total += cps * steps;
544 /* What we do in this step */
545 curr_corr = fifo_push(plan, 0);
546 plan->total -= curr_corr;
548 req_sect = sect_in + curr_corr;
549 if (req_sect < 0)
550 req_sect = 0;
552 max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
553 if (req_sect > max_sect)
554 req_sect = max_sect;
557 drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
558 sect_in, device->rs_in_flight, want, correction,
559 steps, cps, device->rs_planed, curr_corr, req_sect);
562 return req_sect;
565 static int drbd_rs_number_requests(struct drbd_device *device)
567 unsigned int sect_in; /* Number of sectors that came in since the last turn */
568 int number, mxb;
570 sect_in = atomic_xchg(&device->rs_sect_in, 0);
571 device->rs_in_flight -= sect_in;
573 rcu_read_lock();
574 mxb = drbd_get_max_buffers(device) / 2;
575 if (rcu_dereference(device->rs_plan_s)->size) {
576 number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
577 device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
578 } else {
579 device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
580 number = SLEEP_TIME * device->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ);
582 rcu_read_unlock();
584 /* Don't have more than "max-buffers"/2 in-flight.
585 * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
586 * potentially causing a distributed deadlock on congestion during
587 * online-verify or (checksum-based) resync, if max-buffers,
588 * socket buffer sizes and resync rate settings are mis-configured. */
590 /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
591 * mxb (as used here, and in drbd_alloc_pages on the peer) is
592 * "number of pages" (typically also 4k),
593 * but "rs_in_flight" is in "sectors" (512 Byte). */
594 if (mxb - device->rs_in_flight/8 < number)
595 number = mxb - device->rs_in_flight/8;
597 return number;
600 static int make_resync_request(struct drbd_device *const device, int cancel)
602 struct drbd_peer_device *const peer_device = first_peer_device(device);
603 struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
604 unsigned long bit;
605 sector_t sector;
606 const sector_t capacity = drbd_get_capacity(device->this_bdev);
607 int max_bio_size;
608 int number, rollback_i, size;
609 int align, requeue = 0;
610 int i = 0;
611 int discard_granularity = 0;
613 if (unlikely(cancel))
614 return 0;
616 if (device->rs_total == 0) {
617 /* empty resync? */
618 drbd_resync_finished(device);
619 return 0;
622 if (!get_ldev(device)) {
623 /* Since we only need to access device->rsync a
624 get_ldev_if_state(device,D_FAILED) would be sufficient, but
625 to continue resync with a broken disk makes no sense at
626 all */
627 drbd_err(device, "Disk broke down during resync!\n");
628 return 0;
631 if (connection->agreed_features & DRBD_FF_THIN_RESYNC) {
632 rcu_read_lock();
633 discard_granularity = rcu_dereference(device->ldev->disk_conf)->rs_discard_granularity;
634 rcu_read_unlock();
637 max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
638 number = drbd_rs_number_requests(device);
639 if (number <= 0)
640 goto requeue;
642 for (i = 0; i < number; i++) {
643 /* Stop generating RS requests when half of the send buffer is filled,
644 * but notify TCP that we'd like to have more space. */
645 mutex_lock(&connection->data.mutex);
646 if (connection->data.socket) {
647 struct sock *sk = connection->data.socket->sk;
648 int queued = sk->sk_wmem_queued;
649 int sndbuf = sk->sk_sndbuf;
650 if (queued > sndbuf / 2) {
651 requeue = 1;
652 if (sk->sk_socket)
653 set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
655 } else
656 requeue = 1;
657 mutex_unlock(&connection->data.mutex);
658 if (requeue)
659 goto requeue;
661 next_sector:
662 size = BM_BLOCK_SIZE;
663 bit = drbd_bm_find_next(device, device->bm_resync_fo);
665 if (bit == DRBD_END_OF_BITMAP) {
666 device->bm_resync_fo = drbd_bm_bits(device);
667 put_ldev(device);
668 return 0;
671 sector = BM_BIT_TO_SECT(bit);
673 if (drbd_try_rs_begin_io(device, sector)) {
674 device->bm_resync_fo = bit;
675 goto requeue;
677 device->bm_resync_fo = bit + 1;
679 if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
680 drbd_rs_complete_io(device, sector);
681 goto next_sector;
684 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
685 /* try to find some adjacent bits.
686 * we stop if we have already the maximum req size.
688 * Additionally always align bigger requests, in order to
689 * be prepared for all stripe sizes of software RAIDs.
691 align = 1;
692 rollback_i = i;
693 while (i < number) {
694 if (size + BM_BLOCK_SIZE > max_bio_size)
695 break;
697 /* Be always aligned */
698 if (sector & ((1<<(align+3))-1))
699 break;
701 if (discard_granularity && size == discard_granularity)
702 break;
704 /* do not cross extent boundaries */
705 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
706 break;
707 /* now, is it actually dirty, after all?
708 * caution, drbd_bm_test_bit is tri-state for some
709 * obscure reason; ( b == 0 ) would get the out-of-band
710 * only accidentally right because of the "oddly sized"
711 * adjustment below */
712 if (drbd_bm_test_bit(device, bit+1) != 1)
713 break;
714 bit++;
715 size += BM_BLOCK_SIZE;
716 if ((BM_BLOCK_SIZE << align) <= size)
717 align++;
718 i++;
720 /* if we merged some,
721 * reset the offset to start the next drbd_bm_find_next from */
722 if (size > BM_BLOCK_SIZE)
723 device->bm_resync_fo = bit + 1;
724 #endif
726 /* adjust very last sectors, in case we are oddly sized */
727 if (sector + (size>>9) > capacity)
728 size = (capacity-sector)<<9;
730 if (device->use_csums) {
731 switch (read_for_csum(peer_device, sector, size)) {
732 case -EIO: /* Disk failure */
733 put_ldev(device);
734 return -EIO;
735 case -EAGAIN: /* allocation failed, or ldev busy */
736 drbd_rs_complete_io(device, sector);
737 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
738 i = rollback_i;
739 goto requeue;
740 case 0:
741 /* everything ok */
742 break;
743 default:
744 BUG();
746 } else {
747 int err;
749 inc_rs_pending(device);
750 err = drbd_send_drequest(peer_device,
751 size == discard_granularity ? P_RS_THIN_REQ : P_RS_DATA_REQUEST,
752 sector, size, ID_SYNCER);
753 if (err) {
754 drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
755 dec_rs_pending(device);
756 put_ldev(device);
757 return err;
762 if (device->bm_resync_fo >= drbd_bm_bits(device)) {
763 /* last syncer _request_ was sent,
764 * but the P_RS_DATA_REPLY not yet received. sync will end (and
765 * next sync group will resume), as soon as we receive the last
766 * resync data block, and the last bit is cleared.
767 * until then resync "work" is "inactive" ...
769 put_ldev(device);
770 return 0;
773 requeue:
774 device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
775 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
776 put_ldev(device);
777 return 0;
780 static int make_ov_request(struct drbd_device *device, int cancel)
782 int number, i, size;
783 sector_t sector;
784 const sector_t capacity = drbd_get_capacity(device->this_bdev);
785 bool stop_sector_reached = false;
787 if (unlikely(cancel))
788 return 1;
790 number = drbd_rs_number_requests(device);
792 sector = device->ov_position;
793 for (i = 0; i < number; i++) {
794 if (sector >= capacity)
795 return 1;
797 /* We check for "finished" only in the reply path:
798 * w_e_end_ov_reply().
799 * We need to send at least one request out. */
800 stop_sector_reached = i > 0
801 && verify_can_do_stop_sector(device)
802 && sector >= device->ov_stop_sector;
803 if (stop_sector_reached)
804 break;
806 size = BM_BLOCK_SIZE;
808 if (drbd_try_rs_begin_io(device, sector)) {
809 device->ov_position = sector;
810 goto requeue;
813 if (sector + (size>>9) > capacity)
814 size = (capacity-sector)<<9;
816 inc_rs_pending(device);
817 if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
818 dec_rs_pending(device);
819 return 0;
821 sector += BM_SECT_PER_BIT;
823 device->ov_position = sector;
825 requeue:
826 device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
827 if (i == 0 || !stop_sector_reached)
828 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
829 return 1;
832 int w_ov_finished(struct drbd_work *w, int cancel)
834 struct drbd_device_work *dw =
835 container_of(w, struct drbd_device_work, w);
836 struct drbd_device *device = dw->device;
837 kfree(dw);
838 ov_out_of_sync_print(device);
839 drbd_resync_finished(device);
841 return 0;
844 static int w_resync_finished(struct drbd_work *w, int cancel)
846 struct drbd_device_work *dw =
847 container_of(w, struct drbd_device_work, w);
848 struct drbd_device *device = dw->device;
849 kfree(dw);
851 drbd_resync_finished(device);
853 return 0;
856 static void ping_peer(struct drbd_device *device)
858 struct drbd_connection *connection = first_peer_device(device)->connection;
860 clear_bit(GOT_PING_ACK, &connection->flags);
861 request_ping(connection);
862 wait_event(connection->ping_wait,
863 test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
866 int drbd_resync_finished(struct drbd_device *device)
868 struct drbd_connection *connection = first_peer_device(device)->connection;
869 unsigned long db, dt, dbdt;
870 unsigned long n_oos;
871 union drbd_state os, ns;
872 struct drbd_device_work *dw;
873 char *khelper_cmd = NULL;
874 int verify_done = 0;
876 /* Remove all elements from the resync LRU. Since future actions
877 * might set bits in the (main) bitmap, then the entries in the
878 * resync LRU would be wrong. */
879 if (drbd_rs_del_all(device)) {
880 /* In case this is not possible now, most probably because
881 * there are P_RS_DATA_REPLY Packets lingering on the worker's
882 * queue (or even the read operations for those packets
883 * is not finished by now). Retry in 100ms. */
885 schedule_timeout_interruptible(HZ / 10);
886 dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
887 if (dw) {
888 dw->w.cb = w_resync_finished;
889 dw->device = device;
890 drbd_queue_work(&connection->sender_work, &dw->w);
891 return 1;
893 drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
896 dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
897 if (dt <= 0)
898 dt = 1;
900 db = device->rs_total;
901 /* adjust for verify start and stop sectors, respective reached position */
902 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
903 db -= device->ov_left;
905 dbdt = Bit2KB(db/dt);
906 device->rs_paused /= HZ;
908 if (!get_ldev(device))
909 goto out;
911 ping_peer(device);
913 spin_lock_irq(&device->resource->req_lock);
914 os = drbd_read_state(device);
916 verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
918 /* This protects us against multiple calls (that can happen in the presence
919 of application IO), and against connectivity loss just before we arrive here. */
920 if (os.conn <= C_CONNECTED)
921 goto out_unlock;
923 ns = os;
924 ns.conn = C_CONNECTED;
926 drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
927 verify_done ? "Online verify" : "Resync",
928 dt + device->rs_paused, device->rs_paused, dbdt);
930 n_oos = drbd_bm_total_weight(device);
932 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
933 if (n_oos) {
934 drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
935 n_oos, Bit2KB(1));
936 khelper_cmd = "out-of-sync";
938 } else {
939 D_ASSERT(device, (n_oos - device->rs_failed) == 0);
941 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
942 khelper_cmd = "after-resync-target";
944 if (device->use_csums && device->rs_total) {
945 const unsigned long s = device->rs_same_csum;
946 const unsigned long t = device->rs_total;
947 const int ratio =
948 (t == 0) ? 0 :
949 (t < 100000) ? ((s*100)/t) : (s/(t/100));
950 drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
951 "transferred %luK total %luK\n",
952 ratio,
953 Bit2KB(device->rs_same_csum),
954 Bit2KB(device->rs_total - device->rs_same_csum),
955 Bit2KB(device->rs_total));
959 if (device->rs_failed) {
960 drbd_info(device, " %lu failed blocks\n", device->rs_failed);
962 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
963 ns.disk = D_INCONSISTENT;
964 ns.pdsk = D_UP_TO_DATE;
965 } else {
966 ns.disk = D_UP_TO_DATE;
967 ns.pdsk = D_INCONSISTENT;
969 } else {
970 ns.disk = D_UP_TO_DATE;
971 ns.pdsk = D_UP_TO_DATE;
973 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
974 if (device->p_uuid) {
975 int i;
976 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
977 _drbd_uuid_set(device, i, device->p_uuid[i]);
978 drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
979 _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
980 } else {
981 drbd_err(device, "device->p_uuid is NULL! BUG\n");
985 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
986 /* for verify runs, we don't update uuids here,
987 * so there would be nothing to report. */
988 drbd_uuid_set_bm(device, 0UL);
989 drbd_print_uuids(device, "updated UUIDs");
990 if (device->p_uuid) {
991 /* Now the two UUID sets are equal, update what we
992 * know of the peer. */
993 int i;
994 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
995 device->p_uuid[i] = device->ldev->md.uuid[i];
1000 _drbd_set_state(device, ns, CS_VERBOSE, NULL);
1001 out_unlock:
1002 spin_unlock_irq(&device->resource->req_lock);
1004 /* If we have been sync source, and have an effective fencing-policy,
1005 * once *all* volumes are back in sync, call "unfence". */
1006 if (os.conn == C_SYNC_SOURCE) {
1007 enum drbd_disk_state disk_state = D_MASK;
1008 enum drbd_disk_state pdsk_state = D_MASK;
1009 enum drbd_fencing_p fp = FP_DONT_CARE;
1011 rcu_read_lock();
1012 fp = rcu_dereference(device->ldev->disk_conf)->fencing;
1013 if (fp != FP_DONT_CARE) {
1014 struct drbd_peer_device *peer_device;
1015 int vnr;
1016 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1017 struct drbd_device *device = peer_device->device;
1018 disk_state = min_t(enum drbd_disk_state, disk_state, device->state.disk);
1019 pdsk_state = min_t(enum drbd_disk_state, pdsk_state, device->state.pdsk);
1022 rcu_read_unlock();
1023 if (disk_state == D_UP_TO_DATE && pdsk_state == D_UP_TO_DATE)
1024 conn_khelper(connection, "unfence-peer");
1027 put_ldev(device);
1028 out:
1029 device->rs_total = 0;
1030 device->rs_failed = 0;
1031 device->rs_paused = 0;
1033 /* reset start sector, if we reached end of device */
1034 if (verify_done && device->ov_left == 0)
1035 device->ov_start_sector = 0;
1037 drbd_md_sync(device);
1039 if (khelper_cmd)
1040 drbd_khelper(device, khelper_cmd);
1042 return 1;
1045 /* helper */
1046 static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
1048 if (drbd_peer_req_has_active_page(peer_req)) {
1049 /* This might happen if sendpage() has not finished */
1050 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
1051 atomic_add(i, &device->pp_in_use_by_net);
1052 atomic_sub(i, &device->pp_in_use);
1053 spin_lock_irq(&device->resource->req_lock);
1054 list_add_tail(&peer_req->w.list, &device->net_ee);
1055 spin_unlock_irq(&device->resource->req_lock);
1056 wake_up(&drbd_pp_wait);
1057 } else
1058 drbd_free_peer_req(device, peer_req);
1062 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
1063 * @w: work object.
1064 * @cancel: The connection will be closed anyways
1066 int w_e_end_data_req(struct drbd_work *w, int cancel)
1068 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1069 struct drbd_peer_device *peer_device = peer_req->peer_device;
1070 struct drbd_device *device = peer_device->device;
1071 int err;
1073 if (unlikely(cancel)) {
1074 drbd_free_peer_req(device, peer_req);
1075 dec_unacked(device);
1076 return 0;
1079 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1080 err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1081 } else {
1082 if (__ratelimit(&drbd_ratelimit_state))
1083 drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1084 (unsigned long long)peer_req->i.sector);
1086 err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1089 dec_unacked(device);
1091 move_to_net_ee_or_free(device, peer_req);
1093 if (unlikely(err))
1094 drbd_err(device, "drbd_send_block() failed\n");
1095 return err;
1098 static bool all_zero(struct drbd_peer_request *peer_req)
1100 struct page *page = peer_req->pages;
1101 unsigned int len = peer_req->i.size;
1103 page_chain_for_each(page) {
1104 unsigned int l = min_t(unsigned int, len, PAGE_SIZE);
1105 unsigned int i, words = l / sizeof(long);
1106 unsigned long *d;
1108 d = kmap_atomic(page);
1109 for (i = 0; i < words; i++) {
1110 if (d[i]) {
1111 kunmap_atomic(d);
1112 return false;
1115 kunmap_atomic(d);
1116 len -= l;
1119 return true;
1123 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1124 * @w: work object.
1125 * @cancel: The connection will be closed anyways
1127 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1129 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1130 struct drbd_peer_device *peer_device = peer_req->peer_device;
1131 struct drbd_device *device = peer_device->device;
1132 int err;
1134 if (unlikely(cancel)) {
1135 drbd_free_peer_req(device, peer_req);
1136 dec_unacked(device);
1137 return 0;
1140 if (get_ldev_if_state(device, D_FAILED)) {
1141 drbd_rs_complete_io(device, peer_req->i.sector);
1142 put_ldev(device);
1145 if (device->state.conn == C_AHEAD) {
1146 err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1147 } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1148 if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1149 inc_rs_pending(device);
1150 if (peer_req->flags & EE_RS_THIN_REQ && all_zero(peer_req))
1151 err = drbd_send_rs_deallocated(peer_device, peer_req);
1152 else
1153 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1154 } else {
1155 if (__ratelimit(&drbd_ratelimit_state))
1156 drbd_err(device, "Not sending RSDataReply, "
1157 "partner DISKLESS!\n");
1158 err = 0;
1160 } else {
1161 if (__ratelimit(&drbd_ratelimit_state))
1162 drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1163 (unsigned long long)peer_req->i.sector);
1165 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1167 /* update resync data with failure */
1168 drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1171 dec_unacked(device);
1173 move_to_net_ee_or_free(device, peer_req);
1175 if (unlikely(err))
1176 drbd_err(device, "drbd_send_block() failed\n");
1177 return err;
1180 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1182 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1183 struct drbd_peer_device *peer_device = peer_req->peer_device;
1184 struct drbd_device *device = peer_device->device;
1185 struct digest_info *di;
1186 int digest_size;
1187 void *digest = NULL;
1188 int err, eq = 0;
1190 if (unlikely(cancel)) {
1191 drbd_free_peer_req(device, peer_req);
1192 dec_unacked(device);
1193 return 0;
1196 if (get_ldev(device)) {
1197 drbd_rs_complete_io(device, peer_req->i.sector);
1198 put_ldev(device);
1201 di = peer_req->digest;
1203 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1204 /* quick hack to try to avoid a race against reconfiguration.
1205 * a real fix would be much more involved,
1206 * introducing more locking mechanisms */
1207 if (peer_device->connection->csums_tfm) {
1208 digest_size = crypto_ahash_digestsize(peer_device->connection->csums_tfm);
1209 D_ASSERT(device, digest_size == di->digest_size);
1210 digest = kmalloc(digest_size, GFP_NOIO);
1212 if (digest) {
1213 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1214 eq = !memcmp(digest, di->digest, digest_size);
1215 kfree(digest);
1218 if (eq) {
1219 drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1220 /* rs_same_csums unit is BM_BLOCK_SIZE */
1221 device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1222 err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1223 } else {
1224 inc_rs_pending(device);
1225 peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1226 peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1227 kfree(di);
1228 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1230 } else {
1231 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1232 if (__ratelimit(&drbd_ratelimit_state))
1233 drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1236 dec_unacked(device);
1237 move_to_net_ee_or_free(device, peer_req);
1239 if (unlikely(err))
1240 drbd_err(device, "drbd_send_block/ack() failed\n");
1241 return err;
1244 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1246 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1247 struct drbd_peer_device *peer_device = peer_req->peer_device;
1248 struct drbd_device *device = peer_device->device;
1249 sector_t sector = peer_req->i.sector;
1250 unsigned int size = peer_req->i.size;
1251 int digest_size;
1252 void *digest;
1253 int err = 0;
1255 if (unlikely(cancel))
1256 goto out;
1258 digest_size = crypto_ahash_digestsize(peer_device->connection->verify_tfm);
1259 digest = kmalloc(digest_size, GFP_NOIO);
1260 if (!digest) {
1261 err = 1; /* terminate the connection in case the allocation failed */
1262 goto out;
1265 if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1266 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1267 else
1268 memset(digest, 0, digest_size);
1270 /* Free e and pages before send.
1271 * In case we block on congestion, we could otherwise run into
1272 * some distributed deadlock, if the other side blocks on
1273 * congestion as well, because our receiver blocks in
1274 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1275 drbd_free_peer_req(device, peer_req);
1276 peer_req = NULL;
1277 inc_rs_pending(device);
1278 err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1279 if (err)
1280 dec_rs_pending(device);
1281 kfree(digest);
1283 out:
1284 if (peer_req)
1285 drbd_free_peer_req(device, peer_req);
1286 dec_unacked(device);
1287 return err;
1290 void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1292 if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1293 device->ov_last_oos_size += size>>9;
1294 } else {
1295 device->ov_last_oos_start = sector;
1296 device->ov_last_oos_size = size>>9;
1298 drbd_set_out_of_sync(device, sector, size);
1301 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1303 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1304 struct drbd_peer_device *peer_device = peer_req->peer_device;
1305 struct drbd_device *device = peer_device->device;
1306 struct digest_info *di;
1307 void *digest;
1308 sector_t sector = peer_req->i.sector;
1309 unsigned int size = peer_req->i.size;
1310 int digest_size;
1311 int err, eq = 0;
1312 bool stop_sector_reached = false;
1314 if (unlikely(cancel)) {
1315 drbd_free_peer_req(device, peer_req);
1316 dec_unacked(device);
1317 return 0;
1320 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1321 * the resync lru has been cleaned up already */
1322 if (get_ldev(device)) {
1323 drbd_rs_complete_io(device, peer_req->i.sector);
1324 put_ldev(device);
1327 di = peer_req->digest;
1329 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1330 digest_size = crypto_ahash_digestsize(peer_device->connection->verify_tfm);
1331 digest = kmalloc(digest_size, GFP_NOIO);
1332 if (digest) {
1333 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1335 D_ASSERT(device, digest_size == di->digest_size);
1336 eq = !memcmp(digest, di->digest, digest_size);
1337 kfree(digest);
1341 /* Free peer_req and pages before send.
1342 * In case we block on congestion, we could otherwise run into
1343 * some distributed deadlock, if the other side blocks on
1344 * congestion as well, because our receiver blocks in
1345 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1346 drbd_free_peer_req(device, peer_req);
1347 if (!eq)
1348 drbd_ov_out_of_sync_found(device, sector, size);
1349 else
1350 ov_out_of_sync_print(device);
1352 err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1353 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1355 dec_unacked(device);
1357 --device->ov_left;
1359 /* let's advance progress step marks only for every other megabyte */
1360 if ((device->ov_left & 0x200) == 0x200)
1361 drbd_advance_rs_marks(device, device->ov_left);
1363 stop_sector_reached = verify_can_do_stop_sector(device) &&
1364 (sector + (size>>9)) >= device->ov_stop_sector;
1366 if (device->ov_left == 0 || stop_sector_reached) {
1367 ov_out_of_sync_print(device);
1368 drbd_resync_finished(device);
1371 return err;
1374 /* FIXME
1375 * We need to track the number of pending barrier acks,
1376 * and to be able to wait for them.
1377 * See also comment in drbd_adm_attach before drbd_suspend_io.
1379 static int drbd_send_barrier(struct drbd_connection *connection)
1381 struct p_barrier *p;
1382 struct drbd_socket *sock;
1384 sock = &connection->data;
1385 p = conn_prepare_command(connection, sock);
1386 if (!p)
1387 return -EIO;
1388 p->barrier = connection->send.current_epoch_nr;
1389 p->pad = 0;
1390 connection->send.current_epoch_writes = 0;
1391 connection->send.last_sent_barrier_jif = jiffies;
1393 return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1396 static int pd_send_unplug_remote(struct drbd_peer_device *pd)
1398 struct drbd_socket *sock = &pd->connection->data;
1399 if (!drbd_prepare_command(pd, sock))
1400 return -EIO;
1401 return drbd_send_command(pd, sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1404 int w_send_write_hint(struct drbd_work *w, int cancel)
1406 struct drbd_device *device =
1407 container_of(w, struct drbd_device, unplug_work);
1409 if (cancel)
1410 return 0;
1411 return pd_send_unplug_remote(first_peer_device(device));
1414 static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1416 if (!connection->send.seen_any_write_yet) {
1417 connection->send.seen_any_write_yet = true;
1418 connection->send.current_epoch_nr = epoch;
1419 connection->send.current_epoch_writes = 0;
1420 connection->send.last_sent_barrier_jif = jiffies;
1424 static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1426 /* re-init if first write on this connection */
1427 if (!connection->send.seen_any_write_yet)
1428 return;
1429 if (connection->send.current_epoch_nr != epoch) {
1430 if (connection->send.current_epoch_writes)
1431 drbd_send_barrier(connection);
1432 connection->send.current_epoch_nr = epoch;
1436 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1438 struct drbd_request *req = container_of(w, struct drbd_request, w);
1439 struct drbd_device *device = req->device;
1440 struct drbd_peer_device *const peer_device = first_peer_device(device);
1441 struct drbd_connection *const connection = peer_device->connection;
1442 int err;
1444 if (unlikely(cancel)) {
1445 req_mod(req, SEND_CANCELED);
1446 return 0;
1448 req->pre_send_jif = jiffies;
1450 /* this time, no connection->send.current_epoch_writes++;
1451 * If it was sent, it was the closing barrier for the last
1452 * replicated epoch, before we went into AHEAD mode.
1453 * No more barriers will be sent, until we leave AHEAD mode again. */
1454 maybe_send_barrier(connection, req->epoch);
1456 err = drbd_send_out_of_sync(peer_device, req);
1457 req_mod(req, OOS_HANDED_TO_NETWORK);
1459 return err;
1463 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1464 * @w: work object.
1465 * @cancel: The connection will be closed anyways
1467 int w_send_dblock(struct drbd_work *w, int cancel)
1469 struct drbd_request *req = container_of(w, struct drbd_request, w);
1470 struct drbd_device *device = req->device;
1471 struct drbd_peer_device *const peer_device = first_peer_device(device);
1472 struct drbd_connection *connection = peer_device->connection;
1473 bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1474 int err;
1476 if (unlikely(cancel)) {
1477 req_mod(req, SEND_CANCELED);
1478 return 0;
1480 req->pre_send_jif = jiffies;
1482 re_init_if_first_write(connection, req->epoch);
1483 maybe_send_barrier(connection, req->epoch);
1484 connection->send.current_epoch_writes++;
1486 err = drbd_send_dblock(peer_device, req);
1487 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1489 if (do_send_unplug && !err)
1490 pd_send_unplug_remote(peer_device);
1492 return err;
1496 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1497 * @w: work object.
1498 * @cancel: The connection will be closed anyways
1500 int w_send_read_req(struct drbd_work *w, int cancel)
1502 struct drbd_request *req = container_of(w, struct drbd_request, w);
1503 struct drbd_device *device = req->device;
1504 struct drbd_peer_device *const peer_device = first_peer_device(device);
1505 struct drbd_connection *connection = peer_device->connection;
1506 bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1507 int err;
1509 if (unlikely(cancel)) {
1510 req_mod(req, SEND_CANCELED);
1511 return 0;
1513 req->pre_send_jif = jiffies;
1515 /* Even read requests may close a write epoch,
1516 * if there was any yet. */
1517 maybe_send_barrier(connection, req->epoch);
1519 err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1520 (unsigned long)req);
1522 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1524 if (do_send_unplug && !err)
1525 pd_send_unplug_remote(peer_device);
1527 return err;
1530 int w_restart_disk_io(struct drbd_work *w, int cancel)
1532 struct drbd_request *req = container_of(w, struct drbd_request, w);
1533 struct drbd_device *device = req->device;
1535 if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1536 drbd_al_begin_io(device, &req->i);
1538 drbd_req_make_private_bio(req, req->master_bio);
1539 bio_set_dev(req->private_bio, device->ldev->backing_bdev);
1540 generic_make_request(req->private_bio);
1542 return 0;
1545 static int _drbd_may_sync_now(struct drbd_device *device)
1547 struct drbd_device *odev = device;
1548 int resync_after;
1550 while (1) {
1551 if (!odev->ldev || odev->state.disk == D_DISKLESS)
1552 return 1;
1553 rcu_read_lock();
1554 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1555 rcu_read_unlock();
1556 if (resync_after == -1)
1557 return 1;
1558 odev = minor_to_device(resync_after);
1559 if (!odev)
1560 return 1;
1561 if ((odev->state.conn >= C_SYNC_SOURCE &&
1562 odev->state.conn <= C_PAUSED_SYNC_T) ||
1563 odev->state.aftr_isp || odev->state.peer_isp ||
1564 odev->state.user_isp)
1565 return 0;
1570 * drbd_pause_after() - Pause resync on all devices that may not resync now
1571 * @device: DRBD device.
1573 * Called from process context only (admin command and after_state_ch).
1575 static bool drbd_pause_after(struct drbd_device *device)
1577 bool changed = false;
1578 struct drbd_device *odev;
1579 int i;
1581 rcu_read_lock();
1582 idr_for_each_entry(&drbd_devices, odev, i) {
1583 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1584 continue;
1585 if (!_drbd_may_sync_now(odev) &&
1586 _drbd_set_state(_NS(odev, aftr_isp, 1),
1587 CS_HARD, NULL) != SS_NOTHING_TO_DO)
1588 changed = true;
1590 rcu_read_unlock();
1592 return changed;
1596 * drbd_resume_next() - Resume resync on all devices that may resync now
1597 * @device: DRBD device.
1599 * Called from process context only (admin command and worker).
1601 static bool drbd_resume_next(struct drbd_device *device)
1603 bool changed = false;
1604 struct drbd_device *odev;
1605 int i;
1607 rcu_read_lock();
1608 idr_for_each_entry(&drbd_devices, odev, i) {
1609 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1610 continue;
1611 if (odev->state.aftr_isp) {
1612 if (_drbd_may_sync_now(odev) &&
1613 _drbd_set_state(_NS(odev, aftr_isp, 0),
1614 CS_HARD, NULL) != SS_NOTHING_TO_DO)
1615 changed = true;
1618 rcu_read_unlock();
1619 return changed;
1622 void resume_next_sg(struct drbd_device *device)
1624 lock_all_resources();
1625 drbd_resume_next(device);
1626 unlock_all_resources();
1629 void suspend_other_sg(struct drbd_device *device)
1631 lock_all_resources();
1632 drbd_pause_after(device);
1633 unlock_all_resources();
1636 /* caller must lock_all_resources() */
1637 enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1639 struct drbd_device *odev;
1640 int resync_after;
1642 if (o_minor == -1)
1643 return NO_ERROR;
1644 if (o_minor < -1 || o_minor > MINORMASK)
1645 return ERR_RESYNC_AFTER;
1647 /* check for loops */
1648 odev = minor_to_device(o_minor);
1649 while (1) {
1650 if (odev == device)
1651 return ERR_RESYNC_AFTER_CYCLE;
1653 /* You are free to depend on diskless, non-existing,
1654 * or not yet/no longer existing minors.
1655 * We only reject dependency loops.
1656 * We cannot follow the dependency chain beyond a detached or
1657 * missing minor.
1659 if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1660 return NO_ERROR;
1662 rcu_read_lock();
1663 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1664 rcu_read_unlock();
1665 /* dependency chain ends here, no cycles. */
1666 if (resync_after == -1)
1667 return NO_ERROR;
1669 /* follow the dependency chain */
1670 odev = minor_to_device(resync_after);
1674 /* caller must lock_all_resources() */
1675 void drbd_resync_after_changed(struct drbd_device *device)
1677 int changed;
1679 do {
1680 changed = drbd_pause_after(device);
1681 changed |= drbd_resume_next(device);
1682 } while (changed);
1685 void drbd_rs_controller_reset(struct drbd_device *device)
1687 struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
1688 struct fifo_buffer *plan;
1690 atomic_set(&device->rs_sect_in, 0);
1691 atomic_set(&device->rs_sect_ev, 0);
1692 device->rs_in_flight = 0;
1693 device->rs_last_events = (int)part_stat_read_accum(&disk->part0, sectors);
1695 /* Updating the RCU protected object in place is necessary since
1696 this function gets called from atomic context.
1697 It is valid since all other updates also lead to an completely
1698 empty fifo */
1699 rcu_read_lock();
1700 plan = rcu_dereference(device->rs_plan_s);
1701 plan->total = 0;
1702 fifo_set(plan, 0);
1703 rcu_read_unlock();
1706 void start_resync_timer_fn(struct timer_list *t)
1708 struct drbd_device *device = from_timer(device, t, start_resync_timer);
1709 drbd_device_post_work(device, RS_START);
1712 static void do_start_resync(struct drbd_device *device)
1714 if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1715 drbd_warn(device, "postponing start_resync ...\n");
1716 device->start_resync_timer.expires = jiffies + HZ/10;
1717 add_timer(&device->start_resync_timer);
1718 return;
1721 drbd_start_resync(device, C_SYNC_SOURCE);
1722 clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1725 static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1727 bool csums_after_crash_only;
1728 rcu_read_lock();
1729 csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1730 rcu_read_unlock();
1731 return connection->agreed_pro_version >= 89 && /* supported? */
1732 connection->csums_tfm && /* configured? */
1733 (csums_after_crash_only == false /* use for each resync? */
1734 || test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */
1738 * drbd_start_resync() - Start the resync process
1739 * @device: DRBD device.
1740 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET
1742 * This function might bring you directly into one of the
1743 * C_PAUSED_SYNC_* states.
1745 void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1747 struct drbd_peer_device *peer_device = first_peer_device(device);
1748 struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1749 union drbd_state ns;
1750 int r;
1752 if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1753 drbd_err(device, "Resync already running!\n");
1754 return;
1757 if (!connection) {
1758 drbd_err(device, "No connection to peer, aborting!\n");
1759 return;
1762 if (!test_bit(B_RS_H_DONE, &device->flags)) {
1763 if (side == C_SYNC_TARGET) {
1764 /* Since application IO was locked out during C_WF_BITMAP_T and
1765 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1766 we check that we might make the data inconsistent. */
1767 r = drbd_khelper(device, "before-resync-target");
1768 r = (r >> 8) & 0xff;
1769 if (r > 0) {
1770 drbd_info(device, "before-resync-target handler returned %d, "
1771 "dropping connection.\n", r);
1772 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1773 return;
1775 } else /* C_SYNC_SOURCE */ {
1776 r = drbd_khelper(device, "before-resync-source");
1777 r = (r >> 8) & 0xff;
1778 if (r > 0) {
1779 if (r == 3) {
1780 drbd_info(device, "before-resync-source handler returned %d, "
1781 "ignoring. Old userland tools?", r);
1782 } else {
1783 drbd_info(device, "before-resync-source handler returned %d, "
1784 "dropping connection.\n", r);
1785 conn_request_state(connection,
1786 NS(conn, C_DISCONNECTING), CS_HARD);
1787 return;
1793 if (current == connection->worker.task) {
1794 /* The worker should not sleep waiting for state_mutex,
1795 that can take long */
1796 if (!mutex_trylock(device->state_mutex)) {
1797 set_bit(B_RS_H_DONE, &device->flags);
1798 device->start_resync_timer.expires = jiffies + HZ/5;
1799 add_timer(&device->start_resync_timer);
1800 return;
1802 } else {
1803 mutex_lock(device->state_mutex);
1806 lock_all_resources();
1807 clear_bit(B_RS_H_DONE, &device->flags);
1808 /* Did some connection breakage or IO error race with us? */
1809 if (device->state.conn < C_CONNECTED
1810 || !get_ldev_if_state(device, D_NEGOTIATING)) {
1811 unlock_all_resources();
1812 goto out;
1815 ns = drbd_read_state(device);
1817 ns.aftr_isp = !_drbd_may_sync_now(device);
1819 ns.conn = side;
1821 if (side == C_SYNC_TARGET)
1822 ns.disk = D_INCONSISTENT;
1823 else /* side == C_SYNC_SOURCE */
1824 ns.pdsk = D_INCONSISTENT;
1826 r = _drbd_set_state(device, ns, CS_VERBOSE, NULL);
1827 ns = drbd_read_state(device);
1829 if (ns.conn < C_CONNECTED)
1830 r = SS_UNKNOWN_ERROR;
1832 if (r == SS_SUCCESS) {
1833 unsigned long tw = drbd_bm_total_weight(device);
1834 unsigned long now = jiffies;
1835 int i;
1837 device->rs_failed = 0;
1838 device->rs_paused = 0;
1839 device->rs_same_csum = 0;
1840 device->rs_last_sect_ev = 0;
1841 device->rs_total = tw;
1842 device->rs_start = now;
1843 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1844 device->rs_mark_left[i] = tw;
1845 device->rs_mark_time[i] = now;
1847 drbd_pause_after(device);
1848 /* Forget potentially stale cached per resync extent bit-counts.
1849 * Open coded drbd_rs_cancel_all(device), we already have IRQs
1850 * disabled, and know the disk state is ok. */
1851 spin_lock(&device->al_lock);
1852 lc_reset(device->resync);
1853 device->resync_locked = 0;
1854 device->resync_wenr = LC_FREE;
1855 spin_unlock(&device->al_lock);
1857 unlock_all_resources();
1859 if (r == SS_SUCCESS) {
1860 wake_up(&device->al_wait); /* for lc_reset() above */
1861 /* reset rs_last_bcast when a resync or verify is started,
1862 * to deal with potential jiffies wrap. */
1863 device->rs_last_bcast = jiffies - HZ;
1865 drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1866 drbd_conn_str(ns.conn),
1867 (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1868 (unsigned long) device->rs_total);
1869 if (side == C_SYNC_TARGET) {
1870 device->bm_resync_fo = 0;
1871 device->use_csums = use_checksum_based_resync(connection, device);
1872 } else {
1873 device->use_csums = false;
1876 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1877 * with w_send_oos, or the sync target will get confused as to
1878 * how much bits to resync. We cannot do that always, because for an
1879 * empty resync and protocol < 95, we need to do it here, as we call
1880 * drbd_resync_finished from here in that case.
1881 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1882 * and from after_state_ch otherwise. */
1883 if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1884 drbd_gen_and_send_sync_uuid(peer_device);
1886 if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1887 /* This still has a race (about when exactly the peers
1888 * detect connection loss) that can lead to a full sync
1889 * on next handshake. In 8.3.9 we fixed this with explicit
1890 * resync-finished notifications, but the fix
1891 * introduces a protocol change. Sleeping for some
1892 * time longer than the ping interval + timeout on the
1893 * SyncSource, to give the SyncTarget the chance to
1894 * detect connection loss, then waiting for a ping
1895 * response (implicit in drbd_resync_finished) reduces
1896 * the race considerably, but does not solve it. */
1897 if (side == C_SYNC_SOURCE) {
1898 struct net_conf *nc;
1899 int timeo;
1901 rcu_read_lock();
1902 nc = rcu_dereference(connection->net_conf);
1903 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1904 rcu_read_unlock();
1905 schedule_timeout_interruptible(timeo);
1907 drbd_resync_finished(device);
1910 drbd_rs_controller_reset(device);
1911 /* ns.conn may already be != device->state.conn,
1912 * we may have been paused in between, or become paused until
1913 * the timer triggers.
1914 * No matter, that is handled in resync_timer_fn() */
1915 if (ns.conn == C_SYNC_TARGET)
1916 mod_timer(&device->resync_timer, jiffies);
1918 drbd_md_sync(device);
1920 put_ldev(device);
1921 out:
1922 mutex_unlock(device->state_mutex);
1925 static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
1927 struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1928 device->rs_last_bcast = jiffies;
1930 if (!get_ldev(device))
1931 return;
1933 drbd_bm_write_lazy(device, 0);
1934 if (resync_done && is_sync_state(device->state.conn))
1935 drbd_resync_finished(device);
1937 drbd_bcast_event(device, &sib);
1938 /* update timestamp, in case it took a while to write out stuff */
1939 device->rs_last_bcast = jiffies;
1940 put_ldev(device);
1943 static void drbd_ldev_destroy(struct drbd_device *device)
1945 lc_destroy(device->resync);
1946 device->resync = NULL;
1947 lc_destroy(device->act_log);
1948 device->act_log = NULL;
1950 __acquire(local);
1951 drbd_backing_dev_free(device, device->ldev);
1952 device->ldev = NULL;
1953 __release(local);
1955 clear_bit(GOING_DISKLESS, &device->flags);
1956 wake_up(&device->misc_wait);
1959 static void go_diskless(struct drbd_device *device)
1961 D_ASSERT(device, device->state.disk == D_FAILED);
1962 /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1963 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1964 * the protected members anymore, though, so once put_ldev reaches zero
1965 * again, it will be safe to free them. */
1967 /* Try to write changed bitmap pages, read errors may have just
1968 * set some bits outside the area covered by the activity log.
1970 * If we have an IO error during the bitmap writeout,
1971 * we will want a full sync next time, just in case.
1972 * (Do we want a specific meta data flag for this?)
1974 * If that does not make it to stable storage either,
1975 * we cannot do anything about that anymore.
1977 * We still need to check if both bitmap and ldev are present, we may
1978 * end up here after a failed attach, before ldev was even assigned.
1980 if (device->bitmap && device->ldev) {
1981 /* An interrupted resync or similar is allowed to recounts bits
1982 * while we detach.
1983 * Any modifications would not be expected anymore, though.
1985 if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1986 "detach", BM_LOCKED_TEST_ALLOWED)) {
1987 if (test_bit(WAS_READ_ERROR, &device->flags)) {
1988 drbd_md_set_flag(device, MDF_FULL_SYNC);
1989 drbd_md_sync(device);
1994 drbd_force_state(device, NS(disk, D_DISKLESS));
1997 static int do_md_sync(struct drbd_device *device)
1999 drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
2000 drbd_md_sync(device);
2001 return 0;
2004 /* only called from drbd_worker thread, no locking */
2005 void __update_timing_details(
2006 struct drbd_thread_timing_details *tdp,
2007 unsigned int *cb_nr,
2008 void *cb,
2009 const char *fn, const unsigned int line)
2011 unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
2012 struct drbd_thread_timing_details *td = tdp + i;
2014 td->start_jif = jiffies;
2015 td->cb_addr = cb;
2016 td->caller_fn = fn;
2017 td->line = line;
2018 td->cb_nr = *cb_nr;
2020 i = (i+1) % DRBD_THREAD_DETAILS_HIST;
2021 td = tdp + i;
2022 memset(td, 0, sizeof(*td));
2024 ++(*cb_nr);
2027 static void do_device_work(struct drbd_device *device, const unsigned long todo)
2029 if (test_bit(MD_SYNC, &todo))
2030 do_md_sync(device);
2031 if (test_bit(RS_DONE, &todo) ||
2032 test_bit(RS_PROGRESS, &todo))
2033 update_on_disk_bitmap(device, test_bit(RS_DONE, &todo));
2034 if (test_bit(GO_DISKLESS, &todo))
2035 go_diskless(device);
2036 if (test_bit(DESTROY_DISK, &todo))
2037 drbd_ldev_destroy(device);
2038 if (test_bit(RS_START, &todo))
2039 do_start_resync(device);
2042 #define DRBD_DEVICE_WORK_MASK \
2043 ((1UL << GO_DISKLESS) \
2044 |(1UL << DESTROY_DISK) \
2045 |(1UL << MD_SYNC) \
2046 |(1UL << RS_START) \
2047 |(1UL << RS_PROGRESS) \
2048 |(1UL << RS_DONE) \
2051 static unsigned long get_work_bits(unsigned long *flags)
2053 unsigned long old, new;
2054 do {
2055 old = *flags;
2056 new = old & ~DRBD_DEVICE_WORK_MASK;
2057 } while (cmpxchg(flags, old, new) != old);
2058 return old & DRBD_DEVICE_WORK_MASK;
2061 static void do_unqueued_work(struct drbd_connection *connection)
2063 struct drbd_peer_device *peer_device;
2064 int vnr;
2066 rcu_read_lock();
2067 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2068 struct drbd_device *device = peer_device->device;
2069 unsigned long todo = get_work_bits(&device->flags);
2070 if (!todo)
2071 continue;
2073 kref_get(&device->kref);
2074 rcu_read_unlock();
2075 do_device_work(device, todo);
2076 kref_put(&device->kref, drbd_destroy_device);
2077 rcu_read_lock();
2079 rcu_read_unlock();
2082 static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
2084 spin_lock_irq(&queue->q_lock);
2085 list_splice_tail_init(&queue->q, work_list);
2086 spin_unlock_irq(&queue->q_lock);
2087 return !list_empty(work_list);
2090 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
2092 DEFINE_WAIT(wait);
2093 struct net_conf *nc;
2094 int uncork, cork;
2096 dequeue_work_batch(&connection->sender_work, work_list);
2097 if (!list_empty(work_list))
2098 return;
2100 /* Still nothing to do?
2101 * Maybe we still need to close the current epoch,
2102 * even if no new requests are queued yet.
2104 * Also, poke TCP, just in case.
2105 * Then wait for new work (or signal). */
2106 rcu_read_lock();
2107 nc = rcu_dereference(connection->net_conf);
2108 uncork = nc ? nc->tcp_cork : 0;
2109 rcu_read_unlock();
2110 if (uncork) {
2111 mutex_lock(&connection->data.mutex);
2112 if (connection->data.socket)
2113 drbd_tcp_uncork(connection->data.socket);
2114 mutex_unlock(&connection->data.mutex);
2117 for (;;) {
2118 int send_barrier;
2119 prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2120 spin_lock_irq(&connection->resource->req_lock);
2121 spin_lock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */
2122 if (!list_empty(&connection->sender_work.q))
2123 list_splice_tail_init(&connection->sender_work.q, work_list);
2124 spin_unlock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */
2125 if (!list_empty(work_list) || signal_pending(current)) {
2126 spin_unlock_irq(&connection->resource->req_lock);
2127 break;
2130 /* We found nothing new to do, no to-be-communicated request,
2131 * no other work item. We may still need to close the last
2132 * epoch. Next incoming request epoch will be connection ->
2133 * current transfer log epoch number. If that is different
2134 * from the epoch of the last request we communicated, it is
2135 * safe to send the epoch separating barrier now.
2137 send_barrier =
2138 atomic_read(&connection->current_tle_nr) !=
2139 connection->send.current_epoch_nr;
2140 spin_unlock_irq(&connection->resource->req_lock);
2142 if (send_barrier)
2143 maybe_send_barrier(connection,
2144 connection->send.current_epoch_nr + 1);
2146 if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2147 break;
2149 /* drbd_send() may have called flush_signals() */
2150 if (get_t_state(&connection->worker) != RUNNING)
2151 break;
2153 schedule();
2154 /* may be woken up for other things but new work, too,
2155 * e.g. if the current epoch got closed.
2156 * In which case we send the barrier above. */
2158 finish_wait(&connection->sender_work.q_wait, &wait);
2160 /* someone may have changed the config while we have been waiting above. */
2161 rcu_read_lock();
2162 nc = rcu_dereference(connection->net_conf);
2163 cork = nc ? nc->tcp_cork : 0;
2164 rcu_read_unlock();
2165 mutex_lock(&connection->data.mutex);
2166 if (connection->data.socket) {
2167 if (cork)
2168 drbd_tcp_cork(connection->data.socket);
2169 else if (!uncork)
2170 drbd_tcp_uncork(connection->data.socket);
2172 mutex_unlock(&connection->data.mutex);
2175 int drbd_worker(struct drbd_thread *thi)
2177 struct drbd_connection *connection = thi->connection;
2178 struct drbd_work *w = NULL;
2179 struct drbd_peer_device *peer_device;
2180 LIST_HEAD(work_list);
2181 int vnr;
2183 while (get_t_state(thi) == RUNNING) {
2184 drbd_thread_current_set_cpu(thi);
2186 if (list_empty(&work_list)) {
2187 update_worker_timing_details(connection, wait_for_work);
2188 wait_for_work(connection, &work_list);
2191 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2192 update_worker_timing_details(connection, do_unqueued_work);
2193 do_unqueued_work(connection);
2196 if (signal_pending(current)) {
2197 flush_signals(current);
2198 if (get_t_state(thi) == RUNNING) {
2199 drbd_warn(connection, "Worker got an unexpected signal\n");
2200 continue;
2202 break;
2205 if (get_t_state(thi) != RUNNING)
2206 break;
2208 if (!list_empty(&work_list)) {
2209 w = list_first_entry(&work_list, struct drbd_work, list);
2210 list_del_init(&w->list);
2211 update_worker_timing_details(connection, w->cb);
2212 if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2213 continue;
2214 if (connection->cstate >= C_WF_REPORT_PARAMS)
2215 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2219 do {
2220 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2221 update_worker_timing_details(connection, do_unqueued_work);
2222 do_unqueued_work(connection);
2224 if (!list_empty(&work_list)) {
2225 w = list_first_entry(&work_list, struct drbd_work, list);
2226 list_del_init(&w->list);
2227 update_worker_timing_details(connection, w->cb);
2228 w->cb(w, 1);
2229 } else
2230 dequeue_work_batch(&connection->sender_work, &work_list);
2231 } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2233 rcu_read_lock();
2234 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2235 struct drbd_device *device = peer_device->device;
2236 D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2237 kref_get(&device->kref);
2238 rcu_read_unlock();
2239 drbd_device_cleanup(device);
2240 kref_put(&device->kref, drbd_destroy_device);
2241 rcu_read_lock();
2243 rcu_read_unlock();
2245 return 0;