1 /* Copyright 2001, 2002, 2003 by Hans Reiser, licensing governed by
9 #include "page_cache.h"
15 #include <linux/bio.h>
17 #include <linux/pagemap.h>
18 #include <linux/blkdev.h>
19 #include <linux/writeback.h>
21 /* A flush queue object is an accumulator for keeping jnodes prepared
22 by the jnode_flush() function for writing to disk. Those "queued" jnodes are
23 kept on the flush queue until memory pressure or atom commit asks
24 flush queues to write some or all from their jnodes. */
29 fq->guard spin lock protects fq->atom pointer and nothing else. fq->prepped
30 list protected by atom spin lock. fq->prepped list uses the following
33 two ways to protect fq->prepped list for read-only list traversal:
35 1. atom spin-lock atom.
36 2. fq is IN_USE, atom->nr_running_queues increased.
38 and one for list modification:
40 1. atom is spin-locked and one condition is true: fq is IN_USE or
41 atom->nr_running_queues == 0.
43 The deadlock-safe order for flush queues and atoms is: first lock atom, then
44 lock flush queue, then lock jnode.
47 #define fq_in_use(fq) ((fq)->state & FQ_IN_USE)
48 #define fq_ready(fq) (!fq_in_use(fq))
50 #define mark_fq_in_use(fq) do { (fq)->state |= FQ_IN_USE; } while (0)
51 #define mark_fq_ready(fq) do { (fq)->state &= ~FQ_IN_USE; } while (0)
53 /* get lock on atom from locked flush queue object */
54 static txn_atom
*atom_locked_by_fq_nolock(flush_queue_t
*fq
)
56 /* This code is similar to jnode_get_atom(), look at it for the
60 assert_spin_locked(&(fq
->guard
));
67 if (spin_trylock_atom(atom
))
70 atomic_inc(&atom
->refcount
);
71 spin_unlock(&(fq
->guard
));
73 spin_lock(&(fq
->guard
));
75 if (fq
->atom
== atom
) {
76 atomic_dec(&atom
->refcount
);
80 spin_unlock(&(fq
->guard
));
81 atom_dec_and_unlock(atom
);
82 spin_lock(&(fq
->guard
));
88 txn_atom
*atom_locked_by_fq(flush_queue_t
*fq
)
92 spin_lock(&(fq
->guard
));
93 atom
= atom_locked_by_fq_nolock(fq
);
94 spin_unlock(&(fq
->guard
));
98 static void init_fq(flush_queue_t
*fq
)
100 memset(fq
, 0, sizeof *fq
);
102 atomic_set(&fq
->nr_submitted
, 0);
104 INIT_LIST_HEAD(ATOM_FQ_LIST(fq
));
106 init_waitqueue_head(&fq
->wait
);
107 spin_lock_init(&fq
->guard
);
110 /* slab for flush queues */
111 static struct kmem_cache
*fq_slab
;
114 * reiser4_init_fqs - create flush queue cache
116 * Initializes slab cache of flush queues. It is part of reiser4 module
119 int reiser4_init_fqs(void)
121 fq_slab
= kmem_cache_create("fq",
122 sizeof(flush_queue_t
),
123 0, SLAB_HWCACHE_ALIGN
, NULL
);
125 return RETERR(-ENOMEM
);
130 * reiser4_done_fqs - delete flush queue cache
132 * This is called on reiser4 module unloading or system shutdown.
134 void reiser4_done_fqs(void)
136 destroy_reiser4_cache(&fq_slab
);
139 /* create new flush queue object */
140 static flush_queue_t
*create_fq(gfp_t gfp
)
144 fq
= kmem_cache_alloc(fq_slab
, gfp
);
151 /* adjust atom's and flush queue's counters of queued nodes */
152 static void count_enqueued_node(flush_queue_t
*fq
)
154 ON_DEBUG(fq
->atom
->num_queued
++);
157 static void count_dequeued_node(flush_queue_t
*fq
)
159 assert("zam-993", fq
->atom
->num_queued
> 0);
160 ON_DEBUG(fq
->atom
->num_queued
--);
163 /* attach flush queue object to the atom */
164 static void attach_fq(txn_atom
*atom
, flush_queue_t
*fq
)
166 assert_spin_locked(&(atom
->alock
));
167 list_add(&fq
->alink
, &atom
->flush_queues
);
169 ON_DEBUG(atom
->nr_flush_queues
++);
172 static void detach_fq(flush_queue_t
*fq
)
174 assert_spin_locked(&(fq
->atom
->alock
));
176 spin_lock(&(fq
->guard
));
177 list_del_init(&fq
->alink
);
178 assert("vs-1456", fq
->atom
->nr_flush_queues
> 0);
179 ON_DEBUG(fq
->atom
->nr_flush_queues
--);
181 spin_unlock(&(fq
->guard
));
184 /* destroy flush queue object */
185 static void done_fq(flush_queue_t
*fq
)
187 assert("zam-763", list_empty_careful(ATOM_FQ_LIST(fq
)));
188 assert("zam-766", atomic_read(&fq
->nr_submitted
) == 0);
190 kmem_cache_free(fq_slab
, fq
);
194 static void mark_jnode_queued(flush_queue_t
*fq
, jnode
* node
)
196 JF_SET(node
, JNODE_FLUSH_QUEUED
);
197 count_enqueued_node(fq
);
200 /* Putting jnode into the flush queue. Both atom and jnode should be
202 void queue_jnode(flush_queue_t
*fq
, jnode
* node
)
204 assert_spin_locked(&(node
->guard
));
205 assert("zam-713", node
->atom
!= NULL
);
206 assert_spin_locked(&(node
->atom
->alock
));
207 assert("zam-716", fq
->atom
!= NULL
);
208 assert("zam-717", fq
->atom
== node
->atom
);
209 assert("zam-907", fq_in_use(fq
));
211 assert("zam-714", JF_ISSET(node
, JNODE_DIRTY
));
212 assert("zam-826", JF_ISSET(node
, JNODE_RELOC
));
213 assert("vs-1481", !JF_ISSET(node
, JNODE_FLUSH_QUEUED
));
214 assert("vs-1481", NODE_LIST(node
) != FQ_LIST
);
216 mark_jnode_queued(fq
, node
);
217 list_move_tail(&node
->capture_link
, ATOM_FQ_LIST(fq
));
219 ON_DEBUG(count_jnode(node
->atom
, node
, NODE_LIST(node
),
223 /* repeatable process for waiting io completion on a flush queue object */
224 static int wait_io(flush_queue_t
*fq
, int *nr_io_errors
)
226 assert("zam-738", fq
->atom
!= NULL
);
227 assert_spin_locked(&(fq
->atom
->alock
));
228 assert("zam-736", fq_in_use(fq
));
229 assert("zam-911", list_empty_careful(ATOM_FQ_LIST(fq
)));
231 if (atomic_read(&fq
->nr_submitted
) != 0) {
232 struct super_block
*super
;
234 spin_unlock_atom(fq
->atom
);
236 assert("nikita-3013", reiser4_schedulable());
238 super
= reiser4_get_current_sb();
240 /* FIXME: this is instead of blk_run_queues() */
241 blk_run_address_space(reiser4_get_super_fake(super
)->i_mapping
);
243 if (!(super
->s_flags
& MS_RDONLY
))
245 atomic_read(&fq
->nr_submitted
) == 0);
247 /* Ask the caller to re-acquire the locks and call this
248 function again. Note: this technique is commonly used in
253 *nr_io_errors
+= atomic_read(&fq
->nr_errors
);
257 /* wait on I/O completion, re-submit dirty nodes to write */
258 static int finish_fq(flush_queue_t
*fq
, int *nr_io_errors
)
261 txn_atom
*atom
= fq
->atom
;
263 assert("zam-801", atom
!= NULL
);
264 assert_spin_locked(&(atom
->alock
));
265 assert("zam-762", fq_in_use(fq
));
267 ret
= wait_io(fq
, nr_io_errors
);
274 reiser4_atom_send_event(atom
);
279 /* wait for all i/o for given atom to be completed, actually do one iteration
280 on that and return -E_REPEAT if there more iterations needed */
281 static int finish_all_fq(txn_atom
* atom
, int *nr_io_errors
)
285 assert_spin_locked(&(atom
->alock
));
287 if (list_empty_careful(&atom
->flush_queues
))
290 list_for_each_entry(fq
, &atom
->flush_queues
, alink
) {
295 assert("vs-1247", fq
->owner
== NULL
);
296 ON_DEBUG(fq
->owner
= current
);
297 ret
= finish_fq(fq
, nr_io_errors
);
300 reiser4_handle_error();
307 spin_unlock_atom(atom
);
313 /* All flush queues are in use; atom remains locked */
317 /* wait all i/o for current atom */
318 int current_atom_finish_all_fq(void)
321 int nr_io_errors
= 0;
326 atom
= get_current_atom_locked();
327 ret
= finish_all_fq(atom
, &nr_io_errors
);
330 reiser4_atom_wait_event(atom
);
332 } while (ret
== -E_REPEAT
);
334 /* we do not need locked atom after this function finishes, SUCCESS or
335 -EBUSY are two return codes when atom remains locked after
338 spin_unlock_atom(atom
);
340 assert_spin_not_locked(&(atom
->alock
));
351 /* change node->atom field for all jnode from given list */
353 scan_fq_and_update_atom_ref(struct list_head
*list
, txn_atom
*atom
)
357 list_for_each_entry(cur
, list
, capture_link
) {
358 spin_lock_jnode(cur
);
360 spin_unlock_jnode(cur
);
364 /* support for atom fusion operation */
365 void reiser4_fuse_fq(txn_atom
*to
, txn_atom
*from
)
369 assert_spin_locked(&(to
->alock
));
370 assert_spin_locked(&(from
->alock
));
372 list_for_each_entry(fq
, &from
->flush_queues
, alink
) {
373 scan_fq_and_update_atom_ref(ATOM_FQ_LIST(fq
), to
);
374 spin_lock(&(fq
->guard
));
376 spin_unlock(&(fq
->guard
));
379 list_splice_init(&from
->flush_queues
, to
->flush_queues
.prev
);
382 to
->num_queued
+= from
->num_queued
;
383 to
->nr_flush_queues
+= from
->nr_flush_queues
;
384 from
->nr_flush_queues
= 0;
389 int atom_fq_parts_are_clean(txn_atom
* atom
)
391 assert("zam-915", atom
!= NULL
);
392 return list_empty_careful(&atom
->flush_queues
);
395 /* Bio i/o completion routine for reiser4 write operations. */
397 end_io_handler(struct bio
*bio
, int err
)
403 assert("zam-958", bio
->bi_rw
& WRITE
);
405 if (err
== -EOPNOTSUPP
)
406 set_bit(BIO_EOPNOTSUPP
, &bio
->bi_flags
);
408 /* we expect that bio->private is set to NULL or fq object which is used
409 * for synchronization and error counting. */
410 fq
= bio
->bi_private
;
411 /* Check all elements of io_vec for correct write completion. */
412 for (i
= 0; i
< bio
->bi_vcnt
; i
+= 1) {
413 struct page
*pg
= bio
->bi_io_vec
[i
].bv_page
;
415 if (!test_bit(BIO_UPTODATE
, &bio
->bi_flags
)) {
421 /* jnode WRITEBACK ("write is in progress bit") is
422 * atomically cleared here. */
425 assert("zam-736", pg
!= NULL
);
426 assert("zam-736", PagePrivate(pg
));
429 JF_CLR(node
, JNODE_WRITEBACK
);
432 end_page_writeback(pg
);
433 page_cache_release(pg
);
437 /* count i/o error in fq object */
438 atomic_add(nr_errors
, &fq
->nr_errors
);
440 /* If all write requests registered in this "fq" are done we up
442 if (atomic_sub_and_test(bio
->bi_vcnt
, &fq
->nr_submitted
))
449 /* Count I/O requests which will be submitted by @bio in given flush queues
451 void add_fq_to_bio(flush_queue_t
*fq
, struct bio
*bio
)
453 bio
->bi_private
= fq
;
454 bio
->bi_end_io
= end_io_handler
;
457 atomic_add(bio
->bi_vcnt
, &fq
->nr_submitted
);
460 /* Move all queued nodes out from @fq->prepped list. */
461 static void release_prepped_list(flush_queue_t
*fq
)
465 assert("zam-904", fq_in_use(fq
));
466 atom
= atom_locked_by_fq(fq
);
468 while (!list_empty(ATOM_FQ_LIST(fq
))) {
471 cur
= list_entry(ATOM_FQ_LIST(fq
)->next
, jnode
, capture_link
);
472 list_del_init(&cur
->capture_link
);
474 count_dequeued_node(fq
);
475 spin_lock_jnode(cur
);
476 assert("nikita-3154", !JF_ISSET(cur
, JNODE_OVRWR
));
477 assert("nikita-3154", JF_ISSET(cur
, JNODE_RELOC
));
478 assert("nikita-3154", JF_ISSET(cur
, JNODE_FLUSH_QUEUED
));
479 JF_CLR(cur
, JNODE_FLUSH_QUEUED
);
481 if (JF_ISSET(cur
, JNODE_DIRTY
)) {
482 list_add_tail(&cur
->capture_link
,
483 ATOM_DIRTY_LIST(atom
,
484 jnode_get_level(cur
)));
485 ON_DEBUG(count_jnode(atom
, cur
, FQ_LIST
,
488 list_add_tail(&cur
->capture_link
,
489 ATOM_CLEAN_LIST(atom
));
490 ON_DEBUG(count_jnode(atom
, cur
, FQ_LIST
,
494 spin_unlock_jnode(cur
);
497 if (--atom
->nr_running_queues
== 0)
498 reiser4_atom_send_event(atom
);
500 spin_unlock_atom(atom
);
503 /* Submit write requests for nodes on the already filled flush queue @fq.
505 @fq: flush queue object which contains jnodes we can (and will) write.
506 @return: number of submitted blocks (>=0) if success, otherwise -- an error
508 int reiser4_write_fq(flush_queue_t
*fq
, long *nr_submitted
, int flags
)
514 atom
= atom_locked_by_fq(fq
);
515 assert("zam-924", atom
);
516 /* do not write fq in parallel. */
517 if (atom
->nr_running_queues
== 0
518 || !(flags
& WRITEOUT_SINGLE_STREAM
))
520 reiser4_atom_wait_event(atom
);
523 atom
->nr_running_queues
++;
524 spin_unlock_atom(atom
);
526 ret
= write_jnode_list(ATOM_FQ_LIST(fq
), fq
, nr_submitted
, flags
);
527 release_prepped_list(fq
);
532 /* Getting flush queue object for exclusive use by one thread. May require
533 several iterations which is indicated by -E_REPEAT return code.
535 This function does not contain code for obtaining an atom lock because an
536 atom lock is obtained by different ways in different parts of reiser4,
537 usually it is current atom, but we need a possibility for getting fq for the
538 atom of given jnode. */
539 static int fq_by_atom_gfp(txn_atom
*atom
, flush_queue_t
**new_fq
, gfp_t gfp
)
543 assert_spin_locked(&(atom
->alock
));
545 fq
= list_entry(atom
->flush_queues
.next
, flush_queue_t
, alink
);
546 while (&atom
->flush_queues
!= &fq
->alink
) {
547 spin_lock(&(fq
->guard
));
551 assert("vs-1246", fq
->owner
== NULL
);
552 ON_DEBUG(fq
->owner
= current
);
553 spin_unlock(&(fq
->guard
));
563 spin_unlock(&(fq
->guard
));
565 fq
= list_entry(fq
->alink
.next
, flush_queue_t
, alink
);
568 /* Use previously allocated fq object */
570 mark_fq_in_use(*new_fq
);
571 assert("vs-1248", (*new_fq
)->owner
== 0);
572 ON_DEBUG((*new_fq
)->owner
= current
);
573 attach_fq(atom
, *new_fq
);
578 spin_unlock_atom(atom
);
580 *new_fq
= create_fq(gfp
);
583 return RETERR(-ENOMEM
);
585 return RETERR(-E_REPEAT
);
588 int reiser4_fq_by_atom(txn_atom
* atom
, flush_queue_t
**new_fq
)
590 return fq_by_atom_gfp(atom
, new_fq
, reiser4_ctx_gfp_mask_get());
593 /* A wrapper around reiser4_fq_by_atom for getting a flush queue
594 object for current atom, if success fq->atom remains locked. */
595 flush_queue_t
*get_fq_for_current_atom(void)
597 flush_queue_t
*fq
= NULL
;
602 atom
= get_current_atom_locked();
603 ret
= reiser4_fq_by_atom(atom
, &fq
);
604 } while (ret
== -E_REPEAT
);
611 /* Releasing flush queue object after exclusive use */
612 void reiser4_fq_put_nolock(flush_queue_t
*fq
)
614 assert("zam-747", fq
->atom
!= NULL
);
615 assert("zam-902", list_empty_careful(ATOM_FQ_LIST(fq
)));
617 assert("vs-1245", fq
->owner
== current
);
618 ON_DEBUG(fq
->owner
= NULL
);
621 void reiser4_fq_put(flush_queue_t
*fq
)
625 spin_lock(&(fq
->guard
));
626 atom
= atom_locked_by_fq_nolock(fq
);
628 assert("zam-746", atom
!= NULL
);
630 reiser4_fq_put_nolock(fq
);
631 reiser4_atom_send_event(atom
);
633 spin_unlock(&(fq
->guard
));
634 spin_unlock_atom(atom
);
637 /* A part of atom object initialization related to the embedded flush queue
640 void init_atom_fq_parts(txn_atom
*atom
)
642 INIT_LIST_HEAD(&atom
->flush_queues
);
647 void reiser4_check_fq(const txn_atom
*atom
)
649 /* check number of nodes on all atom's flush queues */
652 struct list_head
*pos
;
655 list_for_each_entry(fq
, &atom
->flush_queues
, alink
) {
656 spin_lock(&(fq
->guard
));
657 /* calculate number of jnodes on fq' list of prepped jnodes */
658 list_for_each(pos
, ATOM_FQ_LIST(fq
))
660 spin_unlock(&(fq
->guard
));
662 if (count
!= atom
->fq
)
663 warning("", "fq counter %d, real %d\n", atom
->fq
, count
);
671 * c-indentation-style: "K&R"