On Tue, Nov 06, 2007 at 02:33:53AM -0800, akpm@linux-foundation.org wrote:
[mmotm.git] / drivers / block / drbd / drbd_worker.c
blobed8796f1112d933004a6feea740765d3251cce65
1 /*
2 drbd_worker.c
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 #include <linux/module.h>
27 #include <linux/version.h>
28 #include <linux/drbd.h>
29 #include <linux/sched.h>
30 #include <linux/smp_lock.h>
31 #include <linux/wait.h>
32 #include <linux/mm.h>
33 #include <linux/memcontrol.h>
34 #include <linux/mm_inline.h>
35 #include <linux/slab.h>
36 #include <linux/random.h>
37 #include <linux/mm.h>
38 #include <linux/string.h>
39 #include <linux/scatterlist.h>
41 #include "drbd_int.h"
42 #include "drbd_req.h"
44 #define SLEEP_TIME (HZ/10)
46 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
50 /* defined here:
51 drbd_md_io_complete
52 drbd_endio_write_sec
53 drbd_endio_read_sec
54 drbd_endio_pri
56 * more endio handlers:
57 atodb_endio in drbd_actlog.c
58 drbd_bm_async_io_complete in drbd_bitmap.c
60 * For all these callbacks, note the following:
61 * The callbacks will be called in irq context by the IDE drivers,
62 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
63 * Try to get the locking right :)
68 /* About the global_state_lock
69 Each state transition on an device holds a read lock. In case we have
70 to evaluate the sync after dependencies, we grab a write lock, because
71 we need stable states on all devices for that. */
72 rwlock_t global_state_lock;
74 /* used for synchronous meta data and bitmap IO
75 * submitted by drbd_md_sync_page_io()
77 void drbd_md_io_complete(struct bio *bio, int error)
79 struct drbd_md_io *md_io;
81 md_io = (struct drbd_md_io *)bio->bi_private;
82 md_io->error = error;
84 complete(&md_io->event);
87 /* reads on behalf of the partner,
88 * "submitted" by the receiver
90 void drbd_endio_read_sec(struct bio *bio, int error) __releases(local)
92 unsigned long flags = 0;
93 struct drbd_epoch_entry *e = NULL;
94 struct drbd_conf *mdev;
95 int uptodate = bio_flagged(bio, BIO_UPTODATE);
97 e = bio->bi_private;
98 mdev = e->mdev;
100 if (error)
101 dev_warn(DEV, "read: error=%d s=%llus\n", error,
102 (unsigned long long)e->sector);
103 if (!error && !uptodate) {
104 dev_warn(DEV, "read: setting error to -EIO s=%llus\n",
105 (unsigned long long)e->sector);
106 /* strange behavior of some lower level drivers...
107 * fail the request by clearing the uptodate flag,
108 * but do not return any error?! */
109 error = -EIO;
112 D_ASSERT(e->block_id != ID_VACANT);
114 spin_lock_irqsave(&mdev->req_lock, flags);
115 mdev->read_cnt += e->size >> 9;
116 list_del(&e->w.list);
117 if (list_empty(&mdev->read_ee))
118 wake_up(&mdev->ee_wait);
119 spin_unlock_irqrestore(&mdev->req_lock, flags);
121 drbd_chk_io_error(mdev, error, FALSE);
122 drbd_queue_work(&mdev->data.work, &e->w);
123 put_ldev(mdev);
126 /* writes on behalf of the partner, or resync writes,
127 * "submitted" by the receiver.
129 void drbd_endio_write_sec(struct bio *bio, int error) __releases(local)
131 unsigned long flags = 0;
132 struct drbd_epoch_entry *e = NULL;
133 struct drbd_conf *mdev;
134 sector_t e_sector;
135 int do_wake;
136 int is_syncer_req;
137 int do_al_complete_io;
138 int uptodate = bio_flagged(bio, BIO_UPTODATE);
139 int is_barrier = bio_rw_flagged(bio, BIO_RW_BARRIER);
141 e = bio->bi_private;
142 mdev = e->mdev;
144 if (error)
145 dev_warn(DEV, "write: error=%d s=%llus\n", error,
146 (unsigned long long)e->sector);
147 if (!error && !uptodate) {
148 dev_warn(DEV, "write: setting error to -EIO s=%llus\n",
149 (unsigned long long)e->sector);
150 /* strange behavior of some lower level drivers...
151 * fail the request by clearing the uptodate flag,
152 * but do not return any error?! */
153 error = -EIO;
156 /* error == -ENOTSUPP would be a better test,
157 * alas it is not reliable */
158 if (error && is_barrier && e->flags & EE_IS_BARRIER) {
159 drbd_bump_write_ordering(mdev, WO_bdev_flush);
160 spin_lock_irqsave(&mdev->req_lock, flags);
161 list_del(&e->w.list);
162 e->w.cb = w_e_reissue;
163 /* put_ldev actually happens below, once we come here again. */
164 __release(local);
165 spin_unlock_irqrestore(&mdev->req_lock, flags);
166 drbd_queue_work(&mdev->data.work, &e->w);
167 return;
170 D_ASSERT(e->block_id != ID_VACANT);
172 spin_lock_irqsave(&mdev->req_lock, flags);
173 mdev->writ_cnt += e->size >> 9;
174 is_syncer_req = is_syncer_block_id(e->block_id);
176 /* after we moved e to done_ee,
177 * we may no longer access it,
178 * it may be freed/reused already!
179 * (as soon as we release the req_lock) */
180 e_sector = e->sector;
181 do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
183 list_del(&e->w.list); /* has been on active_ee or sync_ee */
184 list_add_tail(&e->w.list, &mdev->done_ee);
186 /* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
187 * neither did we wake possibly waiting conflicting requests.
188 * done from "drbd_process_done_ee" within the appropriate w.cb
189 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
191 do_wake = is_syncer_req
192 ? list_empty(&mdev->sync_ee)
193 : list_empty(&mdev->active_ee);
195 if (error)
196 __drbd_chk_io_error(mdev, FALSE);
197 spin_unlock_irqrestore(&mdev->req_lock, flags);
199 if (is_syncer_req)
200 drbd_rs_complete_io(mdev, e_sector);
202 if (do_wake)
203 wake_up(&mdev->ee_wait);
205 if (do_al_complete_io)
206 drbd_al_complete_io(mdev, e_sector);
208 wake_asender(mdev);
209 put_ldev(mdev);
213 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
215 void drbd_endio_pri(struct bio *bio, int error)
217 unsigned long flags;
218 struct drbd_request *req = bio->bi_private;
219 struct drbd_conf *mdev = req->mdev;
220 struct bio_and_error m;
221 enum drbd_req_event what;
222 int uptodate = bio_flagged(bio, BIO_UPTODATE);
224 if (error)
225 dev_warn(DEV, "p %s: error=%d\n",
226 bio_data_dir(bio) == WRITE ? "write" : "read", error);
227 if (!error && !uptodate) {
228 dev_warn(DEV, "p %s: setting error to -EIO\n",
229 bio_data_dir(bio) == WRITE ? "write" : "read");
230 /* strange behavior of some lower level drivers...
231 * fail the request by clearing the uptodate flag,
232 * but do not return any error?! */
233 error = -EIO;
236 /* to avoid recursion in __req_mod */
237 if (unlikely(error)) {
238 what = (bio_data_dir(bio) == WRITE)
239 ? write_completed_with_error
240 : (bio_rw(bio) == READA)
241 ? read_completed_with_error
242 : read_ahead_completed_with_error;
243 } else
244 what = completed_ok;
246 bio_put(req->private_bio);
247 req->private_bio = ERR_PTR(error);
249 spin_lock_irqsave(&mdev->req_lock, flags);
250 __req_mod(req, what, &m);
251 spin_unlock_irqrestore(&mdev->req_lock, flags);
253 if (m.bio)
254 complete_master_bio(mdev, &m);
257 int w_io_error(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
259 struct drbd_request *req = container_of(w, struct drbd_request, w);
261 /* NOTE: mdev->ldev can be NULL by the time we get here! */
262 /* D_ASSERT(mdev->ldev->dc.on_io_error != EP_PASS_ON); */
264 /* the only way this callback is scheduled is from _req_may_be_done,
265 * when it is done and had a local write error, see comments there */
266 drbd_req_free(req);
268 return TRUE;
271 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
273 struct drbd_request *req = container_of(w, struct drbd_request, w);
275 /* We should not detach for read io-error,
276 * but try to WRITE the P_DATA_REPLY to the failed location,
277 * to give the disk the chance to relocate that block */
279 spin_lock_irq(&mdev->req_lock);
280 if (cancel ||
281 mdev->state.conn < C_CONNECTED ||
282 mdev->state.pdsk <= D_INCONSISTENT) {
283 _req_mod(req, send_canceled);
284 spin_unlock_irq(&mdev->req_lock);
285 dev_alert(DEV, "WE ARE LOST. Local IO failure, no peer.\n");
286 return 1;
288 spin_unlock_irq(&mdev->req_lock);
290 return w_send_read_req(mdev, w, 0);
293 int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
295 ERR_IF(cancel) return 1;
296 dev_err(DEV, "resync inactive, but callback triggered??\n");
297 return 1; /* Simply ignore this! */
300 void drbd_csum(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
302 struct hash_desc desc;
303 struct scatterlist sg;
304 struct bio_vec *bvec;
305 int i;
307 desc.tfm = tfm;
308 desc.flags = 0;
310 sg_init_table(&sg, 1);
311 crypto_hash_init(&desc);
313 __bio_for_each_segment(bvec, bio, i, 0) {
314 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
315 crypto_hash_update(&desc, &sg, sg.length);
317 crypto_hash_final(&desc, digest);
320 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
322 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
323 int digest_size;
324 void *digest;
325 int ok;
327 D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
329 if (unlikely(cancel)) {
330 drbd_free_ee(mdev, e);
331 return 1;
334 if (likely(drbd_bio_uptodate(e->private_bio))) {
335 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
336 digest = kmalloc(digest_size, GFP_NOIO);
337 if (digest) {
338 drbd_csum(mdev, mdev->csums_tfm, e->private_bio, digest);
340 inc_rs_pending(mdev);
341 ok = drbd_send_drequest_csum(mdev,
342 e->sector,
343 e->size,
344 digest,
345 digest_size,
346 P_CSUM_RS_REQUEST);
347 kfree(digest);
348 } else {
349 dev_err(DEV, "kmalloc() of digest failed.\n");
350 ok = 0;
352 } else
353 ok = 1;
355 drbd_free_ee(mdev, e);
357 if (unlikely(!ok))
358 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
359 return ok;
362 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
364 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
366 struct drbd_epoch_entry *e;
368 if (!get_ldev(mdev))
369 return 0;
371 /* GFP_TRY, because if there is no memory available right now, this may
372 * be rescheduled for later. It is "only" background resync, after all. */
373 e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
374 if (!e) {
375 put_ldev(mdev);
376 return 2;
379 spin_lock_irq(&mdev->req_lock);
380 list_add(&e->w.list, &mdev->read_ee);
381 spin_unlock_irq(&mdev->req_lock);
383 e->private_bio->bi_end_io = drbd_endio_read_sec;
384 e->private_bio->bi_rw = READ;
385 e->w.cb = w_e_send_csum;
387 mdev->read_cnt += size >> 9;
388 drbd_generic_make_request(mdev, DRBD_FAULT_RS_RD, e->private_bio);
390 return 1;
393 void resync_timer_fn(unsigned long data)
395 unsigned long flags;
396 struct drbd_conf *mdev = (struct drbd_conf *) data;
397 int queue;
399 spin_lock_irqsave(&mdev->req_lock, flags);
401 if (likely(!test_and_clear_bit(STOP_SYNC_TIMER, &mdev->flags))) {
402 queue = 1;
403 if (mdev->state.conn == C_VERIFY_S)
404 mdev->resync_work.cb = w_make_ov_request;
405 else
406 mdev->resync_work.cb = w_make_resync_request;
407 } else {
408 queue = 0;
409 mdev->resync_work.cb = w_resync_inactive;
412 spin_unlock_irqrestore(&mdev->req_lock, flags);
414 /* harmless race: list_empty outside data.work.q_lock */
415 if (list_empty(&mdev->resync_work.list) && queue)
416 drbd_queue_work(&mdev->data.work, &mdev->resync_work);
419 int w_make_resync_request(struct drbd_conf *mdev,
420 struct drbd_work *w, int cancel)
422 unsigned long bit;
423 sector_t sector;
424 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
425 int max_segment_size = queue_max_segment_size(mdev->rq_queue);
426 int number, i, size, pe, mx;
427 int align, queued, sndbuf;
429 if (unlikely(cancel))
430 return 1;
432 if (unlikely(mdev->state.conn < C_CONNECTED)) {
433 dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
434 return 0;
437 if (mdev->state.conn != C_SYNC_TARGET)
438 dev_err(DEV, "%s in w_make_resync_request\n",
439 drbd_conn_str(mdev->state.conn));
441 if (!get_ldev(mdev)) {
442 /* Since we only need to access mdev->rsync a
443 get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
444 to continue resync with a broken disk makes no sense at
445 all */
446 dev_err(DEV, "Disk broke down during resync!\n");
447 mdev->resync_work.cb = w_resync_inactive;
448 return 1;
451 number = SLEEP_TIME * mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
452 pe = atomic_read(&mdev->rs_pending_cnt);
454 mutex_lock(&mdev->data.mutex);
455 if (mdev->data.socket)
456 mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req);
457 else
458 mx = 1;
459 mutex_unlock(&mdev->data.mutex);
461 /* For resync rates >160MB/sec, allow more pending RS requests */
462 if (number > mx)
463 mx = number;
465 /* Limit the number of pending RS requests to no more than the peer's receive buffer */
466 if ((pe + number) > mx) {
467 number = mx - pe;
470 for (i = 0; i < number; i++) {
471 /* Stop generating RS requests, when half of the send buffer is filled */
472 mutex_lock(&mdev->data.mutex);
473 if (mdev->data.socket) {
474 queued = mdev->data.socket->sk->sk_wmem_queued;
475 sndbuf = mdev->data.socket->sk->sk_sndbuf;
476 } else {
477 queued = 1;
478 sndbuf = 0;
480 mutex_unlock(&mdev->data.mutex);
481 if (queued > sndbuf / 2)
482 goto requeue;
484 next_sector:
485 size = BM_BLOCK_SIZE;
486 bit = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
488 if (bit == -1UL) {
489 mdev->bm_resync_fo = drbd_bm_bits(mdev);
490 mdev->resync_work.cb = w_resync_inactive;
491 put_ldev(mdev);
492 return 1;
495 sector = BM_BIT_TO_SECT(bit);
497 if (drbd_try_rs_begin_io(mdev, sector)) {
498 mdev->bm_resync_fo = bit;
499 goto requeue;
501 mdev->bm_resync_fo = bit + 1;
503 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
504 drbd_rs_complete_io(mdev, sector);
505 goto next_sector;
508 #if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE
509 /* try to find some adjacent bits.
510 * we stop if we have already the maximum req size.
512 * Additionally always align bigger requests, in order to
513 * be prepared for all stripe sizes of software RAIDs.
515 * we _do_ care about the agreed-upon q->max_segment_size
516 * here, as splitting up the requests on the other side is more
517 * difficult. the consequence is, that on lvm and md and other
518 * "indirect" devices, this is dead code, since
519 * q->max_segment_size will be PAGE_SIZE.
521 align = 1;
522 for (;;) {
523 if (size + BM_BLOCK_SIZE > max_segment_size)
524 break;
526 /* Be always aligned */
527 if (sector & ((1<<(align+3))-1))
528 break;
530 /* do not cross extent boundaries */
531 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
532 break;
533 /* now, is it actually dirty, after all?
534 * caution, drbd_bm_test_bit is tri-state for some
535 * obscure reason; ( b == 0 ) would get the out-of-band
536 * only accidentally right because of the "oddly sized"
537 * adjustment below */
538 if (drbd_bm_test_bit(mdev, bit+1) != 1)
539 break;
540 bit++;
541 size += BM_BLOCK_SIZE;
542 if ((BM_BLOCK_SIZE << align) <= size)
543 align++;
544 i++;
546 /* if we merged some,
547 * reset the offset to start the next drbd_bm_find_next from */
548 if (size > BM_BLOCK_SIZE)
549 mdev->bm_resync_fo = bit + 1;
550 #endif
552 /* adjust very last sectors, in case we are oddly sized */
553 if (sector + (size>>9) > capacity)
554 size = (capacity-sector)<<9;
555 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
556 switch (read_for_csum(mdev, sector, size)) {
557 case 0: /* Disk failure*/
558 put_ldev(mdev);
559 return 0;
560 case 2: /* Allocation failed */
561 drbd_rs_complete_io(mdev, sector);
562 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
563 goto requeue;
564 /* case 1: everything ok */
566 } else {
567 inc_rs_pending(mdev);
568 if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
569 sector, size, ID_SYNCER)) {
570 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
571 dec_rs_pending(mdev);
572 put_ldev(mdev);
573 return 0;
578 if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
579 /* last syncer _request_ was sent,
580 * but the P_RS_DATA_REPLY not yet received. sync will end (and
581 * next sync group will resume), as soon as we receive the last
582 * resync data block, and the last bit is cleared.
583 * until then resync "work" is "inactive" ...
585 mdev->resync_work.cb = w_resync_inactive;
586 put_ldev(mdev);
587 return 1;
590 requeue:
591 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
592 put_ldev(mdev);
593 return 1;
596 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
598 int number, i, size;
599 sector_t sector;
600 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
602 if (unlikely(cancel))
603 return 1;
605 if (unlikely(mdev->state.conn < C_CONNECTED)) {
606 dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
607 return 0;
610 number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
611 if (atomic_read(&mdev->rs_pending_cnt) > number)
612 goto requeue;
614 number -= atomic_read(&mdev->rs_pending_cnt);
616 sector = mdev->ov_position;
617 for (i = 0; i < number; i++) {
618 if (sector >= capacity) {
619 mdev->resync_work.cb = w_resync_inactive;
620 return 1;
623 size = BM_BLOCK_SIZE;
625 if (drbd_try_rs_begin_io(mdev, sector)) {
626 mdev->ov_position = sector;
627 goto requeue;
630 if (sector + (size>>9) > capacity)
631 size = (capacity-sector)<<9;
633 inc_rs_pending(mdev);
634 if (!drbd_send_ov_request(mdev, sector, size)) {
635 dec_rs_pending(mdev);
636 return 0;
638 sector += BM_SECT_PER_BIT;
640 mdev->ov_position = sector;
642 requeue:
643 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
644 return 1;
648 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
650 kfree(w);
651 ov_oos_print(mdev);
652 drbd_resync_finished(mdev);
654 return 1;
657 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
659 kfree(w);
661 drbd_resync_finished(mdev);
663 return 1;
666 int drbd_resync_finished(struct drbd_conf *mdev)
668 unsigned long db, dt, dbdt;
669 unsigned long n_oos;
670 union drbd_state os, ns;
671 struct drbd_work *w;
672 char *khelper_cmd = NULL;
674 /* Remove all elements from the resync LRU. Since future actions
675 * might set bits in the (main) bitmap, then the entries in the
676 * resync LRU would be wrong. */
677 if (drbd_rs_del_all(mdev)) {
678 /* In case this is not possible now, most probably because
679 * there are P_RS_DATA_REPLY Packets lingering on the worker's
680 * queue (or even the read operations for those packets
681 * is not finished by now). Retry in 100ms. */
683 drbd_kick_lo(mdev);
684 __set_current_state(TASK_INTERRUPTIBLE);
685 schedule_timeout(HZ / 10);
686 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
687 if (w) {
688 w->cb = w_resync_finished;
689 drbd_queue_work(&mdev->data.work, w);
690 return 1;
692 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
695 dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
696 if (dt <= 0)
697 dt = 1;
698 db = mdev->rs_total;
699 dbdt = Bit2KB(db/dt);
700 mdev->rs_paused /= HZ;
702 if (!get_ldev(mdev))
703 goto out;
705 spin_lock_irq(&mdev->req_lock);
706 os = mdev->state;
708 /* This protects us against multiple calls (that can happen in the presence
709 of application IO), and against connectivity loss just before we arrive here. */
710 if (os.conn <= C_CONNECTED)
711 goto out_unlock;
713 ns = os;
714 ns.conn = C_CONNECTED;
716 dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
717 (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ?
718 "Online verify " : "Resync",
719 dt + mdev->rs_paused, mdev->rs_paused, dbdt);
721 n_oos = drbd_bm_total_weight(mdev);
723 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
724 if (n_oos) {
725 dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
726 n_oos, Bit2KB(1));
727 khelper_cmd = "out-of-sync";
729 } else {
730 D_ASSERT((n_oos - mdev->rs_failed) == 0);
732 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
733 khelper_cmd = "after-resync-target";
735 if (mdev->csums_tfm && mdev->rs_total) {
736 const unsigned long s = mdev->rs_same_csum;
737 const unsigned long t = mdev->rs_total;
738 const int ratio =
739 (t == 0) ? 0 :
740 (t < 100000) ? ((s*100)/t) : (s/(t/100));
741 dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
742 "transferred %luK total %luK\n",
743 ratio,
744 Bit2KB(mdev->rs_same_csum),
745 Bit2KB(mdev->rs_total - mdev->rs_same_csum),
746 Bit2KB(mdev->rs_total));
750 if (mdev->rs_failed) {
751 dev_info(DEV, " %lu failed blocks\n", mdev->rs_failed);
753 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
754 ns.disk = D_INCONSISTENT;
755 ns.pdsk = D_UP_TO_DATE;
756 } else {
757 ns.disk = D_UP_TO_DATE;
758 ns.pdsk = D_INCONSISTENT;
760 } else {
761 ns.disk = D_UP_TO_DATE;
762 ns.pdsk = D_UP_TO_DATE;
764 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
765 if (mdev->p_uuid) {
766 int i;
767 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
768 _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
769 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
770 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
771 } else {
772 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
776 drbd_uuid_set_bm(mdev, 0UL);
778 if (mdev->p_uuid) {
779 /* Now the two UUID sets are equal, update what we
780 * know of the peer. */
781 int i;
782 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
783 mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
787 _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
788 out_unlock:
789 spin_unlock_irq(&mdev->req_lock);
790 put_ldev(mdev);
791 out:
792 mdev->rs_total = 0;
793 mdev->rs_failed = 0;
794 mdev->rs_paused = 0;
795 mdev->ov_start_sector = 0;
797 if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
798 dev_warn(DEV, "Writing the whole bitmap, due to failed kmalloc\n");
799 drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
802 if (khelper_cmd)
803 drbd_khelper(mdev, khelper_cmd);
805 return 1;
808 /* helper */
809 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
811 if (drbd_bio_has_active_page(e->private_bio)) {
812 /* This might happen if sendpage() has not finished */
813 spin_lock_irq(&mdev->req_lock);
814 list_add_tail(&e->w.list, &mdev->net_ee);
815 spin_unlock_irq(&mdev->req_lock);
816 } else
817 drbd_free_ee(mdev, e);
821 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
822 * @mdev: DRBD device.
823 * @w: work object.
824 * @cancel: The connection will be closed anyways
826 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
828 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
829 int ok;
831 if (unlikely(cancel)) {
832 drbd_free_ee(mdev, e);
833 dec_unacked(mdev);
834 return 1;
837 if (likely(drbd_bio_uptodate(e->private_bio))) {
838 ok = drbd_send_block(mdev, P_DATA_REPLY, e);
839 } else {
840 if (__ratelimit(&drbd_ratelimit_state))
841 dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
842 (unsigned long long)e->sector);
844 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
847 dec_unacked(mdev);
849 move_to_net_ee_or_free(mdev, e);
851 if (unlikely(!ok))
852 dev_err(DEV, "drbd_send_block() failed\n");
853 return ok;
857 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
858 * @mdev: DRBD device.
859 * @w: work object.
860 * @cancel: The connection will be closed anyways
862 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
864 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
865 int ok;
867 if (unlikely(cancel)) {
868 drbd_free_ee(mdev, e);
869 dec_unacked(mdev);
870 return 1;
873 if (get_ldev_if_state(mdev, D_FAILED)) {
874 drbd_rs_complete_io(mdev, e->sector);
875 put_ldev(mdev);
878 if (likely(drbd_bio_uptodate(e->private_bio))) {
879 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
880 inc_rs_pending(mdev);
881 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
882 } else {
883 if (__ratelimit(&drbd_ratelimit_state))
884 dev_err(DEV, "Not sending RSDataReply, "
885 "partner DISKLESS!\n");
886 ok = 1;
888 } else {
889 if (__ratelimit(&drbd_ratelimit_state))
890 dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
891 (unsigned long long)e->sector);
893 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
895 /* update resync data with failure */
896 drbd_rs_failed_io(mdev, e->sector, e->size);
899 dec_unacked(mdev);
901 move_to_net_ee_or_free(mdev, e);
903 if (unlikely(!ok))
904 dev_err(DEV, "drbd_send_block() failed\n");
905 return ok;
908 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
910 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
911 struct digest_info *di;
912 int digest_size;
913 void *digest = NULL;
914 int ok, eq = 0;
916 if (unlikely(cancel)) {
917 drbd_free_ee(mdev, e);
918 dec_unacked(mdev);
919 return 1;
922 drbd_rs_complete_io(mdev, e->sector);
924 di = (struct digest_info *)(unsigned long)e->block_id;
926 if (likely(drbd_bio_uptodate(e->private_bio))) {
927 /* quick hack to try to avoid a race against reconfiguration.
928 * a real fix would be much more involved,
929 * introducing more locking mechanisms */
930 if (mdev->csums_tfm) {
931 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
932 D_ASSERT(digest_size == di->digest_size);
933 digest = kmalloc(digest_size, GFP_NOIO);
935 if (digest) {
936 drbd_csum(mdev, mdev->csums_tfm, e->private_bio, digest);
937 eq = !memcmp(digest, di->digest, digest_size);
938 kfree(digest);
941 if (eq) {
942 drbd_set_in_sync(mdev, e->sector, e->size);
943 mdev->rs_same_csum++;
944 ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
945 } else {
946 inc_rs_pending(mdev);
947 e->block_id = ID_SYNCER;
948 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
950 } else {
951 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
952 if (__ratelimit(&drbd_ratelimit_state))
953 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
956 dec_unacked(mdev);
958 kfree(di);
960 move_to_net_ee_or_free(mdev, e);
962 if (unlikely(!ok))
963 dev_err(DEV, "drbd_send_block/ack() failed\n");
964 return ok;
967 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
969 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
970 int digest_size;
971 void *digest;
972 int ok = 1;
974 if (unlikely(cancel))
975 goto out;
977 if (unlikely(!drbd_bio_uptodate(e->private_bio)))
978 goto out;
980 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
981 /* FIXME if this allocation fails, online verify will not terminate! */
982 digest = kmalloc(digest_size, GFP_NOIO);
983 if (digest) {
984 drbd_csum(mdev, mdev->verify_tfm, e->private_bio, digest);
985 inc_rs_pending(mdev);
986 ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
987 digest, digest_size, P_OV_REPLY);
988 if (!ok)
989 dec_rs_pending(mdev);
990 kfree(digest);
993 out:
994 drbd_free_ee(mdev, e);
996 dec_unacked(mdev);
998 return ok;
1001 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1003 if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1004 mdev->ov_last_oos_size += size>>9;
1005 } else {
1006 mdev->ov_last_oos_start = sector;
1007 mdev->ov_last_oos_size = size>>9;
1009 drbd_set_out_of_sync(mdev, sector, size);
1010 set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1013 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1015 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1016 struct digest_info *di;
1017 int digest_size;
1018 void *digest;
1019 int ok, eq = 0;
1021 if (unlikely(cancel)) {
1022 drbd_free_ee(mdev, e);
1023 dec_unacked(mdev);
1024 return 1;
1027 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1028 * the resync lru has been cleaned up already */
1029 drbd_rs_complete_io(mdev, e->sector);
1031 di = (struct digest_info *)(unsigned long)e->block_id;
1033 if (likely(drbd_bio_uptodate(e->private_bio))) {
1034 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1035 digest = kmalloc(digest_size, GFP_NOIO);
1036 if (digest) {
1037 drbd_csum(mdev, mdev->verify_tfm, e->private_bio, digest);
1039 D_ASSERT(digest_size == di->digest_size);
1040 eq = !memcmp(digest, di->digest, digest_size);
1041 kfree(digest);
1043 } else {
1044 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1045 if (__ratelimit(&drbd_ratelimit_state))
1046 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1049 dec_unacked(mdev);
1051 kfree(di);
1053 if (!eq)
1054 drbd_ov_oos_found(mdev, e->sector, e->size);
1055 else
1056 ov_oos_print(mdev);
1058 ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1059 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1061 drbd_free_ee(mdev, e);
1063 if (--mdev->ov_left == 0) {
1064 ov_oos_print(mdev);
1065 drbd_resync_finished(mdev);
1068 return ok;
1071 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1073 struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1074 complete(&b->done);
1075 return 1;
1078 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1080 struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1081 struct p_barrier *p = &mdev->data.sbuf.barrier;
1082 int ok = 1;
1084 /* really avoid racing with tl_clear. w.cb may have been referenced
1085 * just before it was reassigned and re-queued, so double check that.
1086 * actually, this race was harmless, since we only try to send the
1087 * barrier packet here, and otherwise do nothing with the object.
1088 * but compare with the head of w_clear_epoch */
1089 spin_lock_irq(&mdev->req_lock);
1090 if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1091 cancel = 1;
1092 spin_unlock_irq(&mdev->req_lock);
1093 if (cancel)
1094 return 1;
1096 if (!drbd_get_data_sock(mdev))
1097 return 0;
1098 p->barrier = b->br_number;
1099 /* inc_ap_pending was done where this was queued.
1100 * dec_ap_pending will be done in got_BarrierAck
1101 * or (on connection loss) in w_clear_epoch. */
1102 ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1103 (struct p_header *)p, sizeof(*p), 0);
1104 drbd_put_data_sock(mdev);
1106 return ok;
1109 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1111 if (cancel)
1112 return 1;
1113 return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1117 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1118 * @mdev: DRBD device.
1119 * @w: work object.
1120 * @cancel: The connection will be closed anyways
1122 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1124 struct drbd_request *req = container_of(w, struct drbd_request, w);
1125 int ok;
1127 if (unlikely(cancel)) {
1128 req_mod(req, send_canceled);
1129 return 1;
1132 ok = drbd_send_dblock(mdev, req);
1133 req_mod(req, ok ? handed_over_to_network : send_failed);
1135 return ok;
1139 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1140 * @mdev: DRBD device.
1141 * @w: work object.
1142 * @cancel: The connection will be closed anyways
1144 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1146 struct drbd_request *req = container_of(w, struct drbd_request, w);
1147 int ok;
1149 if (unlikely(cancel)) {
1150 req_mod(req, send_canceled);
1151 return 1;
1154 ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1155 (unsigned long)req);
1157 if (!ok) {
1158 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1159 * so this is probably redundant */
1160 if (mdev->state.conn >= C_CONNECTED)
1161 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1163 req_mod(req, ok ? handed_over_to_network : send_failed);
1165 return ok;
1168 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1170 struct drbd_conf *odev = mdev;
1172 while (1) {
1173 if (odev->sync_conf.after == -1)
1174 return 1;
1175 odev = minor_to_mdev(odev->sync_conf.after);
1176 ERR_IF(!odev) return 1;
1177 if ((odev->state.conn >= C_SYNC_SOURCE &&
1178 odev->state.conn <= C_PAUSED_SYNC_T) ||
1179 odev->state.aftr_isp || odev->state.peer_isp ||
1180 odev->state.user_isp)
1181 return 0;
1186 * _drbd_pause_after() - Pause resync on all devices that may not resync now
1187 * @mdev: DRBD device.
1189 * Called from process context only (admin command and after_state_ch).
1191 static int _drbd_pause_after(struct drbd_conf *mdev)
1193 struct drbd_conf *odev;
1194 int i, rv = 0;
1196 for (i = 0; i < minor_count; i++) {
1197 odev = minor_to_mdev(i);
1198 if (!odev)
1199 continue;
1200 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1201 continue;
1202 if (!_drbd_may_sync_now(odev))
1203 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1204 != SS_NOTHING_TO_DO);
1207 return rv;
1211 * _drbd_resume_next() - Resume resync on all devices that may resync now
1212 * @mdev: DRBD device.
1214 * Called from process context only (admin command and worker).
1216 static int _drbd_resume_next(struct drbd_conf *mdev)
1218 struct drbd_conf *odev;
1219 int i, rv = 0;
1221 for (i = 0; i < minor_count; i++) {
1222 odev = minor_to_mdev(i);
1223 if (!odev)
1224 continue;
1225 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1226 continue;
1227 if (odev->state.aftr_isp) {
1228 if (_drbd_may_sync_now(odev))
1229 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1230 CS_HARD, NULL)
1231 != SS_NOTHING_TO_DO) ;
1234 return rv;
1237 void resume_next_sg(struct drbd_conf *mdev)
1239 write_lock_irq(&global_state_lock);
1240 _drbd_resume_next(mdev);
1241 write_unlock_irq(&global_state_lock);
1244 void suspend_other_sg(struct drbd_conf *mdev)
1246 write_lock_irq(&global_state_lock);
1247 _drbd_pause_after(mdev);
1248 write_unlock_irq(&global_state_lock);
1251 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1253 struct drbd_conf *odev;
1255 if (o_minor == -1)
1256 return NO_ERROR;
1257 if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1258 return ERR_SYNC_AFTER;
1260 /* check for loops */
1261 odev = minor_to_mdev(o_minor);
1262 while (1) {
1263 if (odev == mdev)
1264 return ERR_SYNC_AFTER_CYCLE;
1266 /* dependency chain ends here, no cycles. */
1267 if (odev->sync_conf.after == -1)
1268 return NO_ERROR;
1270 /* follow the dependency chain */
1271 odev = minor_to_mdev(odev->sync_conf.after);
1275 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1277 int changes;
1278 int retcode;
1280 write_lock_irq(&global_state_lock);
1281 retcode = sync_after_error(mdev, na);
1282 if (retcode == NO_ERROR) {
1283 mdev->sync_conf.after = na;
1284 do {
1285 changes = _drbd_pause_after(mdev);
1286 changes |= _drbd_resume_next(mdev);
1287 } while (changes);
1289 write_unlock_irq(&global_state_lock);
1290 return retcode;
1294 * drbd_start_resync() - Start the resync process
1295 * @mdev: DRBD device.
1296 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET
1298 * This function might bring you directly into one of the
1299 * C_PAUSED_SYNC_* states.
1301 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1303 union drbd_state ns;
1304 int r;
1306 if (mdev->state.conn >= C_SYNC_SOURCE) {
1307 dev_err(DEV, "Resync already running!\n");
1308 return;
1311 /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1312 drbd_rs_cancel_all(mdev);
1314 if (side == C_SYNC_TARGET) {
1315 /* Since application IO was locked out during C_WF_BITMAP_T and
1316 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1317 we check that we might make the data inconsistent. */
1318 r = drbd_khelper(mdev, "before-resync-target");
1319 r = (r >> 8) & 0xff;
1320 if (r > 0) {
1321 dev_info(DEV, "before-resync-target handler returned %d, "
1322 "dropping connection.\n", r);
1323 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1324 return;
1328 drbd_state_lock(mdev);
1330 if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1331 drbd_state_unlock(mdev);
1332 return;
1335 if (side == C_SYNC_TARGET) {
1336 mdev->bm_resync_fo = 0;
1337 } else /* side == C_SYNC_SOURCE */ {
1338 u64 uuid;
1340 get_random_bytes(&uuid, sizeof(u64));
1341 drbd_uuid_set(mdev, UI_BITMAP, uuid);
1342 drbd_send_sync_uuid(mdev, uuid);
1344 D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1347 write_lock_irq(&global_state_lock);
1348 ns = mdev->state;
1350 ns.aftr_isp = !_drbd_may_sync_now(mdev);
1352 ns.conn = side;
1354 if (side == C_SYNC_TARGET)
1355 ns.disk = D_INCONSISTENT;
1356 else /* side == C_SYNC_SOURCE */
1357 ns.pdsk = D_INCONSISTENT;
1359 r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1360 ns = mdev->state;
1362 if (ns.conn < C_CONNECTED)
1363 r = SS_UNKNOWN_ERROR;
1365 if (r == SS_SUCCESS) {
1366 mdev->rs_total =
1367 mdev->rs_mark_left = drbd_bm_total_weight(mdev);
1368 mdev->rs_failed = 0;
1369 mdev->rs_paused = 0;
1370 mdev->rs_start =
1371 mdev->rs_mark_time = jiffies;
1372 mdev->rs_same_csum = 0;
1373 _drbd_pause_after(mdev);
1375 write_unlock_irq(&global_state_lock);
1376 drbd_state_unlock(mdev);
1377 put_ldev(mdev);
1379 if (r == SS_SUCCESS) {
1380 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1381 drbd_conn_str(ns.conn),
1382 (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1383 (unsigned long) mdev->rs_total);
1385 if (mdev->rs_total == 0) {
1386 /* Peer still reachable? Beware of failing before-resync-target handlers! */
1387 request_ping(mdev);
1388 __set_current_state(TASK_INTERRUPTIBLE);
1389 schedule_timeout(mdev->net_conf->ping_timeo*HZ/9); /* 9 instead 10 */
1390 drbd_resync_finished(mdev);
1391 return;
1394 /* ns.conn may already be != mdev->state.conn,
1395 * we may have been paused in between, or become paused until
1396 * the timer triggers.
1397 * No matter, that is handled in resync_timer_fn() */
1398 if (ns.conn == C_SYNC_TARGET)
1399 mod_timer(&mdev->resync_timer, jiffies);
1401 drbd_md_sync(mdev);
1405 int drbd_worker(struct drbd_thread *thi)
1407 struct drbd_conf *mdev = thi->mdev;
1408 struct drbd_work *w = NULL;
1409 LIST_HEAD(work_list);
1410 int intr = 0, i;
1412 sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1414 while (get_t_state(thi) == Running) {
1415 drbd_thread_current_set_cpu(mdev);
1417 if (down_trylock(&mdev->data.work.s)) {
1418 mutex_lock(&mdev->data.mutex);
1419 if (mdev->data.socket && !mdev->net_conf->no_cork)
1420 drbd_tcp_uncork(mdev->data.socket);
1421 mutex_unlock(&mdev->data.mutex);
1423 intr = down_interruptible(&mdev->data.work.s);
1425 mutex_lock(&mdev->data.mutex);
1426 if (mdev->data.socket && !mdev->net_conf->no_cork)
1427 drbd_tcp_cork(mdev->data.socket);
1428 mutex_unlock(&mdev->data.mutex);
1431 if (intr) {
1432 D_ASSERT(intr == -EINTR);
1433 flush_signals(current);
1434 ERR_IF (get_t_state(thi) == Running)
1435 continue;
1436 break;
1439 if (get_t_state(thi) != Running)
1440 break;
1441 /* With this break, we have done a down() but not consumed
1442 the entry from the list. The cleanup code takes care of
1443 this... */
1445 w = NULL;
1446 spin_lock_irq(&mdev->data.work.q_lock);
1447 ERR_IF(list_empty(&mdev->data.work.q)) {
1448 /* something terribly wrong in our logic.
1449 * we were able to down() the semaphore,
1450 * but the list is empty... doh.
1452 * what is the best thing to do now?
1453 * try again from scratch, restarting the receiver,
1454 * asender, whatnot? could break even more ugly,
1455 * e.g. when we are primary, but no good local data.
1457 * I'll try to get away just starting over this loop.
1459 spin_unlock_irq(&mdev->data.work.q_lock);
1460 continue;
1462 w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1463 list_del_init(&w->list);
1464 spin_unlock_irq(&mdev->data.work.q_lock);
1466 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1467 /* dev_warn(DEV, "worker: a callback failed! \n"); */
1468 if (mdev->state.conn >= C_CONNECTED)
1469 drbd_force_state(mdev,
1470 NS(conn, C_NETWORK_FAILURE));
1473 D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1474 D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1476 spin_lock_irq(&mdev->data.work.q_lock);
1477 i = 0;
1478 while (!list_empty(&mdev->data.work.q)) {
1479 list_splice_init(&mdev->data.work.q, &work_list);
1480 spin_unlock_irq(&mdev->data.work.q_lock);
1482 while (!list_empty(&work_list)) {
1483 w = list_entry(work_list.next, struct drbd_work, list);
1484 list_del_init(&w->list);
1485 w->cb(mdev, w, 1);
1486 i++; /* dead debugging code */
1489 spin_lock_irq(&mdev->data.work.q_lock);
1491 sema_init(&mdev->data.work.s, 0);
1492 /* DANGEROUS race: if someone did queue his work within the spinlock,
1493 * but up() ed outside the spinlock, we could get an up() on the
1494 * semaphore without corresponding list entry.
1495 * So don't do that.
1497 spin_unlock_irq(&mdev->data.work.q_lock);
1499 D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1500 /* _drbd_set_state only uses stop_nowait.
1501 * wait here for the Exiting receiver. */
1502 drbd_thread_stop(&mdev->receiver);
1503 drbd_mdev_cleanup(mdev);
1505 dev_info(DEV, "worker terminated\n");
1507 clear_bit(DEVICE_DYING, &mdev->flags);
1508 clear_bit(CONFIG_PENDING, &mdev->flags);
1509 wake_up(&mdev->state_wait);
1511 return 0;