revert-mm-fix-blkdev-size-calculation-in-generic_write_checks
[linux-2.6/linux-trees-mm.git] / fs / reiser4 / lock.c
blobd0eaf03770ed9d376c1fdfad26104164f06d09a9
1 /* Copyright 2001, 2002, 2003 by Hans Reiser, licensing governed by
2 * reiser4/README */
4 /* Traditional deadlock avoidance is achieved by acquiring all locks in a single
5 order. V4 balances the tree from the bottom up, and searches the tree from
6 the top down, and that is really the way we want it, so tradition won't work
7 for us.
9 Instead we have two lock orderings, a high priority lock ordering, and a low
10 priority lock ordering. Each node in the tree has a lock in its znode.
12 Suppose we have a set of processes which lock (R/W) tree nodes. Each process
13 has a set (maybe empty) of already locked nodes ("process locked set"). Each
14 process may have a pending lock request to a node locked by another process.
15 Note: we lock and unlock, but do not transfer locks: it is possible
16 transferring locks instead would save some bus locking....
18 Deadlock occurs when we have a loop constructed from process locked sets and
19 lock request vectors.
21 NOTE: The reiser4 "tree" is a tree on disk, but its cached representation in
22 memory is extended with "znodes" with which we connect nodes with their left
23 and right neighbors using sibling pointers stored in the znodes. When we
24 perform balancing operations we often go from left to right and from right to
25 left.
27 +-P1-+ +-P3-+
28 |+--+| V1 |+--+|
29 ||N1|| -------> ||N3||
30 |+--+| |+--+|
31 +----+ +----+
32 ^ |
33 |V2 |V3
34 | v
35 +---------P2---------+
36 |+--+ +--+|
37 ||N2| -------- |N4||
38 |+--+ +--+|
39 +--------------------+
41 We solve this by ensuring that only low priority processes lock in top to
42 bottom order and from right to left, and high priority processes lock from
43 bottom to top and left to right.
45 ZAM-FIXME-HANS: order not just node locks in this way, order atom locks, and
46 kill those damn busy loops.
47 ANSWER(ZAM): atom locks (which are introduced by ASTAGE_CAPTURE_WAIT atom
48 stage) cannot be ordered that way. There are no rules what nodes can belong
49 to the atom and what nodes cannot. We cannot define what is right or left
50 direction, what is top or bottom. We can take immediate parent or side
51 neighbor of one node, but nobody guarantees that, say, left neighbor node is
52 not a far right neighbor for other nodes from the same atom. It breaks
53 deadlock avoidance rules and hi-low priority locking cannot be applied for
54 atom locks.
56 How does it help to avoid deadlocks ?
58 Suppose we have a deadlock with n processes. Processes from one priority
59 class never deadlock because they take locks in one consistent
60 order.
62 So, any possible deadlock loop must have low priority as well as high
63 priority processes. There are no other lock priority levels except low and
64 high. We know that any deadlock loop contains at least one node locked by a
65 low priority process and requested by a high priority process. If this
66 situation is caught and resolved it is sufficient to avoid deadlocks.
68 V4 DEADLOCK PREVENTION ALGORITHM IMPLEMENTATION.
70 The deadlock prevention algorithm is based on comparing
71 priorities of node owners (processes which keep znode locked) and
72 requesters (processes which want to acquire a lock on znode). We
73 implement a scheme where low-priority owners yield locks to
74 high-priority requesters. We created a signal passing system that
75 is used to ask low-priority processes to yield one or more locked
76 znodes.
78 The condition when a znode needs to change its owners is described by the
79 following formula:
81 #############################################
82 # #
83 # (number of high-priority requesters) > 0 #
84 # AND #
85 # (numbers of high-priority owners) == 0 #
86 # #
87 #############################################
89 Note that a low-priority process delays node releasing if another
90 high-priority process owns this node. So, slightly more strictly speaking,
91 to have a deadlock capable cycle you must have a loop in which a high
92 priority process is waiting on a low priority process to yield a node, which
93 is slightly different from saying a high priority process is waiting on a
94 node owned by a low priority process.
96 It is enough to avoid deadlocks if we prevent any low-priority process from
97 falling asleep if its locked set contains a node which satisfies the
98 deadlock condition.
100 That condition is implicitly or explicitly checked in all places where new
101 high-priority requests may be added or removed from node request queue or
102 high-priority process takes or releases a lock on node. The main
103 goal of these checks is to never lose the moment when node becomes "has
104 wrong owners" and send "must-yield-this-lock" signals to its low-pri owners
105 at that time.
107 The information about received signals is stored in the per-process
108 structure (lock stack) and analyzed before a low-priority process goes to
109 sleep but after a "fast" attempt to lock a node fails. Any signal wakes
110 sleeping process up and forces him to re-check lock status and received
111 signal info. If "must-yield-this-lock" signals were received the locking
112 primitive (longterm_lock_znode()) fails with -E_DEADLOCK error code.
114 V4 LOCKING DRAWBACKS
116 If we have already balanced on one level, and we are propagating our changes
117 upward to a higher level, it could be very messy to surrender all locks on
118 the lower level because we put so much computational work into it, and
119 reverting them to their state before they were locked might be very complex.
120 We also don't want to acquire all locks before performing balancing because
121 that would either be almost as much work as the balancing, or it would be
122 too conservative and lock too much. We want balancing to be done only at
123 high priority. Yet, we might want to go to the left one node and use some
124 of its empty space... So we make one attempt at getting the node to the left
125 using try_lock, and if it fails we do without it, because we didn't really
126 need it, it was only a nice to have.
128 LOCK STRUCTURES DESCRIPTION
130 The following data structures are used in the reiser4 locking
131 implementation:
133 All fields related to long-term locking are stored in znode->lock.
135 The lock stack is a per thread object. It owns all znodes locked by the
136 thread. One znode may be locked by several threads in case of read lock or
137 one znode may be write locked by one thread several times. The special link
138 objects (lock handles) support n<->m relation between znodes and lock
139 owners.
141 <Thread 1> <Thread 2>
143 +---------+ +---------+
144 | LS1 | | LS2 |
145 +---------+ +---------+
147 |---------------+ +----------+
148 v v v v
149 +---------+ +---------+ +---------+ +---------+
150 | LH1 | | LH2 | | LH3 | | LH4 |
151 +---------+ +---------+ +---------+ +---------+
152 ^ ^ ^ ^
153 | +------------+ |
154 v v v
155 +---------+ +---------+ +---------+
156 | Z1 | | Z2 | | Z3 |
157 +---------+ +---------+ +---------+
159 Thread 1 locked znodes Z1 and Z2, thread 2 locked znodes Z2 and Z3. The
160 picture above shows that lock stack LS1 has a list of 2 lock handles LH1 and
161 LH2, lock stack LS2 has a list with lock handles LH3 and LH4 on it. Znode
162 Z1 is locked by only one thread, znode has only one lock handle LH1 on its
163 list, similar situation is for Z3 which is locked by the thread 2 only. Z2
164 is locked (for read) twice by different threads and two lock handles are on
165 its list. Each lock handle represents a single relation of a locking of a
166 znode by a thread. Locking of a znode is an establishing of a locking
167 relation between the lock stack and the znode by adding of a new lock handle
168 to a list of lock handles, the lock stack. The lock stack links all lock
169 handles for all znodes locked by the lock stack. The znode list groups all
170 lock handles for all locks stacks which locked the znode.
172 Yet another relation may exist between znode and lock owners. If lock
173 procedure cannot immediately take lock on an object it adds the lock owner
174 on special `requestors' list belongs to znode. That list represents a
175 queue of pending lock requests. Because one lock owner may request only
176 only one lock object at a time, it is a 1->n relation between lock objects
177 and a lock owner implemented as it is described above. Full information
178 (priority, pointers to lock and link objects) about each lock request is
179 stored in lock owner structure in `request' field.
181 SHORT_TERM LOCKING
183 This is a list of primitive operations over lock stacks / lock handles /
184 znodes and locking descriptions for them.
186 1. locking / unlocking which is done by two list insertion/deletion, one
187 to/from znode's list of lock handles, another one is to/from lock stack's
188 list of lock handles. The first insertion is protected by
189 znode->lock.guard spinlock. The list owned by the lock stack can be
190 modified only by thread who owns the lock stack and nobody else can
191 modify/read it. There is nothing to be protected by a spinlock or
192 something else.
194 2. adding/removing a lock request to/from znode requesters list. The rule is
195 that znode->lock.guard spinlock should be taken for this.
197 3. we can traverse list of lock handles and use references to lock stacks who
198 locked given znode if znode->lock.guard spinlock is taken.
200 4. If a lock stack is associated with a znode as a lock requestor or lock
201 owner its existence is guaranteed by znode->lock.guard spinlock. Some its
202 (lock stack's) fields should be protected from being accessed in parallel
203 by two or more threads. Please look at lock_stack structure definition
204 for the info how those fields are protected. */
206 /* Znode lock and capturing intertwining. */
207 /* In current implementation we capture formatted nodes before locking
208 them. Take a look on longterm lock znode, reiser4_try_capture() request
209 precedes locking requests. The longterm_lock_znode function unconditionally
210 captures znode before even checking of locking conditions.
212 Another variant is to capture znode after locking it. It was not tested, but
213 at least one deadlock condition is supposed to be there. One thread has
214 locked a znode (Node-1) and calls reiser4_try_capture() for it.
215 reiser4_try_capture() sleeps because znode's atom has CAPTURE_WAIT state.
216 Second thread is a flushing thread, its current atom is the atom Node-1
217 belongs to. Second thread wants to lock Node-1 and sleeps because Node-1
218 is locked by the first thread. The described situation is a deadlock. */
220 #include "debug.h"
221 #include "txnmgr.h"
222 #include "znode.h"
223 #include "jnode.h"
224 #include "tree.h"
225 #include "plugin/node/node.h"
226 #include "super.h"
228 #include <linux/spinlock.h>
230 #if REISER4_DEBUG
231 static int request_is_deadlock_safe(znode *, znode_lock_mode,
232 znode_lock_request);
233 #endif
235 /* Returns a lock owner associated with current thread */
236 lock_stack *get_current_lock_stack(void)
238 return &get_current_context()->stack;
241 /* Wakes up all low priority owners informing them about possible deadlock */
242 static void wake_up_all_lopri_owners(znode * node)
244 lock_handle *handle;
246 assert_spin_locked(&(node->lock.guard));
247 list_for_each_entry(handle, &node->lock.owners, owners_link) {
248 assert("nikita-1832", handle->node == node);
249 /* count this signal in owner->nr_signaled */
250 if (!handle->signaled) {
251 handle->signaled = 1;
252 atomic_inc(&handle->owner->nr_signaled);
253 /* Wake up a single process */
254 reiser4_wake_up(handle->owner);
259 /* Adds a lock to a lock owner, which means creating a link to the lock and
260 putting the link into the two lists all links are on (the doubly linked list
261 that forms the lock_stack, and the doubly linked list of links attached
262 to a lock.
264 static inline void
265 link_object(lock_handle * handle, lock_stack * owner, znode * node)
267 assert("jmacd-810", handle->owner == NULL);
268 assert_spin_locked(&(node->lock.guard));
270 handle->owner = owner;
271 handle->node = node;
273 assert("reiser4-4",
274 ergo(list_empty_careful(&owner->locks), owner->nr_locks == 0));
276 /* add lock handle to the end of lock_stack's list of locks */
277 list_add_tail(&handle->locks_link, &owner->locks);
278 ON_DEBUG(owner->nr_locks++);
279 reiser4_ctx_gfp_mask_set();
281 /* add lock handle to the head of znode's list of owners */
282 list_add(&handle->owners_link, &node->lock.owners);
283 handle->signaled = 0;
286 /* Breaks a relation between a lock and its owner */
287 static inline void unlink_object(lock_handle * handle)
289 assert("zam-354", handle->owner != NULL);
290 assert("nikita-1608", handle->node != NULL);
291 assert_spin_locked(&(handle->node->lock.guard));
292 assert("nikita-1829", handle->owner == get_current_lock_stack());
293 assert("reiser4-5", handle->owner->nr_locks > 0);
295 /* remove lock handle from lock_stack's list of locks */
296 list_del(&handle->locks_link);
297 ON_DEBUG(handle->owner->nr_locks--);
298 reiser4_ctx_gfp_mask_set();
299 assert("reiser4-6",
300 ergo(list_empty_careful(&handle->owner->locks),
301 handle->owner->nr_locks == 0));
302 /* remove lock handle from znode's list of owners */
303 list_del(&handle->owners_link);
304 /* indicates that lock handle is free now */
305 handle->node = NULL;
306 #if REISER4_DEBUG
307 INIT_LIST_HEAD(&handle->locks_link);
308 INIT_LIST_HEAD(&handle->owners_link);
309 handle->owner = NULL;
310 #endif
313 /* Actually locks an object knowing that we are able to do this */
314 static void lock_object(lock_stack * owner)
316 struct lock_request *request;
317 znode *node;
319 request = &owner->request;
320 node = request->node;
321 assert_spin_locked(&(node->lock.guard));
322 if (request->mode == ZNODE_READ_LOCK) {
323 node->lock.nr_readers++;
324 } else {
325 /* check that we don't switched from read to write lock */
326 assert("nikita-1840", node->lock.nr_readers <= 0);
327 /* We allow recursive locking; a node can be locked several
328 times for write by same process */
329 node->lock.nr_readers--;
332 link_object(request->handle, owner, node);
334 if (owner->curpri) {
335 node->lock.nr_hipri_owners++;
339 /* Check for recursive write locking */
340 static int recursive(lock_stack * owner)
342 int ret;
343 znode *node;
344 lock_handle *lh;
346 node = owner->request.node;
348 /* Owners list is not empty for a locked node */
349 assert("zam-314", !list_empty_careful(&node->lock.owners));
350 assert("nikita-1841", owner == get_current_lock_stack());
351 assert_spin_locked(&(node->lock.guard));
353 lh = list_entry(node->lock.owners.next, lock_handle, owners_link);
354 ret = (lh->owner == owner);
356 /* Recursive read locking should be done usual way */
357 assert("zam-315", !ret || owner->request.mode == ZNODE_WRITE_LOCK);
358 /* mixing of read/write locks is not allowed */
359 assert("zam-341", !ret || znode_is_wlocked(node));
361 return ret;
364 #if REISER4_DEBUG
365 /* Returns true if the lock is held by the calling thread. */
366 int znode_is_any_locked(const znode * node)
368 lock_handle *handle;
369 lock_stack *stack;
370 int ret;
372 if (!znode_is_locked(node)) {
373 return 0;
376 stack = get_current_lock_stack();
378 spin_lock_stack(stack);
380 ret = 0;
382 list_for_each_entry(handle, &stack->locks, locks_link) {
383 if (handle->node == node) {
384 ret = 1;
385 break;
389 spin_unlock_stack(stack);
391 return ret;
394 #endif
396 /* Returns true if a write lock is held by the calling thread. */
397 int znode_is_write_locked(const znode * node)
399 lock_stack *stack;
400 lock_handle *handle;
402 assert("jmacd-8765", node != NULL);
404 if (!znode_is_wlocked(node)) {
405 return 0;
408 stack = get_current_lock_stack();
411 * When znode is write locked, all owner handles point to the same lock
412 * stack. Get pointer to lock stack from the first lock handle from
413 * znode's owner list
415 handle = list_entry(node->lock.owners.next, lock_handle, owners_link);
417 return (handle->owner == stack);
420 /* This "deadlock" condition is the essential part of reiser4 locking
421 implementation. This condition is checked explicitly by calling
422 check_deadlock_condition() or implicitly in all places where znode lock
423 state (set of owners and request queue) is changed. Locking code is
424 designed to use this condition to trigger procedure of passing object from
425 low priority owner(s) to high priority one(s).
427 The procedure results in passing an event (setting lock_handle->signaled
428 flag) and counting this event in nr_signaled field of owner's lock stack
429 object and wakeup owner's process.
431 static inline int check_deadlock_condition(znode * node)
433 assert_spin_locked(&(node->lock.guard));
434 return node->lock.nr_hipri_requests > 0
435 && node->lock.nr_hipri_owners == 0;
438 static int check_livelock_condition(znode * node, znode_lock_mode mode)
440 zlock * lock = &node->lock;
442 return mode == ZNODE_READ_LOCK &&
443 lock -> nr_readers >= 0 && lock->nr_hipri_write_requests > 0;
446 /* checks lock/request compatibility */
447 static int can_lock_object(lock_stack * owner)
449 znode *node = owner->request.node;
451 assert_spin_locked(&(node->lock.guard));
453 /* See if the node is disconnected. */
454 if (unlikely(ZF_ISSET(node, JNODE_IS_DYING)))
455 return RETERR(-EINVAL);
457 /* Do not ever try to take a lock if we are going in low priority
458 direction and a node have a high priority request without high
459 priority owners. */
460 if (unlikely(!owner->curpri && check_deadlock_condition(node)))
461 return RETERR(-E_REPEAT);
462 if (unlikely(owner->curpri && check_livelock_condition(node, owner->request.mode)))
463 return RETERR(-E_REPEAT);
464 if (unlikely(!is_lock_compatible(node, owner->request.mode)))
465 return RETERR(-E_REPEAT);
466 return 0;
469 /* Setting of a high priority to the process. It clears "signaled" flags
470 because znode locked by high-priority process can't satisfy our "deadlock
471 condition". */
472 static void set_high_priority(lock_stack * owner)
474 assert("nikita-1846", owner == get_current_lock_stack());
475 /* Do nothing if current priority is already high */
476 if (!owner->curpri) {
477 /* We don't need locking for owner->locks list, because, this
478 * function is only called with the lock stack of the current
479 * thread, and no other thread can play with owner->locks list
480 * and/or change ->node pointers of lock handles in this list.
482 * (Interrupts also are not involved.)
484 lock_handle *item = list_entry(owner->locks.next, lock_handle, locks_link);
485 while (&owner->locks != &item->locks_link) {
486 znode *node = item->node;
488 spin_lock_zlock(&node->lock);
490 node->lock.nr_hipri_owners++;
492 /* we can safely set signaled to zero, because
493 previous statement (nr_hipri_owners ++) guarantees
494 that signaled will be never set again. */
495 item->signaled = 0;
496 spin_unlock_zlock(&node->lock);
498 item = list_entry(item->locks_link.next, lock_handle, locks_link);
500 owner->curpri = 1;
501 atomic_set(&owner->nr_signaled, 0);
505 /* Sets a low priority to the process. */
506 static void set_low_priority(lock_stack * owner)
508 assert("nikita-3075", owner == get_current_lock_stack());
509 /* Do nothing if current priority is already low */
510 if (owner->curpri) {
511 /* scan all locks (lock handles) held by @owner, which is
512 actually current thread, and check whether we are reaching
513 deadlock possibility anywhere.
515 lock_handle *handle = list_entry(owner->locks.next, lock_handle, locks_link);
516 while (&owner->locks != &handle->locks_link) {
517 znode *node = handle->node;
518 spin_lock_zlock(&node->lock);
519 /* this thread just was hipri owner of @node, so
520 nr_hipri_owners has to be greater than zero. */
521 assert("nikita-1835", node->lock.nr_hipri_owners > 0);
522 node->lock.nr_hipri_owners--;
523 /* If we have deadlock condition, adjust a nr_signaled
524 field. It is enough to set "signaled" flag only for
525 current process, other low-pri owners will be
526 signaled and waken up after current process unlocks
527 this object and any high-priority requestor takes
528 control. */
529 if (check_deadlock_condition(node)
530 && !handle->signaled) {
531 handle->signaled = 1;
532 atomic_inc(&owner->nr_signaled);
534 spin_unlock_zlock(&node->lock);
535 handle = list_entry(handle->locks_link.next, lock_handle, locks_link);
537 owner->curpri = 0;
541 static void remove_lock_request(lock_stack * requestor)
543 zlock * lock = &requestor->request.node->lock;
545 if (requestor->curpri) {
546 assert("nikita-1838", lock->nr_hipri_requests > 0);
547 lock->nr_hipri_requests--;
548 if (requestor->request.mode == ZNODE_WRITE_LOCK)
549 lock->nr_hipri_write_requests --;
551 list_del(&requestor->requestors_link);
554 static void invalidate_all_lock_requests(znode * node)
556 lock_stack *requestor, *tmp;
558 assert_spin_locked(&(node->lock.guard));
560 list_for_each_entry_safe(requestor, tmp, &node->lock.requestors, requestors_link) {
561 remove_lock_request(requestor);
562 requestor->request.ret_code = -EINVAL;
563 reiser4_wake_up(requestor);
564 requestor->request.mode = ZNODE_NO_LOCK;
568 static void dispatch_lock_requests(znode * node)
570 lock_stack *requestor, *tmp;
572 assert_spin_locked(&(node->lock.guard));
574 list_for_each_entry_safe(requestor, tmp, &node->lock.requestors, requestors_link) {
575 if (znode_is_write_locked(node))
576 break;
577 if (!can_lock_object(requestor)) {
578 lock_object(requestor);
579 remove_lock_request(requestor);
580 requestor->request.ret_code = 0;
581 reiser4_wake_up(requestor);
582 requestor->request.mode = ZNODE_NO_LOCK;
587 /* release long-term lock, acquired by longterm_lock_znode() */
588 void longterm_unlock_znode(lock_handle * handle)
590 znode *node = handle->node;
591 lock_stack *oldowner = handle->owner;
592 int hipri;
593 int readers;
594 int rdelta;
595 int youdie;
598 * this is time-critical and highly optimized code. Modify carefully.
601 assert("jmacd-1021", handle != NULL);
602 assert("jmacd-1022", handle->owner != NULL);
603 assert("nikita-1392", LOCK_CNT_GTZ(long_term_locked_znode));
605 assert("zam-130", oldowner == get_current_lock_stack());
607 LOCK_CNT_DEC(long_term_locked_znode);
610 * to minimize amount of operations performed under lock, pre-compute
611 * all variables used within critical section. This makes code
612 * obscure.
615 /* was this lock of hi or lo priority */
616 hipri = oldowner->curpri ? 1 : 0;
617 /* number of readers */
618 readers = node->lock.nr_readers;
619 /* +1 if write lock, -1 if read lock */
620 rdelta = (readers > 0) ? -1 : +1;
621 /* true if node is to die and write lock is released */
622 youdie = ZF_ISSET(node, JNODE_HEARD_BANSHEE) && (readers < 0);
624 spin_lock_zlock(&node->lock);
626 assert("zam-101", znode_is_locked(node));
628 /* Adjust a number of high priority owners of this lock */
629 assert("nikita-1836", node->lock.nr_hipri_owners >= hipri);
630 node->lock.nr_hipri_owners -= hipri;
632 /* Handle znode deallocation on last write-lock release. */
633 if (znode_is_wlocked_once(node)) {
634 if (youdie) {
635 forget_znode(handle);
636 assert("nikita-2191", znode_invariant(node));
637 zput(node);
638 return;
642 if (handle->signaled)
643 atomic_dec(&oldowner->nr_signaled);
645 /* Unlocking means owner<->object link deletion */
646 unlink_object(handle);
648 /* This is enough to be sure whether an object is completely
649 unlocked. */
650 node->lock.nr_readers += rdelta;
652 /* If the node is locked it must have an owners list. Likewise, if
653 the node is unlocked it must have an empty owners list. */
654 assert("zam-319", equi(znode_is_locked(node),
655 !list_empty_careful(&node->lock.owners)));
657 #if REISER4_DEBUG
658 if (!znode_is_locked(node))
659 ++node->times_locked;
660 #endif
662 /* If there are pending lock requests we wake up a requestor */
663 if (!znode_is_wlocked(node))
664 dispatch_lock_requests(node);
665 if (check_deadlock_condition(node))
666 wake_up_all_lopri_owners(node);
667 spin_unlock_zlock(&node->lock);
669 /* minus one reference from handle->node */
670 assert("nikita-2190", znode_invariant(node));
671 ON_DEBUG(check_lock_data());
672 ON_DEBUG(check_lock_node_data(node));
673 zput(node);
676 /* final portion of longterm-lock */
677 static int
678 lock_tail(lock_stack * owner, int ok, znode_lock_mode mode)
680 znode *node = owner->request.node;
682 assert_spin_locked(&(node->lock.guard));
684 /* If we broke with (ok == 0) it means we can_lock, now do it. */
685 if (ok == 0) {
686 lock_object(owner);
687 owner->request.mode = 0;
688 /* count a reference from lockhandle->node
690 znode was already referenced at the entry to this function,
691 hence taking spin-lock here is not necessary (see comment
692 in the zref()).
694 zref(node);
696 LOCK_CNT_INC(long_term_locked_znode);
698 spin_unlock_zlock(&node->lock);
699 ON_DEBUG(check_lock_data());
700 ON_DEBUG(check_lock_node_data(node));
701 return ok;
705 * version of longterm_znode_lock() optimized for the most common case: read
706 * lock without any special flags. This is the kind of lock that any tree
707 * traversal takes on the root node of the tree, which is very frequent.
709 static int longterm_lock_tryfast(lock_stack * owner)
711 int result;
712 znode *node;
713 zlock *lock;
715 node = owner->request.node;
716 lock = &node->lock;
718 assert("nikita-3340", reiser4_schedulable());
719 assert("nikita-3341", request_is_deadlock_safe(node,
720 ZNODE_READ_LOCK,
721 ZNODE_LOCK_LOPRI));
722 spin_lock_zlock(lock);
723 result = can_lock_object(owner);
724 spin_unlock_zlock(lock);
726 if (likely(result != -EINVAL)) {
727 spin_lock_znode(node);
728 result = reiser4_try_capture(ZJNODE(node), ZNODE_READ_LOCK, 0);
729 spin_unlock_znode(node);
730 spin_lock_zlock(lock);
731 if (unlikely(result != 0)) {
732 owner->request.mode = 0;
733 } else {
734 result = can_lock_object(owner);
735 if (unlikely(result == -E_REPEAT)) {
736 /* fall back to longterm_lock_znode() */
737 spin_unlock_zlock(lock);
738 return 1;
741 return lock_tail(owner, result, ZNODE_READ_LOCK);
742 } else
743 return 1;
746 /* locks given lock object */
747 int longterm_lock_znode(
748 /* local link object (allocated by lock owner thread, usually on its own
749 * stack) */
750 lock_handle * handle,
751 /* znode we want to lock. */
752 znode * node,
753 /* {ZNODE_READ_LOCK, ZNODE_WRITE_LOCK}; */
754 znode_lock_mode mode,
755 /* {0, -EINVAL, -E_DEADLOCK}, see return codes description. */
756 znode_lock_request request) {
757 int ret;
758 int hipri = (request & ZNODE_LOCK_HIPRI) != 0;
759 int non_blocking = 0;
760 int has_atom;
761 txn_capture cap_flags;
762 zlock *lock;
763 txn_handle *txnh;
764 tree_level level;
766 /* Get current process context */
767 lock_stack *owner = get_current_lock_stack();
769 /* Check that the lock handle is initialized and isn't already being
770 * used. */
771 assert("jmacd-808", handle->owner == NULL);
772 assert("nikita-3026", reiser4_schedulable());
773 assert("nikita-3219", request_is_deadlock_safe(node, mode, request));
774 assert("zam-1056", atomic_read(&ZJNODE(node)->x_count) > 0);
775 /* long term locks are not allowed in the VM contexts (->writepage(),
776 * prune_{d,i}cache()).
778 * FIXME this doesn't work due to unused-dentry-with-unlinked-inode
779 * bug caused by d_splice_alias() only working for directories.
781 assert("nikita-3547", 1 || ((current->flags & PF_MEMALLOC) == 0));
782 assert ("zam-1055", mode != ZNODE_NO_LOCK);
784 cap_flags = 0;
785 if (request & ZNODE_LOCK_NONBLOCK) {
786 cap_flags |= TXN_CAPTURE_NONBLOCKING;
787 non_blocking = 1;
790 if (request & ZNODE_LOCK_DONT_FUSE)
791 cap_flags |= TXN_CAPTURE_DONT_FUSE;
793 /* If we are changing our process priority we must adjust a number
794 of high priority owners for each znode that we already lock */
795 if (hipri) {
796 set_high_priority(owner);
797 } else {
798 set_low_priority(owner);
801 level = znode_get_level(node);
803 /* Fill request structure with our values. */
804 owner->request.mode = mode;
805 owner->request.handle = handle;
806 owner->request.node = node;
808 txnh = get_current_context()->trans;
809 lock = &node->lock;
811 if (mode == ZNODE_READ_LOCK && request == 0) {
812 ret = longterm_lock_tryfast(owner);
813 if (ret <= 0)
814 return ret;
817 has_atom = (txnh->atom != NULL);
819 /* Synchronize on node's zlock guard lock. */
820 spin_lock_zlock(lock);
822 if (znode_is_locked(node) &&
823 mode == ZNODE_WRITE_LOCK && recursive(owner))
824 return lock_tail(owner, 0, mode);
826 for (;;) {
827 /* Check the lock's availability: if it is unavaiable we get
828 E_REPEAT, 0 indicates "can_lock", otherwise the node is
829 invalid. */
830 ret = can_lock_object(owner);
832 if (unlikely(ret == -EINVAL)) {
833 /* @node is dying. Leave it alone. */
834 break;
837 if (unlikely(ret == -E_REPEAT && non_blocking)) {
838 /* either locking of @node by the current thread will
839 * lead to the deadlock, or lock modes are
840 * incompatible. */
841 break;
844 assert("nikita-1844", (ret == 0)
845 || ((ret == -E_REPEAT) && !non_blocking));
846 /* If we can get the lock... Try to capture first before
847 taking the lock. */
849 /* first handle commonest case where node and txnh are already
850 * in the same atom. */
851 /* safe to do without taking locks, because:
853 * 1. read of aligned word is atomic with respect to writes to
854 * this word
856 * 2. false negatives are handled in reiser4_try_capture().
858 * 3. false positives are impossible.
860 * PROOF: left as an exercise to the curious reader.
862 * Just kidding. Here is one:
864 * At the time T0 txnh->atom is stored in txnh_atom.
866 * At the time T1 node->atom is stored in node_atom.
868 * At the time T2 we observe that
870 * txnh_atom != NULL && node_atom == txnh_atom.
872 * Imagine that at this moment we acquire node and txnh spin
873 * lock in this order. Suppose that under spin lock we have
875 * node->atom != txnh->atom, (S1)
877 * at the time T3.
879 * txnh->atom != NULL still, because txnh is open by the
880 * current thread.
882 * Suppose node->atom == NULL, that is, node was un-captured
883 * between T1, and T3. But un-capturing of formatted node is
884 * always preceded by the call to reiser4_invalidate_lock(),
885 * which marks znode as JNODE_IS_DYING under zlock spin
886 * lock. Contradiction, because can_lock_object() above checks
887 * for JNODE_IS_DYING. Hence, node->atom != NULL at T3.
889 * Suppose that node->atom != node_atom, that is, atom, node
890 * belongs to was fused into another atom: node_atom was fused
891 * into node->atom. Atom of txnh was equal to node_atom at T2,
892 * which means that under spin lock, txnh->atom == node->atom,
893 * because txnh->atom can only follow fusion
894 * chain. Contradicts S1.
896 * The same for hypothesis txnh->atom != txnh_atom. Hence,
897 * node->atom == node_atom == txnh_atom == txnh->atom. Again
898 * contradicts S1. Hence S1 is false. QED.
902 if (likely(has_atom && ZJNODE(node)->atom == txnh->atom)) {
904 } else {
906 * unlock zlock spin lock here. It is possible for
907 * longterm_unlock_znode() to sneak in here, but there
908 * is no harm: reiser4_invalidate_lock() will mark znode
909 * as JNODE_IS_DYING and this will be noted by
910 * can_lock_object() below.
912 spin_unlock_zlock(lock);
913 spin_lock_znode(node);
914 ret = reiser4_try_capture(ZJNODE(node), mode, cap_flags);
915 spin_unlock_znode(node);
916 spin_lock_zlock(lock);
917 if (unlikely(ret != 0)) {
918 /* In the failure case, the txnmgr releases
919 the znode's lock (or in some cases, it was
920 released a while ago). There's no need to
921 reacquire it so we should return here,
922 avoid releasing the lock. */
923 owner->request.mode = 0;
924 break;
927 /* Check the lock's availability again -- this is
928 because under some circumstances the capture code
929 has to release and reacquire the znode spinlock. */
930 ret = can_lock_object(owner);
933 /* This time, a return of (ret == 0) means we can lock, so we
934 should break out of the loop. */
935 if (likely(ret != -E_REPEAT || non_blocking))
936 break;
938 /* Lock is unavailable, we have to wait. */
939 ret = reiser4_prepare_to_sleep(owner);
940 if (unlikely(ret != 0))
941 break;
943 assert_spin_locked(&(node->lock.guard));
944 if (hipri) {
945 /* If we are going in high priority direction then
946 increase high priority requests counter for the
947 node */
948 lock->nr_hipri_requests++;
949 if (mode == ZNODE_WRITE_LOCK)
950 lock->nr_hipri_write_requests ++;
951 /* If there are no high priority owners for a node,
952 then immediately wake up low priority owners, so
953 they can detect possible deadlock */
954 if (lock->nr_hipri_owners == 0)
955 wake_up_all_lopri_owners(node);
957 list_add_tail(&owner->requestors_link, &lock->requestors);
959 /* Ok, here we have prepared a lock request, so unlock
960 a znode ... */
961 spin_unlock_zlock(lock);
962 /* ... and sleep */
963 reiser4_go_to_sleep(owner);
964 if (owner->request.mode == ZNODE_NO_LOCK)
965 goto request_is_done;
966 spin_lock_zlock(lock);
967 if (owner->request.mode == ZNODE_NO_LOCK) {
968 spin_unlock_zlock(lock);
969 request_is_done:
970 if (owner->request.ret_code == 0) {
971 LOCK_CNT_INC(long_term_locked_znode);
972 zref(node);
974 return owner->request.ret_code;
976 remove_lock_request(owner);
979 return lock_tail(owner, ret, mode);
982 /* lock object invalidation means changing of lock object state to `INVALID'
983 and waiting for all other processes to cancel theirs lock requests. */
984 void reiser4_invalidate_lock(lock_handle * handle /* path to lock
985 * owner and lock
986 * object is being
987 * invalidated. */ )
989 znode *node = handle->node;
990 lock_stack *owner = handle->owner;
992 assert("zam-325", owner == get_current_lock_stack());
993 assert("zam-103", znode_is_write_locked(node));
994 assert("nikita-1393", !ZF_ISSET(node, JNODE_LEFT_CONNECTED));
995 assert("nikita-1793", !ZF_ISSET(node, JNODE_RIGHT_CONNECTED));
996 assert("nikita-1394", ZF_ISSET(node, JNODE_HEARD_BANSHEE));
997 assert("nikita-3097", znode_is_wlocked_once(node));
998 assert_spin_locked(&(node->lock.guard));
1000 if (handle->signaled)
1001 atomic_dec(&owner->nr_signaled);
1003 ZF_SET(node, JNODE_IS_DYING);
1004 unlink_object(handle);
1005 node->lock.nr_readers = 0;
1007 invalidate_all_lock_requests(node);
1008 spin_unlock_zlock(&node->lock);
1011 /* Initializes lock_stack. */
1012 void init_lock_stack(lock_stack * owner /* pointer to
1013 * allocated
1014 * structure. */ )
1016 INIT_LIST_HEAD(&owner->locks);
1017 INIT_LIST_HEAD(&owner->requestors_link);
1018 spin_lock_init(&owner->sguard);
1019 owner->curpri = 1;
1020 init_waitqueue_head(&owner->wait);
1023 /* Initializes lock object. */
1024 void reiser4_init_lock(zlock * lock /* pointer on allocated
1025 * uninitialized lock object
1026 * structure. */ )
1028 memset(lock, 0, sizeof(zlock));
1029 spin_lock_init(&lock->guard);
1030 INIT_LIST_HEAD(&lock->requestors);
1031 INIT_LIST_HEAD(&lock->owners);
1034 /* Transfer a lock handle (presumably so that variables can be moved between stack and
1035 heap locations). */
1036 static void
1037 move_lh_internal(lock_handle * new, lock_handle * old, int unlink_old)
1039 znode *node = old->node;
1040 lock_stack *owner = old->owner;
1041 int signaled;
1043 /* locks_list, modified by link_object() is not protected by
1044 anything. This is valid because only current thread ever modifies
1045 locks_list of its lock_stack.
1047 assert("nikita-1827", owner == get_current_lock_stack());
1048 assert("nikita-1831", new->owner == NULL);
1050 spin_lock_zlock(&node->lock);
1052 signaled = old->signaled;
1053 if (unlink_old) {
1054 unlink_object(old);
1055 } else {
1056 if (node->lock.nr_readers > 0) {
1057 node->lock.nr_readers += 1;
1058 } else {
1059 node->lock.nr_readers -= 1;
1061 if (signaled) {
1062 atomic_inc(&owner->nr_signaled);
1064 if (owner->curpri) {
1065 node->lock.nr_hipri_owners += 1;
1067 LOCK_CNT_INC(long_term_locked_znode);
1069 zref(node);
1071 link_object(new, owner, node);
1072 new->signaled = signaled;
1074 spin_unlock_zlock(&node->lock);
1077 void move_lh(lock_handle * new, lock_handle * old)
1079 move_lh_internal(new, old, /*unlink_old */ 1);
1082 void copy_lh(lock_handle * new, lock_handle * old)
1084 move_lh_internal(new, old, /*unlink_old */ 0);
1087 /* after getting -E_DEADLOCK we unlock znodes until this function returns false */
1088 int reiser4_check_deadlock(void)
1090 lock_stack *owner = get_current_lock_stack();
1091 return atomic_read(&owner->nr_signaled) != 0;
1094 /* Before going to sleep we re-check "release lock" requests which might come from threads with hi-pri lock
1095 priorities. */
1096 int reiser4_prepare_to_sleep(lock_stack * owner)
1098 assert("nikita-1847", owner == get_current_lock_stack());
1100 /* We return -E_DEADLOCK if one or more "give me the lock" messages are
1101 * counted in nr_signaled */
1102 if (unlikely(atomic_read(&owner->nr_signaled) != 0)) {
1103 assert("zam-959", !owner->curpri);
1104 return RETERR(-E_DEADLOCK);
1106 return 0;
1109 /* Wakes up a single thread */
1110 void __reiser4_wake_up(lock_stack * owner)
1112 atomic_set(&owner->wakeup, 1);
1113 wake_up(&owner->wait);
1116 /* Puts a thread to sleep */
1117 void reiser4_go_to_sleep(lock_stack * owner)
1119 /* Well, we might sleep here, so holding of any spinlocks is no-no */
1120 assert("nikita-3027", reiser4_schedulable());
1122 wait_event(owner->wait, atomic_read(&owner->wakeup));
1123 atomic_set(&owner->wakeup, 0);
1126 int lock_stack_isclean(lock_stack * owner)
1128 if (list_empty_careful(&owner->locks)) {
1129 assert("zam-353", atomic_read(&owner->nr_signaled) == 0);
1130 return 1;
1133 return 0;
1136 #if REISER4_DEBUG
1139 * debugging functions
1142 static void list_check(struct list_head *head)
1144 struct list_head *pos;
1146 list_for_each(pos, head)
1147 assert("", (pos->prev != NULL && pos->next != NULL &&
1148 pos->prev->next == pos && pos->next->prev == pos));
1151 /* check consistency of locking data-structures hanging of the @stack */
1152 static void check_lock_stack(lock_stack * stack)
1154 spin_lock_stack(stack);
1155 /* check that stack->locks is not corrupted */
1156 list_check(&stack->locks);
1157 spin_unlock_stack(stack);
1160 /* check consistency of locking data structures */
1161 void check_lock_data(void)
1163 check_lock_stack(&get_current_context()->stack);
1166 /* check consistency of locking data structures for @node */
1167 void check_lock_node_data(znode * node)
1169 spin_lock_zlock(&node->lock);
1170 list_check(&node->lock.owners);
1171 list_check(&node->lock.requestors);
1172 spin_unlock_zlock(&node->lock);
1175 /* check that given lock request is dead lock safe. This check is, of course,
1176 * not exhaustive. */
1177 static int
1178 request_is_deadlock_safe(znode * node, znode_lock_mode mode,
1179 znode_lock_request request)
1181 lock_stack *owner;
1183 owner = get_current_lock_stack();
1185 * check that hipri lock request is not issued when there are locked
1186 * nodes at the higher levels.
1188 if (request & ZNODE_LOCK_HIPRI && !(request & ZNODE_LOCK_NONBLOCK) &&
1189 znode_get_level(node) != 0) {
1190 lock_handle *item;
1192 list_for_each_entry(item, &owner->locks, locks_link) {
1193 znode *other;
1195 other = item->node;
1197 if (znode_get_level(other) == 0)
1198 continue;
1199 if (znode_get_level(other) > znode_get_level(node))
1200 return 0;
1203 return 1;
1206 #endif
1208 /* return pointer to static storage with name of lock_mode. For
1209 debugging */
1210 const char *lock_mode_name(znode_lock_mode lock /* lock mode to get name of */ )
1212 if (lock == ZNODE_READ_LOCK)
1213 return "read";
1214 else if (lock == ZNODE_WRITE_LOCK)
1215 return "write";
1216 else {
1217 static char buf[30];
1219 sprintf(buf, "unknown: %i", lock);
1220 return buf;
1224 /* Make Linus happy.
1225 Local variables:
1226 c-indentation-style: "K&R"
1227 mode-name: "LC"
1228 c-basic-offset: 8
1229 tab-width: 8
1230 fill-column: 79
1231 End: