fed up with those stupid warnings
[mmotm.git] / fs / reiser4 / lock.c
blob4af6fd0f9d2a7269f4a638f748843507004130c5
1 /* Copyright 2001, 2002, 2003 by Hans Reiser, licensing governed by
2 * reiser4/README */
4 /* Traditional deadlock avoidance is achieved by acquiring all locks in a single
5 order. V4 balances the tree from the bottom up, and searches the tree from
6 the top down, and that is really the way we want it, so tradition won't work
7 for us.
9 Instead we have two lock orderings, a high priority lock ordering, and a low
10 priority lock ordering. Each node in the tree has a lock in its znode.
12 Suppose we have a set of processes which lock (R/W) tree nodes. Each process
13 has a set (maybe empty) of already locked nodes ("process locked set"). Each
14 process may have a pending lock request to a node locked by another process.
15 Note: we lock and unlock, but do not transfer locks: it is possible
16 transferring locks instead would save some bus locking....
18 Deadlock occurs when we have a loop constructed from process locked sets and
19 lock request vectors.
21 NOTE: The reiser4 "tree" is a tree on disk, but its cached representation in
22 memory is extended with "znodes" with which we connect nodes with their left
23 and right neighbors using sibling pointers stored in the znodes. When we
24 perform balancing operations we often go from left to right and from right to
25 left.
27 +-P1-+ +-P3-+
28 |+--+| V1 |+--+|
29 ||N1|| -------> ||N3||
30 |+--+| |+--+|
31 +----+ +----+
32 ^ |
33 |V2 |V3
34 | v
35 +---------P2---------+
36 |+--+ +--+|
37 ||N2| -------- |N4||
38 |+--+ +--+|
39 +--------------------+
41 We solve this by ensuring that only low priority processes lock in top to
42 bottom order and from right to left, and high priority processes lock from
43 bottom to top and left to right.
45 ZAM-FIXME-HANS: order not just node locks in this way, order atom locks, and
46 kill those damn busy loops.
47 ANSWER(ZAM): atom locks (which are introduced by ASTAGE_CAPTURE_WAIT atom
48 stage) cannot be ordered that way. There are no rules what nodes can belong
49 to the atom and what nodes cannot. We cannot define what is right or left
50 direction, what is top or bottom. We can take immediate parent or side
51 neighbor of one node, but nobody guarantees that, say, left neighbor node is
52 not a far right neighbor for other nodes from the same atom. It breaks
53 deadlock avoidance rules and hi-low priority locking cannot be applied for
54 atom locks.
56 How does it help to avoid deadlocks ?
58 Suppose we have a deadlock with n processes. Processes from one priority
59 class never deadlock because they take locks in one consistent
60 order.
62 So, any possible deadlock loop must have low priority as well as high
63 priority processes. There are no other lock priority levels except low and
64 high. We know that any deadlock loop contains at least one node locked by a
65 low priority process and requested by a high priority process. If this
66 situation is caught and resolved it is sufficient to avoid deadlocks.
68 V4 DEADLOCK PREVENTION ALGORITHM IMPLEMENTATION.
70 The deadlock prevention algorithm is based on comparing
71 priorities of node owners (processes which keep znode locked) and
72 requesters (processes which want to acquire a lock on znode). We
73 implement a scheme where low-priority owners yield locks to
74 high-priority requesters. We created a signal passing system that
75 is used to ask low-priority processes to yield one or more locked
76 znodes.
78 The condition when a znode needs to change its owners is described by the
79 following formula:
81 #############################################
82 # #
83 # (number of high-priority requesters) > 0 #
84 # AND #
85 # (numbers of high-priority owners) == 0 #
86 # #
87 #############################################
89 Note that a low-priority process delays node releasing if another
90 high-priority process owns this node. So, slightly more strictly speaking,
91 to have a deadlock capable cycle you must have a loop in which a high
92 priority process is waiting on a low priority process to yield a node, which
93 is slightly different from saying a high priority process is waiting on a
94 node owned by a low priority process.
96 It is enough to avoid deadlocks if we prevent any low-priority process from
97 falling asleep if its locked set contains a node which satisfies the
98 deadlock condition.
100 That condition is implicitly or explicitly checked in all places where new
101 high-priority requests may be added or removed from node request queue or
102 high-priority process takes or releases a lock on node. The main
103 goal of these checks is to never lose the moment when node becomes "has
104 wrong owners" and send "must-yield-this-lock" signals to its low-pri owners
105 at that time.
107 The information about received signals is stored in the per-process
108 structure (lock stack) and analyzed before a low-priority process goes to
109 sleep but after a "fast" attempt to lock a node fails. Any signal wakes
110 sleeping process up and forces him to re-check lock status and received
111 signal info. If "must-yield-this-lock" signals were received the locking
112 primitive (longterm_lock_znode()) fails with -E_DEADLOCK error code.
114 V4 LOCKING DRAWBACKS
116 If we have already balanced on one level, and we are propagating our changes
117 upward to a higher level, it could be very messy to surrender all locks on
118 the lower level because we put so much computational work into it, and
119 reverting them to their state before they were locked might be very complex.
120 We also don't want to acquire all locks before performing balancing because
121 that would either be almost as much work as the balancing, or it would be
122 too conservative and lock too much. We want balancing to be done only at
123 high priority. Yet, we might want to go to the left one node and use some
124 of its empty space... So we make one attempt at getting the node to the left
125 using try_lock, and if it fails we do without it, because we didn't really
126 need it, it was only a nice to have.
128 LOCK STRUCTURES DESCRIPTION
130 The following data structures are used in the reiser4 locking
131 implementation:
133 All fields related to long-term locking are stored in znode->lock.
135 The lock stack is a per thread object. It owns all znodes locked by the
136 thread. One znode may be locked by several threads in case of read lock or
137 one znode may be write locked by one thread several times. The special link
138 objects (lock handles) support n<->m relation between znodes and lock
139 owners.
141 <Thread 1> <Thread 2>
143 +---------+ +---------+
144 | LS1 | | LS2 |
145 +---------+ +---------+
147 |---------------+ +----------+
148 v v v v
149 +---------+ +---------+ +---------+ +---------+
150 | LH1 | | LH2 | | LH3 | | LH4 |
151 +---------+ +---------+ +---------+ +---------+
152 ^ ^ ^ ^
153 | +------------+ |
154 v v v
155 +---------+ +---------+ +---------+
156 | Z1 | | Z2 | | Z3 |
157 +---------+ +---------+ +---------+
159 Thread 1 locked znodes Z1 and Z2, thread 2 locked znodes Z2 and Z3. The
160 picture above shows that lock stack LS1 has a list of 2 lock handles LH1 and
161 LH2, lock stack LS2 has a list with lock handles LH3 and LH4 on it. Znode
162 Z1 is locked by only one thread, znode has only one lock handle LH1 on its
163 list, similar situation is for Z3 which is locked by the thread 2 only. Z2
164 is locked (for read) twice by different threads and two lock handles are on
165 its list. Each lock handle represents a single relation of a locking of a
166 znode by a thread. Locking of a znode is an establishing of a locking
167 relation between the lock stack and the znode by adding of a new lock handle
168 to a list of lock handles, the lock stack. The lock stack links all lock
169 handles for all znodes locked by the lock stack. The znode list groups all
170 lock handles for all locks stacks which locked the znode.
172 Yet another relation may exist between znode and lock owners. If lock
173 procedure cannot immediately take lock on an object it adds the lock owner
174 on special `requestors' list belongs to znode. That list represents a
175 queue of pending lock requests. Because one lock owner may request only
176 only one lock object at a time, it is a 1->n relation between lock objects
177 and a lock owner implemented as it is described above. Full information
178 (priority, pointers to lock and link objects) about each lock request is
179 stored in lock owner structure in `request' field.
181 SHORT_TERM LOCKING
183 This is a list of primitive operations over lock stacks / lock handles /
184 znodes and locking descriptions for them.
186 1. locking / unlocking which is done by two list insertion/deletion, one
187 to/from znode's list of lock handles, another one is to/from lock stack's
188 list of lock handles. The first insertion is protected by
189 znode->lock.guard spinlock. The list owned by the lock stack can be
190 modified only by thread who owns the lock stack and nobody else can
191 modify/read it. There is nothing to be protected by a spinlock or
192 something else.
194 2. adding/removing a lock request to/from znode requesters list. The rule is
195 that znode->lock.guard spinlock should be taken for this.
197 3. we can traverse list of lock handles and use references to lock stacks who
198 locked given znode if znode->lock.guard spinlock is taken.
200 4. If a lock stack is associated with a znode as a lock requestor or lock
201 owner its existence is guaranteed by znode->lock.guard spinlock. Some its
202 (lock stack's) fields should be protected from being accessed in parallel
203 by two or more threads. Please look at lock_stack structure definition
204 for the info how those fields are protected. */
206 /* Znode lock and capturing intertwining. */
207 /* In current implementation we capture formatted nodes before locking
208 them. Take a look on longterm lock znode, reiser4_try_capture() request
209 precedes locking requests. The longterm_lock_znode function unconditionally
210 captures znode before even checking of locking conditions.
212 Another variant is to capture znode after locking it. It was not tested, but
213 at least one deadlock condition is supposed to be there. One thread has
214 locked a znode (Node-1) and calls reiser4_try_capture() for it.
215 reiser4_try_capture() sleeps because znode's atom has CAPTURE_WAIT state.
216 Second thread is a flushing thread, its current atom is the atom Node-1
217 belongs to. Second thread wants to lock Node-1 and sleeps because Node-1
218 is locked by the first thread. The described situation is a deadlock. */
220 #include "debug.h"
221 #include "txnmgr.h"
222 #include "znode.h"
223 #include "jnode.h"
224 #include "tree.h"
225 #include "plugin/node/node.h"
226 #include "super.h"
228 #include <linux/spinlock.h>
230 #if REISER4_DEBUG
231 static int request_is_deadlock_safe(znode * , znode_lock_mode,
232 znode_lock_request);
233 #endif
235 /* Returns a lock owner associated with current thread */
236 lock_stack *get_current_lock_stack(void)
238 return &get_current_context()->stack;
241 /* Wakes up all low priority owners informing them about possible deadlock */
242 static void wake_up_all_lopri_owners(znode * node)
244 lock_handle *handle;
246 assert_spin_locked(&(node->lock.guard));
247 list_for_each_entry(handle, &node->lock.owners, owners_link) {
248 assert("nikita-1832", handle->node == node);
249 /* count this signal in owner->nr_signaled */
250 if (!handle->signaled) {
251 handle->signaled = 1;
252 atomic_inc(&handle->owner->nr_signaled);
253 /* Wake up a single process */
254 reiser4_wake_up(handle->owner);
259 /* Adds a lock to a lock owner, which means creating a link to the lock and
260 putting the link into the two lists all links are on (the doubly linked list
261 that forms the lock_stack, and the doubly linked list of links attached
262 to a lock.
264 static inline void
265 link_object(lock_handle * handle, lock_stack * owner, znode * node)
267 assert("jmacd-810", handle->owner == NULL);
268 assert_spin_locked(&(node->lock.guard));
270 handle->owner = owner;
271 handle->node = node;
273 assert("reiser4-4",
274 ergo(list_empty_careful(&owner->locks), owner->nr_locks == 0));
276 /* add lock handle to the end of lock_stack's list of locks */
277 list_add_tail(&handle->locks_link, &owner->locks);
278 ON_DEBUG(owner->nr_locks++);
279 reiser4_ctx_gfp_mask_set();
281 /* add lock handle to the head of znode's list of owners */
282 list_add(&handle->owners_link, &node->lock.owners);
283 handle->signaled = 0;
286 /* Breaks a relation between a lock and its owner */
287 static inline void unlink_object(lock_handle * handle)
289 assert("zam-354", handle->owner != NULL);
290 assert("nikita-1608", handle->node != NULL);
291 assert_spin_locked(&(handle->node->lock.guard));
292 assert("nikita-1829", handle->owner == get_current_lock_stack());
293 assert("reiser4-5", handle->owner->nr_locks > 0);
295 /* remove lock handle from lock_stack's list of locks */
296 list_del(&handle->locks_link);
297 ON_DEBUG(handle->owner->nr_locks--);
298 reiser4_ctx_gfp_mask_set();
299 assert("reiser4-6",
300 ergo(list_empty_careful(&handle->owner->locks),
301 handle->owner->nr_locks == 0));
302 /* remove lock handle from znode's list of owners */
303 list_del(&handle->owners_link);
304 /* indicates that lock handle is free now */
305 handle->node = NULL;
306 #if REISER4_DEBUG
307 INIT_LIST_HEAD(&handle->locks_link);
308 INIT_LIST_HEAD(&handle->owners_link);
309 handle->owner = NULL;
310 #endif
313 /* Actually locks an object knowing that we are able to do this */
314 static void lock_object(lock_stack * owner)
316 struct lock_request *request;
317 znode *node;
319 request = &owner->request;
320 node = request->node;
321 assert_spin_locked(&(node->lock.guard));
322 if (request->mode == ZNODE_READ_LOCK) {
323 node->lock.nr_readers++;
324 } else {
325 /* check that we don't switched from read to write lock */
326 assert("nikita-1840", node->lock.nr_readers <= 0);
327 /* We allow recursive locking; a node can be locked several
328 times for write by same process */
329 node->lock.nr_readers--;
332 link_object(request->handle, owner, node);
334 if (owner->curpri)
335 node->lock.nr_hipri_owners++;
338 /* Check for recursive write locking */
339 static int recursive(lock_stack * owner)
341 int ret;
342 znode *node;
343 lock_handle *lh;
345 node = owner->request.node;
347 /* Owners list is not empty for a locked node */
348 assert("zam-314", !list_empty_careful(&node->lock.owners));
349 assert("nikita-1841", owner == get_current_lock_stack());
350 assert_spin_locked(&(node->lock.guard));
352 lh = list_entry(node->lock.owners.next, lock_handle, owners_link);
353 ret = (lh->owner == owner);
355 /* Recursive read locking should be done usual way */
356 assert("zam-315", !ret || owner->request.mode == ZNODE_WRITE_LOCK);
357 /* mixing of read/write locks is not allowed */
358 assert("zam-341", !ret || znode_is_wlocked(node));
360 return ret;
363 #if REISER4_DEBUG
364 /* Returns true if the lock is held by the calling thread. */
365 int znode_is_any_locked(const znode * node)
367 lock_handle *handle;
368 lock_stack *stack;
369 int ret;
371 if (!znode_is_locked(node))
372 return 0;
374 stack = get_current_lock_stack();
376 spin_lock_stack(stack);
378 ret = 0;
380 list_for_each_entry(handle, &stack->locks, locks_link) {
381 if (handle->node == node) {
382 ret = 1;
383 break;
387 spin_unlock_stack(stack);
389 return ret;
392 #endif
394 /* Returns true if a write lock is held by the calling thread. */
395 int znode_is_write_locked(const znode * node)
397 lock_stack *stack;
398 lock_handle *handle;
400 assert("jmacd-8765", node != NULL);
402 if (!znode_is_wlocked(node))
403 return 0;
405 stack = get_current_lock_stack();
408 * When znode is write locked, all owner handles point to the same lock
409 * stack. Get pointer to lock stack from the first lock handle from
410 * znode's owner list
412 handle = list_entry(node->lock.owners.next, lock_handle, owners_link);
414 return (handle->owner == stack);
417 /* This "deadlock" condition is the essential part of reiser4 locking
418 implementation. This condition is checked explicitly by calling
419 check_deadlock_condition() or implicitly in all places where znode lock
420 state (set of owners and request queue) is changed. Locking code is
421 designed to use this condition to trigger procedure of passing object from
422 low priority owner(s) to high priority one(s).
424 The procedure results in passing an event (setting lock_handle->signaled
425 flag) and counting this event in nr_signaled field of owner's lock stack
426 object and wakeup owner's process.
428 static inline int check_deadlock_condition(znode * node)
430 assert_spin_locked(&(node->lock.guard));
431 return node->lock.nr_hipri_requests > 0
432 && node->lock.nr_hipri_owners == 0;
435 static int check_livelock_condition(znode * node, znode_lock_mode mode)
437 zlock * lock = &node->lock;
439 return mode == ZNODE_READ_LOCK &&
440 lock->nr_readers >= 0 && lock->nr_hipri_write_requests > 0;
443 /* checks lock/request compatibility */
444 static int can_lock_object(lock_stack * owner)
446 znode *node = owner->request.node;
448 assert_spin_locked(&(node->lock.guard));
450 /* See if the node is disconnected. */
451 if (unlikely(ZF_ISSET(node, JNODE_IS_DYING)))
452 return RETERR(-EINVAL);
454 /* Do not ever try to take a lock if we are going in low priority
455 direction and a node have a high priority request without high
456 priority owners. */
457 if (unlikely(!owner->curpri && check_deadlock_condition(node)))
458 return RETERR(-E_REPEAT);
459 if (unlikely(owner->curpri &&
460 check_livelock_condition(node, owner->request.mode)))
461 return RETERR(-E_REPEAT);
462 if (unlikely(!is_lock_compatible(node, owner->request.mode)))
463 return RETERR(-E_REPEAT);
464 return 0;
467 /* Setting of a high priority to the process. It clears "signaled" flags
468 because znode locked by high-priority process can't satisfy our "deadlock
469 condition". */
470 static void set_high_priority(lock_stack * owner)
472 assert("nikita-1846", owner == get_current_lock_stack());
473 /* Do nothing if current priority is already high */
474 if (!owner->curpri) {
475 /* We don't need locking for owner->locks list, because, this
476 * function is only called with the lock stack of the current
477 * thread, and no other thread can play with owner->locks list
478 * and/or change ->node pointers of lock handles in this list.
480 * (Interrupts also are not involved.)
482 lock_handle *item = list_entry(owner->locks.next, lock_handle,
483 locks_link);
484 while (&owner->locks != &item->locks_link) {
485 znode *node = item->node;
487 spin_lock_zlock(&node->lock);
489 node->lock.nr_hipri_owners++;
491 /* we can safely set signaled to zero, because
492 previous statement (nr_hipri_owners ++) guarantees
493 that signaled will be never set again. */
494 item->signaled = 0;
495 spin_unlock_zlock(&node->lock);
497 item = list_entry(item->locks_link.next, lock_handle,
498 locks_link);
500 owner->curpri = 1;
501 atomic_set(&owner->nr_signaled, 0);
505 /* Sets a low priority to the process. */
506 static void set_low_priority(lock_stack * owner)
508 assert("nikita-3075", owner == get_current_lock_stack());
509 /* Do nothing if current priority is already low */
510 if (owner->curpri) {
511 /* scan all locks (lock handles) held by @owner, which is
512 actually current thread, and check whether we are reaching
513 deadlock possibility anywhere.
515 lock_handle *handle = list_entry(owner->locks.next, lock_handle,
516 locks_link);
517 while (&owner->locks != &handle->locks_link) {
518 znode *node = handle->node;
519 spin_lock_zlock(&node->lock);
520 /* this thread just was hipri owner of @node, so
521 nr_hipri_owners has to be greater than zero. */
522 assert("nikita-1835", node->lock.nr_hipri_owners > 0);
523 node->lock.nr_hipri_owners--;
524 /* If we have deadlock condition, adjust a nr_signaled
525 field. It is enough to set "signaled" flag only for
526 current process, other low-pri owners will be
527 signaled and waken up after current process unlocks
528 this object and any high-priority requestor takes
529 control. */
530 if (check_deadlock_condition(node)
531 && !handle->signaled) {
532 handle->signaled = 1;
533 atomic_inc(&owner->nr_signaled);
535 spin_unlock_zlock(&node->lock);
536 handle = list_entry(handle->locks_link.next,
537 lock_handle, locks_link);
539 owner->curpri = 0;
543 static void remove_lock_request(lock_stack * requestor)
545 zlock * lock = &requestor->request.node->lock;
547 if (requestor->curpri) {
548 assert("nikita-1838", lock->nr_hipri_requests > 0);
549 lock->nr_hipri_requests--;
550 if (requestor->request.mode == ZNODE_WRITE_LOCK)
551 lock->nr_hipri_write_requests--;
553 list_del(&requestor->requestors_link);
556 static void invalidate_all_lock_requests(znode * node)
558 lock_stack *requestor, *tmp;
560 assert_spin_locked(&(node->lock.guard));
562 list_for_each_entry_safe(requestor, tmp, &node->lock.requestors,
563 requestors_link) {
564 remove_lock_request(requestor);
565 requestor->request.ret_code = -EINVAL;
566 reiser4_wake_up(requestor);
567 requestor->request.mode = ZNODE_NO_LOCK;
571 static void dispatch_lock_requests(znode * node)
573 lock_stack *requestor, *tmp;
575 assert_spin_locked(&(node->lock.guard));
577 list_for_each_entry_safe(requestor, tmp, &node->lock.requestors,
578 requestors_link) {
579 if (znode_is_write_locked(node))
580 break;
581 if (!can_lock_object(requestor)) {
582 lock_object(requestor);
583 remove_lock_request(requestor);
584 requestor->request.ret_code = 0;
585 reiser4_wake_up(requestor);
586 requestor->request.mode = ZNODE_NO_LOCK;
591 /* release long-term lock, acquired by longterm_lock_znode() */
592 void longterm_unlock_znode(lock_handle * handle)
594 znode *node = handle->node;
595 lock_stack *oldowner = handle->owner;
596 int hipri;
597 int readers;
598 int rdelta;
599 int youdie;
602 * this is time-critical and highly optimized code. Modify carefully.
605 assert("jmacd-1021", handle != NULL);
606 assert("jmacd-1022", handle->owner != NULL);
607 assert("nikita-1392", LOCK_CNT_GTZ(long_term_locked_znode));
609 assert("zam-130", oldowner == get_current_lock_stack());
611 LOCK_CNT_DEC(long_term_locked_znode);
614 * to minimize amount of operations performed under lock, pre-compute
615 * all variables used within critical section. This makes code
616 * obscure.
619 /* was this lock of hi or lo priority */
620 hipri = oldowner->curpri ? 1 : 0;
621 /* number of readers */
622 readers = node->lock.nr_readers;
623 /* +1 if write lock, -1 if read lock */
624 rdelta = (readers > 0) ? -1 : +1;
625 /* true if node is to die and write lock is released */
626 youdie = ZF_ISSET(node, JNODE_HEARD_BANSHEE) && (readers < 0);
628 spin_lock_zlock(&node->lock);
630 assert("zam-101", znode_is_locked(node));
632 /* Adjust a number of high priority owners of this lock */
633 assert("nikita-1836", node->lock.nr_hipri_owners >= hipri);
634 node->lock.nr_hipri_owners -= hipri;
636 /* Handle znode deallocation on last write-lock release. */
637 if (znode_is_wlocked_once(node)) {
638 if (youdie) {
639 forget_znode(handle);
640 assert("nikita-2191", znode_invariant(node));
641 zput(node);
642 return;
646 if (handle->signaled)
647 atomic_dec(&oldowner->nr_signaled);
649 /* Unlocking means owner<->object link deletion */
650 unlink_object(handle);
652 /* This is enough to be sure whether an object is completely
653 unlocked. */
654 node->lock.nr_readers += rdelta;
656 /* If the node is locked it must have an owners list. Likewise, if
657 the node is unlocked it must have an empty owners list. */
658 assert("zam-319", equi(znode_is_locked(node),
659 !list_empty_careful(&node->lock.owners)));
661 #if REISER4_DEBUG
662 if (!znode_is_locked(node))
663 ++node->times_locked;
664 #endif
666 /* If there are pending lock requests we wake up a requestor */
667 if (!znode_is_wlocked(node))
668 dispatch_lock_requests(node);
669 if (check_deadlock_condition(node))
670 wake_up_all_lopri_owners(node);
671 spin_unlock_zlock(&node->lock);
673 /* minus one reference from handle->node */
674 assert("nikita-2190", znode_invariant(node));
675 ON_DEBUG(check_lock_data());
676 ON_DEBUG(check_lock_node_data(node));
677 zput(node);
680 /* final portion of longterm-lock */
681 static int
682 lock_tail(lock_stack * owner, int ok, znode_lock_mode mode)
684 znode *node = owner->request.node;
686 assert_spin_locked(&(node->lock.guard));
688 /* If we broke with (ok == 0) it means we can_lock, now do it. */
689 if (ok == 0) {
690 lock_object(owner);
691 owner->request.mode = 0;
692 /* count a reference from lockhandle->node
694 znode was already referenced at the entry to this function,
695 hence taking spin-lock here is not necessary (see comment
696 in the zref()).
698 zref(node);
700 LOCK_CNT_INC(long_term_locked_znode);
702 spin_unlock_zlock(&node->lock);
703 ON_DEBUG(check_lock_data());
704 ON_DEBUG(check_lock_node_data(node));
705 return ok;
709 * version of longterm_znode_lock() optimized for the most common case: read
710 * lock without any special flags. This is the kind of lock that any tree
711 * traversal takes on the root node of the tree, which is very frequent.
713 static int longterm_lock_tryfast(lock_stack * owner)
715 int result;
716 znode *node;
717 zlock *lock;
719 node = owner->request.node;
720 lock = &node->lock;
722 assert("nikita-3340", reiser4_schedulable());
723 assert("nikita-3341", request_is_deadlock_safe(node,
724 ZNODE_READ_LOCK,
725 ZNODE_LOCK_LOPRI));
726 spin_lock_zlock(lock);
727 result = can_lock_object(owner);
728 spin_unlock_zlock(lock);
730 if (likely(result != -EINVAL)) {
731 spin_lock_znode(node);
732 result = reiser4_try_capture(ZJNODE(node), ZNODE_READ_LOCK, 0);
733 spin_unlock_znode(node);
734 spin_lock_zlock(lock);
735 if (unlikely(result != 0)) {
736 owner->request.mode = 0;
737 } else {
738 result = can_lock_object(owner);
739 if (unlikely(result == -E_REPEAT)) {
740 /* fall back to longterm_lock_znode() */
741 spin_unlock_zlock(lock);
742 return 1;
745 return lock_tail(owner, result, ZNODE_READ_LOCK);
746 } else
747 return 1;
750 /* locks given lock object */
751 int longterm_lock_znode(
752 /* local link object (allocated by lock owner
753 * thread, usually on its own stack) */
754 lock_handle * handle,
755 /* znode we want to lock. */
756 znode * node,
757 /* {ZNODE_READ_LOCK, ZNODE_WRITE_LOCK}; */
758 znode_lock_mode mode,
759 /* {0, -EINVAL, -E_DEADLOCK}, see return codes
760 description. */
761 znode_lock_request request) {
762 int ret;
763 int hipri = (request & ZNODE_LOCK_HIPRI) != 0;
764 int non_blocking = 0;
765 int has_atom;
766 txn_capture cap_flags;
767 zlock *lock;
768 txn_handle *txnh;
769 tree_level level;
771 /* Get current process context */
772 lock_stack *owner = get_current_lock_stack();
774 /* Check that the lock handle is initialized and isn't already being
775 * used. */
776 assert("jmacd-808", handle->owner == NULL);
777 assert("nikita-3026", reiser4_schedulable());
778 assert("nikita-3219", request_is_deadlock_safe(node, mode, request));
779 assert("zam-1056", atomic_read(&ZJNODE(node)->x_count) > 0);
780 /* long term locks are not allowed in the VM contexts (->writepage(),
781 * prune_{d,i}cache()).
783 * FIXME this doesn't work due to unused-dentry-with-unlinked-inode
784 * bug caused by d_splice_alias() only working for directories.
786 assert("nikita-3547", 1 || ((current->flags & PF_MEMALLOC) == 0));
787 assert("zam-1055", mode != ZNODE_NO_LOCK);
789 cap_flags = 0;
790 if (request & ZNODE_LOCK_NONBLOCK) {
791 cap_flags |= TXN_CAPTURE_NONBLOCKING;
792 non_blocking = 1;
795 if (request & ZNODE_LOCK_DONT_FUSE)
796 cap_flags |= TXN_CAPTURE_DONT_FUSE;
798 /* If we are changing our process priority we must adjust a number
799 of high priority owners for each znode that we already lock */
800 if (hipri) {
801 set_high_priority(owner);
802 } else {
803 set_low_priority(owner);
806 level = znode_get_level(node);
808 /* Fill request structure with our values. */
809 owner->request.mode = mode;
810 owner->request.handle = handle;
811 owner->request.node = node;
813 txnh = get_current_context()->trans;
814 lock = &node->lock;
816 if (mode == ZNODE_READ_LOCK && request == 0) {
817 ret = longterm_lock_tryfast(owner);
818 if (ret <= 0)
819 return ret;
822 has_atom = (txnh->atom != NULL);
824 /* Synchronize on node's zlock guard lock. */
825 spin_lock_zlock(lock);
827 if (znode_is_locked(node) &&
828 mode == ZNODE_WRITE_LOCK && recursive(owner))
829 return lock_tail(owner, 0, mode);
831 for (;;) {
832 /* Check the lock's availability: if it is unavaiable we get
833 E_REPEAT, 0 indicates "can_lock", otherwise the node is
834 invalid. */
835 ret = can_lock_object(owner);
837 if (unlikely(ret == -EINVAL)) {
838 /* @node is dying. Leave it alone. */
839 break;
842 if (unlikely(ret == -E_REPEAT && non_blocking)) {
843 /* either locking of @node by the current thread will
844 * lead to the deadlock, or lock modes are
845 * incompatible. */
846 break;
849 assert("nikita-1844", (ret == 0)
850 || ((ret == -E_REPEAT) && !non_blocking));
851 /* If we can get the lock... Try to capture first before
852 taking the lock. */
854 /* first handle commonest case where node and txnh are already
855 * in the same atom. */
856 /* safe to do without taking locks, because:
858 * 1. read of aligned word is atomic with respect to writes to
859 * this word
861 * 2. false negatives are handled in reiser4_try_capture().
863 * 3. false positives are impossible.
865 * PROOF: left as an exercise to the curious reader.
867 * Just kidding. Here is one:
869 * At the time T0 txnh->atom is stored in txnh_atom.
871 * At the time T1 node->atom is stored in node_atom.
873 * At the time T2 we observe that
875 * txnh_atom != NULL && node_atom == txnh_atom.
877 * Imagine that at this moment we acquire node and txnh spin
878 * lock in this order. Suppose that under spin lock we have
880 * node->atom != txnh->atom, (S1)
882 * at the time T3.
884 * txnh->atom != NULL still, because txnh is open by the
885 * current thread.
887 * Suppose node->atom == NULL, that is, node was un-captured
888 * between T1, and T3. But un-capturing of formatted node is
889 * always preceded by the call to reiser4_invalidate_lock(),
890 * which marks znode as JNODE_IS_DYING under zlock spin
891 * lock. Contradiction, because can_lock_object() above checks
892 * for JNODE_IS_DYING. Hence, node->atom != NULL at T3.
894 * Suppose that node->atom != node_atom, that is, atom, node
895 * belongs to was fused into another atom: node_atom was fused
896 * into node->atom. Atom of txnh was equal to node_atom at T2,
897 * which means that under spin lock, txnh->atom == node->atom,
898 * because txnh->atom can only follow fusion
899 * chain. Contradicts S1.
901 * The same for hypothesis txnh->atom != txnh_atom. Hence,
902 * node->atom == node_atom == txnh_atom == txnh->atom. Again
903 * contradicts S1. Hence S1 is false. QED.
907 if (likely(has_atom && ZJNODE(node)->atom == txnh->atom)) {
909 } else {
911 * unlock zlock spin lock here. It is possible for
912 * longterm_unlock_znode() to sneak in here, but there
913 * is no harm: reiser4_invalidate_lock() will mark znode
914 * as JNODE_IS_DYING and this will be noted by
915 * can_lock_object() below.
917 spin_unlock_zlock(lock);
918 spin_lock_znode(node);
919 ret = reiser4_try_capture(ZJNODE(node), mode,
920 cap_flags);
921 spin_unlock_znode(node);
922 spin_lock_zlock(lock);
923 if (unlikely(ret != 0)) {
924 /* In the failure case, the txnmgr releases
925 the znode's lock (or in some cases, it was
926 released a while ago). There's no need to
927 reacquire it so we should return here,
928 avoid releasing the lock. */
929 owner->request.mode = 0;
930 break;
933 /* Check the lock's availability again -- this is
934 because under some circumstances the capture code
935 has to release and reacquire the znode spinlock. */
936 ret = can_lock_object(owner);
939 /* This time, a return of (ret == 0) means we can lock, so we
940 should break out of the loop. */
941 if (likely(ret != -E_REPEAT || non_blocking))
942 break;
944 /* Lock is unavailable, we have to wait. */
945 ret = reiser4_prepare_to_sleep(owner);
946 if (unlikely(ret != 0))
947 break;
949 assert_spin_locked(&(node->lock.guard));
950 if (hipri) {
951 /* If we are going in high priority direction then
952 increase high priority requests counter for the
953 node */
954 lock->nr_hipri_requests++;
955 if (mode == ZNODE_WRITE_LOCK)
956 lock->nr_hipri_write_requests++;
957 /* If there are no high priority owners for a node,
958 then immediately wake up low priority owners, so
959 they can detect possible deadlock */
960 if (lock->nr_hipri_owners == 0)
961 wake_up_all_lopri_owners(node);
963 list_add_tail(&owner->requestors_link, &lock->requestors);
965 /* Ok, here we have prepared a lock request, so unlock
966 a znode ... */
967 spin_unlock_zlock(lock);
968 /* ... and sleep */
969 reiser4_go_to_sleep(owner);
970 if (owner->request.mode == ZNODE_NO_LOCK)
971 goto request_is_done;
972 spin_lock_zlock(lock);
973 if (owner->request.mode == ZNODE_NO_LOCK) {
974 spin_unlock_zlock(lock);
975 request_is_done:
976 if (owner->request.ret_code == 0) {
977 LOCK_CNT_INC(long_term_locked_znode);
978 zref(node);
980 return owner->request.ret_code;
982 remove_lock_request(owner);
985 return lock_tail(owner, ret, mode);
988 /* lock object invalidation means changing of lock object state to `INVALID'
989 and waiting for all other processes to cancel theirs lock requests. */
990 void reiser4_invalidate_lock(lock_handle * handle /* path to lock
991 * owner and lock
992 * object is being
993 * invalidated. */ )
995 znode *node = handle->node;
996 lock_stack *owner = handle->owner;
998 assert("zam-325", owner == get_current_lock_stack());
999 assert("zam-103", znode_is_write_locked(node));
1000 assert("nikita-1393", !ZF_ISSET(node, JNODE_LEFT_CONNECTED));
1001 assert("nikita-1793", !ZF_ISSET(node, JNODE_RIGHT_CONNECTED));
1002 assert("nikita-1394", ZF_ISSET(node, JNODE_HEARD_BANSHEE));
1003 assert("nikita-3097", znode_is_wlocked_once(node));
1004 assert_spin_locked(&(node->lock.guard));
1006 if (handle->signaled)
1007 atomic_dec(&owner->nr_signaled);
1009 ZF_SET(node, JNODE_IS_DYING);
1010 unlink_object(handle);
1011 node->lock.nr_readers = 0;
1013 invalidate_all_lock_requests(node);
1014 spin_unlock_zlock(&node->lock);
1017 /* Initializes lock_stack. */
1018 void init_lock_stack(lock_stack * owner /* pointer to
1019 * allocated
1020 * structure. */ )
1022 INIT_LIST_HEAD(&owner->locks);
1023 INIT_LIST_HEAD(&owner->requestors_link);
1024 spin_lock_init(&owner->sguard);
1025 owner->curpri = 1;
1026 init_waitqueue_head(&owner->wait);
1029 /* Initializes lock object. */
1030 void reiser4_init_lock(zlock * lock /* pointer on allocated
1031 * uninitialized lock object
1032 * structure. */ )
1034 memset(lock, 0, sizeof(zlock));
1035 spin_lock_init(&lock->guard);
1036 INIT_LIST_HEAD(&lock->requestors);
1037 INIT_LIST_HEAD(&lock->owners);
1040 /* Transfer a lock handle (presumably so that variables can be moved between
1041 stack and heap locations). */
1042 static void
1043 move_lh_internal(lock_handle * new, lock_handle * old, int unlink_old)
1045 znode *node = old->node;
1046 lock_stack *owner = old->owner;
1047 int signaled;
1049 /* locks_list, modified by link_object() is not protected by
1050 anything. This is valid because only current thread ever modifies
1051 locks_list of its lock_stack.
1053 assert("nikita-1827", owner == get_current_lock_stack());
1054 assert("nikita-1831", new->owner == NULL);
1056 spin_lock_zlock(&node->lock);
1058 signaled = old->signaled;
1059 if (unlink_old) {
1060 unlink_object(old);
1061 } else {
1062 if (node->lock.nr_readers > 0) {
1063 node->lock.nr_readers += 1;
1064 } else {
1065 node->lock.nr_readers -= 1;
1067 if (signaled)
1068 atomic_inc(&owner->nr_signaled);
1069 if (owner->curpri)
1070 node->lock.nr_hipri_owners += 1;
1071 LOCK_CNT_INC(long_term_locked_znode);
1073 zref(node);
1075 link_object(new, owner, node);
1076 new->signaled = signaled;
1078 spin_unlock_zlock(&node->lock);
1081 void move_lh(lock_handle * new, lock_handle * old)
1083 move_lh_internal(new, old, /*unlink_old */ 1);
1086 void copy_lh(lock_handle * new, lock_handle * old)
1088 move_lh_internal(new, old, /*unlink_old */ 0);
1091 /* after getting -E_DEADLOCK we unlock znodes until this function returns false
1093 int reiser4_check_deadlock(void)
1095 lock_stack *owner = get_current_lock_stack();
1096 return atomic_read(&owner->nr_signaled) != 0;
1099 /* Before going to sleep we re-check "release lock" requests which might come
1100 from threads with hi-pri lock priorities. */
1101 int reiser4_prepare_to_sleep(lock_stack * owner)
1103 assert("nikita-1847", owner == get_current_lock_stack());
1105 /* We return -E_DEADLOCK if one or more "give me the lock" messages are
1106 * counted in nr_signaled */
1107 if (unlikely(atomic_read(&owner->nr_signaled) != 0)) {
1108 assert("zam-959", !owner->curpri);
1109 return RETERR(-E_DEADLOCK);
1111 return 0;
1114 /* Wakes up a single thread */
1115 void __reiser4_wake_up(lock_stack * owner)
1117 atomic_set(&owner->wakeup, 1);
1118 wake_up(&owner->wait);
1121 /* Puts a thread to sleep */
1122 void reiser4_go_to_sleep(lock_stack * owner)
1124 /* Well, we might sleep here, so holding of any spinlocks is no-no */
1125 assert("nikita-3027", reiser4_schedulable());
1127 wait_event(owner->wait, atomic_read(&owner->wakeup));
1128 atomic_set(&owner->wakeup, 0);
1131 int lock_stack_isclean(lock_stack * owner)
1133 if (list_empty_careful(&owner->locks)) {
1134 assert("zam-353", atomic_read(&owner->nr_signaled) == 0);
1135 return 1;
1138 return 0;
1141 #if REISER4_DEBUG
1144 * debugging functions
1147 static void list_check(struct list_head *head)
1149 struct list_head *pos;
1151 list_for_each(pos, head)
1152 assert("", (pos->prev != NULL && pos->next != NULL &&
1153 pos->prev->next == pos && pos->next->prev == pos));
1156 /* check consistency of locking data-structures hanging of the @stack */
1157 static void check_lock_stack(lock_stack * stack)
1159 spin_lock_stack(stack);
1160 /* check that stack->locks is not corrupted */
1161 list_check(&stack->locks);
1162 spin_unlock_stack(stack);
1165 /* check consistency of locking data structures */
1166 void check_lock_data(void)
1168 check_lock_stack(&get_current_context()->stack);
1171 /* check consistency of locking data structures for @node */
1172 void check_lock_node_data(znode * node)
1174 spin_lock_zlock(&node->lock);
1175 list_check(&node->lock.owners);
1176 list_check(&node->lock.requestors);
1177 spin_unlock_zlock(&node->lock);
1180 /* check that given lock request is dead lock safe. This check is, of course,
1181 * not exhaustive. */
1182 static int
1183 request_is_deadlock_safe(znode * node, znode_lock_mode mode,
1184 znode_lock_request request)
1186 lock_stack *owner;
1188 owner = get_current_lock_stack();
1190 * check that hipri lock request is not issued when there are locked
1191 * nodes at the higher levels.
1193 if (request & ZNODE_LOCK_HIPRI && !(request & ZNODE_LOCK_NONBLOCK) &&
1194 znode_get_level(node) != 0) {
1195 lock_handle *item;
1197 list_for_each_entry(item, &owner->locks, locks_link) {
1198 znode *other;
1200 other = item->node;
1202 if (znode_get_level(other) == 0)
1203 continue;
1204 if (znode_get_level(other) > znode_get_level(node))
1205 return 0;
1208 return 1;
1211 #endif
1213 /* return pointer to static storage with name of lock_mode. For
1214 debugging */
1215 const char *lock_mode_name(znode_lock_mode lock/* lock mode to get name of */)
1217 if (lock == ZNODE_READ_LOCK)
1218 return "read";
1219 else if (lock == ZNODE_WRITE_LOCK)
1220 return "write";
1221 else {
1222 static char buf[30];
1224 sprintf(buf, "unknown: %i", lock);
1225 return buf;
1229 /* Make Linus happy.
1230 Local variables:
1231 c-indentation-style: "K&R"
1232 mode-name: "LC"
1233 c-basic-offset: 8
1234 tab-width: 8
1235 fill-column: 79
1236 End: