Linux 2.6.34-rc3
[pohmelfs.git] / drivers / block / drbd / drbd_worker.c
blobb453c2bca3be5d3740c81be7e41d2ff953bb937d
1 /*
2 drbd_worker.c
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/smp_lock.h>
30 #include <linux/wait.h>
31 #include <linux/mm.h>
32 #include <linux/memcontrol.h>
33 #include <linux/mm_inline.h>
34 #include <linux/slab.h>
35 #include <linux/random.h>
36 #include <linux/string.h>
37 #include <linux/scatterlist.h>
39 #include "drbd_int.h"
40 #include "drbd_req.h"
42 #define SLEEP_TIME (HZ/10)
44 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
48 /* defined here:
49 drbd_md_io_complete
50 drbd_endio_write_sec
51 drbd_endio_read_sec
52 drbd_endio_pri
54 * more endio handlers:
55 atodb_endio in drbd_actlog.c
56 drbd_bm_async_io_complete in drbd_bitmap.c
58 * For all these callbacks, note the following:
59 * The callbacks will be called in irq context by the IDE drivers,
60 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
61 * Try to get the locking right :)
66 /* About the global_state_lock
67 Each state transition on an device holds a read lock. In case we have
68 to evaluate the sync after dependencies, we grab a write lock, because
69 we need stable states on all devices for that. */
70 rwlock_t global_state_lock;
72 /* used for synchronous meta data and bitmap IO
73 * submitted by drbd_md_sync_page_io()
75 void drbd_md_io_complete(struct bio *bio, int error)
77 struct drbd_md_io *md_io;
79 md_io = (struct drbd_md_io *)bio->bi_private;
80 md_io->error = error;
82 complete(&md_io->event);
85 /* reads on behalf of the partner,
86 * "submitted" by the receiver
88 void drbd_endio_read_sec(struct bio *bio, int error) __releases(local)
90 unsigned long flags = 0;
91 struct drbd_epoch_entry *e = NULL;
92 struct drbd_conf *mdev;
93 int uptodate = bio_flagged(bio, BIO_UPTODATE);
95 e = bio->bi_private;
96 mdev = e->mdev;
98 if (error)
99 dev_warn(DEV, "read: error=%d s=%llus\n", error,
100 (unsigned long long)e->sector);
101 if (!error && !uptodate) {
102 dev_warn(DEV, "read: setting error to -EIO s=%llus\n",
103 (unsigned long long)e->sector);
104 /* strange behavior of some lower level drivers...
105 * fail the request by clearing the uptodate flag,
106 * but do not return any error?! */
107 error = -EIO;
110 D_ASSERT(e->block_id != ID_VACANT);
112 spin_lock_irqsave(&mdev->req_lock, flags);
113 mdev->read_cnt += e->size >> 9;
114 list_del(&e->w.list);
115 if (list_empty(&mdev->read_ee))
116 wake_up(&mdev->ee_wait);
117 spin_unlock_irqrestore(&mdev->req_lock, flags);
119 drbd_chk_io_error(mdev, error, FALSE);
120 drbd_queue_work(&mdev->data.work, &e->w);
121 put_ldev(mdev);
124 /* writes on behalf of the partner, or resync writes,
125 * "submitted" by the receiver.
127 void drbd_endio_write_sec(struct bio *bio, int error) __releases(local)
129 unsigned long flags = 0;
130 struct drbd_epoch_entry *e = NULL;
131 struct drbd_conf *mdev;
132 sector_t e_sector;
133 int do_wake;
134 int is_syncer_req;
135 int do_al_complete_io;
136 int uptodate = bio_flagged(bio, BIO_UPTODATE);
137 int is_barrier = bio_rw_flagged(bio, BIO_RW_BARRIER);
139 e = bio->bi_private;
140 mdev = e->mdev;
142 if (error)
143 dev_warn(DEV, "write: error=%d s=%llus\n", error,
144 (unsigned long long)e->sector);
145 if (!error && !uptodate) {
146 dev_warn(DEV, "write: setting error to -EIO s=%llus\n",
147 (unsigned long long)e->sector);
148 /* strange behavior of some lower level drivers...
149 * fail the request by clearing the uptodate flag,
150 * but do not return any error?! */
151 error = -EIO;
154 /* error == -ENOTSUPP would be a better test,
155 * alas it is not reliable */
156 if (error && is_barrier && e->flags & EE_IS_BARRIER) {
157 drbd_bump_write_ordering(mdev, WO_bdev_flush);
158 spin_lock_irqsave(&mdev->req_lock, flags);
159 list_del(&e->w.list);
160 e->w.cb = w_e_reissue;
161 /* put_ldev actually happens below, once we come here again. */
162 __release(local);
163 spin_unlock_irqrestore(&mdev->req_lock, flags);
164 drbd_queue_work(&mdev->data.work, &e->w);
165 return;
168 D_ASSERT(e->block_id != ID_VACANT);
170 spin_lock_irqsave(&mdev->req_lock, flags);
171 mdev->writ_cnt += e->size >> 9;
172 is_syncer_req = is_syncer_block_id(e->block_id);
174 /* after we moved e to done_ee,
175 * we may no longer access it,
176 * it may be freed/reused already!
177 * (as soon as we release the req_lock) */
178 e_sector = e->sector;
179 do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
181 list_del(&e->w.list); /* has been on active_ee or sync_ee */
182 list_add_tail(&e->w.list, &mdev->done_ee);
184 /* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
185 * neither did we wake possibly waiting conflicting requests.
186 * done from "drbd_process_done_ee" within the appropriate w.cb
187 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
189 do_wake = is_syncer_req
190 ? list_empty(&mdev->sync_ee)
191 : list_empty(&mdev->active_ee);
193 if (error)
194 __drbd_chk_io_error(mdev, FALSE);
195 spin_unlock_irqrestore(&mdev->req_lock, flags);
197 if (is_syncer_req)
198 drbd_rs_complete_io(mdev, e_sector);
200 if (do_wake)
201 wake_up(&mdev->ee_wait);
203 if (do_al_complete_io)
204 drbd_al_complete_io(mdev, e_sector);
206 wake_asender(mdev);
207 put_ldev(mdev);
211 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
213 void drbd_endio_pri(struct bio *bio, int error)
215 unsigned long flags;
216 struct drbd_request *req = bio->bi_private;
217 struct drbd_conf *mdev = req->mdev;
218 struct bio_and_error m;
219 enum drbd_req_event what;
220 int uptodate = bio_flagged(bio, BIO_UPTODATE);
222 if (error)
223 dev_warn(DEV, "p %s: error=%d\n",
224 bio_data_dir(bio) == WRITE ? "write" : "read", error);
225 if (!error && !uptodate) {
226 dev_warn(DEV, "p %s: setting error to -EIO\n",
227 bio_data_dir(bio) == WRITE ? "write" : "read");
228 /* strange behavior of some lower level drivers...
229 * fail the request by clearing the uptodate flag,
230 * but do not return any error?! */
231 error = -EIO;
234 /* to avoid recursion in __req_mod */
235 if (unlikely(error)) {
236 what = (bio_data_dir(bio) == WRITE)
237 ? write_completed_with_error
238 : (bio_rw(bio) == READA)
239 ? read_completed_with_error
240 : read_ahead_completed_with_error;
241 } else
242 what = completed_ok;
244 bio_put(req->private_bio);
245 req->private_bio = ERR_PTR(error);
247 spin_lock_irqsave(&mdev->req_lock, flags);
248 __req_mod(req, what, &m);
249 spin_unlock_irqrestore(&mdev->req_lock, flags);
251 if (m.bio)
252 complete_master_bio(mdev, &m);
255 int w_io_error(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
257 struct drbd_request *req = container_of(w, struct drbd_request, w);
259 /* NOTE: mdev->ldev can be NULL by the time we get here! */
260 /* D_ASSERT(mdev->ldev->dc.on_io_error != EP_PASS_ON); */
262 /* the only way this callback is scheduled is from _req_may_be_done,
263 * when it is done and had a local write error, see comments there */
264 drbd_req_free(req);
266 return TRUE;
269 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
271 struct drbd_request *req = container_of(w, struct drbd_request, w);
273 /* We should not detach for read io-error,
274 * but try to WRITE the P_DATA_REPLY to the failed location,
275 * to give the disk the chance to relocate that block */
277 spin_lock_irq(&mdev->req_lock);
278 if (cancel ||
279 mdev->state.conn < C_CONNECTED ||
280 mdev->state.pdsk <= D_INCONSISTENT) {
281 _req_mod(req, send_canceled);
282 spin_unlock_irq(&mdev->req_lock);
283 dev_alert(DEV, "WE ARE LOST. Local IO failure, no peer.\n");
284 return 1;
286 spin_unlock_irq(&mdev->req_lock);
288 return w_send_read_req(mdev, w, 0);
291 int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
293 ERR_IF(cancel) return 1;
294 dev_err(DEV, "resync inactive, but callback triggered??\n");
295 return 1; /* Simply ignore this! */
298 void drbd_csum(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
300 struct hash_desc desc;
301 struct scatterlist sg;
302 struct bio_vec *bvec;
303 int i;
305 desc.tfm = tfm;
306 desc.flags = 0;
308 sg_init_table(&sg, 1);
309 crypto_hash_init(&desc);
311 __bio_for_each_segment(bvec, bio, i, 0) {
312 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
313 crypto_hash_update(&desc, &sg, sg.length);
315 crypto_hash_final(&desc, digest);
318 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
320 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
321 int digest_size;
322 void *digest;
323 int ok;
325 D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
327 if (unlikely(cancel)) {
328 drbd_free_ee(mdev, e);
329 return 1;
332 if (likely(drbd_bio_uptodate(e->private_bio))) {
333 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
334 digest = kmalloc(digest_size, GFP_NOIO);
335 if (digest) {
336 drbd_csum(mdev, mdev->csums_tfm, e->private_bio, digest);
338 inc_rs_pending(mdev);
339 ok = drbd_send_drequest_csum(mdev,
340 e->sector,
341 e->size,
342 digest,
343 digest_size,
344 P_CSUM_RS_REQUEST);
345 kfree(digest);
346 } else {
347 dev_err(DEV, "kmalloc() of digest failed.\n");
348 ok = 0;
350 } else
351 ok = 1;
353 drbd_free_ee(mdev, e);
355 if (unlikely(!ok))
356 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
357 return ok;
360 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
362 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
364 struct drbd_epoch_entry *e;
366 if (!get_ldev(mdev))
367 return 0;
369 /* GFP_TRY, because if there is no memory available right now, this may
370 * be rescheduled for later. It is "only" background resync, after all. */
371 e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
372 if (!e) {
373 put_ldev(mdev);
374 return 2;
377 spin_lock_irq(&mdev->req_lock);
378 list_add(&e->w.list, &mdev->read_ee);
379 spin_unlock_irq(&mdev->req_lock);
381 e->private_bio->bi_end_io = drbd_endio_read_sec;
382 e->private_bio->bi_rw = READ;
383 e->w.cb = w_e_send_csum;
385 mdev->read_cnt += size >> 9;
386 drbd_generic_make_request(mdev, DRBD_FAULT_RS_RD, e->private_bio);
388 return 1;
391 void resync_timer_fn(unsigned long data)
393 unsigned long flags;
394 struct drbd_conf *mdev = (struct drbd_conf *) data;
395 int queue;
397 spin_lock_irqsave(&mdev->req_lock, flags);
399 if (likely(!test_and_clear_bit(STOP_SYNC_TIMER, &mdev->flags))) {
400 queue = 1;
401 if (mdev->state.conn == C_VERIFY_S)
402 mdev->resync_work.cb = w_make_ov_request;
403 else
404 mdev->resync_work.cb = w_make_resync_request;
405 } else {
406 queue = 0;
407 mdev->resync_work.cb = w_resync_inactive;
410 spin_unlock_irqrestore(&mdev->req_lock, flags);
412 /* harmless race: list_empty outside data.work.q_lock */
413 if (list_empty(&mdev->resync_work.list) && queue)
414 drbd_queue_work(&mdev->data.work, &mdev->resync_work);
417 int w_make_resync_request(struct drbd_conf *mdev,
418 struct drbd_work *w, int cancel)
420 unsigned long bit;
421 sector_t sector;
422 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
423 int max_segment_size = queue_max_segment_size(mdev->rq_queue);
424 int number, i, size, pe, mx;
425 int align, queued, sndbuf;
427 if (unlikely(cancel))
428 return 1;
430 if (unlikely(mdev->state.conn < C_CONNECTED)) {
431 dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
432 return 0;
435 if (mdev->state.conn != C_SYNC_TARGET)
436 dev_err(DEV, "%s in w_make_resync_request\n",
437 drbd_conn_str(mdev->state.conn));
439 if (!get_ldev(mdev)) {
440 /* Since we only need to access mdev->rsync a
441 get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
442 to continue resync with a broken disk makes no sense at
443 all */
444 dev_err(DEV, "Disk broke down during resync!\n");
445 mdev->resync_work.cb = w_resync_inactive;
446 return 1;
449 number = SLEEP_TIME * mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
450 pe = atomic_read(&mdev->rs_pending_cnt);
452 mutex_lock(&mdev->data.mutex);
453 if (mdev->data.socket)
454 mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req);
455 else
456 mx = 1;
457 mutex_unlock(&mdev->data.mutex);
459 /* For resync rates >160MB/sec, allow more pending RS requests */
460 if (number > mx)
461 mx = number;
463 /* Limit the number of pending RS requests to no more than the peer's receive buffer */
464 if ((pe + number) > mx) {
465 number = mx - pe;
468 for (i = 0; i < number; i++) {
469 /* Stop generating RS requests, when half of the send buffer is filled */
470 mutex_lock(&mdev->data.mutex);
471 if (mdev->data.socket) {
472 queued = mdev->data.socket->sk->sk_wmem_queued;
473 sndbuf = mdev->data.socket->sk->sk_sndbuf;
474 } else {
475 queued = 1;
476 sndbuf = 0;
478 mutex_unlock(&mdev->data.mutex);
479 if (queued > sndbuf / 2)
480 goto requeue;
482 next_sector:
483 size = BM_BLOCK_SIZE;
484 bit = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
486 if (bit == -1UL) {
487 mdev->bm_resync_fo = drbd_bm_bits(mdev);
488 mdev->resync_work.cb = w_resync_inactive;
489 put_ldev(mdev);
490 return 1;
493 sector = BM_BIT_TO_SECT(bit);
495 if (drbd_try_rs_begin_io(mdev, sector)) {
496 mdev->bm_resync_fo = bit;
497 goto requeue;
499 mdev->bm_resync_fo = bit + 1;
501 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
502 drbd_rs_complete_io(mdev, sector);
503 goto next_sector;
506 #if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE
507 /* try to find some adjacent bits.
508 * we stop if we have already the maximum req size.
510 * Additionally always align bigger requests, in order to
511 * be prepared for all stripe sizes of software RAIDs.
513 * we _do_ care about the agreed-upon q->max_segment_size
514 * here, as splitting up the requests on the other side is more
515 * difficult. the consequence is, that on lvm and md and other
516 * "indirect" devices, this is dead code, since
517 * q->max_segment_size will be PAGE_SIZE.
519 align = 1;
520 for (;;) {
521 if (size + BM_BLOCK_SIZE > max_segment_size)
522 break;
524 /* Be always aligned */
525 if (sector & ((1<<(align+3))-1))
526 break;
528 /* do not cross extent boundaries */
529 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
530 break;
531 /* now, is it actually dirty, after all?
532 * caution, drbd_bm_test_bit is tri-state for some
533 * obscure reason; ( b == 0 ) would get the out-of-band
534 * only accidentally right because of the "oddly sized"
535 * adjustment below */
536 if (drbd_bm_test_bit(mdev, bit+1) != 1)
537 break;
538 bit++;
539 size += BM_BLOCK_SIZE;
540 if ((BM_BLOCK_SIZE << align) <= size)
541 align++;
542 i++;
544 /* if we merged some,
545 * reset the offset to start the next drbd_bm_find_next from */
546 if (size > BM_BLOCK_SIZE)
547 mdev->bm_resync_fo = bit + 1;
548 #endif
550 /* adjust very last sectors, in case we are oddly sized */
551 if (sector + (size>>9) > capacity)
552 size = (capacity-sector)<<9;
553 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
554 switch (read_for_csum(mdev, sector, size)) {
555 case 0: /* Disk failure*/
556 put_ldev(mdev);
557 return 0;
558 case 2: /* Allocation failed */
559 drbd_rs_complete_io(mdev, sector);
560 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
561 goto requeue;
562 /* case 1: everything ok */
564 } else {
565 inc_rs_pending(mdev);
566 if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
567 sector, size, ID_SYNCER)) {
568 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
569 dec_rs_pending(mdev);
570 put_ldev(mdev);
571 return 0;
576 if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
577 /* last syncer _request_ was sent,
578 * but the P_RS_DATA_REPLY not yet received. sync will end (and
579 * next sync group will resume), as soon as we receive the last
580 * resync data block, and the last bit is cleared.
581 * until then resync "work" is "inactive" ...
583 mdev->resync_work.cb = w_resync_inactive;
584 put_ldev(mdev);
585 return 1;
588 requeue:
589 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
590 put_ldev(mdev);
591 return 1;
594 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
596 int number, i, size;
597 sector_t sector;
598 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
600 if (unlikely(cancel))
601 return 1;
603 if (unlikely(mdev->state.conn < C_CONNECTED)) {
604 dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
605 return 0;
608 number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
609 if (atomic_read(&mdev->rs_pending_cnt) > number)
610 goto requeue;
612 number -= atomic_read(&mdev->rs_pending_cnt);
614 sector = mdev->ov_position;
615 for (i = 0; i < number; i++) {
616 if (sector >= capacity) {
617 mdev->resync_work.cb = w_resync_inactive;
618 return 1;
621 size = BM_BLOCK_SIZE;
623 if (drbd_try_rs_begin_io(mdev, sector)) {
624 mdev->ov_position = sector;
625 goto requeue;
628 if (sector + (size>>9) > capacity)
629 size = (capacity-sector)<<9;
631 inc_rs_pending(mdev);
632 if (!drbd_send_ov_request(mdev, sector, size)) {
633 dec_rs_pending(mdev);
634 return 0;
636 sector += BM_SECT_PER_BIT;
638 mdev->ov_position = sector;
640 requeue:
641 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
642 return 1;
646 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
648 kfree(w);
649 ov_oos_print(mdev);
650 drbd_resync_finished(mdev);
652 return 1;
655 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
657 kfree(w);
659 drbd_resync_finished(mdev);
661 return 1;
664 int drbd_resync_finished(struct drbd_conf *mdev)
666 unsigned long db, dt, dbdt;
667 unsigned long n_oos;
668 union drbd_state os, ns;
669 struct drbd_work *w;
670 char *khelper_cmd = NULL;
672 /* Remove all elements from the resync LRU. Since future actions
673 * might set bits in the (main) bitmap, then the entries in the
674 * resync LRU would be wrong. */
675 if (drbd_rs_del_all(mdev)) {
676 /* In case this is not possible now, most probably because
677 * there are P_RS_DATA_REPLY Packets lingering on the worker's
678 * queue (or even the read operations for those packets
679 * is not finished by now). Retry in 100ms. */
681 drbd_kick_lo(mdev);
682 __set_current_state(TASK_INTERRUPTIBLE);
683 schedule_timeout(HZ / 10);
684 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
685 if (w) {
686 w->cb = w_resync_finished;
687 drbd_queue_work(&mdev->data.work, w);
688 return 1;
690 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
693 dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
694 if (dt <= 0)
695 dt = 1;
696 db = mdev->rs_total;
697 dbdt = Bit2KB(db/dt);
698 mdev->rs_paused /= HZ;
700 if (!get_ldev(mdev))
701 goto out;
703 spin_lock_irq(&mdev->req_lock);
704 os = mdev->state;
706 /* This protects us against multiple calls (that can happen in the presence
707 of application IO), and against connectivity loss just before we arrive here. */
708 if (os.conn <= C_CONNECTED)
709 goto out_unlock;
711 ns = os;
712 ns.conn = C_CONNECTED;
714 dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
715 (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ?
716 "Online verify " : "Resync",
717 dt + mdev->rs_paused, mdev->rs_paused, dbdt);
719 n_oos = drbd_bm_total_weight(mdev);
721 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
722 if (n_oos) {
723 dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
724 n_oos, Bit2KB(1));
725 khelper_cmd = "out-of-sync";
727 } else {
728 D_ASSERT((n_oos - mdev->rs_failed) == 0);
730 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
731 khelper_cmd = "after-resync-target";
733 if (mdev->csums_tfm && mdev->rs_total) {
734 const unsigned long s = mdev->rs_same_csum;
735 const unsigned long t = mdev->rs_total;
736 const int ratio =
737 (t == 0) ? 0 :
738 (t < 100000) ? ((s*100)/t) : (s/(t/100));
739 dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
740 "transferred %luK total %luK\n",
741 ratio,
742 Bit2KB(mdev->rs_same_csum),
743 Bit2KB(mdev->rs_total - mdev->rs_same_csum),
744 Bit2KB(mdev->rs_total));
748 if (mdev->rs_failed) {
749 dev_info(DEV, " %lu failed blocks\n", mdev->rs_failed);
751 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
752 ns.disk = D_INCONSISTENT;
753 ns.pdsk = D_UP_TO_DATE;
754 } else {
755 ns.disk = D_UP_TO_DATE;
756 ns.pdsk = D_INCONSISTENT;
758 } else {
759 ns.disk = D_UP_TO_DATE;
760 ns.pdsk = D_UP_TO_DATE;
762 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
763 if (mdev->p_uuid) {
764 int i;
765 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
766 _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
767 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
768 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
769 } else {
770 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
774 drbd_uuid_set_bm(mdev, 0UL);
776 if (mdev->p_uuid) {
777 /* Now the two UUID sets are equal, update what we
778 * know of the peer. */
779 int i;
780 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
781 mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
785 _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
786 out_unlock:
787 spin_unlock_irq(&mdev->req_lock);
788 put_ldev(mdev);
789 out:
790 mdev->rs_total = 0;
791 mdev->rs_failed = 0;
792 mdev->rs_paused = 0;
793 mdev->ov_start_sector = 0;
795 if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
796 dev_warn(DEV, "Writing the whole bitmap, due to failed kmalloc\n");
797 drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
800 if (khelper_cmd)
801 drbd_khelper(mdev, khelper_cmd);
803 return 1;
806 /* helper */
807 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
809 if (drbd_bio_has_active_page(e->private_bio)) {
810 /* This might happen if sendpage() has not finished */
811 spin_lock_irq(&mdev->req_lock);
812 list_add_tail(&e->w.list, &mdev->net_ee);
813 spin_unlock_irq(&mdev->req_lock);
814 } else
815 drbd_free_ee(mdev, e);
819 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
820 * @mdev: DRBD device.
821 * @w: work object.
822 * @cancel: The connection will be closed anyways
824 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
826 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
827 int ok;
829 if (unlikely(cancel)) {
830 drbd_free_ee(mdev, e);
831 dec_unacked(mdev);
832 return 1;
835 if (likely(drbd_bio_uptodate(e->private_bio))) {
836 ok = drbd_send_block(mdev, P_DATA_REPLY, e);
837 } else {
838 if (__ratelimit(&drbd_ratelimit_state))
839 dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
840 (unsigned long long)e->sector);
842 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
845 dec_unacked(mdev);
847 move_to_net_ee_or_free(mdev, e);
849 if (unlikely(!ok))
850 dev_err(DEV, "drbd_send_block() failed\n");
851 return ok;
855 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
856 * @mdev: DRBD device.
857 * @w: work object.
858 * @cancel: The connection will be closed anyways
860 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
862 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
863 int ok;
865 if (unlikely(cancel)) {
866 drbd_free_ee(mdev, e);
867 dec_unacked(mdev);
868 return 1;
871 if (get_ldev_if_state(mdev, D_FAILED)) {
872 drbd_rs_complete_io(mdev, e->sector);
873 put_ldev(mdev);
876 if (likely(drbd_bio_uptodate(e->private_bio))) {
877 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
878 inc_rs_pending(mdev);
879 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
880 } else {
881 if (__ratelimit(&drbd_ratelimit_state))
882 dev_err(DEV, "Not sending RSDataReply, "
883 "partner DISKLESS!\n");
884 ok = 1;
886 } else {
887 if (__ratelimit(&drbd_ratelimit_state))
888 dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
889 (unsigned long long)e->sector);
891 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
893 /* update resync data with failure */
894 drbd_rs_failed_io(mdev, e->sector, e->size);
897 dec_unacked(mdev);
899 move_to_net_ee_or_free(mdev, e);
901 if (unlikely(!ok))
902 dev_err(DEV, "drbd_send_block() failed\n");
903 return ok;
906 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
908 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
909 struct digest_info *di;
910 int digest_size;
911 void *digest = NULL;
912 int ok, eq = 0;
914 if (unlikely(cancel)) {
915 drbd_free_ee(mdev, e);
916 dec_unacked(mdev);
917 return 1;
920 drbd_rs_complete_io(mdev, e->sector);
922 di = (struct digest_info *)(unsigned long)e->block_id;
924 if (likely(drbd_bio_uptodate(e->private_bio))) {
925 /* quick hack to try to avoid a race against reconfiguration.
926 * a real fix would be much more involved,
927 * introducing more locking mechanisms */
928 if (mdev->csums_tfm) {
929 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
930 D_ASSERT(digest_size == di->digest_size);
931 digest = kmalloc(digest_size, GFP_NOIO);
933 if (digest) {
934 drbd_csum(mdev, mdev->csums_tfm, e->private_bio, digest);
935 eq = !memcmp(digest, di->digest, digest_size);
936 kfree(digest);
939 if (eq) {
940 drbd_set_in_sync(mdev, e->sector, e->size);
941 mdev->rs_same_csum++;
942 ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
943 } else {
944 inc_rs_pending(mdev);
945 e->block_id = ID_SYNCER;
946 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
948 } else {
949 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
950 if (__ratelimit(&drbd_ratelimit_state))
951 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
954 dec_unacked(mdev);
956 kfree(di);
958 move_to_net_ee_or_free(mdev, e);
960 if (unlikely(!ok))
961 dev_err(DEV, "drbd_send_block/ack() failed\n");
962 return ok;
965 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
967 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
968 int digest_size;
969 void *digest;
970 int ok = 1;
972 if (unlikely(cancel))
973 goto out;
975 if (unlikely(!drbd_bio_uptodate(e->private_bio)))
976 goto out;
978 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
979 /* FIXME if this allocation fails, online verify will not terminate! */
980 digest = kmalloc(digest_size, GFP_NOIO);
981 if (digest) {
982 drbd_csum(mdev, mdev->verify_tfm, e->private_bio, digest);
983 inc_rs_pending(mdev);
984 ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
985 digest, digest_size, P_OV_REPLY);
986 if (!ok)
987 dec_rs_pending(mdev);
988 kfree(digest);
991 out:
992 drbd_free_ee(mdev, e);
994 dec_unacked(mdev);
996 return ok;
999 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1001 if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1002 mdev->ov_last_oos_size += size>>9;
1003 } else {
1004 mdev->ov_last_oos_start = sector;
1005 mdev->ov_last_oos_size = size>>9;
1007 drbd_set_out_of_sync(mdev, sector, size);
1008 set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1011 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1013 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1014 struct digest_info *di;
1015 int digest_size;
1016 void *digest;
1017 int ok, eq = 0;
1019 if (unlikely(cancel)) {
1020 drbd_free_ee(mdev, e);
1021 dec_unacked(mdev);
1022 return 1;
1025 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1026 * the resync lru has been cleaned up already */
1027 drbd_rs_complete_io(mdev, e->sector);
1029 di = (struct digest_info *)(unsigned long)e->block_id;
1031 if (likely(drbd_bio_uptodate(e->private_bio))) {
1032 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1033 digest = kmalloc(digest_size, GFP_NOIO);
1034 if (digest) {
1035 drbd_csum(mdev, mdev->verify_tfm, e->private_bio, digest);
1037 D_ASSERT(digest_size == di->digest_size);
1038 eq = !memcmp(digest, di->digest, digest_size);
1039 kfree(digest);
1041 } else {
1042 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1043 if (__ratelimit(&drbd_ratelimit_state))
1044 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1047 dec_unacked(mdev);
1049 kfree(di);
1051 if (!eq)
1052 drbd_ov_oos_found(mdev, e->sector, e->size);
1053 else
1054 ov_oos_print(mdev);
1056 ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1057 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1059 drbd_free_ee(mdev, e);
1061 if (--mdev->ov_left == 0) {
1062 ov_oos_print(mdev);
1063 drbd_resync_finished(mdev);
1066 return ok;
1069 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1071 struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1072 complete(&b->done);
1073 return 1;
1076 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1078 struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1079 struct p_barrier *p = &mdev->data.sbuf.barrier;
1080 int ok = 1;
1082 /* really avoid racing with tl_clear. w.cb may have been referenced
1083 * just before it was reassigned and re-queued, so double check that.
1084 * actually, this race was harmless, since we only try to send the
1085 * barrier packet here, and otherwise do nothing with the object.
1086 * but compare with the head of w_clear_epoch */
1087 spin_lock_irq(&mdev->req_lock);
1088 if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1089 cancel = 1;
1090 spin_unlock_irq(&mdev->req_lock);
1091 if (cancel)
1092 return 1;
1094 if (!drbd_get_data_sock(mdev))
1095 return 0;
1096 p->barrier = b->br_number;
1097 /* inc_ap_pending was done where this was queued.
1098 * dec_ap_pending will be done in got_BarrierAck
1099 * or (on connection loss) in w_clear_epoch. */
1100 ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1101 (struct p_header *)p, sizeof(*p), 0);
1102 drbd_put_data_sock(mdev);
1104 return ok;
1107 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1109 if (cancel)
1110 return 1;
1111 return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1115 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1116 * @mdev: DRBD device.
1117 * @w: work object.
1118 * @cancel: The connection will be closed anyways
1120 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1122 struct drbd_request *req = container_of(w, struct drbd_request, w);
1123 int ok;
1125 if (unlikely(cancel)) {
1126 req_mod(req, send_canceled);
1127 return 1;
1130 ok = drbd_send_dblock(mdev, req);
1131 req_mod(req, ok ? handed_over_to_network : send_failed);
1133 return ok;
1137 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1138 * @mdev: DRBD device.
1139 * @w: work object.
1140 * @cancel: The connection will be closed anyways
1142 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1144 struct drbd_request *req = container_of(w, struct drbd_request, w);
1145 int ok;
1147 if (unlikely(cancel)) {
1148 req_mod(req, send_canceled);
1149 return 1;
1152 ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1153 (unsigned long)req);
1155 if (!ok) {
1156 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1157 * so this is probably redundant */
1158 if (mdev->state.conn >= C_CONNECTED)
1159 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1161 req_mod(req, ok ? handed_over_to_network : send_failed);
1163 return ok;
1166 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1168 struct drbd_conf *odev = mdev;
1170 while (1) {
1171 if (odev->sync_conf.after == -1)
1172 return 1;
1173 odev = minor_to_mdev(odev->sync_conf.after);
1174 ERR_IF(!odev) return 1;
1175 if ((odev->state.conn >= C_SYNC_SOURCE &&
1176 odev->state.conn <= C_PAUSED_SYNC_T) ||
1177 odev->state.aftr_isp || odev->state.peer_isp ||
1178 odev->state.user_isp)
1179 return 0;
1184 * _drbd_pause_after() - Pause resync on all devices that may not resync now
1185 * @mdev: DRBD device.
1187 * Called from process context only (admin command and after_state_ch).
1189 static int _drbd_pause_after(struct drbd_conf *mdev)
1191 struct drbd_conf *odev;
1192 int i, rv = 0;
1194 for (i = 0; i < minor_count; i++) {
1195 odev = minor_to_mdev(i);
1196 if (!odev)
1197 continue;
1198 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1199 continue;
1200 if (!_drbd_may_sync_now(odev))
1201 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1202 != SS_NOTHING_TO_DO);
1205 return rv;
1209 * _drbd_resume_next() - Resume resync on all devices that may resync now
1210 * @mdev: DRBD device.
1212 * Called from process context only (admin command and worker).
1214 static int _drbd_resume_next(struct drbd_conf *mdev)
1216 struct drbd_conf *odev;
1217 int i, rv = 0;
1219 for (i = 0; i < minor_count; i++) {
1220 odev = minor_to_mdev(i);
1221 if (!odev)
1222 continue;
1223 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1224 continue;
1225 if (odev->state.aftr_isp) {
1226 if (_drbd_may_sync_now(odev))
1227 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1228 CS_HARD, NULL)
1229 != SS_NOTHING_TO_DO) ;
1232 return rv;
1235 void resume_next_sg(struct drbd_conf *mdev)
1237 write_lock_irq(&global_state_lock);
1238 _drbd_resume_next(mdev);
1239 write_unlock_irq(&global_state_lock);
1242 void suspend_other_sg(struct drbd_conf *mdev)
1244 write_lock_irq(&global_state_lock);
1245 _drbd_pause_after(mdev);
1246 write_unlock_irq(&global_state_lock);
1249 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1251 struct drbd_conf *odev;
1253 if (o_minor == -1)
1254 return NO_ERROR;
1255 if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1256 return ERR_SYNC_AFTER;
1258 /* check for loops */
1259 odev = minor_to_mdev(o_minor);
1260 while (1) {
1261 if (odev == mdev)
1262 return ERR_SYNC_AFTER_CYCLE;
1264 /* dependency chain ends here, no cycles. */
1265 if (odev->sync_conf.after == -1)
1266 return NO_ERROR;
1268 /* follow the dependency chain */
1269 odev = minor_to_mdev(odev->sync_conf.after);
1273 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1275 int changes;
1276 int retcode;
1278 write_lock_irq(&global_state_lock);
1279 retcode = sync_after_error(mdev, na);
1280 if (retcode == NO_ERROR) {
1281 mdev->sync_conf.after = na;
1282 do {
1283 changes = _drbd_pause_after(mdev);
1284 changes |= _drbd_resume_next(mdev);
1285 } while (changes);
1287 write_unlock_irq(&global_state_lock);
1288 return retcode;
1292 * drbd_start_resync() - Start the resync process
1293 * @mdev: DRBD device.
1294 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET
1296 * This function might bring you directly into one of the
1297 * C_PAUSED_SYNC_* states.
1299 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1301 union drbd_state ns;
1302 int r;
1304 if (mdev->state.conn >= C_SYNC_SOURCE) {
1305 dev_err(DEV, "Resync already running!\n");
1306 return;
1309 /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1310 drbd_rs_cancel_all(mdev);
1312 if (side == C_SYNC_TARGET) {
1313 /* Since application IO was locked out during C_WF_BITMAP_T and
1314 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1315 we check that we might make the data inconsistent. */
1316 r = drbd_khelper(mdev, "before-resync-target");
1317 r = (r >> 8) & 0xff;
1318 if (r > 0) {
1319 dev_info(DEV, "before-resync-target handler returned %d, "
1320 "dropping connection.\n", r);
1321 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1322 return;
1326 drbd_state_lock(mdev);
1328 if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1329 drbd_state_unlock(mdev);
1330 return;
1333 if (side == C_SYNC_TARGET) {
1334 mdev->bm_resync_fo = 0;
1335 } else /* side == C_SYNC_SOURCE */ {
1336 u64 uuid;
1338 get_random_bytes(&uuid, sizeof(u64));
1339 drbd_uuid_set(mdev, UI_BITMAP, uuid);
1340 drbd_send_sync_uuid(mdev, uuid);
1342 D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1345 write_lock_irq(&global_state_lock);
1346 ns = mdev->state;
1348 ns.aftr_isp = !_drbd_may_sync_now(mdev);
1350 ns.conn = side;
1352 if (side == C_SYNC_TARGET)
1353 ns.disk = D_INCONSISTENT;
1354 else /* side == C_SYNC_SOURCE */
1355 ns.pdsk = D_INCONSISTENT;
1357 r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1358 ns = mdev->state;
1360 if (ns.conn < C_CONNECTED)
1361 r = SS_UNKNOWN_ERROR;
1363 if (r == SS_SUCCESS) {
1364 mdev->rs_total =
1365 mdev->rs_mark_left = drbd_bm_total_weight(mdev);
1366 mdev->rs_failed = 0;
1367 mdev->rs_paused = 0;
1368 mdev->rs_start =
1369 mdev->rs_mark_time = jiffies;
1370 mdev->rs_same_csum = 0;
1371 _drbd_pause_after(mdev);
1373 write_unlock_irq(&global_state_lock);
1374 drbd_state_unlock(mdev);
1375 put_ldev(mdev);
1377 if (r == SS_SUCCESS) {
1378 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1379 drbd_conn_str(ns.conn),
1380 (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1381 (unsigned long) mdev->rs_total);
1383 if (mdev->rs_total == 0) {
1384 /* Peer still reachable? Beware of failing before-resync-target handlers! */
1385 request_ping(mdev);
1386 __set_current_state(TASK_INTERRUPTIBLE);
1387 schedule_timeout(mdev->net_conf->ping_timeo*HZ/9); /* 9 instead 10 */
1388 drbd_resync_finished(mdev);
1389 return;
1392 /* ns.conn may already be != mdev->state.conn,
1393 * we may have been paused in between, or become paused until
1394 * the timer triggers.
1395 * No matter, that is handled in resync_timer_fn() */
1396 if (ns.conn == C_SYNC_TARGET)
1397 mod_timer(&mdev->resync_timer, jiffies);
1399 drbd_md_sync(mdev);
1403 int drbd_worker(struct drbd_thread *thi)
1405 struct drbd_conf *mdev = thi->mdev;
1406 struct drbd_work *w = NULL;
1407 LIST_HEAD(work_list);
1408 int intr = 0, i;
1410 sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1412 while (get_t_state(thi) == Running) {
1413 drbd_thread_current_set_cpu(mdev);
1415 if (down_trylock(&mdev->data.work.s)) {
1416 mutex_lock(&mdev->data.mutex);
1417 if (mdev->data.socket && !mdev->net_conf->no_cork)
1418 drbd_tcp_uncork(mdev->data.socket);
1419 mutex_unlock(&mdev->data.mutex);
1421 intr = down_interruptible(&mdev->data.work.s);
1423 mutex_lock(&mdev->data.mutex);
1424 if (mdev->data.socket && !mdev->net_conf->no_cork)
1425 drbd_tcp_cork(mdev->data.socket);
1426 mutex_unlock(&mdev->data.mutex);
1429 if (intr) {
1430 D_ASSERT(intr == -EINTR);
1431 flush_signals(current);
1432 ERR_IF (get_t_state(thi) == Running)
1433 continue;
1434 break;
1437 if (get_t_state(thi) != Running)
1438 break;
1439 /* With this break, we have done a down() but not consumed
1440 the entry from the list. The cleanup code takes care of
1441 this... */
1443 w = NULL;
1444 spin_lock_irq(&mdev->data.work.q_lock);
1445 ERR_IF(list_empty(&mdev->data.work.q)) {
1446 /* something terribly wrong in our logic.
1447 * we were able to down() the semaphore,
1448 * but the list is empty... doh.
1450 * what is the best thing to do now?
1451 * try again from scratch, restarting the receiver,
1452 * asender, whatnot? could break even more ugly,
1453 * e.g. when we are primary, but no good local data.
1455 * I'll try to get away just starting over this loop.
1457 spin_unlock_irq(&mdev->data.work.q_lock);
1458 continue;
1460 w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1461 list_del_init(&w->list);
1462 spin_unlock_irq(&mdev->data.work.q_lock);
1464 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1465 /* dev_warn(DEV, "worker: a callback failed! \n"); */
1466 if (mdev->state.conn >= C_CONNECTED)
1467 drbd_force_state(mdev,
1468 NS(conn, C_NETWORK_FAILURE));
1471 D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1472 D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1474 spin_lock_irq(&mdev->data.work.q_lock);
1475 i = 0;
1476 while (!list_empty(&mdev->data.work.q)) {
1477 list_splice_init(&mdev->data.work.q, &work_list);
1478 spin_unlock_irq(&mdev->data.work.q_lock);
1480 while (!list_empty(&work_list)) {
1481 w = list_entry(work_list.next, struct drbd_work, list);
1482 list_del_init(&w->list);
1483 w->cb(mdev, w, 1);
1484 i++; /* dead debugging code */
1487 spin_lock_irq(&mdev->data.work.q_lock);
1489 sema_init(&mdev->data.work.s, 0);
1490 /* DANGEROUS race: if someone did queue his work within the spinlock,
1491 * but up() ed outside the spinlock, we could get an up() on the
1492 * semaphore without corresponding list entry.
1493 * So don't do that.
1495 spin_unlock_irq(&mdev->data.work.q_lock);
1497 D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1498 /* _drbd_set_state only uses stop_nowait.
1499 * wait here for the Exiting receiver. */
1500 drbd_thread_stop(&mdev->receiver);
1501 drbd_mdev_cleanup(mdev);
1503 dev_info(DEV, "worker terminated\n");
1505 clear_bit(DEVICE_DYING, &mdev->flags);
1506 clear_bit(CONFIG_PENDING, &mdev->flags);
1507 wake_up(&mdev->state_wait);
1509 return 0;