1 /* Copyright 2001, 2002, 2003 by Hans Reiser, licensing governed by
3 /* Functions to "carry" tree modification(s) upward. */
4 /* Tree is modified one level at a time. As we modify a level we accumulate a
5 set of changes that need to be propagated to the next level. We manage
6 node locking such that any searches that collide with carrying are
7 restarted, from the root if necessary.
9 Insertion of a new item may result in items being moved among nodes and
10 this requires the delimiting key to be updated at the least common parent
11 of the nodes modified to preserve search tree invariants. Also, insertion
12 may require allocation of a new node. A pointer to the new node has to be
13 inserted into some node on the parent level, etc.
15 Tree carrying is meant to be analogous to arithmetic carrying.
17 A carry operation is always associated with some node (&carry_node).
19 Carry process starts with some initial set of operations to be performed
20 and an initial set of already locked nodes. Operations are performed one
21 by one. Performing each single operation has following possible effects:
23 - content of carry node associated with operation is modified
24 - new carry nodes are locked and involved into carry process on this level
25 - new carry operations are posted to the next level
27 After all carry operations on this level are done, process is repeated for
28 the accumulated sequence on carry operations for the next level. This
29 starts by trying to lock (in left to right order) all carry nodes
30 associated with carry operations on the parent level. After this, we decide
31 whether more nodes are required on the left of already locked set. If so,
32 all locks taken on the parent level are released, new carry nodes are
33 added, and locking process repeats.
35 It may happen that balancing process fails owing to unrecoverable error on
36 some of upper levels of a tree (possible causes are io error, failure to
37 allocate new node, etc.). In this case we should unmount the filesystem,
38 rebooting if it is the root, and possibly advise the use of fsck.
42 int some_tree_operation( znode *node, ... )
44 // Allocate on a stack pool of carry objects: operations and nodes.
45 // Most carry processes will only take objects from here, without
46 // dynamic allocation.
48 I feel uneasy about this pool. It adds to code complexity, I understand why it
52 carry_level lowest_level;
55 init_carry_pool( &pool );
56 init_carry_level( &lowest_level, &pool );
58 // operation may be one of:
59 // COP_INSERT --- insert new item into node
60 // COP_CUT --- remove part of or whole node
61 // COP_PASTE --- increase size of item
62 // COP_DELETE --- delete pointer from parent node
63 // COP_UPDATE --- update delimiting key in least
64 // common ancestor of two
66 op = reiser4_post_carry( &lowest_level, operation, node, 0 );
67 if( IS_ERR( op ) || ( op == NULL ) ) {
70 // fill in remaining fields in @op, according to carry.h:carry_op
71 result = carry(&lowest_level, NULL);
73 done_carry_pool(&pool);
76 When you are implementing node plugin method that participates in carry
77 (shifting, insertion, deletion, etc.), do the following:
79 int foo_node_method(znode * node, ..., carry_level * todo)
85 // note, that last argument to reiser4_post_carry() is non-null
86 // here, because @op is to be applied to the parent of @node, rather
87 // than to the @node itself as in the previous case.
89 op = node_post_carry(todo, operation, node, 1);
90 // fill in remaining fields in @op, according to carry.h:carry_op
98 One of the main advantages of level-by-level balancing implemented here is
99 ability to batch updates on a parent level and to peform them more
100 efficiently as a result.
102 Description To Be Done (TBD).
104 DIFFICULTIES AND SUBTLE POINTS:
106 1. complex plumbing is required, because:
108 a. effective allocation through pools is needed
110 b. target of operation is not exactly known when operation is
111 posted. This is worked around through bitfields in &carry_node and
112 logic in lock_carry_node()
114 c. of interaction with locking code: node should be added into sibling
115 list when pointer to it is inserted into its parent, which is some time
116 after node was created. Between these moments, node is somewhat in
117 suspended state and is only registered in the carry lists
119 2. whole balancing logic is implemented here, in particular, insertion
120 logic is coded in make_space().
122 3. special cases like insertion (reiser4_add_tree_root()) or deletion
123 (reiser4_kill_tree_root()) of tree root and morphing of paste into insert
124 (insert_paste()) have to be handled.
126 4. there is non-trivial interdependency between allocation of new nodes
127 and almost everything else. This is mainly due to the (1.c) above. I shall
128 write about this later.
136 #include "plugin/item/item.h"
137 #include "plugin/item/extent.h"
138 #include "plugin/node/node.h"
141 #include "tree_mod.h"
142 #include "tree_walk.h"
143 #include "block_alloc.h"
147 #include "carry_ops.h"
151 #include <linux/types.h>
153 /* level locking/unlocking */
154 static int lock_carry_level(carry_level
* level
);
155 static void unlock_carry_level(carry_level
* level
, int failure
);
156 static void done_carry_level(carry_level
* level
);
157 static void unlock_carry_node(carry_level
* level
, carry_node
* node
, int fail
);
159 int lock_carry_node(carry_level
* level
, carry_node
* node
);
160 int lock_carry_node_tail(carry_node
* node
);
162 /* carry processing proper */
163 static int carry_on_level(carry_level
* doing
, carry_level
* todo
);
165 static carry_op
*add_op(carry_level
* level
, pool_ordering order
,
166 carry_op
* reference
);
168 /* handlers for carry operations. */
170 static void fatal_carry_error(carry_level
* doing
, int ecode
);
171 static int add_new_root(carry_level
* level
, carry_node
* node
, znode
* fake
);
173 static void print_level(const char *prefix
, carry_level
* level
);
180 static int carry_level_invariant(carry_level
* level
, carry_queue_state state
);
183 /* main entry point for tree balancing.
185 Tree carry performs operations from @doing and while doing so accumulates
186 information about operations to be performed on the next level ("carried"
187 to the parent level). Carried operations are performed, causing possibly
188 more operations to be carried upward etc. carry() takes care about
189 locking and pinning znodes while operating on them.
191 For usage, see comment at the top of fs/reiser4/carry.c
194 int reiser4_carry(carry_level
* doing
/* set of carry operations to be
196 carry_level
* done
/* set of nodes, already performed
197 * at the previous level.
198 * NULL in most cases */)
201 /* queue of new requests */
203 ON_DEBUG(STORE_COUNTERS
);
205 assert("nikita-888", doing
!= NULL
);
206 BUG_ON(done
!= NULL
);
209 init_carry_level(todo
, doing
->pool
);
211 /* queue of requests preformed on the previous level */
213 init_carry_level(done
, doing
->pool
);
215 /* iterate until there is nothing more to do */
216 while (result
== 0 && doing
->ops_num
> 0) {
219 /* at this point @done is locked. */
220 /* repeat lock/do/unlock while
222 (1) lock_carry_level() fails due to deadlock avoidance, or
224 (2) carry_on_level() decides that more nodes have to
227 (3) some unexpected error occurred while balancing on the
228 upper levels. In this case all changes are rolled back.
232 result
= lock_carry_level(doing
);
234 /* perform operations from @doing and
235 accumulate new requests in @todo */
236 result
= carry_on_level(doing
, todo
);
239 else if (result
!= -E_REPEAT
||
240 !doing
->restartable
) {
241 warning("nikita-1043",
242 "Fatal error during carry: %i",
244 print_level("done", done
);
245 print_level("doing", doing
);
246 print_level("todo", todo
);
247 /* do some rough stuff like aborting
248 all pending transcrashes and thus
249 pushing tree back to the consistent
250 state. Alternatvely, just panic.
252 fatal_carry_error(doing
, result
);
255 } else if (result
!= -E_REPEAT
) {
256 fatal_carry_error(doing
, result
);
259 unlock_carry_level(doing
, 1);
261 /* at this point @done can be safely unlocked */
262 done_carry_level(done
);
264 /* cyclically shift queues */
269 init_carry_level(todo
, doing
->pool
);
271 /* give other threads chance to run */
272 reiser4_preempt_point();
274 done_carry_level(done
);
276 /* all counters, but x_refs should remain the same. x_refs can change
277 owing to transaction manager */
278 ON_DEBUG(CHECK_COUNTERS
);
282 /* perform carry operations on given level.
284 Optimizations proposed by pooh:
286 (1) don't lock all nodes from queue at the same time. Lock nodes lazily as
289 (2) unlock node if there are no more operations to be performed upon it and
290 node didn't add any operation to @todo. This can be implemented by
291 attaching to each node two counters: counter of operaions working on this
292 node and counter and operations carried upward from this node.
295 static int carry_on_level(carry_level
* doing
/* queue of carry operations to
296 * do on this level */ ,
297 carry_level
* todo
/* queue where new carry
298 * operations to be performed on
299 * the * parent level are
300 * accumulated during @doing
304 int (*f
) (carry_op
*, carry_level
*, carry_level
*);
308 assert("nikita-1034", doing
!= NULL
);
309 assert("nikita-1035", todo
!= NULL
);
311 /* @doing->nodes are locked. */
313 /* This function can be split into two phases: analysis and modification
315 Analysis calculates precisely what items should be moved between
316 nodes. This information is gathered in some structures attached to
317 each carry_node in a @doing queue. Analysis also determines whether
318 new nodes are to be allocated etc.
320 After analysis is completed, actual modification is performed. Here
321 we can take advantage of "batch modification": if there are several
322 operations acting on the same node, modifications can be performed
323 more efficiently when batched together.
325 Above is an optimization left for the future.
327 /* Important, but delayed optimization: it's possible to batch
328 operations together and perform them more efficiently as a
329 result. For example, deletion of several neighboring items from a
330 node can be converted to a single ->cut() operation.
332 Before processing queue, it should be scanned and "mergeable"
336 for_all_ops(doing
, op
, tmp_op
) {
339 assert("nikita-1041", op
!= NULL
);
341 assert("nikita-1042", op
->op
< COP_LAST_OP
);
342 f
= op_dispatch_table
[op
->op
].handler
;
343 result
= f(op
, doing
, todo
);
344 /* locking can fail with -E_REPEAT. Any different error is fatal
345 and will be handled by fatal_carry_error() sledgehammer.
351 carry_plugin_info info
;
353 carry_node
*tmp_scan
;
358 assert("nikita-3002",
359 carry_level_invariant(doing
, CARRY_DOING
));
360 for_all_nodes(doing
, scan
, tmp_scan
) {
363 node
= reiser4_carry_real(scan
);
364 assert("nikita-2547", node
!= NULL
);
365 if (node_is_empty(node
)) {
367 node_plugin_by_node(node
)->
368 prepare_removal(node
, &info
);
377 /* post carry operation
379 This is main function used by external carry clients: node layout plugins
380 and tree operations to create new carry operation to be performed on some
383 New operation will be included in the @level queue. To actually perform it,
384 call carry( level, ... ). This function takes write lock on @node. Carry
385 manages all its locks by itself, don't worry about this.
387 This function adds operation and node at the end of the queue. It is up to
388 caller to guarantee proper ordering of node queue.
391 carry_op
* reiser4_post_carry(carry_level
* level
/* queue where new operation
392 * is to be posted at */ ,
393 carry_opcode op
/* opcode of operation */ ,
394 znode
* node
/* node on which this operation
396 int apply_to_parent_p
/* whether operation will
397 * operate directly on @node
398 * or on it parent. */)
403 assert("nikita-1046", level
!= NULL
);
404 assert("nikita-1788", znode_is_write_locked(node
));
406 result
= add_op(level
, POOLO_LAST
, NULL
);
409 child
= reiser4_add_carry(level
, POOLO_LAST
, NULL
);
411 reiser4_pool_free(&level
->pool
->op_pool
, &result
->header
);
412 return (carry_op
*) child
;
414 result
->node
= child
;
416 child
->parent
= apply_to_parent_p
;
417 if (ZF_ISSET(node
, JNODE_ORPHAN
))
418 child
->left_before
= 1;
423 /* initialize carry queue */
424 void init_carry_level(carry_level
* level
/* level to initialize */ ,
425 carry_pool
* pool
/* pool @level will allocate objects
428 assert("nikita-1045", level
!= NULL
);
429 assert("nikita-967", pool
!= NULL
);
431 memset(level
, 0, sizeof *level
);
434 INIT_LIST_HEAD(&level
->nodes
);
435 INIT_LIST_HEAD(&level
->ops
);
438 /* allocate carry pool and initialize pools within queue */
439 carry_pool
*init_carry_pool(int size
)
443 assert("", size
>= sizeof(carry_pool
) + 3 * sizeof(carry_level
));
444 pool
= kmalloc(size
, reiser4_ctx_gfp_mask_get());
446 return ERR_PTR(RETERR(-ENOMEM
));
448 reiser4_init_pool(&pool
->op_pool
, sizeof(carry_op
), CARRIES_POOL_SIZE
,
450 reiser4_init_pool(&pool
->node_pool
, sizeof(carry_node
),
451 NODES_LOCKED_POOL_SIZE
, (char *)pool
->node
);
455 /* finish with queue pools */
456 void done_carry_pool(carry_pool
* pool
/* pool to destroy */)
458 reiser4_done_pool(&pool
->op_pool
);
459 reiser4_done_pool(&pool
->node_pool
);
463 /* add new carry node to the @level.
465 Returns pointer to the new carry node allocated from pool. It's up to
466 callers to maintain proper order in the @level. Assumption is that if carry
467 nodes on one level are already sorted and modifications are peroformed from
468 left to right, carry nodes added on the parent level will be ordered
469 automatically. To control ordering use @order and @reference parameters.
472 carry_node
*reiser4_add_carry_skip(carry_level
* level
/* &carry_level to add
474 pool_ordering order
/* where to insert:
475 * at the beginning of
479 * at the end of @level
481 carry_node
* reference
/* reference node for
484 ON_DEBUG(carry_node
* orig_ref
= reference
);
486 if (order
== POOLO_BEFORE
) {
487 reference
= find_left_carry(reference
, level
);
488 if (reference
== NULL
)
489 reference
= list_entry(level
->nodes
.next
, carry_node
,
490 header
.level_linkage
);
492 reference
= list_entry(reference
->header
.level_linkage
.next
,
493 carry_node
, header
.level_linkage
);
494 } else if (order
== POOLO_AFTER
) {
495 reference
= find_right_carry(reference
, level
);
496 if (reference
== NULL
)
497 reference
= list_entry(level
->nodes
.prev
, carry_node
,
498 header
.level_linkage
);
500 reference
= list_entry(reference
->header
.level_linkage
.prev
,
501 carry_node
, header
.level_linkage
);
503 assert("nikita-2209",
504 ergo(orig_ref
!= NULL
,
505 reiser4_carry_real(reference
) ==
506 reiser4_carry_real(orig_ref
)));
507 return reiser4_add_carry(level
, order
, reference
);
510 carry_node
*reiser4_add_carry(carry_level
* level
, /* carry_level to add
512 pool_ordering order
, /* where to insert:
513 * at the beginning of
517 * at the end of @level
519 carry_node
* reference
/* reference node for
525 (carry_node
*) reiser4_add_obj(&level
->pool
->node_pool
,
527 order
, &reference
->header
);
528 if (!IS_ERR(result
) && (result
!= NULL
))
534 * add new carry operation to the @level.
536 * Returns pointer to the new carry operations allocated from pool. It's up to
537 * callers to maintain proper order in the @level. To control ordering use
538 * @order and @reference parameters.
540 static carry_op
*add_op(carry_level
* level
, /* &carry_level to add node to */
541 pool_ordering order
, /* where to insert:
542 * at the beginning of @level;
545 * at the end of @level */
546 carry_op
* reference
/* reference node for insertion */)
551 (carry_op
*) reiser4_add_obj(&level
->pool
->op_pool
, &level
->ops
,
552 order
, &reference
->header
);
553 if (!IS_ERR(result
) && (result
!= NULL
))
559 * Return node on the right of which @node was created.
561 * Each node is created on the right of some existing node (or it is new root,
562 * which is special case not handled here).
564 * @node is new node created on some level, but not yet inserted into its
565 * parent, it has corresponding bit (JNODE_ORPHAN) set in zstate.
567 static carry_node
*find_begetting_brother(carry_node
* node
,/* node to start
569 carry_level
* kin UNUSED_ARG
574 assert("nikita-1614", node
!= NULL
);
575 assert("nikita-1615", kin
!= NULL
);
576 assert("nikita-1616", LOCK_CNT_GTZ(rw_locked_tree
));
577 assert("nikita-1619", ergo(reiser4_carry_real(node
) != NULL
,
578 ZF_ISSET(reiser4_carry_real(node
),
581 scan
= list_entry(scan
->header
.level_linkage
.prev
, carry_node
,
582 header
.level_linkage
)) {
583 assert("nikita-1617", &kin
->nodes
!= &scan
->header
.level_linkage
);
584 if ((scan
->node
!= node
->node
) &&
585 !ZF_ISSET(scan
->node
, JNODE_ORPHAN
)) {
586 assert("nikita-1618", reiser4_carry_real(scan
) != NULL
);
594 carry_node_cmp(carry_level
* level
, carry_node
* n1
, carry_node
* n2
)
596 assert("nikita-2199", n1
!= NULL
);
597 assert("nikita-2200", n2
!= NULL
);
602 n1
= carry_node_next(n1
);
603 if (carry_node_end(level
, n1
))
608 impossible("nikita-2201", "End of level reached");
611 carry_node
*find_carry_node(carry_level
* level
, const znode
* node
)
614 carry_node
*tmp_scan
;
616 assert("nikita-2202", level
!= NULL
);
617 assert("nikita-2203", node
!= NULL
);
619 for_all_nodes(level
, scan
, tmp_scan
) {
620 if (reiser4_carry_real(scan
) == node
)
626 znode
*reiser4_carry_real(const carry_node
* node
)
628 assert("nikita-3061", node
!= NULL
);
630 return node
->lock_handle
.node
;
633 carry_node
*insert_carry_node(carry_level
* doing
, carry_level
* todo
,
638 carry_node
*tmp_scan
;
641 base
= find_carry_node(doing
, node
);
642 assert("nikita-2204", base
!= NULL
);
644 for_all_nodes(todo
, scan
, tmp_scan
) {
645 proj
= find_carry_node(doing
, scan
->node
);
646 assert("nikita-2205", proj
!= NULL
);
647 if (carry_node_cmp(doing
, proj
, base
) != LESS_THAN
)
653 static carry_node
*add_carry_atplace(carry_level
* doing
, carry_level
* todo
,
656 carry_node
*reference
;
658 assert("nikita-2994", doing
!= NULL
);
659 assert("nikita-2995", todo
!= NULL
);
660 assert("nikita-2996", node
!= NULL
);
662 reference
= insert_carry_node(doing
, todo
, node
);
663 assert("nikita-2997", reference
!= NULL
);
665 return reiser4_add_carry(todo
, POOLO_BEFORE
, reference
);
668 /* like reiser4_post_carry(), but designed to be called from node plugin
669 methods. This function is different from reiser4_post_carry() in that it
670 finds proper place to insert node in the queue. */
671 carry_op
*node_post_carry(carry_plugin_info
* info
/* carry parameters
672 * passed down to node
674 carry_opcode op
/* opcode of operation */ ,
675 znode
* node
/* node on which this
676 * operation will operate */ ,
677 int apply_to_parent_p
/* whether operation will
678 * operate directly on @node
679 * or on it parent. */ )
684 assert("nikita-2207", info
!= NULL
);
685 assert("nikita-2208", info
->todo
!= NULL
);
687 if (info
->doing
== NULL
)
688 return reiser4_post_carry(info
->todo
, op
, node
,
691 result
= add_op(info
->todo
, POOLO_LAST
, NULL
);
694 child
= add_carry_atplace(info
->doing
, info
->todo
, node
);
696 reiser4_pool_free(&info
->todo
->pool
->op_pool
, &result
->header
);
697 return (carry_op
*) child
;
699 result
->node
= child
;
701 child
->parent
= apply_to_parent_p
;
702 if (ZF_ISSET(node
, JNODE_ORPHAN
))
703 child
->left_before
= 1;
708 /* lock all carry nodes in @level */
709 static int lock_carry_level(carry_level
* level
/* level to lock */)
713 carry_node
*tmp_node
;
715 assert("nikita-881", level
!= NULL
);
716 assert("nikita-2229", carry_level_invariant(level
, CARRY_TODO
));
718 /* lock nodes from left to right */
720 for_all_nodes(level
, node
, tmp_node
) {
721 result
= lock_carry_node(level
, node
);
728 /* Synchronize delimiting keys between @node and its left neighbor.
730 To reduce contention on dk key and simplify carry code, we synchronize
731 delimiting keys only when carry ultimately leaves tree level (carrying
732 changes upward) and unlocks nodes at this level.
734 This function first finds left neighbor of @node and then updates left
735 neighbor's right delimiting key to conincide with least key in @node.
739 ON_DEBUG(extern atomic_t delim_key_version
;
742 static void sync_dkeys(znode
* spot
/* node to update */)
747 assert("nikita-1610", spot
!= NULL
);
748 assert("nikita-1612", LOCK_CNT_NIL(rw_locked_dk
));
750 tree
= znode_get_tree(spot
);
751 read_lock_tree(tree
);
754 assert("nikita-2192", znode_is_loaded(spot
));
756 /* sync left delimiting key of @spot with key in its leftmost item */
757 if (node_is_empty(spot
))
758 pivot
= *znode_get_rd_key(spot
);
760 leftmost_key_in_node(spot
, &pivot
);
762 znode_set_ld_key(spot
, &pivot
);
764 /* there can be sequence of empty nodes pending removal on the left of
765 @spot. Scan them and update their left and right delimiting keys to
766 match left delimiting key of @spot. Also, update right delimiting
767 key of first non-empty left neighbor.
770 if (!ZF_ISSET(spot
, JNODE_LEFT_CONNECTED
))
777 znode_set_rd_key(spot
, &pivot
);
778 /* don't sink into the domain of another balancing */
779 if (!znode_is_write_locked(spot
))
781 if (ZF_ISSET(spot
, JNODE_HEARD_BANSHEE
))
782 znode_set_ld_key(spot
, &pivot
);
787 write_unlock_dk(tree
);
788 read_unlock_tree(tree
);
791 /* unlock all carry nodes in @level */
792 static void unlock_carry_level(carry_level
* level
/* level to unlock */ ,
793 int failure
/* true if unlocking owing to
797 carry_node
*tmp_node
;
799 assert("nikita-889", level
!= NULL
);
805 /* update delimiting keys */
806 for_all_nodes(level
, node
, tmp_node
) {
807 if (reiser4_carry_real(node
) != spot
) {
808 spot
= reiser4_carry_real(node
);
814 /* nodes can be unlocked in arbitrary order. In preemptible
815 environment it's better to unlock in reverse order of locking,
818 for_all_nodes_back(level
, node
, tmp_node
) {
819 /* all allocated nodes should be already linked to their
820 parents at this moment. */
821 assert("nikita-1631",
822 ergo(!failure
, !ZF_ISSET(reiser4_carry_real(node
),
824 ON_DEBUG(check_dkeys(reiser4_carry_real(node
)));
825 unlock_carry_node(level
, node
, failure
);
827 level
->new_root
= NULL
;
830 /* finish with @level
832 Unlock nodes and release all allocated resources */
833 static void done_carry_level(carry_level
* level
/* level to finish */)
836 carry_node
*tmp_node
;
840 assert("nikita-1076", level
!= NULL
);
842 unlock_carry_level(level
, 0);
843 for_all_nodes(level
, node
, tmp_node
) {
844 assert("nikita-2113", list_empty_careful(&node
->lock_handle
.locks_link
));
845 assert("nikita-2114", list_empty_careful(&node
->lock_handle
.owners_link
));
846 reiser4_pool_free(&level
->pool
->node_pool
, &node
->header
);
848 for_all_ops(level
, op
, tmp_op
)
849 reiser4_pool_free(&level
->pool
->op_pool
, &op
->header
);
852 /* helper function to complete locking of carry node
854 Finish locking of carry node. There are several ways in which new carry
855 node can be added into carry level and locked. Normal is through
856 lock_carry_node(), but also from find_{left|right}_neighbor(). This
857 function factors out common final part of all locking scenarios. It
858 supposes that @node -> lock_handle is lock handle for lock just taken and
859 fills ->real_node from this lock handle.
862 int lock_carry_node_tail(carry_node
* node
/* node to complete locking of */)
864 assert("nikita-1052", node
!= NULL
);
865 assert("nikita-1187", reiser4_carry_real(node
) != NULL
);
866 assert("nikita-1188", !node
->unlock
);
869 /* Load node content into memory and install node plugin by
870 looking at the node header.
872 Most of the time this call is cheap because the node is
875 Corresponding zrelse() is in unlock_carry_node()
877 return zload(reiser4_carry_real(node
));
882 "Resolve" node to real znode, lock it and mark as locked.
883 This requires recursive locking of znodes.
885 When operation is posted to the parent level, node it will be applied to is
886 not yet known. For example, when shifting data between two nodes,
887 delimiting has to be updated in parent or parents of nodes involved. But
888 their parents is not yet locked and, moreover said nodes can be reparented
889 by concurrent balancing.
891 To work around this, carry operation is applied to special "carry node"
892 rather than to the znode itself. Carry node consists of some "base" or
893 "reference" znode and flags indicating how to get to the target of carry
894 operation (->real_node field of carry_node) from base.
897 int lock_carry_node(carry_level
* level
/* level @node is in */ ,
898 carry_node
* node
/* node to lock */)
901 znode
*reference_point
;
906 assert("nikita-887", level
!= NULL
);
907 assert("nikita-882", node
!= NULL
);
910 reference_point
= node
->node
;
913 if (node
->left_before
) {
914 /* handling of new nodes, allocated on the previous level:
916 some carry ops were propably posted from the new node, but
917 this node neither has parent pointer set, nor is
918 connected. This will be done in ->create_hook() for
921 No then less, parent of new node has to be locked. To do
922 this, first go to the "left" in the carry order. This
923 depends on the decision to always allocate new node on the
924 right of existing one.
926 Loop handles case when multiple nodes, all orphans, were
929 Strictly speaking, taking tree lock is not necessary here,
930 because all nodes scanned by loop in
931 find_begetting_brother() are write-locked by this thread,
932 and thus, their sibling linkage cannot change.
935 tree
= znode_get_tree(reference_point
);
936 read_lock_tree(tree
);
937 reference_point
= find_begetting_brother(node
, level
)->node
;
938 read_unlock_tree(tree
);
939 assert("nikita-1186", reference_point
!= NULL
);
941 if (node
->parent
&& (result
== 0)) {
943 reiser4_get_parent(&tmp_lh
, reference_point
,
947 } else if (znode_get_level(tmp_lh
.node
) == 0) {
948 assert("nikita-1347", znode_above_root(tmp_lh
.node
));
949 result
= add_new_root(level
, node
, tmp_lh
.node
);
951 reference_point
= level
->new_root
;
952 move_lh(&lh
, &node
->lock_handle
);
954 } else if ((level
->new_root
!= NULL
)
955 && (level
->new_root
!=
956 znode_parent_nolock(reference_point
))) {
957 /* parent of node exists, but this level aready
958 created different new root, so */
959 warning("nikita-1109",
960 /* it should be "radicis", but tradition is
961 tradition. do banshees read latin? */
962 "hodie natus est radici frater");
965 move_lh(&lh
, &tmp_lh
);
966 reference_point
= lh
.node
;
969 if (node
->left
&& (result
== 0)) {
970 assert("nikita-1183", node
->parent
);
971 assert("nikita-883", reference_point
!= NULL
);
973 reiser4_get_left_neighbor(&tmp_lh
, reference_point
,
975 GN_CAN_USE_UPPER_LEVELS
);
978 move_lh(&lh
, &tmp_lh
);
979 reference_point
= lh
.node
;
982 if (!node
->parent
&& !node
->left
&& !node
->left_before
) {
984 longterm_lock_znode(&lh
, reference_point
, ZNODE_WRITE_LOCK
,
988 move_lh(&node
->lock_handle
, &lh
);
989 result
= lock_carry_node_tail(node
);
996 /* release a lock on &carry_node.
998 Release if necessary lock on @node. This opearion is pair of
999 lock_carry_node() and is idempotent: you can call it more than once on the
1004 unlock_carry_node(carry_level
* level
,
1005 carry_node
* node
/* node to be released */ ,
1006 int failure
/* 0 if node is unlocked due
1007 * to some error */ )
1011 assert("nikita-884", node
!= NULL
);
1013 real_node
= reiser4_carry_real(node
);
1014 /* pair to zload() in lock_carry_node_tail() */
1016 if (node
->unlock
&& (real_node
!= NULL
)) {
1017 assert("nikita-899", real_node
== node
->lock_handle
.node
);
1018 longterm_unlock_znode(&node
->lock_handle
);
1021 if (node
->deallocate
&& (real_node
!= NULL
)) {
1022 /* free node in bitmap
1024 Prepare node for removal. Last zput() will finish
1027 ZF_SET(real_node
, JNODE_HEARD_BANSHEE
);
1030 assert("nikita-2177",
1031 list_empty_careful(&node
->lock_handle
.locks_link
));
1032 assert("nikita-2112",
1033 list_empty_careful(&node
->lock_handle
.owners_link
));
1034 reiser4_pool_free(&level
->pool
->node_pool
,
1040 /* fatal_carry_error() - all-catching error handling function
1042 It is possible that carry faces unrecoverable error, like unability to
1043 insert pointer at the internal level. Our simple solution is just panic in
1044 this situation. More sophisticated things like attempt to remount
1045 file-system as read-only can be implemented without much difficlties.
1047 It is believed, that:
1049 1. in stead of panicking, all current transactions can be aborted rolling
1050 system back to the consistent state.
1052 Umm, if you simply panic without doing anything more at all, then all current
1053 transactions are aborted and the system is rolled back to a consistent state,
1054 by virtue of the design of the transactional mechanism. Well, wait, let's be
1055 precise. If an internal node is corrupted on disk due to hardware failure,
1056 then there may be no consistent state that can be rolled back to, so instead
1057 we should say that it will rollback the transactions, which barring other
1058 factors means rolling back to a consistent state.
1060 # Nikita: there is a subtle difference between panic and aborting
1061 # transactions: machine doesn't reboot. Processes aren't killed. Processes
1062 # don't using reiser4 (not that we care about such processes), or using other
1063 # reiser4 mounts (about them we do care) will simply continue to run. With
1064 # some luck, even application using aborted file system can survive: it will
1065 # get some error, like EBADF, from each file descriptor on failed file system,
1066 # but applications that do care about tolerance will cope with this (squid
1069 It would be a nice feature though to support rollback without rebooting
1070 followed by remount, but this can wait for later versions.
1072 2. once isolated transactions will be implemented it will be possible to
1073 roll back offending transaction.
1075 2. is additional code complexity of inconsistent value (it implies that a
1076 broken tree should be kept in operation), so we must think about it more
1077 before deciding if it should be done. -Hans
1080 static void fatal_carry_error(carry_level
* doing UNUSED_ARG
/* carry level
1085 int ecode
/* error code */)
1087 assert("nikita-1230", doing
!= NULL
);
1088 assert("nikita-1231", ecode
< 0);
1090 reiser4_panic("nikita-1232", "Carry failed: %i", ecode
);
1094 * Add new root to the tree
1096 * This function itself only manages changes in carry structures and delegates
1097 * all hard work (allocation of znode for new root, changes of parent and
1098 * sibling pointers to the reiser4_add_tree_root().
1100 * Locking: old tree root is locked by carry at this point. Fake znode is also
1103 static int add_new_root(carry_level
* level
,/* carry level in context of which
1104 * operation is performed */
1105 carry_node
* node
, /* carry node for existing root */
1106 znode
* fake
/* "fake" znode already locked by
1111 assert("nikita-1104", level
!= NULL
);
1112 assert("nikita-1105", node
!= NULL
);
1114 assert("nikita-1403", znode_is_write_locked(node
->node
));
1115 assert("nikita-1404", znode_is_write_locked(fake
));
1117 /* trying to create new root. */
1118 /* @node is root and it's already locked by us. This
1119 means that nobody else can be trying to add/remove
1120 tree root right now.
1122 if (level
->new_root
== NULL
)
1123 level
->new_root
= reiser4_add_tree_root(node
->node
, fake
);
1124 if (!IS_ERR(level
->new_root
)) {
1125 assert("nikita-1210", znode_is_root(level
->new_root
));
1126 node
->deallocate
= 1;
1128 longterm_lock_znode(&node
->lock_handle
, level
->new_root
,
1129 ZNODE_WRITE_LOCK
, ZNODE_LOCK_LOPRI
);
1131 zput(level
->new_root
);
1133 result
= PTR_ERR(level
->new_root
);
1134 level
->new_root
= NULL
;
1139 /* allocate new znode and add the operation that inserts the
1140 pointer to it into the parent node into the todo level
1142 Allocate new znode, add it into carry queue and post into @todo queue
1143 request to add pointer to new node into its parent.
1145 This is carry related routing that calls reiser4_new_node() to allocate new
1148 carry_node
*add_new_znode(znode
* brother
/* existing left neighbor of new
1150 carry_node
* ref
/* carry node after which new
1151 * carry node is to be inserted
1152 * into queue. This affects
1154 carry_level
* doing
/* carry queue where new node is
1156 carry_level
* todo
/* carry queue where COP_INSERT
1157 * operation to add pointer to
1158 * new node will ne added */ )
1162 carry_op
*add_pointer
;
1163 carry_plugin_info info
;
1165 assert("nikita-1048", brother
!= NULL
);
1166 assert("nikita-1049", todo
!= NULL
);
1168 /* There is a lot of possible variations here: to what parent
1169 new node will be attached and where. For simplicity, always
1172 (1) new node and @brother will have the same parent.
1174 (2) new node is added on the right of @brother
1178 fresh
= reiser4_add_carry_skip(doing
,
1179 ref
? POOLO_AFTER
: POOLO_LAST
, ref
);
1183 fresh
->deallocate
= 1;
1186 new_znode
= reiser4_new_node(brother
, znode_get_level(brother
));
1187 if (IS_ERR(new_znode
))
1188 /* @fresh will be deallocated automatically by error
1189 handling code in the caller. */
1190 return (carry_node
*) new_znode
;
1192 /* new_znode returned znode with x_count 1. Caller has to decrease
1193 it. make_space() does. */
1195 ZF_SET(new_znode
, JNODE_ORPHAN
);
1196 fresh
->node
= new_znode
;
1198 while (ZF_ISSET(reiser4_carry_real(ref
), JNODE_ORPHAN
)) {
1199 ref
= carry_node_prev(ref
);
1200 assert("nikita-1606", !carry_node_end(doing
, ref
));
1205 add_pointer
= node_post_carry(&info
, COP_INSERT
,
1206 reiser4_carry_real(ref
), 1);
1207 if (IS_ERR(add_pointer
)) {
1208 /* no need to deallocate @new_znode here: it will be
1209 deallocated during carry error handling. */
1210 return (carry_node
*) add_pointer
;
1213 add_pointer
->u
.insert
.type
= COPT_CHILD
;
1214 add_pointer
->u
.insert
.child
= fresh
;
1215 add_pointer
->u
.insert
.brother
= brother
;
1216 /* initially new node spawns empty key range */
1217 write_lock_dk(znode_get_tree(brother
));
1218 znode_set_ld_key(new_znode
,
1219 znode_set_rd_key(new_znode
,
1220 znode_get_rd_key(brother
)));
1221 write_unlock_dk(znode_get_tree(brother
));
1225 /* DEBUGGING FUNCTIONS.
1227 Probably we also should leave them on even when
1228 debugging is turned off to print dumps at errors.
1231 static int carry_level_invariant(carry_level
* level
, carry_queue_state state
)
1234 carry_node
*tmp_node
;
1239 if (level
->track_type
!= 0 &&
1240 level
->track_type
!= CARRY_TRACK_NODE
&&
1241 level
->track_type
!= CARRY_TRACK_CHANGE
)
1244 /* check that nodes are in ascending order */
1245 for_all_nodes(level
, node
, tmp_node
) {
1252 if (node
!= carry_node_front(level
)) {
1253 if (state
== CARRY_TODO
) {
1255 left
= carry_node_prev(node
)->node
;
1257 right
= reiser4_carry_real(node
);
1258 left
= reiser4_carry_real(carry_node_prev(node
));
1260 if (right
== NULL
|| left
== NULL
)
1262 if (node_is_empty(right
) || node_is_empty(left
))
1264 if (!keyle(leftmost_key_in_node(left
, &lkey
),
1265 leftmost_key_in_node(right
, &rkey
))) {
1266 warning("", "wrong key order");
1275 /* get symbolic name for boolean */
1276 static const char *tf(int boolean
/* truth value */)
1278 return boolean
? "t" : "f";
1281 /* symbolic name for carry operation */
1282 static const char *carry_op_name(carry_opcode op
/* carry opcode */)
1286 return "COP_INSERT";
1288 return "COP_DELETE";
1294 return "COP_UPDATE";
1296 return "COP_EXTENT";
1297 case COP_INSERT_FLOW
:
1298 return "COP_INSERT_FLOW";
1300 /* not mt safe, but who cares? */
1301 static char buf
[20];
1303 sprintf(buf
, "unknown op: %x", op
);
1309 /* dump information about carry node */
1310 static void print_carry(const char *prefix
/* prefix to print */ ,
1311 carry_node
* node
/* node to print */)
1314 printk("%s: null\n", prefix
);
1318 ("%s: %p parent: %s, left: %s, unlock: %s, free: %s, dealloc: %s\n",
1319 prefix
, node
, tf(node
->parent
), tf(node
->left
), tf(node
->unlock
),
1320 tf(node
->free
), tf(node
->deallocate
));
1323 /* dump information about carry operation */
1324 static void print_op(const char *prefix
/* prefix to print */ ,
1325 carry_op
* op
/* operation to print */)
1328 printk("%s: null\n", prefix
);
1331 printk("%s: %p carry_opcode: %s\n", prefix
, op
, carry_op_name(op
->op
));
1332 print_carry("\tnode", op
->node
);
1336 print_coord("\tcoord",
1337 op
->u
.insert
.d
? op
->u
.insert
.d
->coord
: NULL
, 0);
1338 reiser4_print_key("\tkey",
1339 op
->u
.insert
.d
? op
->u
.insert
.d
->key
: NULL
);
1340 print_carry("\tchild", op
->u
.insert
.child
);
1343 print_carry("\tchild", op
->u
.delete.child
);
1346 if (op
->u
.cut_or_kill
.is_cut
) {
1347 print_coord("\tfrom",
1348 op
->u
.cut_or_kill
.u
.kill
->params
.from
, 0);
1349 print_coord("\tto", op
->u
.cut_or_kill
.u
.kill
->params
.to
,
1352 print_coord("\tfrom",
1353 op
->u
.cut_or_kill
.u
.cut
->params
.from
, 0);
1354 print_coord("\tto", op
->u
.cut_or_kill
.u
.cut
->params
.to
,
1359 print_carry("\tleft", op
->u
.update
.left
);
1367 /* dump information about all nodes and operations in a @level */
1368 static void print_level(const char *prefix
/* prefix to print */ ,
1369 carry_level
* level
/* level to print */)
1372 carry_node
*tmp_node
;
1376 if (level
== NULL
) {
1377 printk("%s: null\n", prefix
);
1380 printk("%s: %p, restartable: %s\n",
1381 prefix
, level
, tf(level
->restartable
));
1383 for_all_nodes(level
, node
, tmp_node
)
1384 print_carry("\tcarry node", node
);
1385 for_all_ops(level
, op
, tmp_op
)
1386 print_op("\tcarry op", op
);
1389 /* Make Linus happy.
1391 c-indentation-style: "K&R"