On Tue, Nov 06, 2007 at 02:33:53AM -0800, akpm@linux-foundation.org wrote:
[mmotm.git] / fs / reiser4 / flush_queue.c
blobba90b3c38d9757e7ed2d881c77a60704784fc2cb
1 /* Copyright 2001, 2002, 2003 by Hans Reiser, licensing governed by
2 reiser4/README */
4 #include "debug.h"
5 #include "super.h"
6 #include "txnmgr.h"
7 #include "jnode.h"
8 #include "znode.h"
9 #include "page_cache.h"
10 #include "wander.h"
11 #include "vfs_ops.h"
12 #include "writeout.h"
13 #include "flush.h"
15 #include <linux/bio.h>
16 #include <linux/mm.h>
17 #include <linux/pagemap.h>
18 #include <linux/blkdev.h>
19 #include <linux/writeback.h>
21 /* A flush queue object is an accumulator for keeping jnodes prepared
22 by the jnode_flush() function for writing to disk. Those "queued" jnodes are
23 kept on the flush queue until memory pressure or atom commit asks
24 flush queues to write some or all from their jnodes. */
27 LOCKING:
29 fq->guard spin lock protects fq->atom pointer and nothing else. fq->prepped
30 list protected by atom spin lock. fq->prepped list uses the following
31 locking:
33 two ways to protect fq->prepped list for read-only list traversal:
35 1. atom spin-lock atom.
36 2. fq is IN_USE, atom->nr_running_queues increased.
38 and one for list modification:
40 1. atom is spin-locked and one condition is true: fq is IN_USE or
41 atom->nr_running_queues == 0.
43 The deadlock-safe order for flush queues and atoms is: first lock atom, then
44 lock flush queue, then lock jnode.
47 #define fq_in_use(fq) ((fq)->state & FQ_IN_USE)
48 #define fq_ready(fq) (!fq_in_use(fq))
50 #define mark_fq_in_use(fq) do { (fq)->state |= FQ_IN_USE; } while (0)
51 #define mark_fq_ready(fq) do { (fq)->state &= ~FQ_IN_USE; } while (0)
53 /* get lock on atom from locked flush queue object */
54 static txn_atom *atom_locked_by_fq_nolock(flush_queue_t *fq)
56 /* This code is similar to jnode_get_atom(), look at it for the
57 * explanation. */
58 txn_atom *atom;
60 assert_spin_locked(&(fq->guard));
62 while (1) {
63 atom = fq->atom;
64 if (atom == NULL)
65 break;
67 if (spin_trylock_atom(atom))
68 break;
70 atomic_inc(&atom->refcount);
71 spin_unlock(&(fq->guard));
72 spin_lock_atom(atom);
73 spin_lock(&(fq->guard));
75 if (fq->atom == atom) {
76 atomic_dec(&atom->refcount);
77 break;
80 spin_unlock(&(fq->guard));
81 atom_dec_and_unlock(atom);
82 spin_lock(&(fq->guard));
85 return atom;
88 txn_atom *atom_locked_by_fq(flush_queue_t *fq)
90 txn_atom *atom;
92 spin_lock(&(fq->guard));
93 atom = atom_locked_by_fq_nolock(fq);
94 spin_unlock(&(fq->guard));
95 return atom;
98 static void init_fq(flush_queue_t *fq)
100 memset(fq, 0, sizeof *fq);
102 atomic_set(&fq->nr_submitted, 0);
104 INIT_LIST_HEAD(ATOM_FQ_LIST(fq));
106 init_waitqueue_head(&fq->wait);
107 spin_lock_init(&fq->guard);
110 /* slab for flush queues */
111 static struct kmem_cache *fq_slab;
114 * reiser4_init_fqs - create flush queue cache
116 * Initializes slab cache of flush queues. It is part of reiser4 module
117 * initialization.
119 int reiser4_init_fqs(void)
121 fq_slab = kmem_cache_create("fq",
122 sizeof(flush_queue_t),
123 0, SLAB_HWCACHE_ALIGN, NULL);
124 if (fq_slab == NULL)
125 return RETERR(-ENOMEM);
126 return 0;
130 * reiser4_done_fqs - delete flush queue cache
132 * This is called on reiser4 module unloading or system shutdown.
134 void reiser4_done_fqs(void)
136 destroy_reiser4_cache(&fq_slab);
139 /* create new flush queue object */
140 static flush_queue_t *create_fq(gfp_t gfp)
142 flush_queue_t *fq;
144 fq = kmem_cache_alloc(fq_slab, gfp);
145 if (fq)
146 init_fq(fq);
148 return fq;
151 /* adjust atom's and flush queue's counters of queued nodes */
152 static void count_enqueued_node(flush_queue_t *fq)
154 ON_DEBUG(fq->atom->num_queued++);
157 static void count_dequeued_node(flush_queue_t *fq)
159 assert("zam-993", fq->atom->num_queued > 0);
160 ON_DEBUG(fq->atom->num_queued--);
163 /* attach flush queue object to the atom */
164 static void attach_fq(txn_atom *atom, flush_queue_t *fq)
166 assert_spin_locked(&(atom->alock));
167 list_add(&fq->alink, &atom->flush_queues);
168 fq->atom = atom;
169 ON_DEBUG(atom->nr_flush_queues++);
172 static void detach_fq(flush_queue_t *fq)
174 assert_spin_locked(&(fq->atom->alock));
176 spin_lock(&(fq->guard));
177 list_del_init(&fq->alink);
178 assert("vs-1456", fq->atom->nr_flush_queues > 0);
179 ON_DEBUG(fq->atom->nr_flush_queues--);
180 fq->atom = NULL;
181 spin_unlock(&(fq->guard));
184 /* destroy flush queue object */
185 static void done_fq(flush_queue_t *fq)
187 assert("zam-763", list_empty_careful(ATOM_FQ_LIST(fq)));
188 assert("zam-766", atomic_read(&fq->nr_submitted) == 0);
190 kmem_cache_free(fq_slab, fq);
193 /* */
194 static void mark_jnode_queued(flush_queue_t *fq, jnode * node)
196 JF_SET(node, JNODE_FLUSH_QUEUED);
197 count_enqueued_node(fq);
200 /* Putting jnode into the flush queue. Both atom and jnode should be
201 spin-locked. */
202 void queue_jnode(flush_queue_t *fq, jnode * node)
204 assert_spin_locked(&(node->guard));
205 assert("zam-713", node->atom != NULL);
206 assert_spin_locked(&(node->atom->alock));
207 assert("zam-716", fq->atom != NULL);
208 assert("zam-717", fq->atom == node->atom);
209 assert("zam-907", fq_in_use(fq));
211 assert("zam-714", JF_ISSET(node, JNODE_DIRTY));
212 assert("zam-826", JF_ISSET(node, JNODE_RELOC));
213 assert("vs-1481", !JF_ISSET(node, JNODE_FLUSH_QUEUED));
214 assert("vs-1481", NODE_LIST(node) != FQ_LIST);
216 mark_jnode_queued(fq, node);
217 list_move_tail(&node->capture_link, ATOM_FQ_LIST(fq));
219 ON_DEBUG(count_jnode(node->atom, node, NODE_LIST(node),
220 FQ_LIST, 1));
223 /* repeatable process for waiting io completion on a flush queue object */
224 static int wait_io(flush_queue_t *fq, int *nr_io_errors)
226 assert("zam-738", fq->atom != NULL);
227 assert_spin_locked(&(fq->atom->alock));
228 assert("zam-736", fq_in_use(fq));
229 assert("zam-911", list_empty_careful(ATOM_FQ_LIST(fq)));
231 if (atomic_read(&fq->nr_submitted) != 0) {
232 struct super_block *super;
234 spin_unlock_atom(fq->atom);
236 assert("nikita-3013", reiser4_schedulable());
238 super = reiser4_get_current_sb();
240 /* FIXME: this is instead of blk_run_queues() */
241 blk_run_address_space(reiser4_get_super_fake(super)->i_mapping);
243 if (!(super->s_flags & MS_RDONLY))
244 wait_event(fq->wait,
245 atomic_read(&fq->nr_submitted) == 0);
247 /* Ask the caller to re-acquire the locks and call this
248 function again. Note: this technique is commonly used in
249 the txnmgr code. */
250 return -E_REPEAT;
253 *nr_io_errors += atomic_read(&fq->nr_errors);
254 return 0;
257 /* wait on I/O completion, re-submit dirty nodes to write */
258 static int finish_fq(flush_queue_t *fq, int *nr_io_errors)
260 int ret;
261 txn_atom *atom = fq->atom;
263 assert("zam-801", atom != NULL);
264 assert_spin_locked(&(atom->alock));
265 assert("zam-762", fq_in_use(fq));
267 ret = wait_io(fq, nr_io_errors);
268 if (ret)
269 return ret;
271 detach_fq(fq);
272 done_fq(fq);
274 reiser4_atom_send_event(atom);
276 return 0;
279 /* wait for all i/o for given atom to be completed, actually do one iteration
280 on that and return -E_REPEAT if there more iterations needed */
281 static int finish_all_fq(txn_atom * atom, int *nr_io_errors)
283 flush_queue_t *fq;
285 assert_spin_locked(&(atom->alock));
287 if (list_empty_careful(&atom->flush_queues))
288 return 0;
290 list_for_each_entry(fq, &atom->flush_queues, alink) {
291 if (fq_ready(fq)) {
292 int ret;
294 mark_fq_in_use(fq);
295 assert("vs-1247", fq->owner == NULL);
296 ON_DEBUG(fq->owner = current);
297 ret = finish_fq(fq, nr_io_errors);
299 if (*nr_io_errors)
300 reiser4_handle_error();
302 if (ret) {
303 reiser4_fq_put(fq);
304 return ret;
307 spin_unlock_atom(atom);
309 return -E_REPEAT;
313 /* All flush queues are in use; atom remains locked */
314 return -EBUSY;
317 /* wait all i/o for current atom */
318 int current_atom_finish_all_fq(void)
320 txn_atom *atom;
321 int nr_io_errors = 0;
322 int ret = 0;
324 do {
325 while (1) {
326 atom = get_current_atom_locked();
327 ret = finish_all_fq(atom, &nr_io_errors);
328 if (ret != -EBUSY)
329 break;
330 reiser4_atom_wait_event(atom);
332 } while (ret == -E_REPEAT);
334 /* we do not need locked atom after this function finishes, SUCCESS or
335 -EBUSY are two return codes when atom remains locked after
336 finish_all_fq */
337 if (!ret)
338 spin_unlock_atom(atom);
340 assert_spin_not_locked(&(atom->alock));
342 if (ret)
343 return ret;
345 if (nr_io_errors)
346 return RETERR(-EIO);
348 return 0;
351 /* change node->atom field for all jnode from given list */
352 static void
353 scan_fq_and_update_atom_ref(struct list_head *list, txn_atom *atom)
355 jnode *cur;
357 list_for_each_entry(cur, list, capture_link) {
358 spin_lock_jnode(cur);
359 cur->atom = atom;
360 spin_unlock_jnode(cur);
364 /* support for atom fusion operation */
365 void reiser4_fuse_fq(txn_atom *to, txn_atom *from)
367 flush_queue_t *fq;
369 assert_spin_locked(&(to->alock));
370 assert_spin_locked(&(from->alock));
372 list_for_each_entry(fq, &from->flush_queues, alink) {
373 scan_fq_and_update_atom_ref(ATOM_FQ_LIST(fq), to);
374 spin_lock(&(fq->guard));
375 fq->atom = to;
376 spin_unlock(&(fq->guard));
379 list_splice_init(&from->flush_queues, to->flush_queues.prev);
381 #if REISER4_DEBUG
382 to->num_queued += from->num_queued;
383 to->nr_flush_queues += from->nr_flush_queues;
384 from->nr_flush_queues = 0;
385 #endif
388 #if REISER4_DEBUG
389 int atom_fq_parts_are_clean(txn_atom * atom)
391 assert("zam-915", atom != NULL);
392 return list_empty_careful(&atom->flush_queues);
394 #endif
395 /* Bio i/o completion routine for reiser4 write operations. */
396 static void
397 end_io_handler(struct bio *bio, int err)
399 int i;
400 int nr_errors = 0;
401 flush_queue_t *fq;
403 assert("zam-958", bio->bi_rw & WRITE);
405 if (err == -EOPNOTSUPP)
406 set_bit(BIO_EOPNOTSUPP, &bio->bi_flags);
408 /* we expect that bio->private is set to NULL or fq object which is used
409 * for synchronization and error counting. */
410 fq = bio->bi_private;
411 /* Check all elements of io_vec for correct write completion. */
412 for (i = 0; i < bio->bi_vcnt; i += 1) {
413 struct page *pg = bio->bi_io_vec[i].bv_page;
415 if (!test_bit(BIO_UPTODATE, &bio->bi_flags)) {
416 SetPageError(pg);
417 nr_errors++;
421 /* jnode WRITEBACK ("write is in progress bit") is
422 * atomically cleared here. */
423 jnode *node;
425 assert("zam-736", pg != NULL);
426 assert("zam-736", PagePrivate(pg));
427 node = jprivate(pg);
429 JF_CLR(node, JNODE_WRITEBACK);
432 end_page_writeback(pg);
433 page_cache_release(pg);
436 if (fq) {
437 /* count i/o error in fq object */
438 atomic_add(nr_errors, &fq->nr_errors);
440 /* If all write requests registered in this "fq" are done we up
441 * the waiter. */
442 if (atomic_sub_and_test(bio->bi_vcnt, &fq->nr_submitted))
443 wake_up(&fq->wait);
446 bio_put(bio);
449 /* Count I/O requests which will be submitted by @bio in given flush queues
450 @fq */
451 void add_fq_to_bio(flush_queue_t *fq, struct bio *bio)
453 bio->bi_private = fq;
454 bio->bi_end_io = end_io_handler;
456 if (fq)
457 atomic_add(bio->bi_vcnt, &fq->nr_submitted);
460 /* Move all queued nodes out from @fq->prepped list. */
461 static void release_prepped_list(flush_queue_t *fq)
463 txn_atom *atom;
465 assert("zam-904", fq_in_use(fq));
466 atom = atom_locked_by_fq(fq);
468 while (!list_empty(ATOM_FQ_LIST(fq))) {
469 jnode *cur;
471 cur = list_entry(ATOM_FQ_LIST(fq)->next, jnode, capture_link);
472 list_del_init(&cur->capture_link);
474 count_dequeued_node(fq);
475 spin_lock_jnode(cur);
476 assert("nikita-3154", !JF_ISSET(cur, JNODE_OVRWR));
477 assert("nikita-3154", JF_ISSET(cur, JNODE_RELOC));
478 assert("nikita-3154", JF_ISSET(cur, JNODE_FLUSH_QUEUED));
479 JF_CLR(cur, JNODE_FLUSH_QUEUED);
481 if (JF_ISSET(cur, JNODE_DIRTY)) {
482 list_add_tail(&cur->capture_link,
483 ATOM_DIRTY_LIST(atom,
484 jnode_get_level(cur)));
485 ON_DEBUG(count_jnode(atom, cur, FQ_LIST,
486 DIRTY_LIST, 1));
487 } else {
488 list_add_tail(&cur->capture_link,
489 ATOM_CLEAN_LIST(atom));
490 ON_DEBUG(count_jnode(atom, cur, FQ_LIST,
491 CLEAN_LIST, 1));
494 spin_unlock_jnode(cur);
497 if (--atom->nr_running_queues == 0)
498 reiser4_atom_send_event(atom);
500 spin_unlock_atom(atom);
503 /* Submit write requests for nodes on the already filled flush queue @fq.
505 @fq: flush queue object which contains jnodes we can (and will) write.
506 @return: number of submitted blocks (>=0) if success, otherwise -- an error
507 code (<0). */
508 int reiser4_write_fq(flush_queue_t *fq, long *nr_submitted, int flags)
510 int ret;
511 txn_atom *atom;
513 while (1) {
514 atom = atom_locked_by_fq(fq);
515 assert("zam-924", atom);
516 /* do not write fq in parallel. */
517 if (atom->nr_running_queues == 0
518 || !(flags & WRITEOUT_SINGLE_STREAM))
519 break;
520 reiser4_atom_wait_event(atom);
523 atom->nr_running_queues++;
524 spin_unlock_atom(atom);
526 ret = write_jnode_list(ATOM_FQ_LIST(fq), fq, nr_submitted, flags);
527 release_prepped_list(fq);
529 return ret;
532 /* Getting flush queue object for exclusive use by one thread. May require
533 several iterations which is indicated by -E_REPEAT return code.
535 This function does not contain code for obtaining an atom lock because an
536 atom lock is obtained by different ways in different parts of reiser4,
537 usually it is current atom, but we need a possibility for getting fq for the
538 atom of given jnode. */
539 static int fq_by_atom_gfp(txn_atom *atom, flush_queue_t **new_fq, gfp_t gfp)
541 flush_queue_t *fq;
543 assert_spin_locked(&(atom->alock));
545 fq = list_entry(atom->flush_queues.next, flush_queue_t, alink);
546 while (&atom->flush_queues != &fq->alink) {
547 spin_lock(&(fq->guard));
549 if (fq_ready(fq)) {
550 mark_fq_in_use(fq);
551 assert("vs-1246", fq->owner == NULL);
552 ON_DEBUG(fq->owner = current);
553 spin_unlock(&(fq->guard));
555 if (*new_fq)
556 done_fq(*new_fq);
558 *new_fq = fq;
560 return 0;
563 spin_unlock(&(fq->guard));
565 fq = list_entry(fq->alink.next, flush_queue_t, alink);
568 /* Use previously allocated fq object */
569 if (*new_fq) {
570 mark_fq_in_use(*new_fq);
571 assert("vs-1248", (*new_fq)->owner == 0);
572 ON_DEBUG((*new_fq)->owner = current);
573 attach_fq(atom, *new_fq);
575 return 0;
578 spin_unlock_atom(atom);
580 *new_fq = create_fq(gfp);
582 if (*new_fq == NULL)
583 return RETERR(-ENOMEM);
585 return RETERR(-E_REPEAT);
588 int reiser4_fq_by_atom(txn_atom * atom, flush_queue_t **new_fq)
590 return fq_by_atom_gfp(atom, new_fq, reiser4_ctx_gfp_mask_get());
593 /* A wrapper around reiser4_fq_by_atom for getting a flush queue
594 object for current atom, if success fq->atom remains locked. */
595 flush_queue_t *get_fq_for_current_atom(void)
597 flush_queue_t *fq = NULL;
598 txn_atom *atom;
599 int ret;
601 do {
602 atom = get_current_atom_locked();
603 ret = reiser4_fq_by_atom(atom, &fq);
604 } while (ret == -E_REPEAT);
606 if (ret)
607 return ERR_PTR(ret);
608 return fq;
611 /* Releasing flush queue object after exclusive use */
612 void reiser4_fq_put_nolock(flush_queue_t *fq)
614 assert("zam-747", fq->atom != NULL);
615 assert("zam-902", list_empty_careful(ATOM_FQ_LIST(fq)));
616 mark_fq_ready(fq);
617 assert("vs-1245", fq->owner == current);
618 ON_DEBUG(fq->owner = NULL);
621 void reiser4_fq_put(flush_queue_t *fq)
623 txn_atom *atom;
625 spin_lock(&(fq->guard));
626 atom = atom_locked_by_fq_nolock(fq);
628 assert("zam-746", atom != NULL);
630 reiser4_fq_put_nolock(fq);
631 reiser4_atom_send_event(atom);
633 spin_unlock(&(fq->guard));
634 spin_unlock_atom(atom);
637 /* A part of atom object initialization related to the embedded flush queue
638 list head */
640 void init_atom_fq_parts(txn_atom *atom)
642 INIT_LIST_HEAD(&atom->flush_queues);
645 #if REISER4_DEBUG
647 void reiser4_check_fq(const txn_atom *atom)
649 /* check number of nodes on all atom's flush queues */
650 flush_queue_t *fq;
651 int count;
652 struct list_head *pos;
654 count = 0;
655 list_for_each_entry(fq, &atom->flush_queues, alink) {
656 spin_lock(&(fq->guard));
657 /* calculate number of jnodes on fq' list of prepped jnodes */
658 list_for_each(pos, ATOM_FQ_LIST(fq))
659 count++;
660 spin_unlock(&(fq->guard));
662 if (count != atom->fq)
663 warning("", "fq counter %d, real %d\n", atom->fq, count);
667 #endif
670 * Local variables:
671 * c-indentation-style: "K&R"
672 * mode-name: "LC"
673 * c-basic-offset: 8
674 * tab-width: 8
675 * fill-column: 79
676 * scroll-step: 1
677 * End: