s390/ptrace: get rid of long longs in psw_bits
[linux/fpc-iii.git] / drivers / block / drbd / drbd_worker.c
blob5578c1477ba6610f27f80e4cffaa874939a05364
1 /*
2 drbd_worker.c
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
38 #include "drbd_int.h"
39 #include "drbd_protocol.h"
40 #include "drbd_req.h"
42 static int make_ov_request(struct drbd_device *, int);
43 static int make_resync_request(struct drbd_device *, int);
45 /* endio handlers:
46 * drbd_md_endio (defined here)
47 * drbd_request_endio (defined here)
48 * drbd_peer_request_endio (defined here)
49 * drbd_bm_endio (defined in drbd_bitmap.c)
51 * For all these callbacks, note the following:
52 * The callbacks will be called in irq context by the IDE drivers,
53 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
54 * Try to get the locking right :)
59 /* About the global_state_lock
60 Each state transition on an device holds a read lock. In case we have
61 to evaluate the resync after dependencies, we grab a write lock, because
62 we need stable states on all devices for that. */
63 rwlock_t global_state_lock;
65 /* used for synchronous meta data and bitmap IO
66 * submitted by drbd_md_sync_page_io()
68 void drbd_md_endio(struct bio *bio)
70 struct drbd_device *device;
72 device = bio->bi_private;
73 device->md_io.error = bio->bi_error;
75 /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
76 * to timeout on the lower level device, and eventually detach from it.
77 * If this io completion runs after that timeout expired, this
78 * drbd_md_put_buffer() may allow us to finally try and re-attach.
79 * During normal operation, this only puts that extra reference
80 * down to 1 again.
81 * Make sure we first drop the reference, and only then signal
82 * completion, or we may (in drbd_al_read_log()) cycle so fast into the
83 * next drbd_md_sync_page_io(), that we trigger the
84 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
86 drbd_md_put_buffer(device);
87 device->md_io.done = 1;
88 wake_up(&device->misc_wait);
89 bio_put(bio);
90 if (device->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */
91 put_ldev(device);
94 /* reads on behalf of the partner,
95 * "submitted" by the receiver
97 static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
99 unsigned long flags = 0;
100 struct drbd_peer_device *peer_device = peer_req->peer_device;
101 struct drbd_device *device = peer_device->device;
103 spin_lock_irqsave(&device->resource->req_lock, flags);
104 device->read_cnt += peer_req->i.size >> 9;
105 list_del(&peer_req->w.list);
106 if (list_empty(&device->read_ee))
107 wake_up(&device->ee_wait);
108 if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
109 __drbd_chk_io_error(device, DRBD_READ_ERROR);
110 spin_unlock_irqrestore(&device->resource->req_lock, flags);
112 drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
113 put_ldev(device);
116 /* writes on behalf of the partner, or resync writes,
117 * "submitted" by the receiver, final stage. */
118 void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
120 unsigned long flags = 0;
121 struct drbd_peer_device *peer_device = peer_req->peer_device;
122 struct drbd_device *device = peer_device->device;
123 struct drbd_interval i;
124 int do_wake;
125 u64 block_id;
126 int do_al_complete_io;
128 /* after we moved peer_req to done_ee,
129 * we may no longer access it,
130 * it may be freed/reused already!
131 * (as soon as we release the req_lock) */
132 i = peer_req->i;
133 do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
134 block_id = peer_req->block_id;
135 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
137 spin_lock_irqsave(&device->resource->req_lock, flags);
138 device->writ_cnt += peer_req->i.size >> 9;
139 list_move_tail(&peer_req->w.list, &device->done_ee);
142 * Do not remove from the write_requests tree here: we did not send the
143 * Ack yet and did not wake possibly waiting conflicting requests.
144 * Removed from the tree from "drbd_process_done_ee" within the
145 * appropriate dw.cb (e_end_block/e_end_resync_block) or from
146 * _drbd_clear_done_ee.
149 do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
151 /* FIXME do we want to detach for failed REQ_DISCARD?
152 * ((peer_req->flags & (EE_WAS_ERROR|EE_IS_TRIM)) == EE_WAS_ERROR) */
153 if (peer_req->flags & EE_WAS_ERROR)
154 __drbd_chk_io_error(device, DRBD_WRITE_ERROR);
155 spin_unlock_irqrestore(&device->resource->req_lock, flags);
157 if (block_id == ID_SYNCER)
158 drbd_rs_complete_io(device, i.sector);
160 if (do_wake)
161 wake_up(&device->ee_wait);
163 if (do_al_complete_io)
164 drbd_al_complete_io(device, &i);
166 wake_asender(peer_device->connection);
167 put_ldev(device);
170 /* writes on behalf of the partner, or resync writes,
171 * "submitted" by the receiver.
173 void drbd_peer_request_endio(struct bio *bio)
175 struct drbd_peer_request *peer_req = bio->bi_private;
176 struct drbd_device *device = peer_req->peer_device->device;
177 int is_write = bio_data_dir(bio) == WRITE;
178 int is_discard = !!(bio->bi_rw & REQ_DISCARD);
180 if (bio->bi_error && __ratelimit(&drbd_ratelimit_state))
181 drbd_warn(device, "%s: error=%d s=%llus\n",
182 is_write ? (is_discard ? "discard" : "write")
183 : "read", bio->bi_error,
184 (unsigned long long)peer_req->i.sector);
186 if (bio->bi_error)
187 set_bit(__EE_WAS_ERROR, &peer_req->flags);
189 bio_put(bio); /* no need for the bio anymore */
190 if (atomic_dec_and_test(&peer_req->pending_bios)) {
191 if (is_write)
192 drbd_endio_write_sec_final(peer_req);
193 else
194 drbd_endio_read_sec_final(peer_req);
198 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
200 void drbd_request_endio(struct bio *bio)
202 unsigned long flags;
203 struct drbd_request *req = bio->bi_private;
204 struct drbd_device *device = req->device;
205 struct bio_and_error m;
206 enum drbd_req_event what;
208 /* If this request was aborted locally before,
209 * but now was completed "successfully",
210 * chances are that this caused arbitrary data corruption.
212 * "aborting" requests, or force-detaching the disk, is intended for
213 * completely blocked/hung local backing devices which do no longer
214 * complete requests at all, not even do error completions. In this
215 * situation, usually a hard-reset and failover is the only way out.
217 * By "aborting", basically faking a local error-completion,
218 * we allow for a more graceful swichover by cleanly migrating services.
219 * Still the affected node has to be rebooted "soon".
221 * By completing these requests, we allow the upper layers to re-use
222 * the associated data pages.
224 * If later the local backing device "recovers", and now DMAs some data
225 * from disk into the original request pages, in the best case it will
226 * just put random data into unused pages; but typically it will corrupt
227 * meanwhile completely unrelated data, causing all sorts of damage.
229 * Which means delayed successful completion,
230 * especially for READ requests,
231 * is a reason to panic().
233 * We assume that a delayed *error* completion is OK,
234 * though we still will complain noisily about it.
236 if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
237 if (__ratelimit(&drbd_ratelimit_state))
238 drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
240 if (!bio->bi_error)
241 panic("possible random memory corruption caused by delayed completion of aborted local request\n");
244 /* to avoid recursion in __req_mod */
245 if (unlikely(bio->bi_error)) {
246 if (bio->bi_rw & REQ_DISCARD)
247 what = (bio->bi_error == -EOPNOTSUPP)
248 ? DISCARD_COMPLETED_NOTSUPP
249 : DISCARD_COMPLETED_WITH_ERROR;
250 else
251 what = (bio_data_dir(bio) == WRITE)
252 ? WRITE_COMPLETED_WITH_ERROR
253 : (bio_rw(bio) == READ)
254 ? READ_COMPLETED_WITH_ERROR
255 : READ_AHEAD_COMPLETED_WITH_ERROR;
256 } else
257 what = COMPLETED_OK;
259 bio_put(req->private_bio);
260 req->private_bio = ERR_PTR(bio->bi_error);
262 /* not req_mod(), we need irqsave here! */
263 spin_lock_irqsave(&device->resource->req_lock, flags);
264 __req_mod(req, what, &m);
265 spin_unlock_irqrestore(&device->resource->req_lock, flags);
266 put_ldev(device);
268 if (m.bio)
269 complete_master_bio(device, &m);
272 void drbd_csum_ee(struct crypto_hash *tfm, struct drbd_peer_request *peer_req, void *digest)
274 struct hash_desc desc;
275 struct scatterlist sg;
276 struct page *page = peer_req->pages;
277 struct page *tmp;
278 unsigned len;
280 desc.tfm = tfm;
281 desc.flags = 0;
283 sg_init_table(&sg, 1);
284 crypto_hash_init(&desc);
286 while ((tmp = page_chain_next(page))) {
287 /* all but the last page will be fully used */
288 sg_set_page(&sg, page, PAGE_SIZE, 0);
289 crypto_hash_update(&desc, &sg, sg.length);
290 page = tmp;
292 /* and now the last, possibly only partially used page */
293 len = peer_req->i.size & (PAGE_SIZE - 1);
294 sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
295 crypto_hash_update(&desc, &sg, sg.length);
296 crypto_hash_final(&desc, digest);
299 void drbd_csum_bio(struct crypto_hash *tfm, struct bio *bio, void *digest)
301 struct hash_desc desc;
302 struct scatterlist sg;
303 struct bio_vec bvec;
304 struct bvec_iter iter;
306 desc.tfm = tfm;
307 desc.flags = 0;
309 sg_init_table(&sg, 1);
310 crypto_hash_init(&desc);
312 bio_for_each_segment(bvec, bio, iter) {
313 sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset);
314 crypto_hash_update(&desc, &sg, sg.length);
316 crypto_hash_final(&desc, digest);
319 /* MAYBE merge common code with w_e_end_ov_req */
320 static int w_e_send_csum(struct drbd_work *w, int cancel)
322 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
323 struct drbd_peer_device *peer_device = peer_req->peer_device;
324 struct drbd_device *device = peer_device->device;
325 int digest_size;
326 void *digest;
327 int err = 0;
329 if (unlikely(cancel))
330 goto out;
332 if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
333 goto out;
335 digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm);
336 digest = kmalloc(digest_size, GFP_NOIO);
337 if (digest) {
338 sector_t sector = peer_req->i.sector;
339 unsigned int size = peer_req->i.size;
340 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
341 /* Free peer_req and pages before send.
342 * In case we block on congestion, we could otherwise run into
343 * some distributed deadlock, if the other side blocks on
344 * congestion as well, because our receiver blocks in
345 * drbd_alloc_pages due to pp_in_use > max_buffers. */
346 drbd_free_peer_req(device, peer_req);
347 peer_req = NULL;
348 inc_rs_pending(device);
349 err = drbd_send_drequest_csum(peer_device, sector, size,
350 digest, digest_size,
351 P_CSUM_RS_REQUEST);
352 kfree(digest);
353 } else {
354 drbd_err(device, "kmalloc() of digest failed.\n");
355 err = -ENOMEM;
358 out:
359 if (peer_req)
360 drbd_free_peer_req(device, peer_req);
362 if (unlikely(err))
363 drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
364 return err;
367 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
369 static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
371 struct drbd_device *device = peer_device->device;
372 struct drbd_peer_request *peer_req;
374 if (!get_ldev(device))
375 return -EIO;
377 /* GFP_TRY, because if there is no memory available right now, this may
378 * be rescheduled for later. It is "only" background resync, after all. */
379 peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
380 size, true /* has real payload */, GFP_TRY);
381 if (!peer_req)
382 goto defer;
384 peer_req->w.cb = w_e_send_csum;
385 spin_lock_irq(&device->resource->req_lock);
386 list_add_tail(&peer_req->w.list, &device->read_ee);
387 spin_unlock_irq(&device->resource->req_lock);
389 atomic_add(size >> 9, &device->rs_sect_ev);
390 if (drbd_submit_peer_request(device, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
391 return 0;
393 /* If it failed because of ENOMEM, retry should help. If it failed
394 * because bio_add_page failed (probably broken lower level driver),
395 * retry may or may not help.
396 * If it does not, you may need to force disconnect. */
397 spin_lock_irq(&device->resource->req_lock);
398 list_del(&peer_req->w.list);
399 spin_unlock_irq(&device->resource->req_lock);
401 drbd_free_peer_req(device, peer_req);
402 defer:
403 put_ldev(device);
404 return -EAGAIN;
407 int w_resync_timer(struct drbd_work *w, int cancel)
409 struct drbd_device *device =
410 container_of(w, struct drbd_device, resync_work);
412 switch (device->state.conn) {
413 case C_VERIFY_S:
414 make_ov_request(device, cancel);
415 break;
416 case C_SYNC_TARGET:
417 make_resync_request(device, cancel);
418 break;
421 return 0;
424 void resync_timer_fn(unsigned long data)
426 struct drbd_device *device = (struct drbd_device *) data;
428 drbd_queue_work_if_unqueued(
429 &first_peer_device(device)->connection->sender_work,
430 &device->resync_work);
433 static void fifo_set(struct fifo_buffer *fb, int value)
435 int i;
437 for (i = 0; i < fb->size; i++)
438 fb->values[i] = value;
441 static int fifo_push(struct fifo_buffer *fb, int value)
443 int ov;
445 ov = fb->values[fb->head_index];
446 fb->values[fb->head_index++] = value;
448 if (fb->head_index >= fb->size)
449 fb->head_index = 0;
451 return ov;
454 static void fifo_add_val(struct fifo_buffer *fb, int value)
456 int i;
458 for (i = 0; i < fb->size; i++)
459 fb->values[i] += value;
462 struct fifo_buffer *fifo_alloc(int fifo_size)
464 struct fifo_buffer *fb;
466 fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
467 if (!fb)
468 return NULL;
470 fb->head_index = 0;
471 fb->size = fifo_size;
472 fb->total = 0;
474 return fb;
477 static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
479 struct disk_conf *dc;
480 unsigned int want; /* The number of sectors we want in-flight */
481 int req_sect; /* Number of sectors to request in this turn */
482 int correction; /* Number of sectors more we need in-flight */
483 int cps; /* correction per invocation of drbd_rs_controller() */
484 int steps; /* Number of time steps to plan ahead */
485 int curr_corr;
486 int max_sect;
487 struct fifo_buffer *plan;
489 dc = rcu_dereference(device->ldev->disk_conf);
490 plan = rcu_dereference(device->rs_plan_s);
492 steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
494 if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
495 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
496 } else { /* normal path */
497 want = dc->c_fill_target ? dc->c_fill_target :
498 sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
501 correction = want - device->rs_in_flight - plan->total;
503 /* Plan ahead */
504 cps = correction / steps;
505 fifo_add_val(plan, cps);
506 plan->total += cps * steps;
508 /* What we do in this step */
509 curr_corr = fifo_push(plan, 0);
510 plan->total -= curr_corr;
512 req_sect = sect_in + curr_corr;
513 if (req_sect < 0)
514 req_sect = 0;
516 max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
517 if (req_sect > max_sect)
518 req_sect = max_sect;
521 drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
522 sect_in, device->rs_in_flight, want, correction,
523 steps, cps, device->rs_planed, curr_corr, req_sect);
526 return req_sect;
529 static int drbd_rs_number_requests(struct drbd_device *device)
531 unsigned int sect_in; /* Number of sectors that came in since the last turn */
532 int number, mxb;
534 sect_in = atomic_xchg(&device->rs_sect_in, 0);
535 device->rs_in_flight -= sect_in;
537 rcu_read_lock();
538 mxb = drbd_get_max_buffers(device) / 2;
539 if (rcu_dereference(device->rs_plan_s)->size) {
540 number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
541 device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
542 } else {
543 device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
544 number = SLEEP_TIME * device->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ);
546 rcu_read_unlock();
548 /* Don't have more than "max-buffers"/2 in-flight.
549 * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
550 * potentially causing a distributed deadlock on congestion during
551 * online-verify or (checksum-based) resync, if max-buffers,
552 * socket buffer sizes and resync rate settings are mis-configured. */
554 /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
555 * mxb (as used here, and in drbd_alloc_pages on the peer) is
556 * "number of pages" (typically also 4k),
557 * but "rs_in_flight" is in "sectors" (512 Byte). */
558 if (mxb - device->rs_in_flight/8 < number)
559 number = mxb - device->rs_in_flight/8;
561 return number;
564 static int make_resync_request(struct drbd_device *const device, int cancel)
566 struct drbd_peer_device *const peer_device = first_peer_device(device);
567 struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
568 unsigned long bit;
569 sector_t sector;
570 const sector_t capacity = drbd_get_capacity(device->this_bdev);
571 int max_bio_size;
572 int number, rollback_i, size;
573 int align, requeue = 0;
574 int i = 0;
576 if (unlikely(cancel))
577 return 0;
579 if (device->rs_total == 0) {
580 /* empty resync? */
581 drbd_resync_finished(device);
582 return 0;
585 if (!get_ldev(device)) {
586 /* Since we only need to access device->rsync a
587 get_ldev_if_state(device,D_FAILED) would be sufficient, but
588 to continue resync with a broken disk makes no sense at
589 all */
590 drbd_err(device, "Disk broke down during resync!\n");
591 return 0;
594 max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
595 number = drbd_rs_number_requests(device);
596 if (number <= 0)
597 goto requeue;
599 for (i = 0; i < number; i++) {
600 /* Stop generating RS requests when half of the send buffer is filled,
601 * but notify TCP that we'd like to have more space. */
602 mutex_lock(&connection->data.mutex);
603 if (connection->data.socket) {
604 struct sock *sk = connection->data.socket->sk;
605 int queued = sk->sk_wmem_queued;
606 int sndbuf = sk->sk_sndbuf;
607 if (queued > sndbuf / 2) {
608 requeue = 1;
609 if (sk->sk_socket)
610 set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
612 } else
613 requeue = 1;
614 mutex_unlock(&connection->data.mutex);
615 if (requeue)
616 goto requeue;
618 next_sector:
619 size = BM_BLOCK_SIZE;
620 bit = drbd_bm_find_next(device, device->bm_resync_fo);
622 if (bit == DRBD_END_OF_BITMAP) {
623 device->bm_resync_fo = drbd_bm_bits(device);
624 put_ldev(device);
625 return 0;
628 sector = BM_BIT_TO_SECT(bit);
630 if (drbd_try_rs_begin_io(device, sector)) {
631 device->bm_resync_fo = bit;
632 goto requeue;
634 device->bm_resync_fo = bit + 1;
636 if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
637 drbd_rs_complete_io(device, sector);
638 goto next_sector;
641 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
642 /* try to find some adjacent bits.
643 * we stop if we have already the maximum req size.
645 * Additionally always align bigger requests, in order to
646 * be prepared for all stripe sizes of software RAIDs.
648 align = 1;
649 rollback_i = i;
650 while (i < number) {
651 if (size + BM_BLOCK_SIZE > max_bio_size)
652 break;
654 /* Be always aligned */
655 if (sector & ((1<<(align+3))-1))
656 break;
658 /* do not cross extent boundaries */
659 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
660 break;
661 /* now, is it actually dirty, after all?
662 * caution, drbd_bm_test_bit is tri-state for some
663 * obscure reason; ( b == 0 ) would get the out-of-band
664 * only accidentally right because of the "oddly sized"
665 * adjustment below */
666 if (drbd_bm_test_bit(device, bit+1) != 1)
667 break;
668 bit++;
669 size += BM_BLOCK_SIZE;
670 if ((BM_BLOCK_SIZE << align) <= size)
671 align++;
672 i++;
674 /* if we merged some,
675 * reset the offset to start the next drbd_bm_find_next from */
676 if (size > BM_BLOCK_SIZE)
677 device->bm_resync_fo = bit + 1;
678 #endif
680 /* adjust very last sectors, in case we are oddly sized */
681 if (sector + (size>>9) > capacity)
682 size = (capacity-sector)<<9;
684 if (device->use_csums) {
685 switch (read_for_csum(peer_device, sector, size)) {
686 case -EIO: /* Disk failure */
687 put_ldev(device);
688 return -EIO;
689 case -EAGAIN: /* allocation failed, or ldev busy */
690 drbd_rs_complete_io(device, sector);
691 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
692 i = rollback_i;
693 goto requeue;
694 case 0:
695 /* everything ok */
696 break;
697 default:
698 BUG();
700 } else {
701 int err;
703 inc_rs_pending(device);
704 err = drbd_send_drequest(peer_device, P_RS_DATA_REQUEST,
705 sector, size, ID_SYNCER);
706 if (err) {
707 drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
708 dec_rs_pending(device);
709 put_ldev(device);
710 return err;
715 if (device->bm_resync_fo >= drbd_bm_bits(device)) {
716 /* last syncer _request_ was sent,
717 * but the P_RS_DATA_REPLY not yet received. sync will end (and
718 * next sync group will resume), as soon as we receive the last
719 * resync data block, and the last bit is cleared.
720 * until then resync "work" is "inactive" ...
722 put_ldev(device);
723 return 0;
726 requeue:
727 device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
728 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
729 put_ldev(device);
730 return 0;
733 static int make_ov_request(struct drbd_device *device, int cancel)
735 int number, i, size;
736 sector_t sector;
737 const sector_t capacity = drbd_get_capacity(device->this_bdev);
738 bool stop_sector_reached = false;
740 if (unlikely(cancel))
741 return 1;
743 number = drbd_rs_number_requests(device);
745 sector = device->ov_position;
746 for (i = 0; i < number; i++) {
747 if (sector >= capacity)
748 return 1;
750 /* We check for "finished" only in the reply path:
751 * w_e_end_ov_reply().
752 * We need to send at least one request out. */
753 stop_sector_reached = i > 0
754 && verify_can_do_stop_sector(device)
755 && sector >= device->ov_stop_sector;
756 if (stop_sector_reached)
757 break;
759 size = BM_BLOCK_SIZE;
761 if (drbd_try_rs_begin_io(device, sector)) {
762 device->ov_position = sector;
763 goto requeue;
766 if (sector + (size>>9) > capacity)
767 size = (capacity-sector)<<9;
769 inc_rs_pending(device);
770 if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
771 dec_rs_pending(device);
772 return 0;
774 sector += BM_SECT_PER_BIT;
776 device->ov_position = sector;
778 requeue:
779 device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
780 if (i == 0 || !stop_sector_reached)
781 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
782 return 1;
785 int w_ov_finished(struct drbd_work *w, int cancel)
787 struct drbd_device_work *dw =
788 container_of(w, struct drbd_device_work, w);
789 struct drbd_device *device = dw->device;
790 kfree(dw);
791 ov_out_of_sync_print(device);
792 drbd_resync_finished(device);
794 return 0;
797 static int w_resync_finished(struct drbd_work *w, int cancel)
799 struct drbd_device_work *dw =
800 container_of(w, struct drbd_device_work, w);
801 struct drbd_device *device = dw->device;
802 kfree(dw);
804 drbd_resync_finished(device);
806 return 0;
809 static void ping_peer(struct drbd_device *device)
811 struct drbd_connection *connection = first_peer_device(device)->connection;
813 clear_bit(GOT_PING_ACK, &connection->flags);
814 request_ping(connection);
815 wait_event(connection->ping_wait,
816 test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
819 int drbd_resync_finished(struct drbd_device *device)
821 unsigned long db, dt, dbdt;
822 unsigned long n_oos;
823 union drbd_state os, ns;
824 struct drbd_device_work *dw;
825 char *khelper_cmd = NULL;
826 int verify_done = 0;
828 /* Remove all elements from the resync LRU. Since future actions
829 * might set bits in the (main) bitmap, then the entries in the
830 * resync LRU would be wrong. */
831 if (drbd_rs_del_all(device)) {
832 /* In case this is not possible now, most probably because
833 * there are P_RS_DATA_REPLY Packets lingering on the worker's
834 * queue (or even the read operations for those packets
835 * is not finished by now). Retry in 100ms. */
837 schedule_timeout_interruptible(HZ / 10);
838 dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
839 if (dw) {
840 dw->w.cb = w_resync_finished;
841 dw->device = device;
842 drbd_queue_work(&first_peer_device(device)->connection->sender_work,
843 &dw->w);
844 return 1;
846 drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
849 dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
850 if (dt <= 0)
851 dt = 1;
853 db = device->rs_total;
854 /* adjust for verify start and stop sectors, respective reached position */
855 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
856 db -= device->ov_left;
858 dbdt = Bit2KB(db/dt);
859 device->rs_paused /= HZ;
861 if (!get_ldev(device))
862 goto out;
864 ping_peer(device);
866 spin_lock_irq(&device->resource->req_lock);
867 os = drbd_read_state(device);
869 verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
871 /* This protects us against multiple calls (that can happen in the presence
872 of application IO), and against connectivity loss just before we arrive here. */
873 if (os.conn <= C_CONNECTED)
874 goto out_unlock;
876 ns = os;
877 ns.conn = C_CONNECTED;
879 drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
880 verify_done ? "Online verify" : "Resync",
881 dt + device->rs_paused, device->rs_paused, dbdt);
883 n_oos = drbd_bm_total_weight(device);
885 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
886 if (n_oos) {
887 drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
888 n_oos, Bit2KB(1));
889 khelper_cmd = "out-of-sync";
891 } else {
892 D_ASSERT(device, (n_oos - device->rs_failed) == 0);
894 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
895 khelper_cmd = "after-resync-target";
897 if (device->use_csums && device->rs_total) {
898 const unsigned long s = device->rs_same_csum;
899 const unsigned long t = device->rs_total;
900 const int ratio =
901 (t == 0) ? 0 :
902 (t < 100000) ? ((s*100)/t) : (s/(t/100));
903 drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
904 "transferred %luK total %luK\n",
905 ratio,
906 Bit2KB(device->rs_same_csum),
907 Bit2KB(device->rs_total - device->rs_same_csum),
908 Bit2KB(device->rs_total));
912 if (device->rs_failed) {
913 drbd_info(device, " %lu failed blocks\n", device->rs_failed);
915 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
916 ns.disk = D_INCONSISTENT;
917 ns.pdsk = D_UP_TO_DATE;
918 } else {
919 ns.disk = D_UP_TO_DATE;
920 ns.pdsk = D_INCONSISTENT;
922 } else {
923 ns.disk = D_UP_TO_DATE;
924 ns.pdsk = D_UP_TO_DATE;
926 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
927 if (device->p_uuid) {
928 int i;
929 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
930 _drbd_uuid_set(device, i, device->p_uuid[i]);
931 drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
932 _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
933 } else {
934 drbd_err(device, "device->p_uuid is NULL! BUG\n");
938 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
939 /* for verify runs, we don't update uuids here,
940 * so there would be nothing to report. */
941 drbd_uuid_set_bm(device, 0UL);
942 drbd_print_uuids(device, "updated UUIDs");
943 if (device->p_uuid) {
944 /* Now the two UUID sets are equal, update what we
945 * know of the peer. */
946 int i;
947 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
948 device->p_uuid[i] = device->ldev->md.uuid[i];
953 _drbd_set_state(device, ns, CS_VERBOSE, NULL);
954 out_unlock:
955 spin_unlock_irq(&device->resource->req_lock);
956 put_ldev(device);
957 out:
958 device->rs_total = 0;
959 device->rs_failed = 0;
960 device->rs_paused = 0;
962 /* reset start sector, if we reached end of device */
963 if (verify_done && device->ov_left == 0)
964 device->ov_start_sector = 0;
966 drbd_md_sync(device);
968 if (khelper_cmd)
969 drbd_khelper(device, khelper_cmd);
971 return 1;
974 /* helper */
975 static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
977 if (drbd_peer_req_has_active_page(peer_req)) {
978 /* This might happen if sendpage() has not finished */
979 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
980 atomic_add(i, &device->pp_in_use_by_net);
981 atomic_sub(i, &device->pp_in_use);
982 spin_lock_irq(&device->resource->req_lock);
983 list_add_tail(&peer_req->w.list, &device->net_ee);
984 spin_unlock_irq(&device->resource->req_lock);
985 wake_up(&drbd_pp_wait);
986 } else
987 drbd_free_peer_req(device, peer_req);
991 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
992 * @device: DRBD device.
993 * @w: work object.
994 * @cancel: The connection will be closed anyways
996 int w_e_end_data_req(struct drbd_work *w, int cancel)
998 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
999 struct drbd_peer_device *peer_device = peer_req->peer_device;
1000 struct drbd_device *device = peer_device->device;
1001 int err;
1003 if (unlikely(cancel)) {
1004 drbd_free_peer_req(device, peer_req);
1005 dec_unacked(device);
1006 return 0;
1009 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1010 err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1011 } else {
1012 if (__ratelimit(&drbd_ratelimit_state))
1013 drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1014 (unsigned long long)peer_req->i.sector);
1016 err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1019 dec_unacked(device);
1021 move_to_net_ee_or_free(device, peer_req);
1023 if (unlikely(err))
1024 drbd_err(device, "drbd_send_block() failed\n");
1025 return err;
1029 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1030 * @w: work object.
1031 * @cancel: The connection will be closed anyways
1033 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1035 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1036 struct drbd_peer_device *peer_device = peer_req->peer_device;
1037 struct drbd_device *device = peer_device->device;
1038 int err;
1040 if (unlikely(cancel)) {
1041 drbd_free_peer_req(device, peer_req);
1042 dec_unacked(device);
1043 return 0;
1046 if (get_ldev_if_state(device, D_FAILED)) {
1047 drbd_rs_complete_io(device, peer_req->i.sector);
1048 put_ldev(device);
1051 if (device->state.conn == C_AHEAD) {
1052 err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1053 } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1054 if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1055 inc_rs_pending(device);
1056 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1057 } else {
1058 if (__ratelimit(&drbd_ratelimit_state))
1059 drbd_err(device, "Not sending RSDataReply, "
1060 "partner DISKLESS!\n");
1061 err = 0;
1063 } else {
1064 if (__ratelimit(&drbd_ratelimit_state))
1065 drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1066 (unsigned long long)peer_req->i.sector);
1068 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1070 /* update resync data with failure */
1071 drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1074 dec_unacked(device);
1076 move_to_net_ee_or_free(device, peer_req);
1078 if (unlikely(err))
1079 drbd_err(device, "drbd_send_block() failed\n");
1080 return err;
1083 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1085 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1086 struct drbd_peer_device *peer_device = peer_req->peer_device;
1087 struct drbd_device *device = peer_device->device;
1088 struct digest_info *di;
1089 int digest_size;
1090 void *digest = NULL;
1091 int err, eq = 0;
1093 if (unlikely(cancel)) {
1094 drbd_free_peer_req(device, peer_req);
1095 dec_unacked(device);
1096 return 0;
1099 if (get_ldev(device)) {
1100 drbd_rs_complete_io(device, peer_req->i.sector);
1101 put_ldev(device);
1104 di = peer_req->digest;
1106 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1107 /* quick hack to try to avoid a race against reconfiguration.
1108 * a real fix would be much more involved,
1109 * introducing more locking mechanisms */
1110 if (peer_device->connection->csums_tfm) {
1111 digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm);
1112 D_ASSERT(device, digest_size == di->digest_size);
1113 digest = kmalloc(digest_size, GFP_NOIO);
1115 if (digest) {
1116 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1117 eq = !memcmp(digest, di->digest, digest_size);
1118 kfree(digest);
1121 if (eq) {
1122 drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1123 /* rs_same_csums unit is BM_BLOCK_SIZE */
1124 device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1125 err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1126 } else {
1127 inc_rs_pending(device);
1128 peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1129 peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1130 kfree(di);
1131 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1133 } else {
1134 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1135 if (__ratelimit(&drbd_ratelimit_state))
1136 drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1139 dec_unacked(device);
1140 move_to_net_ee_or_free(device, peer_req);
1142 if (unlikely(err))
1143 drbd_err(device, "drbd_send_block/ack() failed\n");
1144 return err;
1147 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1149 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1150 struct drbd_peer_device *peer_device = peer_req->peer_device;
1151 struct drbd_device *device = peer_device->device;
1152 sector_t sector = peer_req->i.sector;
1153 unsigned int size = peer_req->i.size;
1154 int digest_size;
1155 void *digest;
1156 int err = 0;
1158 if (unlikely(cancel))
1159 goto out;
1161 digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm);
1162 digest = kmalloc(digest_size, GFP_NOIO);
1163 if (!digest) {
1164 err = 1; /* terminate the connection in case the allocation failed */
1165 goto out;
1168 if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1169 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1170 else
1171 memset(digest, 0, digest_size);
1173 /* Free e and pages before send.
1174 * In case we block on congestion, we could otherwise run into
1175 * some distributed deadlock, if the other side blocks on
1176 * congestion as well, because our receiver blocks in
1177 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1178 drbd_free_peer_req(device, peer_req);
1179 peer_req = NULL;
1180 inc_rs_pending(device);
1181 err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1182 if (err)
1183 dec_rs_pending(device);
1184 kfree(digest);
1186 out:
1187 if (peer_req)
1188 drbd_free_peer_req(device, peer_req);
1189 dec_unacked(device);
1190 return err;
1193 void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1195 if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1196 device->ov_last_oos_size += size>>9;
1197 } else {
1198 device->ov_last_oos_start = sector;
1199 device->ov_last_oos_size = size>>9;
1201 drbd_set_out_of_sync(device, sector, size);
1204 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1206 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1207 struct drbd_peer_device *peer_device = peer_req->peer_device;
1208 struct drbd_device *device = peer_device->device;
1209 struct digest_info *di;
1210 void *digest;
1211 sector_t sector = peer_req->i.sector;
1212 unsigned int size = peer_req->i.size;
1213 int digest_size;
1214 int err, eq = 0;
1215 bool stop_sector_reached = false;
1217 if (unlikely(cancel)) {
1218 drbd_free_peer_req(device, peer_req);
1219 dec_unacked(device);
1220 return 0;
1223 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1224 * the resync lru has been cleaned up already */
1225 if (get_ldev(device)) {
1226 drbd_rs_complete_io(device, peer_req->i.sector);
1227 put_ldev(device);
1230 di = peer_req->digest;
1232 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1233 digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm);
1234 digest = kmalloc(digest_size, GFP_NOIO);
1235 if (digest) {
1236 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1238 D_ASSERT(device, digest_size == di->digest_size);
1239 eq = !memcmp(digest, di->digest, digest_size);
1240 kfree(digest);
1244 /* Free peer_req and pages before send.
1245 * In case we block on congestion, we could otherwise run into
1246 * some distributed deadlock, if the other side blocks on
1247 * congestion as well, because our receiver blocks in
1248 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1249 drbd_free_peer_req(device, peer_req);
1250 if (!eq)
1251 drbd_ov_out_of_sync_found(device, sector, size);
1252 else
1253 ov_out_of_sync_print(device);
1255 err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1256 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1258 dec_unacked(device);
1260 --device->ov_left;
1262 /* let's advance progress step marks only for every other megabyte */
1263 if ((device->ov_left & 0x200) == 0x200)
1264 drbd_advance_rs_marks(device, device->ov_left);
1266 stop_sector_reached = verify_can_do_stop_sector(device) &&
1267 (sector + (size>>9)) >= device->ov_stop_sector;
1269 if (device->ov_left == 0 || stop_sector_reached) {
1270 ov_out_of_sync_print(device);
1271 drbd_resync_finished(device);
1274 return err;
1277 /* FIXME
1278 * We need to track the number of pending barrier acks,
1279 * and to be able to wait for them.
1280 * See also comment in drbd_adm_attach before drbd_suspend_io.
1282 static int drbd_send_barrier(struct drbd_connection *connection)
1284 struct p_barrier *p;
1285 struct drbd_socket *sock;
1287 sock = &connection->data;
1288 p = conn_prepare_command(connection, sock);
1289 if (!p)
1290 return -EIO;
1291 p->barrier = connection->send.current_epoch_nr;
1292 p->pad = 0;
1293 connection->send.current_epoch_writes = 0;
1295 return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1298 int w_send_write_hint(struct drbd_work *w, int cancel)
1300 struct drbd_device *device =
1301 container_of(w, struct drbd_device, unplug_work);
1302 struct drbd_socket *sock;
1304 if (cancel)
1305 return 0;
1306 sock = &first_peer_device(device)->connection->data;
1307 if (!drbd_prepare_command(first_peer_device(device), sock))
1308 return -EIO;
1309 return drbd_send_command(first_peer_device(device), sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1312 static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1314 if (!connection->send.seen_any_write_yet) {
1315 connection->send.seen_any_write_yet = true;
1316 connection->send.current_epoch_nr = epoch;
1317 connection->send.current_epoch_writes = 0;
1321 static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1323 /* re-init if first write on this connection */
1324 if (!connection->send.seen_any_write_yet)
1325 return;
1326 if (connection->send.current_epoch_nr != epoch) {
1327 if (connection->send.current_epoch_writes)
1328 drbd_send_barrier(connection);
1329 connection->send.current_epoch_nr = epoch;
1333 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1335 struct drbd_request *req = container_of(w, struct drbd_request, w);
1336 struct drbd_device *device = req->device;
1337 struct drbd_peer_device *const peer_device = first_peer_device(device);
1338 struct drbd_connection *const connection = peer_device->connection;
1339 int err;
1341 if (unlikely(cancel)) {
1342 req_mod(req, SEND_CANCELED);
1343 return 0;
1345 req->pre_send_jif = jiffies;
1347 /* this time, no connection->send.current_epoch_writes++;
1348 * If it was sent, it was the closing barrier for the last
1349 * replicated epoch, before we went into AHEAD mode.
1350 * No more barriers will be sent, until we leave AHEAD mode again. */
1351 maybe_send_barrier(connection, req->epoch);
1353 err = drbd_send_out_of_sync(peer_device, req);
1354 req_mod(req, OOS_HANDED_TO_NETWORK);
1356 return err;
1360 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1361 * @w: work object.
1362 * @cancel: The connection will be closed anyways
1364 int w_send_dblock(struct drbd_work *w, int cancel)
1366 struct drbd_request *req = container_of(w, struct drbd_request, w);
1367 struct drbd_device *device = req->device;
1368 struct drbd_peer_device *const peer_device = first_peer_device(device);
1369 struct drbd_connection *connection = peer_device->connection;
1370 int err;
1372 if (unlikely(cancel)) {
1373 req_mod(req, SEND_CANCELED);
1374 return 0;
1376 req->pre_send_jif = jiffies;
1378 re_init_if_first_write(connection, req->epoch);
1379 maybe_send_barrier(connection, req->epoch);
1380 connection->send.current_epoch_writes++;
1382 err = drbd_send_dblock(peer_device, req);
1383 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1385 return err;
1389 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1390 * @w: work object.
1391 * @cancel: The connection will be closed anyways
1393 int w_send_read_req(struct drbd_work *w, int cancel)
1395 struct drbd_request *req = container_of(w, struct drbd_request, w);
1396 struct drbd_device *device = req->device;
1397 struct drbd_peer_device *const peer_device = first_peer_device(device);
1398 struct drbd_connection *connection = peer_device->connection;
1399 int err;
1401 if (unlikely(cancel)) {
1402 req_mod(req, SEND_CANCELED);
1403 return 0;
1405 req->pre_send_jif = jiffies;
1407 /* Even read requests may close a write epoch,
1408 * if there was any yet. */
1409 maybe_send_barrier(connection, req->epoch);
1411 err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1412 (unsigned long)req);
1414 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1416 return err;
1419 int w_restart_disk_io(struct drbd_work *w, int cancel)
1421 struct drbd_request *req = container_of(w, struct drbd_request, w);
1422 struct drbd_device *device = req->device;
1424 if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1425 drbd_al_begin_io(device, &req->i);
1427 drbd_req_make_private_bio(req, req->master_bio);
1428 req->private_bio->bi_bdev = device->ldev->backing_bdev;
1429 generic_make_request(req->private_bio);
1431 return 0;
1434 static int _drbd_may_sync_now(struct drbd_device *device)
1436 struct drbd_device *odev = device;
1437 int resync_after;
1439 while (1) {
1440 if (!odev->ldev || odev->state.disk == D_DISKLESS)
1441 return 1;
1442 rcu_read_lock();
1443 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1444 rcu_read_unlock();
1445 if (resync_after == -1)
1446 return 1;
1447 odev = minor_to_device(resync_after);
1448 if (!odev)
1449 return 1;
1450 if ((odev->state.conn >= C_SYNC_SOURCE &&
1451 odev->state.conn <= C_PAUSED_SYNC_T) ||
1452 odev->state.aftr_isp || odev->state.peer_isp ||
1453 odev->state.user_isp)
1454 return 0;
1459 * _drbd_pause_after() - Pause resync on all devices that may not resync now
1460 * @device: DRBD device.
1462 * Called from process context only (admin command and after_state_ch).
1464 static int _drbd_pause_after(struct drbd_device *device)
1466 struct drbd_device *odev;
1467 int i, rv = 0;
1469 rcu_read_lock();
1470 idr_for_each_entry(&drbd_devices, odev, i) {
1471 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1472 continue;
1473 if (!_drbd_may_sync_now(odev))
1474 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1475 != SS_NOTHING_TO_DO);
1477 rcu_read_unlock();
1479 return rv;
1483 * _drbd_resume_next() - Resume resync on all devices that may resync now
1484 * @device: DRBD device.
1486 * Called from process context only (admin command and worker).
1488 static int _drbd_resume_next(struct drbd_device *device)
1490 struct drbd_device *odev;
1491 int i, rv = 0;
1493 rcu_read_lock();
1494 idr_for_each_entry(&drbd_devices, odev, i) {
1495 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1496 continue;
1497 if (odev->state.aftr_isp) {
1498 if (_drbd_may_sync_now(odev))
1499 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1500 CS_HARD, NULL)
1501 != SS_NOTHING_TO_DO) ;
1504 rcu_read_unlock();
1505 return rv;
1508 void resume_next_sg(struct drbd_device *device)
1510 write_lock_irq(&global_state_lock);
1511 _drbd_resume_next(device);
1512 write_unlock_irq(&global_state_lock);
1515 void suspend_other_sg(struct drbd_device *device)
1517 write_lock_irq(&global_state_lock);
1518 _drbd_pause_after(device);
1519 write_unlock_irq(&global_state_lock);
1522 /* caller must hold global_state_lock */
1523 enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1525 struct drbd_device *odev;
1526 int resync_after;
1528 if (o_minor == -1)
1529 return NO_ERROR;
1530 if (o_minor < -1 || o_minor > MINORMASK)
1531 return ERR_RESYNC_AFTER;
1533 /* check for loops */
1534 odev = minor_to_device(o_minor);
1535 while (1) {
1536 if (odev == device)
1537 return ERR_RESYNC_AFTER_CYCLE;
1539 /* You are free to depend on diskless, non-existing,
1540 * or not yet/no longer existing minors.
1541 * We only reject dependency loops.
1542 * We cannot follow the dependency chain beyond a detached or
1543 * missing minor.
1545 if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1546 return NO_ERROR;
1548 rcu_read_lock();
1549 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1550 rcu_read_unlock();
1551 /* dependency chain ends here, no cycles. */
1552 if (resync_after == -1)
1553 return NO_ERROR;
1555 /* follow the dependency chain */
1556 odev = minor_to_device(resync_after);
1560 /* caller must hold global_state_lock */
1561 void drbd_resync_after_changed(struct drbd_device *device)
1563 int changes;
1565 do {
1566 changes = _drbd_pause_after(device);
1567 changes |= _drbd_resume_next(device);
1568 } while (changes);
1571 void drbd_rs_controller_reset(struct drbd_device *device)
1573 struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
1574 struct fifo_buffer *plan;
1576 atomic_set(&device->rs_sect_in, 0);
1577 atomic_set(&device->rs_sect_ev, 0);
1578 device->rs_in_flight = 0;
1579 device->rs_last_events =
1580 (int)part_stat_read(&disk->part0, sectors[0]) +
1581 (int)part_stat_read(&disk->part0, sectors[1]);
1583 /* Updating the RCU protected object in place is necessary since
1584 this function gets called from atomic context.
1585 It is valid since all other updates also lead to an completely
1586 empty fifo */
1587 rcu_read_lock();
1588 plan = rcu_dereference(device->rs_plan_s);
1589 plan->total = 0;
1590 fifo_set(plan, 0);
1591 rcu_read_unlock();
1594 void start_resync_timer_fn(unsigned long data)
1596 struct drbd_device *device = (struct drbd_device *) data;
1597 drbd_device_post_work(device, RS_START);
1600 static void do_start_resync(struct drbd_device *device)
1602 if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1603 drbd_warn(device, "postponing start_resync ...\n");
1604 device->start_resync_timer.expires = jiffies + HZ/10;
1605 add_timer(&device->start_resync_timer);
1606 return;
1609 drbd_start_resync(device, C_SYNC_SOURCE);
1610 clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1613 static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1615 bool csums_after_crash_only;
1616 rcu_read_lock();
1617 csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1618 rcu_read_unlock();
1619 return connection->agreed_pro_version >= 89 && /* supported? */
1620 connection->csums_tfm && /* configured? */
1621 (csums_after_crash_only == 0 /* use for each resync? */
1622 || test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */
1626 * drbd_start_resync() - Start the resync process
1627 * @device: DRBD device.
1628 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET
1630 * This function might bring you directly into one of the
1631 * C_PAUSED_SYNC_* states.
1633 void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1635 struct drbd_peer_device *peer_device = first_peer_device(device);
1636 struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1637 union drbd_state ns;
1638 int r;
1640 if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1641 drbd_err(device, "Resync already running!\n");
1642 return;
1645 if (!test_bit(B_RS_H_DONE, &device->flags)) {
1646 if (side == C_SYNC_TARGET) {
1647 /* Since application IO was locked out during C_WF_BITMAP_T and
1648 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1649 we check that we might make the data inconsistent. */
1650 r = drbd_khelper(device, "before-resync-target");
1651 r = (r >> 8) & 0xff;
1652 if (r > 0) {
1653 drbd_info(device, "before-resync-target handler returned %d, "
1654 "dropping connection.\n", r);
1655 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1656 return;
1658 } else /* C_SYNC_SOURCE */ {
1659 r = drbd_khelper(device, "before-resync-source");
1660 r = (r >> 8) & 0xff;
1661 if (r > 0) {
1662 if (r == 3) {
1663 drbd_info(device, "before-resync-source handler returned %d, "
1664 "ignoring. Old userland tools?", r);
1665 } else {
1666 drbd_info(device, "before-resync-source handler returned %d, "
1667 "dropping connection.\n", r);
1668 conn_request_state(connection,
1669 NS(conn, C_DISCONNECTING), CS_HARD);
1670 return;
1676 if (current == connection->worker.task) {
1677 /* The worker should not sleep waiting for state_mutex,
1678 that can take long */
1679 if (!mutex_trylock(device->state_mutex)) {
1680 set_bit(B_RS_H_DONE, &device->flags);
1681 device->start_resync_timer.expires = jiffies + HZ/5;
1682 add_timer(&device->start_resync_timer);
1683 return;
1685 } else {
1686 mutex_lock(device->state_mutex);
1688 clear_bit(B_RS_H_DONE, &device->flags);
1690 /* req_lock: serialize with drbd_send_and_submit() and others
1691 * global_state_lock: for stable sync-after dependencies */
1692 spin_lock_irq(&device->resource->req_lock);
1693 write_lock(&global_state_lock);
1694 /* Did some connection breakage or IO error race with us? */
1695 if (device->state.conn < C_CONNECTED
1696 || !get_ldev_if_state(device, D_NEGOTIATING)) {
1697 write_unlock(&global_state_lock);
1698 spin_unlock_irq(&device->resource->req_lock);
1699 mutex_unlock(device->state_mutex);
1700 return;
1703 ns = drbd_read_state(device);
1705 ns.aftr_isp = !_drbd_may_sync_now(device);
1707 ns.conn = side;
1709 if (side == C_SYNC_TARGET)
1710 ns.disk = D_INCONSISTENT;
1711 else /* side == C_SYNC_SOURCE */
1712 ns.pdsk = D_INCONSISTENT;
1714 r = __drbd_set_state(device, ns, CS_VERBOSE, NULL);
1715 ns = drbd_read_state(device);
1717 if (ns.conn < C_CONNECTED)
1718 r = SS_UNKNOWN_ERROR;
1720 if (r == SS_SUCCESS) {
1721 unsigned long tw = drbd_bm_total_weight(device);
1722 unsigned long now = jiffies;
1723 int i;
1725 device->rs_failed = 0;
1726 device->rs_paused = 0;
1727 device->rs_same_csum = 0;
1728 device->rs_last_sect_ev = 0;
1729 device->rs_total = tw;
1730 device->rs_start = now;
1731 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1732 device->rs_mark_left[i] = tw;
1733 device->rs_mark_time[i] = now;
1735 _drbd_pause_after(device);
1736 /* Forget potentially stale cached per resync extent bit-counts.
1737 * Open coded drbd_rs_cancel_all(device), we already have IRQs
1738 * disabled, and know the disk state is ok. */
1739 spin_lock(&device->al_lock);
1740 lc_reset(device->resync);
1741 device->resync_locked = 0;
1742 device->resync_wenr = LC_FREE;
1743 spin_unlock(&device->al_lock);
1745 write_unlock(&global_state_lock);
1746 spin_unlock_irq(&device->resource->req_lock);
1748 if (r == SS_SUCCESS) {
1749 wake_up(&device->al_wait); /* for lc_reset() above */
1750 /* reset rs_last_bcast when a resync or verify is started,
1751 * to deal with potential jiffies wrap. */
1752 device->rs_last_bcast = jiffies - HZ;
1754 drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1755 drbd_conn_str(ns.conn),
1756 (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1757 (unsigned long) device->rs_total);
1758 if (side == C_SYNC_TARGET) {
1759 device->bm_resync_fo = 0;
1760 device->use_csums = use_checksum_based_resync(connection, device);
1761 } else {
1762 device->use_csums = 0;
1765 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1766 * with w_send_oos, or the sync target will get confused as to
1767 * how much bits to resync. We cannot do that always, because for an
1768 * empty resync and protocol < 95, we need to do it here, as we call
1769 * drbd_resync_finished from here in that case.
1770 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1771 * and from after_state_ch otherwise. */
1772 if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1773 drbd_gen_and_send_sync_uuid(peer_device);
1775 if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1776 /* This still has a race (about when exactly the peers
1777 * detect connection loss) that can lead to a full sync
1778 * on next handshake. In 8.3.9 we fixed this with explicit
1779 * resync-finished notifications, but the fix
1780 * introduces a protocol change. Sleeping for some
1781 * time longer than the ping interval + timeout on the
1782 * SyncSource, to give the SyncTarget the chance to
1783 * detect connection loss, then waiting for a ping
1784 * response (implicit in drbd_resync_finished) reduces
1785 * the race considerably, but does not solve it. */
1786 if (side == C_SYNC_SOURCE) {
1787 struct net_conf *nc;
1788 int timeo;
1790 rcu_read_lock();
1791 nc = rcu_dereference(connection->net_conf);
1792 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1793 rcu_read_unlock();
1794 schedule_timeout_interruptible(timeo);
1796 drbd_resync_finished(device);
1799 drbd_rs_controller_reset(device);
1800 /* ns.conn may already be != device->state.conn,
1801 * we may have been paused in between, or become paused until
1802 * the timer triggers.
1803 * No matter, that is handled in resync_timer_fn() */
1804 if (ns.conn == C_SYNC_TARGET)
1805 mod_timer(&device->resync_timer, jiffies);
1807 drbd_md_sync(device);
1809 put_ldev(device);
1810 mutex_unlock(device->state_mutex);
1813 static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
1815 struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1816 device->rs_last_bcast = jiffies;
1818 if (!get_ldev(device))
1819 return;
1821 drbd_bm_write_lazy(device, 0);
1822 if (resync_done && is_sync_state(device->state.conn))
1823 drbd_resync_finished(device);
1825 drbd_bcast_event(device, &sib);
1826 /* update timestamp, in case it took a while to write out stuff */
1827 device->rs_last_bcast = jiffies;
1828 put_ldev(device);
1831 static void drbd_ldev_destroy(struct drbd_device *device)
1833 lc_destroy(device->resync);
1834 device->resync = NULL;
1835 lc_destroy(device->act_log);
1836 device->act_log = NULL;
1838 __acquire(local);
1839 drbd_free_ldev(device->ldev);
1840 device->ldev = NULL;
1841 __release(local);
1843 clear_bit(GOING_DISKLESS, &device->flags);
1844 wake_up(&device->misc_wait);
1847 static void go_diskless(struct drbd_device *device)
1849 D_ASSERT(device, device->state.disk == D_FAILED);
1850 /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1851 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1852 * the protected members anymore, though, so once put_ldev reaches zero
1853 * again, it will be safe to free them. */
1855 /* Try to write changed bitmap pages, read errors may have just
1856 * set some bits outside the area covered by the activity log.
1858 * If we have an IO error during the bitmap writeout,
1859 * we will want a full sync next time, just in case.
1860 * (Do we want a specific meta data flag for this?)
1862 * If that does not make it to stable storage either,
1863 * we cannot do anything about that anymore.
1865 * We still need to check if both bitmap and ldev are present, we may
1866 * end up here after a failed attach, before ldev was even assigned.
1868 if (device->bitmap && device->ldev) {
1869 /* An interrupted resync or similar is allowed to recounts bits
1870 * while we detach.
1871 * Any modifications would not be expected anymore, though.
1873 if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1874 "detach", BM_LOCKED_TEST_ALLOWED)) {
1875 if (test_bit(WAS_READ_ERROR, &device->flags)) {
1876 drbd_md_set_flag(device, MDF_FULL_SYNC);
1877 drbd_md_sync(device);
1882 drbd_force_state(device, NS(disk, D_DISKLESS));
1885 static int do_md_sync(struct drbd_device *device)
1887 drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
1888 drbd_md_sync(device);
1889 return 0;
1892 /* only called from drbd_worker thread, no locking */
1893 void __update_timing_details(
1894 struct drbd_thread_timing_details *tdp,
1895 unsigned int *cb_nr,
1896 void *cb,
1897 const char *fn, const unsigned int line)
1899 unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
1900 struct drbd_thread_timing_details *td = tdp + i;
1902 td->start_jif = jiffies;
1903 td->cb_addr = cb;
1904 td->caller_fn = fn;
1905 td->line = line;
1906 td->cb_nr = *cb_nr;
1908 i = (i+1) % DRBD_THREAD_DETAILS_HIST;
1909 td = tdp + i;
1910 memset(td, 0, sizeof(*td));
1912 ++(*cb_nr);
1915 static void do_device_work(struct drbd_device *device, const unsigned long todo)
1917 if (test_bit(MD_SYNC, &todo))
1918 do_md_sync(device);
1919 if (test_bit(RS_DONE, &todo) ||
1920 test_bit(RS_PROGRESS, &todo))
1921 update_on_disk_bitmap(device, test_bit(RS_DONE, &todo));
1922 if (test_bit(GO_DISKLESS, &todo))
1923 go_diskless(device);
1924 if (test_bit(DESTROY_DISK, &todo))
1925 drbd_ldev_destroy(device);
1926 if (test_bit(RS_START, &todo))
1927 do_start_resync(device);
1930 #define DRBD_DEVICE_WORK_MASK \
1931 ((1UL << GO_DISKLESS) \
1932 |(1UL << DESTROY_DISK) \
1933 |(1UL << MD_SYNC) \
1934 |(1UL << RS_START) \
1935 |(1UL << RS_PROGRESS) \
1936 |(1UL << RS_DONE) \
1939 static unsigned long get_work_bits(unsigned long *flags)
1941 unsigned long old, new;
1942 do {
1943 old = *flags;
1944 new = old & ~DRBD_DEVICE_WORK_MASK;
1945 } while (cmpxchg(flags, old, new) != old);
1946 return old & DRBD_DEVICE_WORK_MASK;
1949 static void do_unqueued_work(struct drbd_connection *connection)
1951 struct drbd_peer_device *peer_device;
1952 int vnr;
1954 rcu_read_lock();
1955 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1956 struct drbd_device *device = peer_device->device;
1957 unsigned long todo = get_work_bits(&device->flags);
1958 if (!todo)
1959 continue;
1961 kref_get(&device->kref);
1962 rcu_read_unlock();
1963 do_device_work(device, todo);
1964 kref_put(&device->kref, drbd_destroy_device);
1965 rcu_read_lock();
1967 rcu_read_unlock();
1970 static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
1972 spin_lock_irq(&queue->q_lock);
1973 list_splice_tail_init(&queue->q, work_list);
1974 spin_unlock_irq(&queue->q_lock);
1975 return !list_empty(work_list);
1978 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
1980 DEFINE_WAIT(wait);
1981 struct net_conf *nc;
1982 int uncork, cork;
1984 dequeue_work_batch(&connection->sender_work, work_list);
1985 if (!list_empty(work_list))
1986 return;
1988 /* Still nothing to do?
1989 * Maybe we still need to close the current epoch,
1990 * even if no new requests are queued yet.
1992 * Also, poke TCP, just in case.
1993 * Then wait for new work (or signal). */
1994 rcu_read_lock();
1995 nc = rcu_dereference(connection->net_conf);
1996 uncork = nc ? nc->tcp_cork : 0;
1997 rcu_read_unlock();
1998 if (uncork) {
1999 mutex_lock(&connection->data.mutex);
2000 if (connection->data.socket)
2001 drbd_tcp_uncork(connection->data.socket);
2002 mutex_unlock(&connection->data.mutex);
2005 for (;;) {
2006 int send_barrier;
2007 prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2008 spin_lock_irq(&connection->resource->req_lock);
2009 spin_lock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */
2010 if (!list_empty(&connection->sender_work.q))
2011 list_splice_tail_init(&connection->sender_work.q, work_list);
2012 spin_unlock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */
2013 if (!list_empty(work_list) || signal_pending(current)) {
2014 spin_unlock_irq(&connection->resource->req_lock);
2015 break;
2018 /* We found nothing new to do, no to-be-communicated request,
2019 * no other work item. We may still need to close the last
2020 * epoch. Next incoming request epoch will be connection ->
2021 * current transfer log epoch number. If that is different
2022 * from the epoch of the last request we communicated, it is
2023 * safe to send the epoch separating barrier now.
2025 send_barrier =
2026 atomic_read(&connection->current_tle_nr) !=
2027 connection->send.current_epoch_nr;
2028 spin_unlock_irq(&connection->resource->req_lock);
2030 if (send_barrier)
2031 maybe_send_barrier(connection,
2032 connection->send.current_epoch_nr + 1);
2034 if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2035 break;
2037 /* drbd_send() may have called flush_signals() */
2038 if (get_t_state(&connection->worker) != RUNNING)
2039 break;
2041 schedule();
2042 /* may be woken up for other things but new work, too,
2043 * e.g. if the current epoch got closed.
2044 * In which case we send the barrier above. */
2046 finish_wait(&connection->sender_work.q_wait, &wait);
2048 /* someone may have changed the config while we have been waiting above. */
2049 rcu_read_lock();
2050 nc = rcu_dereference(connection->net_conf);
2051 cork = nc ? nc->tcp_cork : 0;
2052 rcu_read_unlock();
2053 mutex_lock(&connection->data.mutex);
2054 if (connection->data.socket) {
2055 if (cork)
2056 drbd_tcp_cork(connection->data.socket);
2057 else if (!uncork)
2058 drbd_tcp_uncork(connection->data.socket);
2060 mutex_unlock(&connection->data.mutex);
2063 int drbd_worker(struct drbd_thread *thi)
2065 struct drbd_connection *connection = thi->connection;
2066 struct drbd_work *w = NULL;
2067 struct drbd_peer_device *peer_device;
2068 LIST_HEAD(work_list);
2069 int vnr;
2071 while (get_t_state(thi) == RUNNING) {
2072 drbd_thread_current_set_cpu(thi);
2074 if (list_empty(&work_list)) {
2075 update_worker_timing_details(connection, wait_for_work);
2076 wait_for_work(connection, &work_list);
2079 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2080 update_worker_timing_details(connection, do_unqueued_work);
2081 do_unqueued_work(connection);
2084 if (signal_pending(current)) {
2085 flush_signals(current);
2086 if (get_t_state(thi) == RUNNING) {
2087 drbd_warn(connection, "Worker got an unexpected signal\n");
2088 continue;
2090 break;
2093 if (get_t_state(thi) != RUNNING)
2094 break;
2096 if (!list_empty(&work_list)) {
2097 w = list_first_entry(&work_list, struct drbd_work, list);
2098 list_del_init(&w->list);
2099 update_worker_timing_details(connection, w->cb);
2100 if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2101 continue;
2102 if (connection->cstate >= C_WF_REPORT_PARAMS)
2103 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2107 do {
2108 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2109 update_worker_timing_details(connection, do_unqueued_work);
2110 do_unqueued_work(connection);
2112 if (!list_empty(&work_list)) {
2113 w = list_first_entry(&work_list, struct drbd_work, list);
2114 list_del_init(&w->list);
2115 update_worker_timing_details(connection, w->cb);
2116 w->cb(w, 1);
2117 } else
2118 dequeue_work_batch(&connection->sender_work, &work_list);
2119 } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2121 rcu_read_lock();
2122 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2123 struct drbd_device *device = peer_device->device;
2124 D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2125 kref_get(&device->kref);
2126 rcu_read_unlock();
2127 drbd_device_cleanup(device);
2128 kref_put(&device->kref, drbd_destroy_device);
2129 rcu_read_lock();
2131 rcu_read_unlock();
2133 return 0;