revert-mm-fix-blkdev-size-calculation-in-generic_write_checks
[linux-2.6/linux-trees-mm.git] / fs / reiser4 / txnmgr.c
blobffe295e96d4aebae28716123fead5d0fb1d6d1a1
1 /* Copyright 2001, 2002, 2003 by Hans Reiser, licensing governed by
2 * reiser4/README */
4 /* Joshua MacDonald wrote the first draft of this code. */
6 /* ZAM-LONGTERM-FIXME-HANS: The locking in this file is badly designed, and a
7 filesystem scales only as well as its worst locking design. You need to
8 substantially restructure this code. Josh was not as experienced a programmer
9 as you. Particularly review how the locking style differs from what you did
10 for znodes usingt hi-lo priority locking, and present to me an opinion on
11 whether the differences are well founded. */
13 /* I cannot help but to disagree with the sentiment above. Locking of
14 * transaction manager is _not_ badly designed, and, at the very least, is not
15 * the scaling bottleneck. Scaling bottleneck is _exactly_ hi-lo priority
16 * locking on znodes, especially on the root node of the tree. --nikita,
17 * 2003.10.13 */
19 /* The txnmgr is a set of interfaces that keep track of atoms and transcrash handles. The
20 txnmgr processes capture_block requests and manages the relationship between jnodes and
21 atoms through the various stages of a transcrash, and it also oversees the fusion and
22 capture-on-copy processes. The main difficulty with this task is maintaining a
23 deadlock-free lock ordering between atoms and jnodes/handles. The reason for the
24 difficulty is that jnodes, handles, and atoms contain pointer circles, and the cycle
25 must be broken. The main requirement is that atom-fusion be deadlock free, so once you
26 hold the atom_lock you may then wait to acquire any jnode or handle lock. This implies
27 that any time you check the atom-pointer of a jnode or handle and then try to lock that
28 atom, you must use trylock() and possibly reverse the order.
30 This code implements the design documented at:
32 http://namesys.com/txn-doc.html
34 ZAM-FIXME-HANS: update v4.html to contain all of the information present in the above (but updated), and then remove the
35 above document and reference the new. Be sure to provide some credit to Josh. I already have some writings on this
36 topic in v4.html, but they are lacking in details present in the above. Cure that. Remember to write for the bright 12
37 year old --- define all technical terms used.
41 /* Thoughts on the external transaction interface:
43 In the current code, a TRANSCRASH handle is created implicitly by reiser4_init_context() (which
44 creates state that lasts for the duration of a system call and is called at the start
45 of ReiserFS methods implementing VFS operations), and closed by reiser4_exit_context(),
46 occupying the scope of a single system call. We wish to give certain applications an
47 interface to begin and close (commit) transactions. Since our implementation of
48 transactions does not yet support isolation, allowing an application to open a
49 transaction implies trusting it to later close the transaction. Part of the
50 transaction interface will be aimed at enabling that trust, but the interface for
51 actually using transactions is fairly narrow.
53 BEGIN_TRANSCRASH: Returns a transcrash identifier. It should be possible to translate
54 this identifier into a string that a shell-script could use, allowing you to start a
55 transaction by issuing a command. Once open, the transcrash should be set in the task
56 structure, and there should be options (I suppose) to allow it to be carried across
57 fork/exec. A transcrash has several options:
59 - READ_FUSING or WRITE_FUSING: The default policy is for txn-capture to capture only
60 on writes (WRITE_FUSING) and allow "dirty reads". If the application wishes to
61 capture on reads as well, it should set READ_FUSING.
63 - TIMEOUT: Since a non-isolated transcrash cannot be undone, every transcrash must
64 eventually close (or else the machine must crash). If the application dies an
65 unexpected death with an open transcrash, for example, or if it hangs for a long
66 duration, one solution (to avoid crashing the machine) is to simply close it anyway.
67 This is a dangerous option, but it is one way to solve the problem until isolated
68 transcrashes are available for untrusted applications.
70 It seems to be what databases do, though it is unclear how one avoids a DoS attack
71 creating a vulnerability based on resource starvation. Guaranteeing that some
72 minimum amount of computational resources are made available would seem more correct
73 than guaranteeing some amount of time. When we again have someone to code the work,
74 this issue should be considered carefully. -Hans
76 RESERVE_BLOCKS: A running transcrash should indicate to the transaction manager how
77 many dirty blocks it expects. The reserve_blocks interface should be called at a point
78 where it is safe for the application to fail, because the system may not be able to
79 grant the allocation and the application must be able to back-out. For this reason,
80 the number of reserve-blocks can also be passed as an argument to BEGIN_TRANSCRASH, but
81 the application may also wish to extend the allocation after beginning its transcrash.
83 CLOSE_TRANSCRASH: The application closes the transcrash when it is finished making
84 modifications that require transaction protection. When isolated transactions are
85 supported the CLOSE operation is replaced by either COMMIT or ABORT. For example, if a
86 RESERVE_BLOCKS call fails for the application, it should "abort" by calling
87 CLOSE_TRANSCRASH, even though it really commits any changes that were made (which is
88 why, for safety, the application should call RESERVE_BLOCKS before making any changes).
90 For actually implementing these out-of-system-call-scopped transcrashes, the
91 reiser4_context has a "txn_handle *trans" pointer that may be set to an open
92 transcrash. Currently there are no dynamically-allocated transcrashes, but there is a
93 "struct kmem_cache *_txnh_slab" created for that purpose in this file.
96 /* Extending the other system call interfaces for future transaction features:
98 Specialized applications may benefit from passing flags to the ordinary system call
99 interface such as read(), write(), or stat(). For example, the application specifies
100 WRITE_FUSING by default but wishes to add that a certain read() command should be
101 treated as READ_FUSING. But which read? Is it the directory-entry read, the stat-data
102 read, or the file-data read? These issues are straight-forward, but there are a lot of
103 them and adding the necessary flags-passing code will be tedious.
105 When supporting isolated transactions, there is a corresponding READ_MODIFY_WRITE (RMW)
106 flag, which specifies that although it is a read operation being requested, a
107 write-lock should be taken. The reason is that read-locks are shared while write-locks
108 are exclusive, so taking a read-lock when a later-write is known in advance will often
109 leads to deadlock. If a reader knows it will write later, it should issue read
110 requests with the RMW flag set.
114 The znode/atom deadlock avoidance.
116 FIXME(Zam): writing of this comment is in progress.
118 The atom's special stage ASTAGE_CAPTURE_WAIT introduces a kind of atom's
119 long-term locking, which makes reiser4 locking scheme more complex. It had
120 deadlocks until we implement deadlock avoidance algorithms. That deadlocks
121 looked as the following: one stopped thread waits for a long-term lock on
122 znode, the thread who owns that lock waits when fusion with another atom will
123 be allowed.
125 The source of the deadlocks is an optimization of not capturing index nodes
126 for read. Let's prove it. Suppose we have dumb node capturing scheme which
127 unconditionally captures each block before locking it.
129 That scheme has no deadlocks. Let's begin with the thread which stage is
130 ASTAGE_CAPTURE_WAIT and it waits for a znode lock. The thread can't wait for
131 a capture because it's stage allows fusion with any atom except which are
132 being committed currently. A process of atom commit can't deadlock because
133 atom commit procedure does not acquire locks and does not fuse with other
134 atoms. Reiser4 does capturing right before going to sleep inside the
135 longtertm_lock_znode() function, it means the znode which we want to lock is
136 already captured and its atom is in ASTAGE_CAPTURE_WAIT stage. If we
137 continue the analysis we understand that no one process in the sequence may
138 waits atom fusion. Thereby there are no deadlocks of described kind.
140 The capturing optimization makes the deadlocks possible. A thread can wait a
141 lock which owner did not captured that node. The lock owner's current atom
142 is not fused with the first atom and it does not get a ASTAGE_CAPTURE_WAIT
143 state. A deadlock is possible when that atom meets another one which is in
144 ASTAGE_CAPTURE_WAIT already.
146 The deadlock avoidance scheme includes two algorithms:
148 First algorithm is used when a thread captures a node which is locked but not
149 captured by another thread. Those nodes are marked MISSED_IN_CAPTURE at the
150 moment we skip their capturing. If such a node (marked MISSED_IN_CAPTURE) is
151 being captured by a thread with current atom is in ASTAGE_CAPTURE_WAIT, the
152 routine which forces all lock owners to join with current atom is executed.
154 Second algorithm does not allow to skip capturing of already captured nodes.
156 Both algorithms together prevent waiting a longterm lock without atom fusion
157 with atoms of all lock owners, which is a key thing for getting atom/znode
158 locking deadlocks.
162 * Transactions and mmap(2).
164 * 1. Transactions are not supported for accesses through mmap(2), because
165 * this would effectively amount to user-level transactions whose duration
166 * is beyond control of the kernel.
168 * 2. That said, we still want to preserve some decency with regard to
169 * mmap(2). During normal write(2) call, following sequence of events
170 * happens:
172 * 1. page is created;
174 * 2. jnode is created, dirtied and captured into current atom.
176 * 3. extent is inserted and modified.
178 * Steps (2) and (3) take place under long term lock on the twig node.
180 * When file is accessed through mmap(2) page is always created during
181 * page fault.
182 * After this (in reiser4_readpage()->reiser4_readpage_extent()):
184 * 1. if access is made to non-hole page new jnode is created, (if
185 * necessary)
187 * 2. if access is made to the hole page, jnode is not created (XXX
188 * not clear why).
190 * Also, even if page is created by write page fault it is not marked
191 * dirty immediately by handle_mm_fault(). Probably this is to avoid races
192 * with page write-out.
194 * Dirty bit installed by hardware is only transferred to the struct page
195 * later, when page is unmapped (in zap_pte_range(), or
196 * try_to_unmap_one()).
198 * So, with mmap(2) we have to handle following irksome situations:
200 * 1. there exists modified page (clean or dirty) without jnode
202 * 2. there exists modified page (clean or dirty) with clean jnode
204 * 3. clean page which is a part of atom can be transparently modified
205 * at any moment through mapping without becoming dirty.
207 * (1) and (2) can lead to the out-of-memory situation: ->writepage()
208 * doesn't know what to do with such pages and ->sync_sb()/->writepages()
209 * don't see them, because these methods operate on atoms.
211 * (3) can lead to the loss of data: suppose we have dirty page with dirty
212 * captured jnode captured by some atom. As part of early flush (for
213 * example) page was written out. Dirty bit was cleared on both page and
214 * jnode. After this page is modified through mapping, but kernel doesn't
215 * notice and just discards page and jnode as part of commit. (XXX
216 * actually it doesn't, because to reclaim page ->releasepage() has to be
217 * called and before this dirty bit will be transferred to the struct
218 * page).
222 #include "debug.h"
223 #include "txnmgr.h"
224 #include "jnode.h"
225 #include "znode.h"
226 #include "block_alloc.h"
227 #include "tree.h"
228 #include "wander.h"
229 #include "ktxnmgrd.h"
230 #include "super.h"
231 #include "page_cache.h"
232 #include "reiser4.h"
233 #include "vfs_ops.h"
234 #include "inode.h"
235 #include "flush.h"
237 #include <asm/atomic.h>
238 #include <linux/types.h>
239 #include <linux/fs.h>
240 #include <linux/mm.h>
241 #include <linux/slab.h>
242 #include <linux/pagemap.h>
243 #include <linux/writeback.h>
244 #include <linux/swap.h> /* for totalram_pages */
246 static void atom_free(txn_atom * atom);
248 static int commit_txnh(txn_handle * txnh);
250 static void wakeup_atom_waitfor_list(txn_atom * atom);
251 static void wakeup_atom_waiting_list(txn_atom * atom);
253 static void capture_assign_txnh_nolock(txn_atom * atom, txn_handle * txnh);
255 static void capture_assign_block_nolock(txn_atom * atom, jnode * node);
257 static void fuse_not_fused_lock_owners(txn_handle * txnh, znode * node);
259 static int capture_init_fusion(jnode * node, txn_handle * txnh,
260 txn_capture mode);
262 static int capture_fuse_wait(txn_handle *, txn_atom *, txn_atom *, txn_capture);
264 static void capture_fuse_into(txn_atom * small, txn_atom * large);
266 void reiser4_invalidate_list(struct list_head *);
268 /* GENERIC STRUCTURES */
270 typedef struct _txn_wait_links txn_wait_links;
272 struct _txn_wait_links {
273 lock_stack *_lock_stack;
274 struct list_head _fwaitfor_link;
275 struct list_head _fwaiting_link;
276 int (*waitfor_cb) (txn_atom * atom, struct _txn_wait_links * wlinks);
277 int (*waiting_cb) (txn_atom * atom, struct _txn_wait_links * wlinks);
280 /* FIXME: In theory, we should be using the slab cache init & destructor
281 methods instead of, e.g., jnode_init, etc. */
282 static struct kmem_cache *_atom_slab = NULL;
283 /* this is for user-visible, cross system-call transactions. */
284 static struct kmem_cache *_txnh_slab = NULL;
287 * init_txnmgr_static - create transaction manager slab caches
289 * Initializes caches of txn-atoms and txn_handle. It is part of reiser4 module
290 * initialization.
292 int init_txnmgr_static(void)
294 assert("jmacd-600", _atom_slab == NULL);
295 assert("jmacd-601", _txnh_slab == NULL);
297 ON_DEBUG(atomic_set(&flush_cnt, 0));
299 _atom_slab = kmem_cache_create("txn_atom", sizeof(txn_atom), 0,
300 SLAB_HWCACHE_ALIGN |
301 SLAB_RECLAIM_ACCOUNT, NULL);
302 if (_atom_slab == NULL)
303 return RETERR(-ENOMEM);
305 _txnh_slab = kmem_cache_create("txn_handle", sizeof(txn_handle), 0,
306 SLAB_HWCACHE_ALIGN, NULL);
307 if (_txnh_slab == NULL) {
308 kmem_cache_destroy(_atom_slab);
309 _atom_slab = NULL;
310 return RETERR(-ENOMEM);
313 return 0;
317 * done_txnmgr_static - delete txn_atom and txn_handle caches
319 * This is called on reiser4 module unloading or system shutdown.
321 void done_txnmgr_static(void)
323 destroy_reiser4_cache(&_atom_slab);
324 destroy_reiser4_cache(&_txnh_slab);
328 * init_txnmgr - initialize a new transaction manager
329 * @mgr: pointer to transaction manager embedded in reiser4 super block
331 * This is called on mount. Makes necessary initializations.
333 void reiser4_init_txnmgr(txn_mgr *mgr)
335 assert("umka-169", mgr != NULL);
337 mgr->atom_count = 0;
338 mgr->id_count = 1;
339 INIT_LIST_HEAD(&mgr->atoms_list);
340 spin_lock_init(&mgr->tmgr_lock);
341 mutex_init(&mgr->commit_mutex);
345 * reiser4_done_txnmgr - stop transaction manager
346 * @mgr: pointer to transaction manager embedded in reiser4 super block
348 * This is called on umount. Does sanity checks.
350 void reiser4_done_txnmgr(txn_mgr *mgr)
352 assert("umka-170", mgr != NULL);
353 assert("umka-1701", list_empty_careful(&mgr->atoms_list));
354 assert("umka-1702", mgr->atom_count == 0);
357 /* Initialize a transaction handle. */
358 /* Audited by: umka (2002.06.13) */
359 static void txnh_init(txn_handle * txnh, txn_mode mode)
361 assert("umka-171", txnh != NULL);
363 txnh->mode = mode;
364 txnh->atom = NULL;
365 reiser4_ctx_gfp_mask_set();
366 txnh->flags = 0;
367 spin_lock_init(&txnh->hlock);
368 INIT_LIST_HEAD(&txnh->txnh_link);
371 #if REISER4_DEBUG
372 /* Check if a transaction handle is clean. */
373 static int txnh_isclean(txn_handle * txnh)
375 assert("umka-172", txnh != NULL);
376 return txnh->atom == NULL &&
377 LOCK_CNT_NIL(spin_locked_txnh);
379 #endif
381 /* Initialize an atom. */
382 static void atom_init(txn_atom * atom)
384 int level;
386 assert("umka-173", atom != NULL);
388 memset(atom, 0, sizeof(txn_atom));
390 atom->stage = ASTAGE_FREE;
391 atom->start_time = jiffies;
393 for (level = 0; level < REAL_MAX_ZTREE_HEIGHT + 1; level += 1)
394 INIT_LIST_HEAD(ATOM_DIRTY_LIST(atom, level));
396 INIT_LIST_HEAD(ATOM_CLEAN_LIST(atom));
397 INIT_LIST_HEAD(ATOM_OVRWR_LIST(atom));
398 INIT_LIST_HEAD(ATOM_WB_LIST(atom));
399 INIT_LIST_HEAD(&atom->inodes);
400 spin_lock_init(&(atom->alock));
401 /* list of transaction handles */
402 INIT_LIST_HEAD(&atom->txnh_list);
403 /* link to transaction manager's list of atoms */
404 INIT_LIST_HEAD(&atom->atom_link);
405 INIT_LIST_HEAD(&atom->fwaitfor_list);
406 INIT_LIST_HEAD(&atom->fwaiting_list);
407 blocknr_set_init(&atom->delete_set);
408 blocknr_set_init(&atom->wandered_map);
410 init_atom_fq_parts(atom);
413 #if REISER4_DEBUG
414 /* Check if an atom is clean. */
415 static int atom_isclean(txn_atom * atom)
417 int level;
419 assert("umka-174", atom != NULL);
421 for (level = 0; level < REAL_MAX_ZTREE_HEIGHT + 1; level += 1) {
422 if (!list_empty_careful(ATOM_DIRTY_LIST(atom, level))) {
423 return 0;
427 return atom->stage == ASTAGE_FREE &&
428 atom->txnh_count == 0 &&
429 atom->capture_count == 0 &&
430 atomic_read(&atom->refcount) == 0 &&
431 (&atom->atom_link == atom->atom_link.next &&
432 &atom->atom_link == atom->atom_link.prev) &&
433 list_empty_careful(&atom->txnh_list) &&
434 list_empty_careful(ATOM_CLEAN_LIST(atom)) &&
435 list_empty_careful(ATOM_OVRWR_LIST(atom)) &&
436 list_empty_careful(ATOM_WB_LIST(atom)) &&
437 list_empty_careful(&atom->fwaitfor_list) &&
438 list_empty_careful(&atom->fwaiting_list) &&
439 atom_fq_parts_are_clean(atom);
441 #endif
443 /* Begin a transaction in this context. Currently this uses the reiser4_context's
444 trans_in_ctx, which means that transaction handles are stack-allocated. Eventually
445 this will be extended to allow transaction handles to span several contexts. */
446 /* Audited by: umka (2002.06.13) */
447 void reiser4_txn_begin(reiser4_context * context)
449 assert("jmacd-544", context->trans == NULL);
451 context->trans = &context->trans_in_ctx;
453 /* FIXME_LATER_JMACD Currently there's no way to begin a TXN_READ_FUSING
454 transcrash. Default should be TXN_WRITE_FUSING. Also, the _trans variable is
455 stack allocated right now, but we would like to allow for dynamically allocated
456 transcrashes that span multiple system calls.
458 txnh_init(context->trans, TXN_WRITE_FUSING);
461 /* Finish a transaction handle context. */
462 int reiser4_txn_end(reiser4_context * context)
464 long ret = 0;
465 txn_handle *txnh;
467 assert("umka-283", context != NULL);
468 assert("nikita-3012", reiser4_schedulable());
469 assert("vs-24", context == get_current_context());
470 assert("nikita-2967", lock_stack_isclean(get_current_lock_stack()));
472 txnh = context->trans;
473 if (txnh != NULL) {
474 if (txnh->atom != NULL)
475 ret = commit_txnh(txnh);
476 assert("jmacd-633", txnh_isclean(txnh));
477 context->trans = NULL;
479 return ret;
482 void reiser4_txn_restart(reiser4_context * context)
484 reiser4_txn_end(context);
485 reiser4_preempt_point();
486 reiser4_txn_begin(context);
489 void reiser4_txn_restart_current(void)
491 reiser4_txn_restart(get_current_context());
494 /* TXN_ATOM */
496 /* Get the atom belonging to a txnh, which is not locked. Return txnh locked. Locks atom, if atom
497 is not NULL. This performs the necessary spin_trylock to break the lock-ordering cycle. May
498 return NULL. */
499 static txn_atom *txnh_get_atom(txn_handle * txnh)
501 txn_atom *atom;
503 assert("umka-180", txnh != NULL);
504 assert_spin_not_locked(&(txnh->hlock));
506 while (1) {
507 spin_lock_txnh(txnh);
508 atom = txnh->atom;
510 if (atom == NULL)
511 break;
513 if (spin_trylock_atom(atom))
514 break;
516 atomic_inc(&atom->refcount);
518 spin_unlock_txnh(txnh);
519 spin_lock_atom(atom);
520 spin_lock_txnh(txnh);
522 if (txnh->atom == atom) {
523 atomic_dec(&atom->refcount);
524 break;
527 spin_unlock_txnh(txnh);
528 atom_dec_and_unlock(atom);
531 return atom;
534 /* Get the current atom and spinlock it if current atom present. May return NULL */
535 txn_atom *get_current_atom_locked_nocheck(void)
537 reiser4_context *cx;
538 txn_atom *atom;
539 txn_handle *txnh;
541 cx = get_current_context();
542 assert("zam-437", cx != NULL);
544 txnh = cx->trans;
545 assert("zam-435", txnh != NULL);
547 atom = txnh_get_atom(txnh);
549 spin_unlock_txnh(txnh);
550 return atom;
553 /* Get the atom belonging to a jnode, which is initially locked. Return with
554 both jnode and atom locked. This performs the necessary spin_trylock to
555 break the lock-ordering cycle. Assumes the jnode is already locked, and
556 returns NULL if atom is not set. */
557 txn_atom *jnode_get_atom(jnode * node)
559 txn_atom *atom;
561 assert("umka-181", node != NULL);
563 while (1) {
564 assert_spin_locked(&(node->guard));
566 atom = node->atom;
567 /* node is not in any atom */
568 if (atom == NULL)
569 break;
571 /* If atom is not locked, grab the lock and return */
572 if (spin_trylock_atom(atom))
573 break;
575 /* At least one jnode belongs to this atom it guarantees that
576 * atom->refcount > 0, we can safely increment refcount. */
577 atomic_inc(&atom->refcount);
578 spin_unlock_jnode(node);
580 /* re-acquire spin locks in the right order */
581 spin_lock_atom(atom);
582 spin_lock_jnode(node);
584 /* check if node still points to the same atom. */
585 if (node->atom == atom) {
586 atomic_dec(&atom->refcount);
587 break;
590 /* releasing of atom lock and reference requires not holding
591 * locks on jnodes. */
592 spin_unlock_jnode(node);
594 /* We do not sure that this atom has extra references except our
595 * one, so we should call proper function which may free atom if
596 * last reference is released. */
597 atom_dec_and_unlock(atom);
599 /* lock jnode again for getting valid node->atom pointer
600 * value. */
601 spin_lock_jnode(node);
604 return atom;
607 /* Returns true if @node is dirty and part of the same atom as one of its neighbors. Used
608 by flush code to indicate whether the next node (in some direction) is suitable for
609 flushing. */
611 same_slum_check(jnode * node, jnode * check, int alloc_check, int alloc_value)
613 int compat;
614 txn_atom *atom;
616 assert("umka-182", node != NULL);
617 assert("umka-183", check != NULL);
619 /* Not sure what this function is supposed to do if supplied with @check that is
620 neither formatted nor unformatted (bitmap or so). */
621 assert("nikita-2373", jnode_is_znode(check)
622 || jnode_is_unformatted(check));
624 /* Need a lock on CHECK to get its atom and to check various state bits.
625 Don't need a lock on NODE once we get the atom lock. */
626 /* It is not enough to lock two nodes and check (node->atom ==
627 check->atom) because atom could be locked and being fused at that
628 moment, jnodes of the atom of that state (being fused) can point to
629 different objects, but the atom is the same. */
630 spin_lock_jnode(check);
632 atom = jnode_get_atom(check);
634 if (atom == NULL) {
635 compat = 0;
636 } else {
637 compat = (node->atom == atom && JF_ISSET(check, JNODE_DIRTY));
639 if (compat && jnode_is_znode(check)) {
640 compat &= znode_is_connected(JZNODE(check));
643 if (compat && alloc_check) {
644 compat &= (alloc_value == jnode_is_flushprepped(check));
647 spin_unlock_atom(atom);
650 spin_unlock_jnode(check);
652 return compat;
655 /* Decrement the atom's reference count and if it falls to zero, free it. */
656 void atom_dec_and_unlock(txn_atom * atom)
658 txn_mgr *mgr = &get_super_private(reiser4_get_current_sb())->tmgr;
660 assert("umka-186", atom != NULL);
661 assert_spin_locked(&(atom->alock));
662 assert("zam-1039", atomic_read(&atom->refcount) > 0);
664 if (atomic_dec_and_test(&atom->refcount)) {
665 /* take txnmgr lock and atom lock in proper order. */
666 if (!spin_trylock_txnmgr(mgr)) {
667 /* This atom should exist after we re-acquire its
668 * spinlock, so we increment its reference counter. */
669 atomic_inc(&atom->refcount);
670 spin_unlock_atom(atom);
671 spin_lock_txnmgr(mgr);
672 spin_lock_atom(atom);
674 if (!atomic_dec_and_test(&atom->refcount)) {
675 spin_unlock_atom(atom);
676 spin_unlock_txnmgr(mgr);
677 return;
680 assert_spin_locked(&(mgr->tmgr_lock));
681 atom_free(atom);
682 spin_unlock_txnmgr(mgr);
683 } else
684 spin_unlock_atom(atom);
687 /* Create new atom and connect it to given transaction handle. This adds the
688 atom to the transaction manager's list and sets its reference count to 1, an
689 artificial reference which is kept until it commits. We play strange games
690 to avoid allocation under jnode & txnh spinlocks.*/
692 static int atom_begin_and_assign_to_txnh(txn_atom ** atom_alloc, txn_handle * txnh)
694 txn_atom *atom;
695 txn_mgr *mgr;
697 if (REISER4_DEBUG && rofs_tree(current_tree)) {
698 warning("nikita-3366", "Creating atom on rofs");
699 dump_stack();
702 if (*atom_alloc == NULL) {
703 (*atom_alloc) = kmem_cache_alloc(_atom_slab,
704 reiser4_ctx_gfp_mask_get());
706 if (*atom_alloc == NULL)
707 return RETERR(-ENOMEM);
710 /* and, also, txnmgr spin lock should be taken before jnode and txnh
711 locks. */
712 mgr = &get_super_private(reiser4_get_current_sb())->tmgr;
713 spin_lock_txnmgr(mgr);
714 spin_lock_txnh(txnh);
716 /* Check whether new atom still needed */
717 if (txnh->atom != NULL) {
718 /* NOTE-NIKITA probably it is rather better to free
719 * atom_alloc here than thread it up to reiser4_try_capture() */
721 spin_unlock_txnh(txnh);
722 spin_unlock_txnmgr(mgr);
724 return -E_REPEAT;
727 atom = *atom_alloc;
728 *atom_alloc = NULL;
730 atom_init(atom);
732 assert("jmacd-17", atom_isclean(atom));
735 * lock ordering is broken here. It is ok, as long as @atom is new
736 * and inaccessible for others. We can't use spin_lock_atom or
737 * spin_lock(&atom->alock) because they care about locking
738 * dependencies. spin_trylock_lock doesn't.
740 check_me("", spin_trylock_atom(atom));
742 /* add atom to the end of transaction manager's list of atoms */
743 list_add_tail(&atom->atom_link, &mgr->atoms_list);
744 atom->atom_id = mgr->id_count++;
745 mgr->atom_count += 1;
747 /* Release txnmgr lock */
748 spin_unlock_txnmgr(mgr);
750 /* One reference until it commits. */
751 atomic_inc(&atom->refcount);
752 atom->stage = ASTAGE_CAPTURE_FUSE;
753 atom->super = reiser4_get_current_sb();
754 capture_assign_txnh_nolock(atom, txnh);
756 spin_unlock_atom(atom);
757 spin_unlock_txnh(txnh);
759 return -E_REPEAT;
762 /* Return true if an atom is currently "open". */
763 static int atom_isopen(const txn_atom * atom)
765 assert("umka-185", atom != NULL);
767 return atom->stage > 0 && atom->stage < ASTAGE_PRE_COMMIT;
770 /* Return the number of pointers to this atom that must be updated during fusion. This
771 approximates the amount of work to be done. Fusion chooses the atom with fewer
772 pointers to fuse into the atom with more pointers. */
773 static int atom_pointer_count(const txn_atom * atom)
775 assert("umka-187", atom != NULL);
777 /* This is a measure of the amount of work needed to fuse this atom
778 * into another. */
779 return atom->txnh_count + atom->capture_count;
782 /* Called holding the atom lock, this removes the atom from the transaction manager list
783 and frees it. */
784 static void atom_free(txn_atom * atom)
786 txn_mgr *mgr = &get_super_private(reiser4_get_current_sb())->tmgr;
788 assert("umka-188", atom != NULL);
789 assert_spin_locked(&(atom->alock));
791 /* Remove from the txn_mgr's atom list */
792 assert_spin_locked(&(mgr->tmgr_lock));
793 mgr->atom_count -= 1;
794 list_del_init(&atom->atom_link);
796 /* Clean the atom */
797 assert("jmacd-16",
798 (atom->stage == ASTAGE_INVALID || atom->stage == ASTAGE_DONE));
799 atom->stage = ASTAGE_FREE;
801 blocknr_set_destroy(&atom->delete_set);
802 blocknr_set_destroy(&atom->wandered_map);
804 assert("jmacd-16", atom_isclean(atom));
806 spin_unlock_atom(atom);
808 kmem_cache_free(_atom_slab, atom);
811 static int atom_is_dotard(const txn_atom * atom)
813 return time_after(jiffies, atom->start_time +
814 get_current_super_private()->tmgr.atom_max_age);
817 static int atom_can_be_committed(txn_atom * atom)
819 assert_spin_locked(&(atom->alock));
820 assert("zam-885", atom->txnh_count > atom->nr_waiters);
821 return atom->txnh_count == atom->nr_waiters + 1;
824 /* Return true if an atom should commit now. This is determined by aging, atom
825 size or atom flags. */
826 static int atom_should_commit(const txn_atom * atom)
828 assert("umka-189", atom != NULL);
829 return
830 (atom->flags & ATOM_FORCE_COMMIT) ||
831 ((unsigned)atom_pointer_count(atom) >
832 get_current_super_private()->tmgr.atom_max_size)
833 || atom_is_dotard(atom);
836 /* return 1 if current atom exists and requires commit. */
837 int current_atom_should_commit(void)
839 txn_atom *atom;
840 int result = 0;
842 atom = get_current_atom_locked_nocheck();
843 if (atom) {
844 result = atom_should_commit(atom);
845 spin_unlock_atom(atom);
847 return result;
850 static int atom_should_commit_asap(const txn_atom * atom)
852 unsigned int captured;
853 unsigned int pinnedpages;
855 assert("nikita-3309", atom != NULL);
857 captured = (unsigned)atom->capture_count;
858 pinnedpages = (captured >> PAGE_CACHE_SHIFT) * sizeof(znode);
860 return (pinnedpages > (totalram_pages >> 3)) || (atom->flushed > 100);
863 static jnode *find_first_dirty_in_list(struct list_head *head, int flags)
865 jnode *first_dirty;
867 list_for_each_entry(first_dirty, head, capture_link) {
868 if (!(flags & JNODE_FLUSH_COMMIT)) {
870 * skip jnodes which "heard banshee" or having active
871 * I/O
873 if (JF_ISSET(first_dirty, JNODE_HEARD_BANSHEE) ||
874 JF_ISSET(first_dirty, JNODE_WRITEBACK))
875 continue;
877 return first_dirty;
879 return NULL;
882 /* Get first dirty node from the atom's dirty_nodes[n] lists; return NULL if atom has no dirty
883 nodes on atom's lists */
884 jnode *find_first_dirty_jnode(txn_atom * atom, int flags)
886 jnode *first_dirty;
887 tree_level level;
889 assert_spin_locked(&(atom->alock));
891 /* The flush starts from LEAF_LEVEL (=1). */
892 for (level = 1; level < REAL_MAX_ZTREE_HEIGHT + 1; level += 1) {
893 if (list_empty_careful(ATOM_DIRTY_LIST(atom, level)))
894 continue;
896 first_dirty =
897 find_first_dirty_in_list(ATOM_DIRTY_LIST(atom, level),
898 flags);
899 if (first_dirty)
900 return first_dirty;
903 /* znode-above-root is on the list #0. */
904 return find_first_dirty_in_list(ATOM_DIRTY_LIST(atom, 0), flags);
907 static void dispatch_wb_list(txn_atom * atom, flush_queue_t * fq)
909 jnode *cur;
911 assert("zam-905", atom_is_protected(atom));
913 cur = list_entry(ATOM_WB_LIST(atom)->next, jnode, capture_link);
914 while (ATOM_WB_LIST(atom) != &cur->capture_link) {
915 jnode *next = list_entry(cur->capture_link.next, jnode, capture_link);
917 spin_lock_jnode(cur);
918 if (!JF_ISSET(cur, JNODE_WRITEBACK)) {
919 if (JF_ISSET(cur, JNODE_DIRTY)) {
920 queue_jnode(fq, cur);
921 } else {
922 /* move jnode to atom's clean list */
923 list_move_tail(&cur->capture_link,
924 ATOM_CLEAN_LIST(atom));
927 spin_unlock_jnode(cur);
929 cur = next;
933 /* Scan current atom->writeback_nodes list, re-submit dirty and !writeback
934 * jnodes to disk. */
935 static int submit_wb_list(void)
937 int ret;
938 flush_queue_t *fq;
940 fq = get_fq_for_current_atom();
941 if (IS_ERR(fq))
942 return PTR_ERR(fq);
944 dispatch_wb_list(fq->atom, fq);
945 spin_unlock_atom(fq->atom);
947 ret = reiser4_write_fq(fq, NULL, 1);
948 reiser4_fq_put(fq);
950 return ret;
953 /* Wait completion of all writes, re-submit atom writeback list if needed. */
954 static int current_atom_complete_writes(void)
956 int ret;
958 /* Each jnode from that list was modified and dirtied when it had i/o
959 * request running already. After i/o completion we have to resubmit
960 * them to disk again.*/
961 ret = submit_wb_list();
962 if (ret < 0)
963 return ret;
965 /* Wait all i/o completion */
966 ret = current_atom_finish_all_fq();
967 if (ret)
968 return ret;
970 /* Scan wb list again; all i/o should be completed, we re-submit dirty
971 * nodes to disk */
972 ret = submit_wb_list();
973 if (ret < 0)
974 return ret;
976 /* Wait all nodes we just submitted */
977 return current_atom_finish_all_fq();
980 #if REISER4_DEBUG
982 static void reiser4_info_atom(const char *prefix, const txn_atom * atom)
984 if (atom == NULL) {
985 printk("%s: no atom\n", prefix);
986 return;
989 printk("%s: refcount: %i id: %i flags: %x txnh_count: %i"
990 " capture_count: %i stage: %x start: %lu, flushed: %i\n", prefix,
991 atomic_read(&atom->refcount), atom->atom_id, atom->flags,
992 atom->txnh_count, atom->capture_count, atom->stage,
993 atom->start_time, atom->flushed);
996 #else /* REISER4_DEBUG */
998 static inline void reiser4_info_atom(const char *prefix, const txn_atom * atom) {}
1000 #endif /* REISER4_DEBUG */
1002 #define TOOMANYFLUSHES (1 << 13)
1004 /* Called with the atom locked and no open "active" transaction handlers except
1005 ours, this function calls flush_current_atom() until all dirty nodes are
1006 processed. Then it initiates commit processing.
1008 Called by the single remaining open "active" txnh, which is closing. Other
1009 open txnhs belong to processes which wait atom commit in commit_txnh()
1010 routine. They are counted as "waiters" in atom->nr_waiters. Therefore as
1011 long as we hold the atom lock none of the jnodes can be captured and/or
1012 locked.
1014 Return value is an error code if commit fails.
1016 static int commit_current_atom(long *nr_submitted, txn_atom ** atom)
1018 reiser4_super_info_data *sbinfo = get_current_super_private();
1019 long ret = 0;
1020 /* how many times jnode_flush() was called as a part of attempt to
1021 * commit this atom. */
1022 int flushiters;
1024 assert("zam-888", atom != NULL && *atom != NULL);
1025 assert_spin_locked(&((*atom)->alock));
1026 assert("zam-887", get_current_context()->trans->atom == *atom);
1027 assert("jmacd-151", atom_isopen(*atom));
1029 assert("nikita-3184",
1030 get_current_super_private()->delete_mutex_owner != current);
1032 for (flushiters = 0;; ++flushiters) {
1033 ret =
1034 flush_current_atom(JNODE_FLUSH_WRITE_BLOCKS |
1035 JNODE_FLUSH_COMMIT,
1036 LONG_MAX /* nr_to_write */ ,
1037 nr_submitted, atom, NULL);
1038 if (ret != -E_REPEAT)
1039 break;
1041 /* if atom's dirty list contains one znode which is
1042 HEARD_BANSHEE and is locked we have to allow lock owner to
1043 continue and uncapture that znode */
1044 reiser4_preempt_point();
1046 *atom = get_current_atom_locked();
1047 if (flushiters > TOOMANYFLUSHES && IS_POW(flushiters)) {
1048 warning("nikita-3176",
1049 "Flushing like mad: %i", flushiters);
1050 reiser4_info_atom("atom", *atom);
1051 DEBUGON(flushiters > (1 << 20));
1055 if (ret)
1056 return ret;
1058 assert_spin_locked(&((*atom)->alock));
1060 if (!atom_can_be_committed(*atom)) {
1061 spin_unlock_atom(*atom);
1062 return RETERR(-E_REPEAT);
1065 if ((*atom)->capture_count == 0)
1066 goto done;
1068 /* Up to this point we have been flushing and after flush is called we
1069 return -E_REPEAT. Now we can commit. We cannot return -E_REPEAT
1070 at this point, commit should be successful. */
1071 reiser4_atom_set_stage(*atom, ASTAGE_PRE_COMMIT);
1072 ON_DEBUG(((*atom)->committer = current));
1073 spin_unlock_atom(*atom);
1075 ret = current_atom_complete_writes();
1076 if (ret)
1077 return ret;
1079 assert("zam-906", list_empty(ATOM_WB_LIST(*atom)));
1081 /* isolate critical code path which should be executed by only one
1082 * thread using tmgr mutex */
1083 mutex_lock(&sbinfo->tmgr.commit_mutex);
1085 ret = reiser4_write_logs(nr_submitted);
1086 if (ret < 0)
1087 reiser4_panic("zam-597", "write log failed (%ld)\n", ret);
1089 /* The atom->ovrwr_nodes list is processed under commit mutex held
1090 because of bitmap nodes which are captured by special way in
1091 reiser4_pre_commit_hook_bitmap(), that way does not include
1092 capture_fuse_wait() as a capturing of other nodes does -- the commit
1093 mutex is used for transaction isolation instead. */
1094 reiser4_invalidate_list(ATOM_OVRWR_LIST(*atom));
1095 mutex_unlock(&sbinfo->tmgr.commit_mutex);
1097 reiser4_invalidate_list(ATOM_CLEAN_LIST(*atom));
1098 reiser4_invalidate_list(ATOM_WB_LIST(*atom));
1099 assert("zam-927", list_empty(&(*atom)->inodes));
1101 spin_lock_atom(*atom);
1102 done:
1103 reiser4_atom_set_stage(*atom, ASTAGE_DONE);
1104 ON_DEBUG((*atom)->committer = NULL);
1106 /* Atom's state changes, so wake up everybody waiting for this
1107 event. */
1108 wakeup_atom_waiting_list(*atom);
1110 /* Decrement the "until commit" reference, at least one txnh (the caller) is
1111 still open. */
1112 atomic_dec(&(*atom)->refcount);
1114 assert("jmacd-1070", atomic_read(&(*atom)->refcount) > 0);
1115 assert("jmacd-1062", (*atom)->capture_count == 0);
1116 BUG_ON((*atom)->capture_count != 0);
1117 assert_spin_locked(&((*atom)->alock));
1119 return ret;
1122 /* TXN_TXNH */
1125 * force_commit_atom - commit current atom and wait commit completion
1126 * @txnh:
1128 * Commits current atom and wait commit completion; current atom and @txnh have
1129 * to be spinlocked before call, this function unlocks them on exit.
1131 int force_commit_atom(txn_handle *txnh)
1133 txn_atom *atom;
1135 assert("zam-837", txnh != NULL);
1136 assert_spin_locked(&(txnh->hlock));
1137 assert("nikita-2966", lock_stack_isclean(get_current_lock_stack()));
1139 atom = txnh->atom;
1141 assert("zam-834", atom != NULL);
1142 assert_spin_locked(&(atom->alock));
1145 * Set flags for atom and txnh: forcing atom commit and waiting for
1146 * commit completion
1148 txnh->flags |= TXNH_WAIT_COMMIT;
1149 atom->flags |= ATOM_FORCE_COMMIT;
1151 spin_unlock_txnh(txnh);
1152 spin_unlock_atom(atom);
1154 /* commit is here */
1155 reiser4_txn_restart_current();
1156 return 0;
1159 /* Called to force commit of any outstanding atoms. @commit_all_atoms controls
1160 * should we commit all atoms including new ones which are created after this
1161 * functions is called. */
1162 int txnmgr_force_commit_all(struct super_block *super, int commit_all_atoms)
1164 int ret;
1165 txn_atom *atom;
1166 txn_mgr *mgr;
1167 txn_handle *txnh;
1168 unsigned long start_time = jiffies;
1169 reiser4_context *ctx = get_current_context();
1171 assert("nikita-2965", lock_stack_isclean(get_current_lock_stack()));
1172 assert("nikita-3058", reiser4_commit_check_locks());
1174 reiser4_txn_restart_current();
1176 mgr = &get_super_private(super)->tmgr;
1178 txnh = ctx->trans;
1180 again:
1182 spin_lock_txnmgr(mgr);
1184 list_for_each_entry(atom, &mgr->atoms_list, atom_link) {
1185 spin_lock_atom(atom);
1187 /* Commit any atom which can be committed. If @commit_new_atoms
1188 * is not set we commit only atoms which were created before
1189 * this call is started. */
1190 if (commit_all_atoms
1191 || time_before_eq(atom->start_time, start_time)) {
1192 if (atom->stage <= ASTAGE_POST_COMMIT) {
1193 spin_unlock_txnmgr(mgr);
1195 if (atom->stage < ASTAGE_PRE_COMMIT) {
1196 spin_lock_txnh(txnh);
1197 /* Add force-context txnh */
1198 capture_assign_txnh_nolock(atom, txnh);
1199 ret = force_commit_atom(txnh);
1200 if (ret)
1201 return ret;
1202 } else
1203 /* wait atom commit */
1204 reiser4_atom_wait_event(atom);
1206 goto again;
1210 spin_unlock_atom(atom);
1213 #if REISER4_DEBUG
1214 if (commit_all_atoms) {
1215 reiser4_super_info_data *sbinfo = get_super_private(super);
1216 spin_lock_reiser4_super(sbinfo);
1217 assert("zam-813",
1218 sbinfo->blocks_fake_allocated_unformatted == 0);
1219 assert("zam-812", sbinfo->blocks_fake_allocated == 0);
1220 spin_unlock_reiser4_super(sbinfo);
1222 #endif
1224 spin_unlock_txnmgr(mgr);
1226 return 0;
1229 /* check whether commit_some_atoms() can commit @atom. Locking is up to the
1230 * caller */
1231 static int atom_is_committable(txn_atom * atom)
1233 return
1234 atom->stage < ASTAGE_PRE_COMMIT &&
1235 atom->txnh_count == atom->nr_waiters && atom_should_commit(atom);
1238 /* called periodically from ktxnmgrd to commit old atoms. Releases ktxnmgrd spin
1239 * lock at exit */
1240 int commit_some_atoms(txn_mgr * mgr)
1242 int ret = 0;
1243 txn_atom *atom;
1244 txn_handle *txnh;
1245 reiser4_context *ctx;
1246 struct list_head *pos, *tmp;
1248 ctx = get_current_context();
1249 assert("nikita-2444", ctx != NULL);
1251 txnh = ctx->trans;
1252 spin_lock_txnmgr(mgr);
1255 * this is to avoid gcc complain that atom might be used
1256 * uninitialized
1258 atom = NULL;
1260 /* look for atom to commit */
1261 list_for_each_safe(pos, tmp, &mgr->atoms_list) {
1262 atom = list_entry(pos, txn_atom, atom_link);
1264 * first test without taking atom spin lock, whether it is
1265 * eligible for committing at all
1267 if (atom_is_committable(atom)) {
1268 /* now, take spin lock and re-check */
1269 spin_lock_atom(atom);
1270 if (atom_is_committable(atom))
1271 break;
1272 spin_unlock_atom(atom);
1276 ret = (&mgr->atoms_list == pos);
1277 spin_unlock_txnmgr(mgr);
1279 if (ret) {
1280 /* nothing found */
1281 spin_unlock(&mgr->daemon->guard);
1282 return 0;
1285 spin_lock_txnh(txnh);
1287 BUG_ON(atom == NULL);
1288 /* Set the atom to force committing */
1289 atom->flags |= ATOM_FORCE_COMMIT;
1291 /* Add force-context txnh */
1292 capture_assign_txnh_nolock(atom, txnh);
1294 spin_unlock_txnh(txnh);
1295 spin_unlock_atom(atom);
1297 /* we are about to release daemon spin lock, notify daemon it
1298 has to rescan atoms */
1299 mgr->daemon->rescan = 1;
1300 spin_unlock(&mgr->daemon->guard);
1301 reiser4_txn_restart_current();
1302 return 0;
1305 static int txn_try_to_fuse_small_atom(txn_mgr * tmgr, txn_atom * atom)
1307 int atom_stage;
1308 txn_atom *atom_2;
1309 int repeat;
1311 assert("zam-1051", atom->stage < ASTAGE_PRE_COMMIT);
1313 atom_stage = atom->stage;
1314 repeat = 0;
1316 if (!spin_trylock_txnmgr(tmgr)) {
1317 atomic_inc(&atom->refcount);
1318 spin_unlock_atom(atom);
1319 spin_lock_txnmgr(tmgr);
1320 spin_lock_atom(atom);
1321 repeat = 1;
1322 if (atom->stage != atom_stage) {
1323 spin_unlock_txnmgr(tmgr);
1324 atom_dec_and_unlock(atom);
1325 return -E_REPEAT;
1327 atomic_dec(&atom->refcount);
1330 list_for_each_entry(atom_2, &tmgr->atoms_list, atom_link) {
1331 if (atom == atom_2)
1332 continue;
1334 * if trylock does not succeed we just do not fuse with that
1335 * atom.
1337 if (spin_trylock_atom(atom_2)) {
1338 if (atom_2->stage < ASTAGE_PRE_COMMIT) {
1339 spin_unlock_txnmgr(tmgr);
1340 capture_fuse_into(atom_2, atom);
1341 /* all locks are lost we can only repeat here */
1342 return -E_REPEAT;
1344 spin_unlock_atom(atom_2);
1347 atom->flags |= ATOM_CANCEL_FUSION;
1348 spin_unlock_txnmgr(tmgr);
1349 if (repeat) {
1350 spin_unlock_atom(atom);
1351 return -E_REPEAT;
1353 return 0;
1356 /* Calls jnode_flush for current atom if it exists; if not, just take another
1357 atom and call jnode_flush() for him. If current transaction handle has
1358 already assigned atom (current atom) we have to close current transaction
1359 prior to switch to another atom or do something with current atom. This
1360 code tries to flush current atom.
1362 flush_some_atom() is called as part of memory clearing process. It is
1363 invoked from balance_dirty_pages(), pdflushd, and entd.
1365 If we can flush no nodes, atom is committed, because this frees memory.
1367 If atom is too large or too old it is committed also.
1370 flush_some_atom(jnode * start, long *nr_submitted, const struct writeback_control *wbc,
1371 int flags)
1373 reiser4_context *ctx = get_current_context();
1374 txn_mgr *tmgr = &get_super_private(ctx->super)->tmgr;
1375 txn_handle *txnh = ctx->trans;
1376 txn_atom *atom;
1377 int ret;
1379 BUG_ON(wbc->nr_to_write == 0);
1380 BUG_ON(*nr_submitted != 0);
1381 assert("zam-1042", txnh != NULL);
1382 repeat:
1383 if (txnh->atom == NULL) {
1384 /* current atom is not available, take first from txnmgr */
1385 spin_lock_txnmgr(tmgr);
1387 /* traverse the list of all atoms */
1388 list_for_each_entry(atom, &tmgr->atoms_list, atom_link) {
1389 /* lock atom before checking its state */
1390 spin_lock_atom(atom);
1393 * we need an atom which is not being committed and
1394 * which has no flushers (jnode_flush() add one flusher
1395 * at the beginning and subtract one at the end).
1397 if (atom->stage < ASTAGE_PRE_COMMIT &&
1398 atom->nr_flushers == 0) {
1399 spin_lock_txnh(txnh);
1400 capture_assign_txnh_nolock(atom, txnh);
1401 spin_unlock_txnh(txnh);
1403 goto found;
1406 spin_unlock_atom(atom);
1410 * Write throttling is case of no one atom can be
1411 * flushed/committed.
1413 if (!current_is_pdflush() && !wbc->nonblocking) {
1414 list_for_each_entry(atom, &tmgr->atoms_list, atom_link) {
1415 spin_lock_atom(atom);
1416 /* Repeat the check from the above. */
1417 if (atom->stage < ASTAGE_PRE_COMMIT
1418 && atom->nr_flushers == 0) {
1419 spin_lock_txnh(txnh);
1420 capture_assign_txnh_nolock(atom, txnh);
1421 spin_unlock_txnh(txnh);
1423 goto found;
1425 if (atom->stage <= ASTAGE_POST_COMMIT) {
1426 spin_unlock_txnmgr(tmgr);
1428 * we just wait until atom's flusher
1429 * makes a progress in flushing or
1430 * committing the atom
1432 reiser4_atom_wait_event(atom);
1433 goto repeat;
1435 spin_unlock_atom(atom);
1438 spin_unlock_txnmgr(tmgr);
1439 return 0;
1440 found:
1441 spin_unlock_txnmgr(tmgr);
1442 } else
1443 atom = get_current_atom_locked();
1445 BUG_ON(atom->super != ctx->super);
1446 assert("vs-35", atom->super == ctx->super);
1447 if (start) {
1448 spin_lock_jnode(start);
1449 ret = (atom == start->atom) ? 1 : 0;
1450 spin_unlock_jnode(start);
1451 if (ret == 0)
1452 start = NULL;
1454 ret = flush_current_atom(flags, wbc->nr_to_write, nr_submitted, &atom, start);
1455 if (ret == 0) {
1456 /* flush_current_atom returns 0 only if it submitted for write
1457 nothing */
1458 BUG_ON(*nr_submitted != 0);
1459 if (*nr_submitted == 0 || atom_should_commit_asap(atom)) {
1460 if (atom->capture_count < tmgr->atom_min_size &&
1461 !(atom->flags & ATOM_CANCEL_FUSION)) {
1462 ret = txn_try_to_fuse_small_atom(tmgr, atom);
1463 if (ret == -E_REPEAT) {
1464 reiser4_preempt_point();
1465 goto repeat;
1468 /* if early flushing could not make more nodes clean,
1469 * or atom is too old/large,
1470 * we force current atom to commit */
1471 /* wait for commit completion but only if this
1472 * wouldn't stall pdflushd and ent thread. */
1473 if (!wbc->nonblocking && !ctx->entd)
1474 txnh->flags |= TXNH_WAIT_COMMIT;
1475 atom->flags |= ATOM_FORCE_COMMIT;
1477 spin_unlock_atom(atom);
1478 } else if (ret == -E_REPEAT) {
1479 if (*nr_submitted == 0) {
1480 /* let others who hampers flushing (hold longterm locks,
1481 for instance) to free the way for flush */
1482 reiser4_preempt_point();
1483 goto repeat;
1485 ret = 0;
1488 if (*nr_submitted > wbc->nr_to_write)
1489 warning("", "asked for %ld, written %ld\n", wbc->nr_to_write, *nr_submitted);
1491 reiser4_txn_restart(ctx);
1493 return ret;
1496 /* Remove processed nodes from atom's clean list (thereby remove them from transaction). */
1497 void reiser4_invalidate_list(struct list_head *head)
1499 while (!list_empty(head)) {
1500 jnode *node;
1502 node = list_entry(head->next, jnode, capture_link);
1503 spin_lock_jnode(node);
1504 reiser4_uncapture_block(node);
1505 jput(node);
1509 static void init_wlinks(txn_wait_links * wlinks)
1511 wlinks->_lock_stack = get_current_lock_stack();
1512 INIT_LIST_HEAD(&wlinks->_fwaitfor_link);
1513 INIT_LIST_HEAD(&wlinks->_fwaiting_link);
1514 wlinks->waitfor_cb = NULL;
1515 wlinks->waiting_cb = NULL;
1518 /* Add atom to the atom's waitfor list and wait for somebody to wake us up; */
1519 void reiser4_atom_wait_event(txn_atom * atom)
1521 txn_wait_links _wlinks;
1523 assert_spin_locked(&(atom->alock));
1524 assert("nikita-3156",
1525 lock_stack_isclean(get_current_lock_stack()) ||
1526 atom->nr_running_queues > 0);
1528 init_wlinks(&_wlinks);
1529 list_add_tail(&_wlinks._fwaitfor_link, &atom->fwaitfor_list);
1530 atomic_inc(&atom->refcount);
1531 spin_unlock_atom(atom);
1533 reiser4_prepare_to_sleep(_wlinks._lock_stack);
1534 reiser4_go_to_sleep(_wlinks._lock_stack);
1536 spin_lock_atom(atom);
1537 list_del(&_wlinks._fwaitfor_link);
1538 atom_dec_and_unlock(atom);
1541 void reiser4_atom_set_stage(txn_atom * atom, txn_stage stage)
1543 assert("nikita-3535", atom != NULL);
1544 assert_spin_locked(&(atom->alock));
1545 assert("nikita-3536", stage <= ASTAGE_INVALID);
1546 /* Excelsior! */
1547 assert("nikita-3537", stage >= atom->stage);
1548 if (atom->stage != stage) {
1549 atom->stage = stage;
1550 reiser4_atom_send_event(atom);
1554 /* wake all threads which wait for an event */
1555 void reiser4_atom_send_event(txn_atom * atom)
1557 assert_spin_locked(&(atom->alock));
1558 wakeup_atom_waitfor_list(atom);
1561 /* Informs txn manager code that owner of this txn_handle should wait atom commit completion (for
1562 example, because it does fsync(2)) */
1563 static int should_wait_commit(txn_handle * h)
1565 return h->flags & TXNH_WAIT_COMMIT;
1568 typedef struct commit_data {
1569 txn_atom *atom;
1570 txn_handle *txnh;
1571 long nr_written;
1572 /* as an optimization we start committing atom by first trying to
1573 * flush it few times without switching into ASTAGE_CAPTURE_WAIT. This
1574 * allows to reduce stalls due to other threads waiting for atom in
1575 * ASTAGE_CAPTURE_WAIT stage. ->preflush is counter of these
1576 * preliminary flushes. */
1577 int preflush;
1578 /* have we waited on atom. */
1579 int wait;
1580 int failed;
1581 int wake_ktxnmgrd_up;
1582 } commit_data;
1585 * Called from commit_txnh() repeatedly, until either error happens, or atom
1586 * commits successfully.
1588 static int try_commit_txnh(commit_data * cd)
1590 int result;
1592 assert("nikita-2968", lock_stack_isclean(get_current_lock_stack()));
1594 /* Get the atom and txnh locked. */
1595 cd->atom = txnh_get_atom(cd->txnh);
1596 assert("jmacd-309", cd->atom != NULL);
1597 spin_unlock_txnh(cd->txnh);
1599 if (cd->wait) {
1600 cd->atom->nr_waiters--;
1601 cd->wait = 0;
1604 if (cd->atom->stage == ASTAGE_DONE)
1605 return 0;
1607 if (cd->failed)
1608 return 0;
1610 if (atom_should_commit(cd->atom)) {
1611 /* if atom is _very_ large schedule it for commit as soon as
1612 * possible. */
1613 if (atom_should_commit_asap(cd->atom)) {
1615 * When atom is in PRE_COMMIT or later stage following
1616 * invariant (encoded in atom_can_be_committed())
1617 * holds: there is exactly one non-waiter transaction
1618 * handle opened on this atom. When thread wants to
1619 * wait until atom commits (for example sync()) it
1620 * waits on atom event after increasing
1621 * atom->nr_waiters (see blow in this function). It
1622 * cannot be guaranteed that atom is already committed
1623 * after receiving event, so loop has to be
1624 * re-started. But if atom switched into PRE_COMMIT
1625 * stage and became too large, we cannot change its
1626 * state back to CAPTURE_WAIT (atom stage can only
1627 * increase monotonically), hence this check.
1629 if (cd->atom->stage < ASTAGE_CAPTURE_WAIT)
1630 reiser4_atom_set_stage(cd->atom,
1631 ASTAGE_CAPTURE_WAIT);
1632 cd->atom->flags |= ATOM_FORCE_COMMIT;
1634 if (cd->txnh->flags & TXNH_DONT_COMMIT) {
1636 * this thread (transaction handle that is) doesn't
1637 * want to commit atom. Notify waiters that handle is
1638 * closed. This can happen, for example, when we are
1639 * under VFS directory lock and don't want to commit
1640 * atom right now to avoid stalling other threads
1641 * working in the same directory.
1644 /* Wake the ktxnmgrd up if the ktxnmgrd is needed to
1645 * commit this atom: no atom waiters and only one
1646 * (our) open transaction handle. */
1647 cd->wake_ktxnmgrd_up =
1648 cd->atom->txnh_count == 1 &&
1649 cd->atom->nr_waiters == 0;
1650 reiser4_atom_send_event(cd->atom);
1651 result = 0;
1652 } else if (!atom_can_be_committed(cd->atom)) {
1653 if (should_wait_commit(cd->txnh)) {
1654 /* sync(): wait for commit */
1655 cd->atom->nr_waiters++;
1656 cd->wait = 1;
1657 reiser4_atom_wait_event(cd->atom);
1658 result = RETERR(-E_REPEAT);
1659 } else {
1660 result = 0;
1662 } else if (cd->preflush > 0 && !is_current_ktxnmgrd()) {
1664 * optimization: flush atom without switching it into
1665 * ASTAGE_CAPTURE_WAIT.
1667 * But don't do this for ktxnmgrd, because ktxnmgrd
1668 * should never block on atom fusion.
1670 result = flush_current_atom(JNODE_FLUSH_WRITE_BLOCKS,
1671 LONG_MAX, &cd->nr_written,
1672 &cd->atom, NULL);
1673 if (result == 0) {
1674 spin_unlock_atom(cd->atom);
1675 cd->preflush = 0;
1676 result = RETERR(-E_REPEAT);
1677 } else /* Atoms wasn't flushed
1678 * completely. Rinse. Repeat. */
1679 --cd->preflush;
1680 } else {
1681 /* We change atom state to ASTAGE_CAPTURE_WAIT to
1682 prevent atom fusion and count ourself as an active
1683 flusher */
1684 reiser4_atom_set_stage(cd->atom, ASTAGE_CAPTURE_WAIT);
1685 cd->atom->flags |= ATOM_FORCE_COMMIT;
1687 result =
1688 commit_current_atom(&cd->nr_written, &cd->atom);
1689 if (result != 0 && result != -E_REPEAT)
1690 cd->failed = 1;
1692 } else
1693 result = 0;
1695 #if REISER4_DEBUG
1696 if (result == 0)
1697 assert_spin_locked(&(cd->atom->alock));
1698 #endif
1700 /* perfectly valid assertion, except that when atom/txnh is not locked
1701 * fusion can take place, and cd->atom points nowhere. */
1703 assert("jmacd-1028", ergo(result != 0, spin_atom_is_not_locked(cd->atom)));
1705 return result;
1708 /* Called to commit a transaction handle. This decrements the atom's number of open
1709 handles and if it is the last handle to commit and the atom should commit, initiates
1710 atom commit. if commit does not fail, return number of written blocks */
1711 static int commit_txnh(txn_handle * txnh)
1713 commit_data cd;
1714 assert("umka-192", txnh != NULL);
1716 memset(&cd, 0, sizeof cd);
1717 cd.txnh = txnh;
1718 cd.preflush = 10;
1720 /* calls try_commit_txnh() until either atom commits, or error
1721 * happens */
1722 while (try_commit_txnh(&cd) != 0)
1723 reiser4_preempt_point();
1725 spin_lock_txnh(txnh);
1727 cd.atom->txnh_count -= 1;
1728 txnh->atom = NULL;
1729 /* remove transaction handle from atom's list of transaction handles */
1730 list_del_init(&txnh->txnh_link);
1732 spin_unlock_txnh(txnh);
1733 atom_dec_and_unlock(cd.atom);
1734 /* if we don't want to do a commit (TXNH_DONT_COMMIT is set, probably
1735 * because it takes time) by current thread, we do that work
1736 * asynchronously by ktxnmgrd daemon. */
1737 if (cd.wake_ktxnmgrd_up)
1738 ktxnmgrd_kick(&get_current_super_private()->tmgr);
1740 return 0;
1743 /* TRY_CAPTURE */
1745 /* This routine attempts a single block-capture request. It may return -E_REPEAT if some
1746 condition indicates that the request should be retried, and it may block if the
1747 txn_capture mode does not include the TXN_CAPTURE_NONBLOCKING request flag.
1749 This routine encodes the basic logic of block capturing described by:
1751 http://namesys.com/v4/v4.html
1753 Our goal here is to ensure that any two blocks that contain dependent modifications
1754 should commit at the same time. This function enforces this discipline by initiating
1755 fusion whenever a transaction handle belonging to one atom requests to read or write a
1756 block belonging to another atom (TXN_CAPTURE_WRITE or TXN_CAPTURE_READ_ATOMIC).
1758 In addition, this routine handles the initial assignment of atoms to blocks and
1759 transaction handles. These are possible outcomes of this function:
1761 1. The block and handle are already part of the same atom: return immediate success
1763 2. The block is assigned but the handle is not: call capture_assign_txnh to assign
1764 the handle to the block's atom.
1766 3. The handle is assigned but the block is not: call capture_assign_block to assign
1767 the block to the handle's atom.
1769 4. Both handle and block are assigned, but to different atoms: call capture_init_fusion
1770 to fuse atoms.
1772 5. Neither block nor handle are assigned: create a new atom and assign them both.
1774 6. A read request for a non-captured block: return immediate success.
1776 This function acquires and releases the handle's spinlock. This function is called
1777 under the jnode lock and if the return value is 0, it returns with the jnode lock still
1778 held. If the return is -E_REPEAT or some other error condition, the jnode lock is
1779 released. The external interface (reiser4_try_capture) manages re-aquiring the jnode
1780 lock in the failure case.
1782 static int try_capture_block(
1783 txn_handle * txnh, jnode * node, txn_capture mode,
1784 txn_atom ** atom_alloc)
1786 txn_atom *block_atom;
1787 txn_atom *txnh_atom;
1789 /* Should not call capture for READ_NONCOM requests, handled in reiser4_try_capture. */
1790 assert("jmacd-567", CAPTURE_TYPE(mode) != TXN_CAPTURE_READ_NONCOM);
1792 /* FIXME-ZAM-HANS: FIXME_LATER_JMACD Should assert that atom->tree ==
1793 * node->tree somewhere. */
1794 assert("umka-194", txnh != NULL);
1795 assert("umka-195", node != NULL);
1797 /* The jnode is already locked! Being called from reiser4_try_capture(). */
1798 assert_spin_locked(&(node->guard));
1799 block_atom = node->atom;
1801 /* Get txnh spinlock, this allows us to compare txn_atom pointers but it doesn't
1802 let us touch the atoms themselves. */
1803 spin_lock_txnh(txnh);
1804 txnh_atom = txnh->atom;
1805 /* Process of capturing continues into one of four branches depends on
1806 which atoms from (block atom (node->atom), current atom (txnh->atom))
1807 exist. */
1808 if (txnh_atom == NULL) {
1809 if (block_atom == NULL) {
1810 spin_unlock_txnh(txnh);
1811 spin_unlock_jnode(node);
1812 /* assign empty atom to the txnh and repeat */
1813 return atom_begin_and_assign_to_txnh(atom_alloc, txnh);
1814 } else {
1815 atomic_inc(&block_atom->refcount);
1816 /* node spin-lock isn't needed anymore */
1817 spin_unlock_jnode(node);
1818 if (!spin_trylock_atom(block_atom)) {
1819 spin_unlock_txnh(txnh);
1820 spin_lock_atom(block_atom);
1821 spin_lock_txnh(txnh);
1823 /* re-check state after getting txnh and the node
1824 * atom spin-locked */
1825 if (node->atom != block_atom || txnh->atom != NULL) {
1826 spin_unlock_txnh(txnh);
1827 atom_dec_and_unlock(block_atom);
1828 return RETERR(-E_REPEAT);
1830 atomic_dec(&block_atom->refcount);
1831 if (block_atom->stage > ASTAGE_CAPTURE_WAIT ||
1832 (block_atom->stage == ASTAGE_CAPTURE_WAIT &&
1833 block_atom->txnh_count != 0))
1834 return capture_fuse_wait(txnh, block_atom, NULL, mode);
1835 capture_assign_txnh_nolock(block_atom, txnh);
1836 spin_unlock_txnh(txnh);
1837 spin_unlock_atom(block_atom);
1838 return RETERR(-E_REPEAT);
1840 } else {
1841 /* It is time to perform deadlock prevention check over the
1842 node we want to capture. It is possible this node was locked
1843 for read without capturing it. The optimization which allows
1844 to do it helps us in keeping atoms independent as long as
1845 possible but it may cause lock/fuse deadlock problems.
1847 A number of similar deadlock situations with locked but not
1848 captured nodes were found. In each situation there are two
1849 or more threads: one of them does flushing while another one
1850 does routine balancing or tree lookup. The flushing thread
1851 (F) sleeps in long term locking request for node (N), another
1852 thread (A) sleeps in trying to capture some node already
1853 belonging the atom F, F has a state which prevents
1854 immediately fusion .
1856 Deadlocks of this kind cannot happen if node N was properly
1857 captured by thread A. The F thread fuse atoms before locking
1858 therefore current atom of thread F and current atom of thread
1859 A became the same atom and thread A may proceed. This does
1860 not work if node N was not captured because the fusion of
1861 atom does not happens.
1863 The following scheme solves the deadlock: If
1864 longterm_lock_znode locks and does not capture a znode, that
1865 znode is marked as MISSED_IN_CAPTURE. A node marked this way
1866 is processed by the code below which restores the missed
1867 capture and fuses current atoms of all the node lock owners
1868 by calling the fuse_not_fused_lock_owners() function. */
1869 if (JF_ISSET(node, JNODE_MISSED_IN_CAPTURE)) {
1870 JF_CLR(node, JNODE_MISSED_IN_CAPTURE);
1871 if (jnode_is_znode(node) && znode_is_locked(JZNODE(node))) {
1872 spin_unlock_txnh(txnh);
1873 spin_unlock_jnode(node);
1874 fuse_not_fused_lock_owners(txnh, JZNODE(node));
1875 return RETERR(-E_REPEAT);
1878 if (block_atom == NULL) {
1879 atomic_inc(&txnh_atom->refcount);
1880 spin_unlock_txnh(txnh);
1881 if (!spin_trylock_atom(txnh_atom)) {
1882 spin_unlock_jnode(node);
1883 spin_lock_atom(txnh_atom);
1884 spin_lock_jnode(node);
1886 if (txnh->atom != txnh_atom || node->atom != NULL
1887 || JF_ISSET(node, JNODE_IS_DYING)) {
1888 spin_unlock_jnode(node);
1889 atom_dec_and_unlock(txnh_atom);
1890 return RETERR(-E_REPEAT);
1892 atomic_dec(&txnh_atom->refcount);
1893 capture_assign_block_nolock(txnh_atom, node);
1894 spin_unlock_atom(txnh_atom);
1895 } else {
1896 if (txnh_atom != block_atom) {
1897 if (mode & TXN_CAPTURE_DONT_FUSE) {
1898 spin_unlock_txnh(txnh);
1899 spin_unlock_jnode(node);
1900 /* we are in a "no-fusion" mode and @node is
1901 * already part of transaction. */
1902 return RETERR(-E_NO_NEIGHBOR);
1904 return capture_init_fusion(node, txnh, mode);
1906 spin_unlock_txnh(txnh);
1909 return 0;
1912 static txn_capture
1913 build_capture_mode(jnode * node, znode_lock_mode lock_mode, txn_capture flags)
1915 txn_capture cap_mode;
1917 assert_spin_locked(&(node->guard));
1919 /* FIXME_JMACD No way to set TXN_CAPTURE_READ_MODIFY yet. */
1921 if (lock_mode == ZNODE_WRITE_LOCK) {
1922 cap_mode = TXN_CAPTURE_WRITE;
1923 } else if (node->atom != NULL) {
1924 cap_mode = TXN_CAPTURE_WRITE;
1925 } else if (0 && /* txnh->mode == TXN_READ_FUSING && */
1926 jnode_get_level(node) == LEAF_LEVEL) {
1927 /* NOTE-NIKITA TXN_READ_FUSING is not currently used */
1928 /* We only need a READ_FUSING capture at the leaf level. This
1929 is because the internal levels of the tree (twigs included)
1930 are redundant from the point of the user that asked for a
1931 read-fusing transcrash. The user only wants to read-fuse
1932 atoms due to reading uncommitted data that another user has
1933 written. It is the file system that reads/writes the
1934 internal tree levels, the user only reads/writes leaves. */
1935 cap_mode = TXN_CAPTURE_READ_ATOMIC;
1936 } else {
1937 /* In this case (read lock at a non-leaf) there's no reason to
1938 * capture. */
1939 /* cap_mode = TXN_CAPTURE_READ_NONCOM; */
1940 return 0;
1943 cap_mode |= (flags & (TXN_CAPTURE_NONBLOCKING | TXN_CAPTURE_DONT_FUSE));
1944 assert("nikita-3186", cap_mode != 0);
1945 return cap_mode;
1948 /* This is an external interface to try_capture_block(), it calls
1949 try_capture_block() repeatedly as long as -E_REPEAT is returned.
1951 @node: node to capture,
1952 @lock_mode: read or write lock is used in capture mode calculation,
1953 @flags: see txn_capture flags enumeration,
1954 @can_coc : can copy-on-capture
1956 @return: 0 - node was successfully captured, -E_REPEAT - capture request
1957 cannot be processed immediately as it was requested in flags,
1958 < 0 - other errors.
1960 int reiser4_try_capture(jnode *node, znode_lock_mode lock_mode,
1961 txn_capture flags)
1963 txn_atom *atom_alloc = NULL;
1964 txn_capture cap_mode;
1965 txn_handle *txnh = get_current_context()->trans;
1966 int ret;
1968 assert_spin_locked(&(node->guard));
1970 repeat:
1971 if (JF_ISSET(node, JNODE_IS_DYING))
1972 return RETERR(-EINVAL);
1973 if (node->atom != NULL && txnh->atom == node->atom)
1974 return 0;
1975 cap_mode = build_capture_mode(node, lock_mode, flags);
1976 if (cap_mode == 0 ||
1977 (!(cap_mode & TXN_CAPTURE_WTYPES) && node->atom == NULL)) {
1978 /* Mark this node as "MISSED". It helps in further deadlock
1979 * analysis */
1980 if (jnode_is_znode(node))
1981 JF_SET(node, JNODE_MISSED_IN_CAPTURE);
1982 return 0;
1984 /* Repeat try_capture as long as -E_REPEAT is returned. */
1985 ret = try_capture_block(txnh, node, cap_mode, &atom_alloc);
1986 /* Regardless of non_blocking:
1988 If ret == 0 then jnode is still locked.
1989 If ret != 0 then jnode is unlocked.
1991 #if REISER4_DEBUG
1992 if (ret == 0)
1993 assert_spin_locked(&(node->guard));
1994 else
1995 assert_spin_not_locked(&(node->guard));
1996 #endif
1997 assert_spin_not_locked(&(txnh->guard));
1999 if (ret == -E_REPEAT) {
2000 /* E_REPEAT implies all locks were released, therefore we need
2001 to take the jnode's lock again. */
2002 spin_lock_jnode(node);
2004 /* Although this may appear to be a busy loop, it is not.
2005 There are several conditions that cause E_REPEAT to be
2006 returned by the call to try_capture_block, all cases
2007 indicating some kind of state change that means you should
2008 retry the request and will get a different result. In some
2009 cases this could be avoided with some extra code, but
2010 generally it is done because the necessary locks were
2011 released as a result of the operation and repeating is the
2012 simplest thing to do (less bug potential). The cases are:
2013 atom fusion returns E_REPEAT after it completes (jnode and
2014 txnh were unlocked); race conditions in assign_block,
2015 assign_txnh, and init_fusion return E_REPEAT (trylock
2016 failure); after going to sleep in capture_fuse_wait
2017 (request was blocked but may now succeed). I'm not quite
2018 sure how capture_copy works yet, but it may also return
2019 E_REPEAT. When the request is legitimately blocked, the
2020 requestor goes to sleep in fuse_wait, so this is not a busy
2021 loop. */
2022 /* NOTE-NIKITA: still don't understand:
2024 try_capture_block->capture_assign_txnh->spin_trylock_atom->E_REPEAT
2026 looks like busy loop?
2028 goto repeat;
2031 /* free extra atom object that was possibly allocated by
2032 try_capture_block().
2034 Do this before acquiring jnode spin lock to
2035 minimize time spent under lock. --nikita */
2036 if (atom_alloc != NULL) {
2037 kmem_cache_free(_atom_slab, atom_alloc);
2040 if (ret != 0) {
2041 if (ret == -E_BLOCK) {
2042 assert("nikita-3360",
2043 cap_mode & TXN_CAPTURE_NONBLOCKING);
2044 ret = -E_REPEAT;
2047 /* Failure means jnode is not locked. FIXME_LATER_JMACD May
2048 want to fix the above code to avoid releasing the lock and
2049 re-acquiring it, but there are cases were failure occurs
2050 when the lock is not held, and those cases would need to be
2051 modified to re-take the lock. */
2052 spin_lock_jnode(node);
2055 /* Jnode is still locked. */
2056 assert_spin_locked(&(node->guard));
2057 return ret;
2060 static void release_two_atoms(txn_atom *one, txn_atom *two)
2062 spin_unlock_atom(one);
2063 atom_dec_and_unlock(two);
2064 spin_lock_atom(one);
2065 atom_dec_and_unlock(one);
2068 /* This function sets up a call to try_capture_block and repeats as long as -E_REPEAT is
2069 returned by that routine. The txn_capture request mode is computed here depending on
2070 the transaction handle's type and the lock request. This is called from the depths of
2071 the lock manager with the jnode lock held and it always returns with the jnode lock
2072 held.
2075 /* fuse all 'active' atoms of lock owners of given node. */
2076 static void fuse_not_fused_lock_owners(txn_handle * txnh, znode * node)
2078 lock_handle *lh;
2079 int repeat;
2080 txn_atom *atomh, *atomf;
2081 reiser4_context *me = get_current_context();
2082 reiser4_context *ctx = NULL;
2084 assert_spin_not_locked(&(ZJNODE(node)->guard));
2085 assert_spin_not_locked(&(txnh->hlock));
2087 repeat:
2088 repeat = 0;
2089 atomh = txnh_get_atom(txnh);
2090 spin_unlock_txnh(txnh);
2091 assert("zam-692", atomh != NULL);
2093 spin_lock_zlock(&node->lock);
2094 /* inspect list of lock owners */
2095 list_for_each_entry(lh, &node->lock.owners, owners_link) {
2096 ctx = get_context_by_lock_stack(lh->owner);
2097 if (ctx == me)
2098 continue;
2099 /* below we use two assumptions to avoid addition spin-locks
2100 for checking the condition :
2102 1) if the lock stack has lock, the transaction should be
2103 opened, i.e. ctx->trans != NULL;
2105 2) reading of well-aligned ctx->trans->atom is atomic, if it
2106 equals to the address of spin-locked atomh, we take that
2107 the atoms are the same, nothing has to be captured. */
2108 if (atomh != ctx->trans->atom) {
2109 reiser4_wake_up(lh->owner);
2110 repeat = 1;
2111 break;
2114 if (repeat) {
2115 if (!spin_trylock_txnh(ctx->trans)) {
2116 spin_unlock_zlock(&node->lock);
2117 spin_unlock_atom(atomh);
2118 goto repeat;
2120 atomf = ctx->trans->atom;
2121 if (atomf == NULL) {
2122 capture_assign_txnh_nolock(atomh, ctx->trans);
2123 /* release zlock lock _after_ assigning the atom to the
2124 * transaction handle, otherwise the lock owner thread
2125 * may unlock all znodes, exit kernel context and here
2126 * we would access an invalid transaction handle. */
2127 spin_unlock_zlock(&node->lock);
2128 spin_unlock_atom(atomh);
2129 spin_unlock_txnh(ctx->trans);
2130 goto repeat;
2132 assert("zam-1059", atomf != atomh);
2133 spin_unlock_zlock(&node->lock);
2134 atomic_inc(&atomh->refcount);
2135 atomic_inc(&atomf->refcount);
2136 spin_unlock_txnh(ctx->trans);
2137 if (atomf > atomh) {
2138 spin_lock_atom_nested(atomf);
2139 } else {
2140 spin_unlock_atom(atomh);
2141 spin_lock_atom(atomf);
2142 spin_lock_atom_nested(atomh);
2144 if (atomh == atomf || !atom_isopen(atomh) || !atom_isopen(atomf)) {
2145 release_two_atoms(atomf, atomh);
2146 goto repeat;
2148 atomic_dec(&atomh->refcount);
2149 atomic_dec(&atomf->refcount);
2150 capture_fuse_into(atomf, atomh);
2151 goto repeat;
2153 spin_unlock_zlock(&node->lock);
2154 spin_unlock_atom(atomh);
2157 /* This is the interface to capture unformatted nodes via their struct page
2158 reference. Currently it is only used in reiser4_invalidatepage */
2159 int try_capture_page_to_invalidate(struct page *pg)
2161 int ret;
2162 jnode *node;
2164 assert("umka-292", pg != NULL);
2165 assert("nikita-2597", PageLocked(pg));
2167 if (IS_ERR(node = jnode_of_page(pg))) {
2168 return PTR_ERR(node);
2171 spin_lock_jnode(node);
2172 unlock_page(pg);
2174 ret = reiser4_try_capture(node, ZNODE_WRITE_LOCK, 0);
2175 spin_unlock_jnode(node);
2176 jput(node);
2177 lock_page(pg);
2178 return ret;
2181 /* This informs the transaction manager when a node is deleted. Add the block to the
2182 atom's delete set and uncapture the block.
2184 VS-FIXME-HANS: this E_REPEAT paradigm clutters the code and creates a need for
2185 explanations. find all the functions that use it, and unless there is some very
2186 good reason to use it (I have not noticed one so far and I doubt it exists, but maybe somewhere somehow....),
2187 move the loop to inside the function.
2189 VS-FIXME-HANS: can this code be at all streamlined? In particular, can you lock and unlock the jnode fewer times?
2191 void reiser4_uncapture_page(struct page *pg)
2193 jnode *node;
2194 txn_atom *atom;
2196 assert("umka-199", pg != NULL);
2197 assert("nikita-3155", PageLocked(pg));
2199 clear_page_dirty_for_io(pg);
2201 reiser4_wait_page_writeback(pg);
2203 node = jprivate(pg);
2204 BUG_ON(node == NULL);
2206 spin_lock_jnode(node);
2208 atom = jnode_get_atom(node);
2209 if (atom == NULL) {
2210 assert("jmacd-7111", !JF_ISSET(node, JNODE_DIRTY));
2211 spin_unlock_jnode(node);
2212 return;
2215 /* We can remove jnode from transaction even if it is on flush queue
2216 * prepped list, we only need to be sure that flush queue is not being
2217 * written by reiser4_write_fq(). reiser4_write_fq() does not use atom
2218 * spin lock for protection of the prepped nodes list, instead
2219 * write_fq() increments atom's nr_running_queues counters for the time
2220 * when prepped list is not protected by spin lock. Here we check this
2221 * counter if we want to remove jnode from flush queue and, if the
2222 * counter is not zero, wait all reiser4_write_fq() for this atom to
2223 * complete. This is not significant overhead. */
2224 while (JF_ISSET(node, JNODE_FLUSH_QUEUED) && atom->nr_running_queues) {
2225 spin_unlock_jnode(node);
2227 * at this moment we want to wait for "atom event", viz. wait
2228 * until @node can be removed from flush queue. But
2229 * reiser4_atom_wait_event() cannot be called with page locked,
2230 * because it deadlocks with jnode_extent_write(). Unlock page,
2231 * after making sure (through page_cache_get()) that it cannot
2232 * be released from memory.
2234 page_cache_get(pg);
2235 unlock_page(pg);
2236 reiser4_atom_wait_event(atom);
2237 lock_page(pg);
2239 * page may has been detached by ->writepage()->releasepage().
2241 reiser4_wait_page_writeback(pg);
2242 spin_lock_jnode(node);
2243 page_cache_release(pg);
2244 atom = jnode_get_atom(node);
2245 /* VS-FIXME-HANS: improve the commenting in this function */
2246 if (atom == NULL) {
2247 spin_unlock_jnode(node);
2248 return;
2251 reiser4_uncapture_block(node);
2252 spin_unlock_atom(atom);
2253 jput(node);
2256 /* this is used in extent's kill hook to uncapture and unhash jnodes attached to
2257 * inode's tree of jnodes */
2258 void reiser4_uncapture_jnode(jnode * node)
2260 txn_atom *atom;
2262 assert_spin_locked(&(node->guard));
2263 assert("", node->pg == 0);
2265 atom = jnode_get_atom(node);
2266 if (atom == NULL) {
2267 assert("jmacd-7111", !JF_ISSET(node, JNODE_DIRTY));
2268 spin_unlock_jnode(node);
2269 return;
2272 reiser4_uncapture_block(node);
2273 spin_unlock_atom(atom);
2274 jput(node);
2277 /* No-locking version of assign_txnh. Sets the transaction handle's atom pointer,
2278 increases atom refcount and txnh_count, adds to txnh_list. */
2279 static void capture_assign_txnh_nolock(txn_atom *atom, txn_handle *txnh)
2281 assert("umka-200", atom != NULL);
2282 assert("umka-201", txnh != NULL);
2284 assert_spin_locked(&(txnh->hlock));
2285 assert_spin_locked(&(atom->alock));
2286 assert("jmacd-824", txnh->atom == NULL);
2287 assert("nikita-3540", atom_isopen(atom));
2288 BUG_ON(txnh->atom != NULL);
2290 atomic_inc(&atom->refcount);
2291 txnh->atom = atom;
2292 reiser4_ctx_gfp_mask_set();
2293 list_add_tail(&txnh->txnh_link, &atom->txnh_list);
2294 atom->txnh_count += 1;
2297 /* No-locking version of assign_block. Sets the block's atom pointer, references the
2298 block, adds it to the clean or dirty capture_jnode list, increments capture_count. */
2299 static void capture_assign_block_nolock(txn_atom *atom, jnode *node)
2301 assert("umka-202", atom != NULL);
2302 assert("umka-203", node != NULL);
2303 assert_spin_locked(&(node->guard));
2304 assert_spin_locked(&(atom->alock));
2305 assert("jmacd-323", node->atom == NULL);
2306 BUG_ON(!list_empty_careful(&node->capture_link));
2307 assert("nikita-3470", !JF_ISSET(node, JNODE_DIRTY));
2309 /* Pointer from jnode to atom is not counted in atom->refcount. */
2310 node->atom = atom;
2312 list_add_tail(&node->capture_link, ATOM_CLEAN_LIST(atom));
2313 atom->capture_count += 1;
2314 /* reference to jnode is acquired by atom. */
2315 jref(node);
2317 ON_DEBUG(count_jnode(atom, node, NOT_CAPTURED, CLEAN_LIST, 1));
2319 LOCK_CNT_INC(t_refs);
2322 /* common code for dirtying both unformatted jnodes and formatted znodes. */
2323 static void do_jnode_make_dirty(jnode * node, txn_atom * atom)
2325 assert_spin_locked(&(node->guard));
2326 assert_spin_locked(&(atom->alock));
2327 assert("jmacd-3981", !JF_ISSET(node, JNODE_DIRTY));
2329 JF_SET(node, JNODE_DIRTY);
2331 get_current_context()->nr_marked_dirty++;
2333 /* We grab2flush_reserve one additional block only if node was
2334 not CREATED and jnode_flush did not sort it into neither
2335 relocate set nor overwrite one. If node is in overwrite or
2336 relocate set we assume that atom's flush reserved counter was
2337 already adjusted. */
2338 if (!JF_ISSET(node, JNODE_CREATED) && !JF_ISSET(node, JNODE_RELOC)
2339 && !JF_ISSET(node, JNODE_OVRWR) && jnode_is_leaf(node)
2340 && !jnode_is_cluster_page(node)) {
2341 assert("vs-1093", !reiser4_blocknr_is_fake(&node->blocknr));
2342 assert("vs-1506", *jnode_get_block(node) != 0);
2343 grabbed2flush_reserved_nolock(atom, (__u64) 1);
2344 JF_SET(node, JNODE_FLUSH_RESERVED);
2347 if (!JF_ISSET(node, JNODE_FLUSH_QUEUED)) {
2348 /* If the atom is not set yet, it will be added to the appropriate list in
2349 capture_assign_block_nolock. */
2350 /* Sometimes a node is set dirty before being captured -- the case for new
2351 jnodes. In that case the jnode will be added to the appropriate list
2352 in capture_assign_block_nolock. Another reason not to re-link jnode is
2353 that jnode is on a flush queue (see flush.c for details) */
2355 int level = jnode_get_level(node);
2357 assert("nikita-3152", !JF_ISSET(node, JNODE_OVRWR));
2358 assert("zam-654", atom->stage < ASTAGE_PRE_COMMIT);
2359 assert("nikita-2607", 0 <= level);
2360 assert("nikita-2606", level <= REAL_MAX_ZTREE_HEIGHT);
2362 /* move node to atom's dirty list */
2363 list_move_tail(&node->capture_link, ATOM_DIRTY_LIST(atom, level));
2364 ON_DEBUG(count_jnode
2365 (atom, node, NODE_LIST(node), DIRTY_LIST, 1));
2369 /* Set the dirty status for this (spin locked) jnode. */
2370 void jnode_make_dirty_locked(jnode * node)
2372 assert("umka-204", node != NULL);
2373 assert_spin_locked(&(node->guard));
2375 if (REISER4_DEBUG && rofs_jnode(node)) {
2376 warning("nikita-3365", "Dirtying jnode on rofs");
2377 dump_stack();
2380 /* Fast check for already dirty node */
2381 if (!JF_ISSET(node, JNODE_DIRTY)) {
2382 txn_atom *atom;
2384 atom = jnode_get_atom(node);
2385 assert("vs-1094", atom);
2386 /* Check jnode dirty status again because node spin lock might
2387 * be released inside jnode_get_atom(). */
2388 if (likely(!JF_ISSET(node, JNODE_DIRTY)))
2389 do_jnode_make_dirty(node, atom);
2390 spin_unlock_atom(atom);
2394 /* Set the dirty status for this znode. */
2395 void znode_make_dirty(znode * z)
2397 jnode *node;
2398 struct page *page;
2400 assert("umka-204", z != NULL);
2401 assert("nikita-3290", znode_above_root(z) || znode_is_loaded(z));
2402 assert("nikita-3560", znode_is_write_locked(z));
2404 node = ZJNODE(z);
2405 /* znode is longterm locked, we can check dirty bit without spinlock */
2406 if (JF_ISSET(node, JNODE_DIRTY)) {
2407 /* znode is dirty already. All we have to do is to change znode version */
2408 z->version = znode_build_version(jnode_get_tree(node));
2409 return;
2412 spin_lock_jnode(node);
2413 jnode_make_dirty_locked(node);
2414 page = jnode_page(node);
2415 if (page != NULL) {
2416 /* this is useful assertion (allows one to check that no
2417 * modifications are lost due to update of in-flight page),
2418 * but it requires locking on page to check PG_writeback
2419 * bit. */
2420 /* assert("nikita-3292",
2421 !PageWriteback(page) || ZF_ISSET(z, JNODE_WRITEBACK)); */
2422 page_cache_get(page);
2424 /* jnode lock is not needed for the rest of
2425 * znode_set_dirty(). */
2426 spin_unlock_jnode(node);
2427 /* reiser4 file write code calls set_page_dirty for
2428 * unformatted nodes, for formatted nodes we do it here. */
2429 reiser4_set_page_dirty_internal(page);
2430 page_cache_release(page);
2431 /* bump version counter in znode */
2432 z->version = znode_build_version(jnode_get_tree(node));
2433 } else {
2434 assert("zam-596", znode_above_root(JZNODE(node)));
2435 spin_unlock_jnode(node);
2438 assert("nikita-1900", znode_is_write_locked(z));
2439 assert("jmacd-9777", node->atom != NULL);
2442 int reiser4_sync_atom(txn_atom * atom)
2444 int result;
2445 txn_handle *txnh;
2447 txnh = get_current_context()->trans;
2449 result = 0;
2450 if (atom != NULL) {
2451 if (atom->stage < ASTAGE_PRE_COMMIT) {
2452 spin_lock_txnh(txnh);
2453 capture_assign_txnh_nolock(atom, txnh);
2454 result = force_commit_atom(txnh);
2455 } else if (atom->stage < ASTAGE_POST_COMMIT) {
2456 /* wait atom commit */
2457 reiser4_atom_wait_event(atom);
2458 /* try once more */
2459 result = RETERR(-E_REPEAT);
2460 } else
2461 spin_unlock_atom(atom);
2463 return result;
2466 #if REISER4_DEBUG
2468 /* move jnode form one list to another
2469 call this after atom->capture_count is updated */
2470 void
2471 count_jnode(txn_atom * atom, jnode * node, atom_list old_list,
2472 atom_list new_list, int check_lists)
2474 struct list_head *pos;
2476 assert("zam-1018", atom_is_protected(atom));
2477 assert_spin_locked(&(node->guard));
2478 assert("", NODE_LIST(node) == old_list);
2480 switch (NODE_LIST(node)) {
2481 case NOT_CAPTURED:
2482 break;
2483 case DIRTY_LIST:
2484 assert("", atom->dirty > 0);
2485 atom->dirty--;
2486 break;
2487 case CLEAN_LIST:
2488 assert("", atom->clean > 0);
2489 atom->clean--;
2490 break;
2491 case FQ_LIST:
2492 assert("", atom->fq > 0);
2493 atom->fq--;
2494 break;
2495 case WB_LIST:
2496 assert("", atom->wb > 0);
2497 atom->wb--;
2498 break;
2499 case OVRWR_LIST:
2500 assert("", atom->ovrwr > 0);
2501 atom->ovrwr--;
2502 break;
2503 default:
2504 impossible("", "");
2507 switch (new_list) {
2508 case NOT_CAPTURED:
2509 break;
2510 case DIRTY_LIST:
2511 atom->dirty++;
2512 break;
2513 case CLEAN_LIST:
2514 atom->clean++;
2515 break;
2516 case FQ_LIST:
2517 atom->fq++;
2518 break;
2519 case WB_LIST:
2520 atom->wb++;
2521 break;
2522 case OVRWR_LIST:
2523 atom->ovrwr++;
2524 break;
2525 default:
2526 impossible("", "");
2528 ASSIGN_NODE_LIST(node, new_list);
2529 if (0 && check_lists) {
2530 int count;
2531 tree_level level;
2533 count = 0;
2535 /* flush queue list */
2536 /* reiser4_check_fq(atom); */
2538 /* dirty list */
2539 count = 0;
2540 for (level = 0; level < REAL_MAX_ZTREE_HEIGHT + 1; level += 1) {
2541 list_for_each(pos, ATOM_DIRTY_LIST(atom, level))
2542 count++;
2544 if (count != atom->dirty)
2545 warning("", "dirty counter %d, real %d\n", atom->dirty,
2546 count);
2548 /* clean list */
2549 count = 0;
2550 list_for_each(pos, ATOM_CLEAN_LIST(atom))
2551 count++;
2552 if (count != atom->clean)
2553 warning("", "clean counter %d, real %d\n", atom->clean,
2554 count);
2556 /* wb list */
2557 count = 0;
2558 list_for_each(pos, ATOM_WB_LIST(atom))
2559 count++;
2560 if (count != atom->wb)
2561 warning("", "wb counter %d, real %d\n", atom->wb,
2562 count);
2564 /* overwrite list */
2565 count = 0;
2566 list_for_each(pos, ATOM_OVRWR_LIST(atom))
2567 count++;
2569 if (count != atom->ovrwr)
2570 warning("", "ovrwr counter %d, real %d\n", atom->ovrwr,
2571 count);
2573 assert("vs-1624", atom->num_queued == atom->fq);
2574 if (atom->capture_count !=
2575 atom->dirty + atom->clean + atom->ovrwr + atom->wb + atom->fq) {
2576 printk
2577 ("count %d, dirty %d clean %d ovrwr %d wb %d fq %d\n",
2578 atom->capture_count, atom->dirty, atom->clean, atom->ovrwr,
2579 atom->wb, atom->fq);
2580 assert("vs-1622",
2581 atom->capture_count ==
2582 atom->dirty + atom->clean + atom->ovrwr + atom->wb +
2583 atom->fq);
2587 #endif
2589 /* Make node OVRWR and put it on atom->overwrite_nodes list, atom lock and jnode
2590 * lock should be taken before calling this function. */
2591 void jnode_make_wander_nolock(jnode * node)
2593 txn_atom *atom;
2595 assert("nikita-2431", node != NULL);
2596 assert("nikita-2432", !JF_ISSET(node, JNODE_RELOC));
2597 assert("nikita-3153", JF_ISSET(node, JNODE_DIRTY));
2598 assert("zam-897", !JF_ISSET(node, JNODE_FLUSH_QUEUED));
2599 assert("nikita-3367", !reiser4_blocknr_is_fake(jnode_get_block(node)));
2601 atom = node->atom;
2603 assert("zam-895", atom != NULL);
2604 assert("zam-894", atom_is_protected(atom));
2606 JF_SET(node, JNODE_OVRWR);
2607 /* move node to atom's overwrite list */
2608 list_move_tail(&node->capture_link, ATOM_OVRWR_LIST(atom));
2609 ON_DEBUG(count_jnode(atom, node, DIRTY_LIST, OVRWR_LIST, 1));
2612 /* Same as jnode_make_wander_nolock, but all necessary locks are taken inside
2613 * this function. */
2614 void jnode_make_wander(jnode * node)
2616 txn_atom *atom;
2618 spin_lock_jnode(node);
2619 atom = jnode_get_atom(node);
2620 assert("zam-913", atom != NULL);
2621 assert("zam-914", !JF_ISSET(node, JNODE_RELOC));
2623 jnode_make_wander_nolock(node);
2624 spin_unlock_atom(atom);
2625 spin_unlock_jnode(node);
2628 /* this just sets RELOC bit */
2629 static void jnode_make_reloc_nolock(flush_queue_t * fq, jnode * node)
2631 assert_spin_locked(&(node->guard));
2632 assert("zam-916", JF_ISSET(node, JNODE_DIRTY));
2633 assert("zam-917", !JF_ISSET(node, JNODE_RELOC));
2634 assert("zam-918", !JF_ISSET(node, JNODE_OVRWR));
2635 assert("zam-920", !JF_ISSET(node, JNODE_FLUSH_QUEUED));
2636 assert("nikita-3367", !reiser4_blocknr_is_fake(jnode_get_block(node)));
2637 jnode_set_reloc(node);
2640 /* Make znode RELOC and put it on flush queue */
2641 void znode_make_reloc(znode * z, flush_queue_t * fq)
2643 jnode *node;
2644 txn_atom *atom;
2646 node = ZJNODE(z);
2647 spin_lock_jnode(node);
2649 atom = jnode_get_atom(node);
2650 assert("zam-919", atom != NULL);
2652 jnode_make_reloc_nolock(fq, node);
2653 queue_jnode(fq, node);
2655 spin_unlock_atom(atom);
2656 spin_unlock_jnode(node);
2660 /* Make unformatted node RELOC and put it on flush queue */
2661 void unformatted_make_reloc(jnode *node, flush_queue_t *fq)
2663 assert("vs-1479", jnode_is_unformatted(node));
2665 jnode_make_reloc_nolock(fq, node);
2666 queue_jnode(fq, node);
2669 int reiser4_capture_super_block(struct super_block *s)
2671 int result;
2672 znode *uber;
2673 lock_handle lh;
2675 init_lh(&lh);
2676 result = get_uber_znode(reiser4_get_tree(s),
2677 ZNODE_WRITE_LOCK, ZNODE_LOCK_LOPRI, &lh);
2678 if (result)
2679 return result;
2681 uber = lh.node;
2682 /* Grabbing one block for superblock */
2683 result = reiser4_grab_space_force((__u64) 1, BA_RESERVED);
2684 if (result != 0)
2685 return result;
2687 znode_make_dirty(uber);
2689 done_lh(&lh);
2690 return 0;
2693 /* Wakeup every handle on the atom's WAITFOR list */
2694 static void wakeup_atom_waitfor_list(txn_atom * atom)
2696 txn_wait_links *wlinks;
2698 assert("umka-210", atom != NULL);
2700 /* atom is locked */
2701 list_for_each_entry(wlinks, &atom->fwaitfor_list, _fwaitfor_link) {
2702 if (wlinks->waitfor_cb == NULL ||
2703 wlinks->waitfor_cb(atom, wlinks))
2704 /* Wake up. */
2705 reiser4_wake_up(wlinks->_lock_stack);
2709 /* Wakeup every handle on the atom's WAITING list */
2710 static void wakeup_atom_waiting_list(txn_atom * atom)
2712 txn_wait_links *wlinks;
2714 assert("umka-211", atom != NULL);
2716 /* atom is locked */
2717 list_for_each_entry(wlinks, &atom->fwaiting_list, _fwaiting_link) {
2718 if (wlinks->waiting_cb == NULL ||
2719 wlinks->waiting_cb(atom, wlinks))
2720 /* Wake up. */
2721 reiser4_wake_up(wlinks->_lock_stack);
2725 /* helper function used by capture_fuse_wait() to avoid "spurious wake-ups" */
2726 static int wait_for_fusion(txn_atom * atom, txn_wait_links * wlinks)
2728 assert("nikita-3330", atom != NULL);
2729 assert_spin_locked(&(atom->alock));
2731 /* atom->txnh_count == 1 is for waking waiters up if we are releasing
2732 * last transaction handle. */
2733 return atom->stage != ASTAGE_CAPTURE_WAIT || atom->txnh_count == 1;
2736 /* The general purpose of this function is to wait on the first of two possible events.
2737 The situation is that a handle (and its atom atomh) is blocked trying to capture a
2738 block (i.e., node) but the node's atom (atomf) is in the CAPTURE_WAIT state. The
2739 handle's atom (atomh) is not in the CAPTURE_WAIT state. However, atomh could fuse with
2740 another atom or, due to age, enter the CAPTURE_WAIT state itself, at which point it
2741 needs to unblock the handle to avoid deadlock. When the txnh is unblocked it will
2742 proceed and fuse the two atoms in the CAPTURE_WAIT state.
2744 In other words, if either atomh or atomf change state, the handle will be awakened,
2745 thus there are two lists per atom: WAITING and WAITFOR.
2747 This is also called by capture_assign_txnh with (atomh == NULL) to wait for atomf to
2748 close but it is not assigned to an atom of its own.
2750 Lock ordering in this method: all four locks are held: JNODE_LOCK, TXNH_LOCK,
2751 BOTH_ATOM_LOCKS. Result: all four locks are released.
2753 static int capture_fuse_wait(txn_handle * txnh, txn_atom * atomf,
2754 txn_atom * atomh, txn_capture mode)
2756 int ret;
2757 txn_wait_links wlinks;
2759 assert("umka-213", txnh != NULL);
2760 assert("umka-214", atomf != NULL);
2762 if ((mode & TXN_CAPTURE_NONBLOCKING) != 0) {
2763 spin_unlock_txnh(txnh);
2764 spin_unlock_atom(atomf);
2766 if (atomh) {
2767 spin_unlock_atom(atomh);
2770 return RETERR(-E_BLOCK);
2773 /* Initialize the waiting list links. */
2774 init_wlinks(&wlinks);
2776 /* Add txnh to atomf's waitfor list, unlock atomf. */
2777 list_add_tail(&wlinks._fwaitfor_link, &atomf->fwaitfor_list);
2778 wlinks.waitfor_cb = wait_for_fusion;
2779 atomic_inc(&atomf->refcount);
2780 spin_unlock_atom(atomf);
2782 if (atomh) {
2783 /* Add txnh to atomh's waiting list, unlock atomh. */
2784 list_add_tail(&wlinks._fwaiting_link, &atomh->fwaiting_list);
2785 atomic_inc(&atomh->refcount);
2786 spin_unlock_atom(atomh);
2789 /* Go to sleep. */
2790 spin_unlock_txnh(txnh);
2792 ret = reiser4_prepare_to_sleep(wlinks._lock_stack);
2793 if (ret == 0) {
2794 reiser4_go_to_sleep(wlinks._lock_stack);
2795 ret = RETERR(-E_REPEAT);
2798 /* Remove from the waitfor list. */
2799 spin_lock_atom(atomf);
2801 list_del(&wlinks._fwaitfor_link);
2802 atom_dec_and_unlock(atomf);
2804 if (atomh) {
2805 /* Remove from the waiting list. */
2806 spin_lock_atom(atomh);
2807 list_del(&wlinks._fwaiting_link);
2808 atom_dec_and_unlock(atomh);
2810 return ret;
2813 static void lock_two_atoms(txn_atom * one, txn_atom * two)
2815 assert("zam-1067", one != two);
2817 /* lock the atom with lesser address first */
2818 if (one < two) {
2819 spin_lock_atom(one);
2820 spin_lock_atom_nested(two);
2821 } else {
2822 spin_lock_atom(two);
2823 spin_lock_atom_nested(one);
2827 /* Perform the necessary work to prepare for fusing two atoms, which involves
2828 * acquiring two atom locks in the proper order. If one of the node's atom is
2829 * blocking fusion (i.e., it is in the CAPTURE_WAIT stage) and the handle's
2830 * atom is not then the handle's request is put to sleep. If the node's atom
2831 * is committing, then the node can be copy-on-captured. Otherwise, pick the
2832 * atom with fewer pointers to be fused into the atom with more pointer and
2833 * call capture_fuse_into.
2835 static int capture_init_fusion(jnode *node, txn_handle *txnh, txn_capture mode)
2837 txn_atom * txnh_atom = txnh->atom;
2838 txn_atom * block_atom = node->atom;
2840 atomic_inc(&txnh_atom->refcount);
2841 atomic_inc(&block_atom->refcount);
2843 spin_unlock_txnh(txnh);
2844 spin_unlock_jnode(node);
2846 lock_two_atoms(txnh_atom, block_atom);
2848 if (txnh->atom != txnh_atom || node->atom != block_atom ) {
2849 release_two_atoms(txnh_atom, block_atom);
2850 return RETERR(-E_REPEAT);
2853 atomic_dec(&txnh_atom->refcount);
2854 atomic_dec(&block_atom->refcount);
2856 assert ("zam-1066", atom_isopen(txnh_atom));
2858 if (txnh_atom->stage >= block_atom->stage ||
2859 (block_atom->stage == ASTAGE_CAPTURE_WAIT && block_atom->txnh_count == 0)) {
2860 capture_fuse_into(txnh_atom, block_atom);
2861 return RETERR(-E_REPEAT);
2863 spin_lock_txnh(txnh);
2864 return capture_fuse_wait(txnh, block_atom, txnh_atom, mode);
2867 /* This function splices together two jnode lists (small and large) and sets all jnodes in
2868 the small list to point to the large atom. Returns the length of the list. */
2869 static int
2870 capture_fuse_jnode_lists(txn_atom *large, struct list_head *large_head,
2871 struct list_head *small_head)
2873 int count = 0;
2874 jnode *node;
2876 assert("umka-218", large != NULL);
2877 assert("umka-219", large_head != NULL);
2878 assert("umka-220", small_head != NULL);
2879 /* small atom should be locked also. */
2880 assert_spin_locked(&(large->alock));
2882 /* For every jnode on small's capture list... */
2883 list_for_each_entry(node, small_head, capture_link) {
2884 count += 1;
2886 /* With the jnode lock held, update atom pointer. */
2887 spin_lock_jnode(node);
2888 node->atom = large;
2889 spin_unlock_jnode(node);
2892 /* Splice the lists. */
2893 list_splice_init(small_head, large_head->prev);
2895 return count;
2898 /* This function splices together two txnh lists (small and large) and sets all txn handles in
2899 the small list to point to the large atom. Returns the length of the list. */
2900 static int
2901 capture_fuse_txnh_lists(txn_atom *large, struct list_head *large_head,
2902 struct list_head *small_head)
2904 int count = 0;
2905 txn_handle *txnh;
2907 assert("umka-221", large != NULL);
2908 assert("umka-222", large_head != NULL);
2909 assert("umka-223", small_head != NULL);
2911 /* Adjust every txnh to the new atom. */
2912 list_for_each_entry(txnh, small_head, txnh_link) {
2913 count += 1;
2915 /* With the txnh lock held, update atom pointer. */
2916 spin_lock_txnh(txnh);
2917 txnh->atom = large;
2918 spin_unlock_txnh(txnh);
2921 /* Splice the txn_handle list. */
2922 list_splice_init(small_head, large_head->prev);
2924 return count;
2927 /* This function fuses two atoms. The captured nodes and handles belonging to SMALL are
2928 added to LARGE and their ->atom pointers are all updated. The associated counts are
2929 updated as well, and any waiting handles belonging to either are awakened. Finally the
2930 smaller atom's refcount is decremented.
2932 static void capture_fuse_into(txn_atom * small, txn_atom * large)
2934 int level;
2935 unsigned zcount = 0;
2936 unsigned tcount = 0;
2938 assert("umka-224", small != NULL);
2939 assert("umka-225", small != NULL);
2941 assert_spin_locked(&(large->alock));
2942 assert_spin_locked(&(small->alock));
2944 assert("jmacd-201", atom_isopen(small));
2945 assert("jmacd-202", atom_isopen(large));
2947 /* Splice and update the per-level dirty jnode lists */
2948 for (level = 0; level < REAL_MAX_ZTREE_HEIGHT + 1; level += 1) {
2949 zcount +=
2950 capture_fuse_jnode_lists(large,
2951 ATOM_DIRTY_LIST(large, level),
2952 ATOM_DIRTY_LIST(small, level));
2955 /* Splice and update the [clean,dirty] jnode and txnh lists */
2956 zcount +=
2957 capture_fuse_jnode_lists(large, ATOM_CLEAN_LIST(large),
2958 ATOM_CLEAN_LIST(small));
2959 zcount +=
2960 capture_fuse_jnode_lists(large, ATOM_OVRWR_LIST(large),
2961 ATOM_OVRWR_LIST(small));
2962 zcount +=
2963 capture_fuse_jnode_lists(large, ATOM_WB_LIST(large),
2964 ATOM_WB_LIST(small));
2965 zcount +=
2966 capture_fuse_jnode_lists(large, &large->inodes, &small->inodes);
2967 tcount +=
2968 capture_fuse_txnh_lists(large, &large->txnh_list,
2969 &small->txnh_list);
2971 /* Check our accounting. */
2972 assert("jmacd-1063",
2973 zcount + small->num_queued == small->capture_count);
2974 assert("jmacd-1065", tcount == small->txnh_count);
2976 /* sum numbers of waiters threads */
2977 large->nr_waiters += small->nr_waiters;
2978 small->nr_waiters = 0;
2980 /* splice flush queues */
2981 reiser4_fuse_fq(large, small);
2983 /* update counter of jnode on every atom' list */
2984 ON_DEBUG(large->dirty += small->dirty;
2985 small->dirty = 0;
2986 large->clean += small->clean;
2987 small->clean = 0;
2988 large->ovrwr += small->ovrwr;
2989 small->ovrwr = 0;
2990 large->wb += small->wb;
2991 small->wb = 0;
2992 large->fq += small->fq;
2993 small->fq = 0;);
2995 /* count flushers in result atom */
2996 large->nr_flushers += small->nr_flushers;
2997 small->nr_flushers = 0;
2999 /* update counts of flushed nodes */
3000 large->flushed += small->flushed;
3001 small->flushed = 0;
3003 /* Transfer list counts to large. */
3004 large->txnh_count += small->txnh_count;
3005 large->capture_count += small->capture_count;
3007 /* Add all txnh references to large. */
3008 atomic_add(small->txnh_count, &large->refcount);
3009 atomic_sub(small->txnh_count, &small->refcount);
3011 /* Reset small counts */
3012 small->txnh_count = 0;
3013 small->capture_count = 0;
3015 /* Assign the oldest start_time, merge flags. */
3016 large->start_time = min(large->start_time, small->start_time);
3017 large->flags |= small->flags;
3019 /* Merge blocknr sets. */
3020 blocknr_set_merge(&small->delete_set, &large->delete_set);
3021 blocknr_set_merge(&small->wandered_map, &large->wandered_map);
3023 /* Merge allocated/deleted file counts */
3024 large->nr_objects_deleted += small->nr_objects_deleted;
3025 large->nr_objects_created += small->nr_objects_created;
3027 small->nr_objects_deleted = 0;
3028 small->nr_objects_created = 0;
3030 /* Merge allocated blocks counts */
3031 large->nr_blocks_allocated += small->nr_blocks_allocated;
3033 large->nr_running_queues += small->nr_running_queues;
3034 small->nr_running_queues = 0;
3036 /* Merge blocks reserved for overwrite set. */
3037 large->flush_reserved += small->flush_reserved;
3038 small->flush_reserved = 0;
3040 if (large->stage < small->stage) {
3041 /* Large only needs to notify if it has changed state. */
3042 reiser4_atom_set_stage(large, small->stage);
3043 wakeup_atom_waiting_list(large);
3046 reiser4_atom_set_stage(small, ASTAGE_INVALID);
3048 /* Notify any waiters--small needs to unload its wait lists. Waiters
3049 actually remove themselves from the list before returning from the
3050 fuse_wait function. */
3051 wakeup_atom_waiting_list(small);
3053 /* Unlock atoms */
3054 spin_unlock_atom(large);
3055 atom_dec_and_unlock(small);
3058 /* TXNMGR STUFF */
3060 /* Release a block from the atom, reversing the effects of being captured,
3061 do not release atom's reference to jnode due to holding spin-locks.
3062 Currently this is only called when the atom commits.
3064 NOTE: this function does not release a (journal) reference to jnode
3065 due to locking optimizations, you should call jput() somewhere after
3066 calling reiser4_uncapture_block(). */
3067 void reiser4_uncapture_block(jnode * node)
3069 txn_atom *atom;
3071 assert("umka-226", node != NULL);
3072 atom = node->atom;
3073 assert("umka-228", atom != NULL);
3075 assert("jmacd-1021", node->atom == atom);
3076 assert_spin_locked(&(node->guard));
3077 assert("jmacd-1023", atom_is_protected(atom));
3079 JF_CLR(node, JNODE_DIRTY);
3080 JF_CLR(node, JNODE_RELOC);
3081 JF_CLR(node, JNODE_OVRWR);
3082 JF_CLR(node, JNODE_CREATED);
3083 JF_CLR(node, JNODE_WRITEBACK);
3084 JF_CLR(node, JNODE_REPACK);
3086 list_del_init(&node->capture_link);
3087 if (JF_ISSET(node, JNODE_FLUSH_QUEUED)) {
3088 assert("zam-925", atom_isopen(atom));
3089 assert("vs-1623", NODE_LIST(node) == FQ_LIST);
3090 ON_DEBUG(atom->num_queued--);
3091 JF_CLR(node, JNODE_FLUSH_QUEUED);
3093 atom->capture_count -= 1;
3094 ON_DEBUG(count_jnode(atom, node, NODE_LIST(node), NOT_CAPTURED, 1));
3095 node->atom = NULL;
3097 spin_unlock_jnode(node);
3098 LOCK_CNT_DEC(t_refs);
3101 /* Unconditional insert of jnode into atom's overwrite list. Currently used in
3102 bitmap-based allocator code for adding modified bitmap blocks the
3103 transaction. @atom and @node are spin locked */
3104 void insert_into_atom_ovrwr_list(txn_atom * atom, jnode * node)
3106 assert("zam-538", atom_is_protected(atom));
3107 assert_spin_locked(&(node->guard));
3108 assert("zam-899", JF_ISSET(node, JNODE_OVRWR));
3109 assert("zam-543", node->atom == NULL);
3110 assert("vs-1433", !jnode_is_unformatted(node) && !jnode_is_znode(node));
3112 list_add(&node->capture_link, ATOM_OVRWR_LIST(atom));
3113 jref(node);
3114 node->atom = atom;
3115 atom->capture_count++;
3116 ON_DEBUG(count_jnode(atom, node, NODE_LIST(node), OVRWR_LIST, 1));
3119 static int count_deleted_blocks_actor(txn_atom * atom,
3120 const reiser4_block_nr * a,
3121 const reiser4_block_nr * b, void *data)
3123 reiser4_block_nr *counter = data;
3125 assert("zam-995", data != NULL);
3126 assert("zam-996", a != NULL);
3127 if (b == NULL)
3128 *counter += 1;
3129 else
3130 *counter += *b;
3131 return 0;
3134 reiser4_block_nr txnmgr_count_deleted_blocks(void)
3136 reiser4_block_nr result;
3137 txn_mgr *tmgr = &get_super_private(reiser4_get_current_sb())->tmgr;
3138 txn_atom *atom;
3140 result = 0;
3142 spin_lock_txnmgr(tmgr);
3143 list_for_each_entry(atom, &tmgr->atoms_list, atom_link) {
3144 spin_lock_atom(atom);
3145 if (atom_isopen(atom))
3146 blocknr_set_iterator(
3147 atom, &atom->delete_set,
3148 count_deleted_blocks_actor, &result, 0);
3149 spin_unlock_atom(atom);
3151 spin_unlock_txnmgr(tmgr);
3153 return result;
3157 * Local variables:
3158 * c-indentation-style: "K&R"
3159 * mode-name: "LC"
3160 * c-basic-offset: 8
3161 * tab-width: 8
3162 * fill-column: 79
3163 * End: