On Tue, Nov 06, 2007 at 02:33:53AM -0800, akpm@linux-foundation.org wrote:
[mmotm.git] / fs / reiser4 / carry.c
blobf2f1422c9934c0171ab6a6c2171fbe35d4af42ef
1 /* Copyright 2001, 2002, 2003 by Hans Reiser, licensing governed by
2 reiser4/README */
3 /* Functions to "carry" tree modification(s) upward. */
4 /* Tree is modified one level at a time. As we modify a level we accumulate a
5 set of changes that need to be propagated to the next level. We manage
6 node locking such that any searches that collide with carrying are
7 restarted, from the root if necessary.
9 Insertion of a new item may result in items being moved among nodes and
10 this requires the delimiting key to be updated at the least common parent
11 of the nodes modified to preserve search tree invariants. Also, insertion
12 may require allocation of a new node. A pointer to the new node has to be
13 inserted into some node on the parent level, etc.
15 Tree carrying is meant to be analogous to arithmetic carrying.
17 A carry operation is always associated with some node (&carry_node).
19 Carry process starts with some initial set of operations to be performed
20 and an initial set of already locked nodes. Operations are performed one
21 by one. Performing each single operation has following possible effects:
23 - content of carry node associated with operation is modified
24 - new carry nodes are locked and involved into carry process on this level
25 - new carry operations are posted to the next level
27 After all carry operations on this level are done, process is repeated for
28 the accumulated sequence on carry operations for the next level. This
29 starts by trying to lock (in left to right order) all carry nodes
30 associated with carry operations on the parent level. After this, we decide
31 whether more nodes are required on the left of already locked set. If so,
32 all locks taken on the parent level are released, new carry nodes are
33 added, and locking process repeats.
35 It may happen that balancing process fails owing to unrecoverable error on
36 some of upper levels of a tree (possible causes are io error, failure to
37 allocate new node, etc.). In this case we should unmount the filesystem,
38 rebooting if it is the root, and possibly advise the use of fsck.
40 USAGE:
42 int some_tree_operation( znode *node, ... )
44 // Allocate on a stack pool of carry objects: operations and nodes.
45 // Most carry processes will only take objects from here, without
46 // dynamic allocation.
48 I feel uneasy about this pool. It adds to code complexity, I understand why it
49 exists, but.... -Hans
51 carry_pool pool;
52 carry_level lowest_level;
53 carry_op *op;
55 init_carry_pool( &pool );
56 init_carry_level( &lowest_level, &pool );
58 // operation may be one of:
59 // COP_INSERT --- insert new item into node
60 // COP_CUT --- remove part of or whole node
61 // COP_PASTE --- increase size of item
62 // COP_DELETE --- delete pointer from parent node
63 // COP_UPDATE --- update delimiting key in least
64 // common ancestor of two
66 op = reiser4_post_carry( &lowest_level, operation, node, 0 );
67 if( IS_ERR( op ) || ( op == NULL ) ) {
68 handle error
69 } else {
70 // fill in remaining fields in @op, according to carry.h:carry_op
71 result = carry(&lowest_level, NULL);
73 done_carry_pool(&pool);
76 When you are implementing node plugin method that participates in carry
77 (shifting, insertion, deletion, etc.), do the following:
79 int foo_node_method(znode * node, ..., carry_level * todo)
81 carry_op *op;
83 ....
85 // note, that last argument to reiser4_post_carry() is non-null
86 // here, because @op is to be applied to the parent of @node, rather
87 // than to the @node itself as in the previous case.
89 op = node_post_carry(todo, operation, node, 1);
90 // fill in remaining fields in @op, according to carry.h:carry_op
92 ....
96 BATCHING:
98 One of the main advantages of level-by-level balancing implemented here is
99 ability to batch updates on a parent level and to peform them more
100 efficiently as a result.
102 Description To Be Done (TBD).
104 DIFFICULTIES AND SUBTLE POINTS:
106 1. complex plumbing is required, because:
108 a. effective allocation through pools is needed
110 b. target of operation is not exactly known when operation is
111 posted. This is worked around through bitfields in &carry_node and
112 logic in lock_carry_node()
114 c. of interaction with locking code: node should be added into sibling
115 list when pointer to it is inserted into its parent, which is some time
116 after node was created. Between these moments, node is somewhat in
117 suspended state and is only registered in the carry lists
119 2. whole balancing logic is implemented here, in particular, insertion
120 logic is coded in make_space().
122 3. special cases like insertion (reiser4_add_tree_root()) or deletion
123 (reiser4_kill_tree_root()) of tree root and morphing of paste into insert
124 (insert_paste()) have to be handled.
126 4. there is non-trivial interdependency between allocation of new nodes
127 and almost everything else. This is mainly due to the (1.c) above. I shall
128 write about this later.
132 #include "forward.h"
133 #include "debug.h"
134 #include "key.h"
135 #include "coord.h"
136 #include "plugin/item/item.h"
137 #include "plugin/item/extent.h"
138 #include "plugin/node/node.h"
139 #include "jnode.h"
140 #include "znode.h"
141 #include "tree_mod.h"
142 #include "tree_walk.h"
143 #include "block_alloc.h"
144 #include "pool.h"
145 #include "tree.h"
146 #include "carry.h"
147 #include "carry_ops.h"
148 #include "super.h"
149 #include "reiser4.h"
151 #include <linux/types.h>
153 /* level locking/unlocking */
154 static int lock_carry_level(carry_level * level);
155 static void unlock_carry_level(carry_level * level, int failure);
156 static void done_carry_level(carry_level * level);
157 static void unlock_carry_node(carry_level * level, carry_node * node, int fail);
159 int lock_carry_node(carry_level * level, carry_node * node);
160 int lock_carry_node_tail(carry_node * node);
162 /* carry processing proper */
163 static int carry_on_level(carry_level * doing, carry_level * todo);
165 static carry_op *add_op(carry_level * level, pool_ordering order,
166 carry_op * reference);
168 /* handlers for carry operations. */
170 static void fatal_carry_error(carry_level * doing, int ecode);
171 static int add_new_root(carry_level * level, carry_node * node, znode * fake);
173 static void print_level(const char *prefix, carry_level * level);
175 #if REISER4_DEBUG
176 typedef enum {
177 CARRY_TODO,
178 CARRY_DOING
179 } carry_queue_state;
180 static int carry_level_invariant(carry_level * level, carry_queue_state state);
181 #endif
183 /* main entry point for tree balancing.
185 Tree carry performs operations from @doing and while doing so accumulates
186 information about operations to be performed on the next level ("carried"
187 to the parent level). Carried operations are performed, causing possibly
188 more operations to be carried upward etc. carry() takes care about
189 locking and pinning znodes while operating on them.
191 For usage, see comment at the top of fs/reiser4/carry.c
194 int reiser4_carry(carry_level * doing /* set of carry operations to be
195 * performed */ ,
196 carry_level * done /* set of nodes, already performed
197 * at the previous level.
198 * NULL in most cases */)
200 int result = 0;
201 /* queue of new requests */
202 carry_level *todo;
203 ON_DEBUG(STORE_COUNTERS);
205 assert("nikita-888", doing != NULL);
206 BUG_ON(done != NULL);
208 todo = doing + 1;
209 init_carry_level(todo, doing->pool);
211 /* queue of requests preformed on the previous level */
212 done = todo + 1;
213 init_carry_level(done, doing->pool);
215 /* iterate until there is nothing more to do */
216 while (result == 0 && doing->ops_num > 0) {
217 carry_level *tmp;
219 /* at this point @done is locked. */
220 /* repeat lock/do/unlock while
222 (1) lock_carry_level() fails due to deadlock avoidance, or
224 (2) carry_on_level() decides that more nodes have to
225 be involved.
227 (3) some unexpected error occurred while balancing on the
228 upper levels. In this case all changes are rolled back.
231 while (1) {
232 result = lock_carry_level(doing);
233 if (result == 0) {
234 /* perform operations from @doing and
235 accumulate new requests in @todo */
236 result = carry_on_level(doing, todo);
237 if (result == 0)
238 break;
239 else if (result != -E_REPEAT ||
240 !doing->restartable) {
241 warning("nikita-1043",
242 "Fatal error during carry: %i",
243 result);
244 print_level("done", done);
245 print_level("doing", doing);
246 print_level("todo", todo);
247 /* do some rough stuff like aborting
248 all pending transcrashes and thus
249 pushing tree back to the consistent
250 state. Alternatvely, just panic.
252 fatal_carry_error(doing, result);
253 return result;
255 } else if (result != -E_REPEAT) {
256 fatal_carry_error(doing, result);
257 return result;
259 unlock_carry_level(doing, 1);
261 /* at this point @done can be safely unlocked */
262 done_carry_level(done);
264 /* cyclically shift queues */
265 tmp = done;
266 done = doing;
267 doing = todo;
268 todo = tmp;
269 init_carry_level(todo, doing->pool);
271 /* give other threads chance to run */
272 reiser4_preempt_point();
274 done_carry_level(done);
276 /* all counters, but x_refs should remain the same. x_refs can change
277 owing to transaction manager */
278 ON_DEBUG(CHECK_COUNTERS);
279 return result;
282 /* perform carry operations on given level.
284 Optimizations proposed by pooh:
286 (1) don't lock all nodes from queue at the same time. Lock nodes lazily as
287 required;
289 (2) unlock node if there are no more operations to be performed upon it and
290 node didn't add any operation to @todo. This can be implemented by
291 attaching to each node two counters: counter of operaions working on this
292 node and counter and operations carried upward from this node.
295 static int carry_on_level(carry_level * doing /* queue of carry operations to
296 * do on this level */ ,
297 carry_level * todo /* queue where new carry
298 * operations to be performed on
299 * the * parent level are
300 * accumulated during @doing
301 * processing. */ )
303 int result;
304 int (*f) (carry_op *, carry_level *, carry_level *);
305 carry_op *op;
306 carry_op *tmp_op;
308 assert("nikita-1034", doing != NULL);
309 assert("nikita-1035", todo != NULL);
311 /* @doing->nodes are locked. */
313 /* This function can be split into two phases: analysis and modification
315 Analysis calculates precisely what items should be moved between
316 nodes. This information is gathered in some structures attached to
317 each carry_node in a @doing queue. Analysis also determines whether
318 new nodes are to be allocated etc.
320 After analysis is completed, actual modification is performed. Here
321 we can take advantage of "batch modification": if there are several
322 operations acting on the same node, modifications can be performed
323 more efficiently when batched together.
325 Above is an optimization left for the future.
327 /* Important, but delayed optimization: it's possible to batch
328 operations together and perform them more efficiently as a
329 result. For example, deletion of several neighboring items from a
330 node can be converted to a single ->cut() operation.
332 Before processing queue, it should be scanned and "mergeable"
333 operations merged.
335 result = 0;
336 for_all_ops(doing, op, tmp_op) {
337 carry_opcode opcode;
339 assert("nikita-1041", op != NULL);
340 opcode = op->op;
341 assert("nikita-1042", op->op < COP_LAST_OP);
342 f = op_dispatch_table[op->op].handler;
343 result = f(op, doing, todo);
344 /* locking can fail with -E_REPEAT. Any different error is fatal
345 and will be handled by fatal_carry_error() sledgehammer.
347 if (result != 0)
348 break;
350 if (result == 0) {
351 carry_plugin_info info;
352 carry_node *scan;
353 carry_node *tmp_scan;
355 info.doing = doing;
356 info.todo = todo;
358 assert("nikita-3002",
359 carry_level_invariant(doing, CARRY_DOING));
360 for_all_nodes(doing, scan, tmp_scan) {
361 znode *node;
363 node = reiser4_carry_real(scan);
364 assert("nikita-2547", node != NULL);
365 if (node_is_empty(node)) {
366 result =
367 node_plugin_by_node(node)->
368 prepare_removal(node, &info);
369 if (result != 0)
370 break;
374 return result;
377 /* post carry operation
379 This is main function used by external carry clients: node layout plugins
380 and tree operations to create new carry operation to be performed on some
381 level.
383 New operation will be included in the @level queue. To actually perform it,
384 call carry( level, ... ). This function takes write lock on @node. Carry
385 manages all its locks by itself, don't worry about this.
387 This function adds operation and node at the end of the queue. It is up to
388 caller to guarantee proper ordering of node queue.
391 carry_op * reiser4_post_carry(carry_level * level /* queue where new operation
392 * is to be posted at */ ,
393 carry_opcode op /* opcode of operation */ ,
394 znode * node /* node on which this operation
395 * will operate */ ,
396 int apply_to_parent_p /* whether operation will
397 * operate directly on @node
398 * or on it parent. */)
400 carry_op *result;
401 carry_node *child;
403 assert("nikita-1046", level != NULL);
404 assert("nikita-1788", znode_is_write_locked(node));
406 result = add_op(level, POOLO_LAST, NULL);
407 if (IS_ERR(result))
408 return result;
409 child = reiser4_add_carry(level, POOLO_LAST, NULL);
410 if (IS_ERR(child)) {
411 reiser4_pool_free(&level->pool->op_pool, &result->header);
412 return (carry_op *) child;
414 result->node = child;
415 result->op = op;
416 child->parent = apply_to_parent_p;
417 if (ZF_ISSET(node, JNODE_ORPHAN))
418 child->left_before = 1;
419 child->node = node;
420 return result;
423 /* initialize carry queue */
424 void init_carry_level(carry_level * level /* level to initialize */ ,
425 carry_pool * pool /* pool @level will allocate objects
426 * from */ )
428 assert("nikita-1045", level != NULL);
429 assert("nikita-967", pool != NULL);
431 memset(level, 0, sizeof *level);
432 level->pool = pool;
434 INIT_LIST_HEAD(&level->nodes);
435 INIT_LIST_HEAD(&level->ops);
438 /* allocate carry pool and initialize pools within queue */
439 carry_pool *init_carry_pool(int size)
441 carry_pool *pool;
443 assert("", size >= sizeof(carry_pool) + 3 * sizeof(carry_level));
444 pool = kmalloc(size, reiser4_ctx_gfp_mask_get());
445 if (pool == NULL)
446 return ERR_PTR(RETERR(-ENOMEM));
448 reiser4_init_pool(&pool->op_pool, sizeof(carry_op), CARRIES_POOL_SIZE,
449 (char *)pool->op);
450 reiser4_init_pool(&pool->node_pool, sizeof(carry_node),
451 NODES_LOCKED_POOL_SIZE, (char *)pool->node);
452 return pool;
455 /* finish with queue pools */
456 void done_carry_pool(carry_pool * pool/* pool to destroy */)
458 reiser4_done_pool(&pool->op_pool);
459 reiser4_done_pool(&pool->node_pool);
460 kfree(pool);
463 /* add new carry node to the @level.
465 Returns pointer to the new carry node allocated from pool. It's up to
466 callers to maintain proper order in the @level. Assumption is that if carry
467 nodes on one level are already sorted and modifications are peroformed from
468 left to right, carry nodes added on the parent level will be ordered
469 automatically. To control ordering use @order and @reference parameters.
472 carry_node *reiser4_add_carry_skip(carry_level * level /* &carry_level to add
473 * node to */ ,
474 pool_ordering order /* where to insert:
475 * at the beginning of
476 * @level,
477 * before @reference,
478 * after @reference,
479 * at the end of @level
480 */ ,
481 carry_node * reference/* reference node for
482 * insertion */)
484 ON_DEBUG(carry_node * orig_ref = reference);
486 if (order == POOLO_BEFORE) {
487 reference = find_left_carry(reference, level);
488 if (reference == NULL)
489 reference = list_entry(level->nodes.next, carry_node,
490 header.level_linkage);
491 else
492 reference = list_entry(reference->header.level_linkage.next,
493 carry_node, header.level_linkage);
494 } else if (order == POOLO_AFTER) {
495 reference = find_right_carry(reference, level);
496 if (reference == NULL)
497 reference = list_entry(level->nodes.prev, carry_node,
498 header.level_linkage);
499 else
500 reference = list_entry(reference->header.level_linkage.prev,
501 carry_node, header.level_linkage);
503 assert("nikita-2209",
504 ergo(orig_ref != NULL,
505 reiser4_carry_real(reference) ==
506 reiser4_carry_real(orig_ref)));
507 return reiser4_add_carry(level, order, reference);
510 carry_node *reiser4_add_carry(carry_level * level, /* carry_level to add
511 node to */
512 pool_ordering order, /* where to insert:
513 * at the beginning of
514 * @level;
515 * before @reference;
516 * after @reference;
517 * at the end of @level
519 carry_node * reference /* reference node for
520 * insertion */)
522 carry_node *result;
524 result =
525 (carry_node *) reiser4_add_obj(&level->pool->node_pool,
526 &level->nodes,
527 order, &reference->header);
528 if (!IS_ERR(result) && (result != NULL))
529 ++level->nodes_num;
530 return result;
534 * add new carry operation to the @level.
536 * Returns pointer to the new carry operations allocated from pool. It's up to
537 * callers to maintain proper order in the @level. To control ordering use
538 * @order and @reference parameters.
540 static carry_op *add_op(carry_level * level, /* &carry_level to add node to */
541 pool_ordering order, /* where to insert:
542 * at the beginning of @level;
543 * before @reference;
544 * after @reference;
545 * at the end of @level */
546 carry_op * reference /* reference node for insertion */)
548 carry_op *result;
550 result =
551 (carry_op *) reiser4_add_obj(&level->pool->op_pool, &level->ops,
552 order, &reference->header);
553 if (!IS_ERR(result) && (result != NULL))
554 ++level->ops_num;
555 return result;
559 * Return node on the right of which @node was created.
561 * Each node is created on the right of some existing node (or it is new root,
562 * which is special case not handled here).
564 * @node is new node created on some level, but not yet inserted into its
565 * parent, it has corresponding bit (JNODE_ORPHAN) set in zstate.
567 static carry_node *find_begetting_brother(carry_node * node,/* node to start
568 search from */
569 carry_level * kin UNUSED_ARG
570 /* level to scan */)
572 carry_node *scan;
574 assert("nikita-1614", node != NULL);
575 assert("nikita-1615", kin != NULL);
576 assert("nikita-1616", LOCK_CNT_GTZ(rw_locked_tree));
577 assert("nikita-1619", ergo(reiser4_carry_real(node) != NULL,
578 ZF_ISSET(reiser4_carry_real(node),
579 JNODE_ORPHAN)));
580 for (scan = node;;
581 scan = list_entry(scan->header.level_linkage.prev, carry_node,
582 header.level_linkage)) {
583 assert("nikita-1617", &kin->nodes != &scan->header.level_linkage);
584 if ((scan->node != node->node) &&
585 !ZF_ISSET(scan->node, JNODE_ORPHAN)) {
586 assert("nikita-1618", reiser4_carry_real(scan) != NULL);
587 break;
590 return scan;
593 static cmp_t
594 carry_node_cmp(carry_level * level, carry_node * n1, carry_node * n2)
596 assert("nikita-2199", n1 != NULL);
597 assert("nikita-2200", n2 != NULL);
599 if (n1 == n2)
600 return EQUAL_TO;
601 while (1) {
602 n1 = carry_node_next(n1);
603 if (carry_node_end(level, n1))
604 return GREATER_THAN;
605 if (n1 == n2)
606 return LESS_THAN;
608 impossible("nikita-2201", "End of level reached");
611 carry_node *find_carry_node(carry_level * level, const znode * node)
613 carry_node *scan;
614 carry_node *tmp_scan;
616 assert("nikita-2202", level != NULL);
617 assert("nikita-2203", node != NULL);
619 for_all_nodes(level, scan, tmp_scan) {
620 if (reiser4_carry_real(scan) == node)
621 return scan;
623 return NULL;
626 znode *reiser4_carry_real(const carry_node * node)
628 assert("nikita-3061", node != NULL);
630 return node->lock_handle.node;
633 carry_node *insert_carry_node(carry_level * doing, carry_level * todo,
634 const znode * node)
636 carry_node *base;
637 carry_node *scan;
638 carry_node *tmp_scan;
639 carry_node *proj;
641 base = find_carry_node(doing, node);
642 assert("nikita-2204", base != NULL);
644 for_all_nodes(todo, scan, tmp_scan) {
645 proj = find_carry_node(doing, scan->node);
646 assert("nikita-2205", proj != NULL);
647 if (carry_node_cmp(doing, proj, base) != LESS_THAN)
648 break;
650 return scan;
653 static carry_node *add_carry_atplace(carry_level * doing, carry_level * todo,
654 znode * node)
656 carry_node *reference;
658 assert("nikita-2994", doing != NULL);
659 assert("nikita-2995", todo != NULL);
660 assert("nikita-2996", node != NULL);
662 reference = insert_carry_node(doing, todo, node);
663 assert("nikita-2997", reference != NULL);
665 return reiser4_add_carry(todo, POOLO_BEFORE, reference);
668 /* like reiser4_post_carry(), but designed to be called from node plugin
669 methods. This function is different from reiser4_post_carry() in that it
670 finds proper place to insert node in the queue. */
671 carry_op *node_post_carry(carry_plugin_info * info /* carry parameters
672 * passed down to node
673 * plugin */ ,
674 carry_opcode op /* opcode of operation */ ,
675 znode * node /* node on which this
676 * operation will operate */ ,
677 int apply_to_parent_p /* whether operation will
678 * operate directly on @node
679 * or on it parent. */ )
681 carry_op *result;
682 carry_node *child;
684 assert("nikita-2207", info != NULL);
685 assert("nikita-2208", info->todo != NULL);
687 if (info->doing == NULL)
688 return reiser4_post_carry(info->todo, op, node,
689 apply_to_parent_p);
691 result = add_op(info->todo, POOLO_LAST, NULL);
692 if (IS_ERR(result))
693 return result;
694 child = add_carry_atplace(info->doing, info->todo, node);
695 if (IS_ERR(child)) {
696 reiser4_pool_free(&info->todo->pool->op_pool, &result->header);
697 return (carry_op *) child;
699 result->node = child;
700 result->op = op;
701 child->parent = apply_to_parent_p;
702 if (ZF_ISSET(node, JNODE_ORPHAN))
703 child->left_before = 1;
704 child->node = node;
705 return result;
708 /* lock all carry nodes in @level */
709 static int lock_carry_level(carry_level * level/* level to lock */)
711 int result;
712 carry_node *node;
713 carry_node *tmp_node;
715 assert("nikita-881", level != NULL);
716 assert("nikita-2229", carry_level_invariant(level, CARRY_TODO));
718 /* lock nodes from left to right */
719 result = 0;
720 for_all_nodes(level, node, tmp_node) {
721 result = lock_carry_node(level, node);
722 if (result != 0)
723 break;
725 return result;
728 /* Synchronize delimiting keys between @node and its left neighbor.
730 To reduce contention on dk key and simplify carry code, we synchronize
731 delimiting keys only when carry ultimately leaves tree level (carrying
732 changes upward) and unlocks nodes at this level.
734 This function first finds left neighbor of @node and then updates left
735 neighbor's right delimiting key to conincide with least key in @node.
739 ON_DEBUG(extern atomic_t delim_key_version;
742 static void sync_dkeys(znode * spot/* node to update */)
744 reiser4_key pivot;
745 reiser4_tree *tree;
747 assert("nikita-1610", spot != NULL);
748 assert("nikita-1612", LOCK_CNT_NIL(rw_locked_dk));
750 tree = znode_get_tree(spot);
751 read_lock_tree(tree);
752 write_lock_dk(tree);
754 assert("nikita-2192", znode_is_loaded(spot));
756 /* sync left delimiting key of @spot with key in its leftmost item */
757 if (node_is_empty(spot))
758 pivot = *znode_get_rd_key(spot);
759 else
760 leftmost_key_in_node(spot, &pivot);
762 znode_set_ld_key(spot, &pivot);
764 /* there can be sequence of empty nodes pending removal on the left of
765 @spot. Scan them and update their left and right delimiting keys to
766 match left delimiting key of @spot. Also, update right delimiting
767 key of first non-empty left neighbor.
769 while (1) {
770 if (!ZF_ISSET(spot, JNODE_LEFT_CONNECTED))
771 break;
773 spot = spot->left;
774 if (spot == NULL)
775 break;
777 znode_set_rd_key(spot, &pivot);
778 /* don't sink into the domain of another balancing */
779 if (!znode_is_write_locked(spot))
780 break;
781 if (ZF_ISSET(spot, JNODE_HEARD_BANSHEE))
782 znode_set_ld_key(spot, &pivot);
783 else
784 break;
787 write_unlock_dk(tree);
788 read_unlock_tree(tree);
791 /* unlock all carry nodes in @level */
792 static void unlock_carry_level(carry_level * level /* level to unlock */ ,
793 int failure /* true if unlocking owing to
794 * failure */ )
796 carry_node *node;
797 carry_node *tmp_node;
799 assert("nikita-889", level != NULL);
801 if (!failure) {
802 znode *spot;
804 spot = NULL;
805 /* update delimiting keys */
806 for_all_nodes(level, node, tmp_node) {
807 if (reiser4_carry_real(node) != spot) {
808 spot = reiser4_carry_real(node);
809 sync_dkeys(spot);
814 /* nodes can be unlocked in arbitrary order. In preemptible
815 environment it's better to unlock in reverse order of locking,
816 though.
818 for_all_nodes_back(level, node, tmp_node) {
819 /* all allocated nodes should be already linked to their
820 parents at this moment. */
821 assert("nikita-1631",
822 ergo(!failure, !ZF_ISSET(reiser4_carry_real(node),
823 JNODE_ORPHAN)));
824 ON_DEBUG(check_dkeys(reiser4_carry_real(node)));
825 unlock_carry_node(level, node, failure);
827 level->new_root = NULL;
830 /* finish with @level
832 Unlock nodes and release all allocated resources */
833 static void done_carry_level(carry_level * level/* level to finish */)
835 carry_node *node;
836 carry_node *tmp_node;
837 carry_op *op;
838 carry_op *tmp_op;
840 assert("nikita-1076", level != NULL);
842 unlock_carry_level(level, 0);
843 for_all_nodes(level, node, tmp_node) {
844 assert("nikita-2113", list_empty_careful(&node->lock_handle.locks_link));
845 assert("nikita-2114", list_empty_careful(&node->lock_handle.owners_link));
846 reiser4_pool_free(&level->pool->node_pool, &node->header);
848 for_all_ops(level, op, tmp_op)
849 reiser4_pool_free(&level->pool->op_pool, &op->header);
852 /* helper function to complete locking of carry node
854 Finish locking of carry node. There are several ways in which new carry
855 node can be added into carry level and locked. Normal is through
856 lock_carry_node(), but also from find_{left|right}_neighbor(). This
857 function factors out common final part of all locking scenarios. It
858 supposes that @node -> lock_handle is lock handle for lock just taken and
859 fills ->real_node from this lock handle.
862 int lock_carry_node_tail(carry_node * node/* node to complete locking of */)
864 assert("nikita-1052", node != NULL);
865 assert("nikita-1187", reiser4_carry_real(node) != NULL);
866 assert("nikita-1188", !node->unlock);
868 node->unlock = 1;
869 /* Load node content into memory and install node plugin by
870 looking at the node header.
872 Most of the time this call is cheap because the node is
873 already in memory.
875 Corresponding zrelse() is in unlock_carry_node()
877 return zload(reiser4_carry_real(node));
880 /* lock carry node
882 "Resolve" node to real znode, lock it and mark as locked.
883 This requires recursive locking of znodes.
885 When operation is posted to the parent level, node it will be applied to is
886 not yet known. For example, when shifting data between two nodes,
887 delimiting has to be updated in parent or parents of nodes involved. But
888 their parents is not yet locked and, moreover said nodes can be reparented
889 by concurrent balancing.
891 To work around this, carry operation is applied to special "carry node"
892 rather than to the znode itself. Carry node consists of some "base" or
893 "reference" znode and flags indicating how to get to the target of carry
894 operation (->real_node field of carry_node) from base.
897 int lock_carry_node(carry_level * level /* level @node is in */ ,
898 carry_node * node/* node to lock */)
900 int result;
901 znode *reference_point;
902 lock_handle lh;
903 lock_handle tmp_lh;
904 reiser4_tree *tree;
906 assert("nikita-887", level != NULL);
907 assert("nikita-882", node != NULL);
909 result = 0;
910 reference_point = node->node;
911 init_lh(&lh);
912 init_lh(&tmp_lh);
913 if (node->left_before) {
914 /* handling of new nodes, allocated on the previous level:
916 some carry ops were propably posted from the new node, but
917 this node neither has parent pointer set, nor is
918 connected. This will be done in ->create_hook() for
919 internal item.
921 No then less, parent of new node has to be locked. To do
922 this, first go to the "left" in the carry order. This
923 depends on the decision to always allocate new node on the
924 right of existing one.
926 Loop handles case when multiple nodes, all orphans, were
927 inserted.
929 Strictly speaking, taking tree lock is not necessary here,
930 because all nodes scanned by loop in
931 find_begetting_brother() are write-locked by this thread,
932 and thus, their sibling linkage cannot change.
935 tree = znode_get_tree(reference_point);
936 read_lock_tree(tree);
937 reference_point = find_begetting_brother(node, level)->node;
938 read_unlock_tree(tree);
939 assert("nikita-1186", reference_point != NULL);
941 if (node->parent && (result == 0)) {
942 result =
943 reiser4_get_parent(&tmp_lh, reference_point,
944 ZNODE_WRITE_LOCK);
945 if (result != 0) {
946 ; /* nothing */
947 } else if (znode_get_level(tmp_lh.node) == 0) {
948 assert("nikita-1347", znode_above_root(tmp_lh.node));
949 result = add_new_root(level, node, tmp_lh.node);
950 if (result == 0) {
951 reference_point = level->new_root;
952 move_lh(&lh, &node->lock_handle);
954 } else if ((level->new_root != NULL)
955 && (level->new_root !=
956 znode_parent_nolock(reference_point))) {
957 /* parent of node exists, but this level aready
958 created different new root, so */
959 warning("nikita-1109",
960 /* it should be "radicis", but tradition is
961 tradition. do banshees read latin? */
962 "hodie natus est radici frater");
963 result = -EIO;
964 } else {
965 move_lh(&lh, &tmp_lh);
966 reference_point = lh.node;
969 if (node->left && (result == 0)) {
970 assert("nikita-1183", node->parent);
971 assert("nikita-883", reference_point != NULL);
972 result =
973 reiser4_get_left_neighbor(&tmp_lh, reference_point,
974 ZNODE_WRITE_LOCK,
975 GN_CAN_USE_UPPER_LEVELS);
976 if (result == 0) {
977 done_lh(&lh);
978 move_lh(&lh, &tmp_lh);
979 reference_point = lh.node;
982 if (!node->parent && !node->left && !node->left_before) {
983 result =
984 longterm_lock_znode(&lh, reference_point, ZNODE_WRITE_LOCK,
985 ZNODE_LOCK_HIPRI);
987 if (result == 0) {
988 move_lh(&node->lock_handle, &lh);
989 result = lock_carry_node_tail(node);
991 done_lh(&tmp_lh);
992 done_lh(&lh);
993 return result;
996 /* release a lock on &carry_node.
998 Release if necessary lock on @node. This opearion is pair of
999 lock_carry_node() and is idempotent: you can call it more than once on the
1000 same node.
1003 static void
1004 unlock_carry_node(carry_level * level,
1005 carry_node * node /* node to be released */ ,
1006 int failure /* 0 if node is unlocked due
1007 * to some error */ )
1009 znode *real_node;
1011 assert("nikita-884", node != NULL);
1013 real_node = reiser4_carry_real(node);
1014 /* pair to zload() in lock_carry_node_tail() */
1015 zrelse(real_node);
1016 if (node->unlock && (real_node != NULL)) {
1017 assert("nikita-899", real_node == node->lock_handle.node);
1018 longterm_unlock_znode(&node->lock_handle);
1020 if (failure) {
1021 if (node->deallocate && (real_node != NULL)) {
1022 /* free node in bitmap
1024 Prepare node for removal. Last zput() will finish
1025 with it.
1027 ZF_SET(real_node, JNODE_HEARD_BANSHEE);
1029 if (node->free) {
1030 assert("nikita-2177",
1031 list_empty_careful(&node->lock_handle.locks_link));
1032 assert("nikita-2112",
1033 list_empty_careful(&node->lock_handle.owners_link));
1034 reiser4_pool_free(&level->pool->node_pool,
1035 &node->header);
1040 /* fatal_carry_error() - all-catching error handling function
1042 It is possible that carry faces unrecoverable error, like unability to
1043 insert pointer at the internal level. Our simple solution is just panic in
1044 this situation. More sophisticated things like attempt to remount
1045 file-system as read-only can be implemented without much difficlties.
1047 It is believed, that:
1049 1. in stead of panicking, all current transactions can be aborted rolling
1050 system back to the consistent state.
1052 Umm, if you simply panic without doing anything more at all, then all current
1053 transactions are aborted and the system is rolled back to a consistent state,
1054 by virtue of the design of the transactional mechanism. Well, wait, let's be
1055 precise. If an internal node is corrupted on disk due to hardware failure,
1056 then there may be no consistent state that can be rolled back to, so instead
1057 we should say that it will rollback the transactions, which barring other
1058 factors means rolling back to a consistent state.
1060 # Nikita: there is a subtle difference between panic and aborting
1061 # transactions: machine doesn't reboot. Processes aren't killed. Processes
1062 # don't using reiser4 (not that we care about such processes), or using other
1063 # reiser4 mounts (about them we do care) will simply continue to run. With
1064 # some luck, even application using aborted file system can survive: it will
1065 # get some error, like EBADF, from each file descriptor on failed file system,
1066 # but applications that do care about tolerance will cope with this (squid
1067 # will).
1069 It would be a nice feature though to support rollback without rebooting
1070 followed by remount, but this can wait for later versions.
1072 2. once isolated transactions will be implemented it will be possible to
1073 roll back offending transaction.
1075 2. is additional code complexity of inconsistent value (it implies that a
1076 broken tree should be kept in operation), so we must think about it more
1077 before deciding if it should be done. -Hans
1080 static void fatal_carry_error(carry_level * doing UNUSED_ARG /* carry level
1081 * where
1082 * unrecoverable
1083 * error
1084 * occurred */ ,
1085 int ecode/* error code */)
1087 assert("nikita-1230", doing != NULL);
1088 assert("nikita-1231", ecode < 0);
1090 reiser4_panic("nikita-1232", "Carry failed: %i", ecode);
1094 * Add new root to the tree
1096 * This function itself only manages changes in carry structures and delegates
1097 * all hard work (allocation of znode for new root, changes of parent and
1098 * sibling pointers to the reiser4_add_tree_root().
1100 * Locking: old tree root is locked by carry at this point. Fake znode is also
1101 * locked.
1103 static int add_new_root(carry_level * level,/* carry level in context of which
1104 * operation is performed */
1105 carry_node * node, /* carry node for existing root */
1106 znode * fake /* "fake" znode already locked by
1107 * us */)
1109 int result;
1111 assert("nikita-1104", level != NULL);
1112 assert("nikita-1105", node != NULL);
1114 assert("nikita-1403", znode_is_write_locked(node->node));
1115 assert("nikita-1404", znode_is_write_locked(fake));
1117 /* trying to create new root. */
1118 /* @node is root and it's already locked by us. This
1119 means that nobody else can be trying to add/remove
1120 tree root right now.
1122 if (level->new_root == NULL)
1123 level->new_root = reiser4_add_tree_root(node->node, fake);
1124 if (!IS_ERR(level->new_root)) {
1125 assert("nikita-1210", znode_is_root(level->new_root));
1126 node->deallocate = 1;
1127 result =
1128 longterm_lock_znode(&node->lock_handle, level->new_root,
1129 ZNODE_WRITE_LOCK, ZNODE_LOCK_LOPRI);
1130 if (result == 0)
1131 zput(level->new_root);
1132 } else {
1133 result = PTR_ERR(level->new_root);
1134 level->new_root = NULL;
1136 return result;
1139 /* allocate new znode and add the operation that inserts the
1140 pointer to it into the parent node into the todo level
1142 Allocate new znode, add it into carry queue and post into @todo queue
1143 request to add pointer to new node into its parent.
1145 This is carry related routing that calls reiser4_new_node() to allocate new
1146 node.
1148 carry_node *add_new_znode(znode * brother /* existing left neighbor of new
1149 * node */ ,
1150 carry_node * ref /* carry node after which new
1151 * carry node is to be inserted
1152 * into queue. This affects
1153 * locking. */ ,
1154 carry_level * doing /* carry queue where new node is
1155 * to be added */ ,
1156 carry_level * todo /* carry queue where COP_INSERT
1157 * operation to add pointer to
1158 * new node will ne added */ )
1160 carry_node *fresh;
1161 znode *new_znode;
1162 carry_op *add_pointer;
1163 carry_plugin_info info;
1165 assert("nikita-1048", brother != NULL);
1166 assert("nikita-1049", todo != NULL);
1168 /* There is a lot of possible variations here: to what parent
1169 new node will be attached and where. For simplicity, always
1170 do the following:
1172 (1) new node and @brother will have the same parent.
1174 (2) new node is added on the right of @brother
1178 fresh = reiser4_add_carry_skip(doing,
1179 ref ? POOLO_AFTER : POOLO_LAST, ref);
1180 if (IS_ERR(fresh))
1181 return fresh;
1183 fresh->deallocate = 1;
1184 fresh->free = 1;
1186 new_znode = reiser4_new_node(brother, znode_get_level(brother));
1187 if (IS_ERR(new_znode))
1188 /* @fresh will be deallocated automatically by error
1189 handling code in the caller. */
1190 return (carry_node *) new_znode;
1192 /* new_znode returned znode with x_count 1. Caller has to decrease
1193 it. make_space() does. */
1195 ZF_SET(new_znode, JNODE_ORPHAN);
1196 fresh->node = new_znode;
1198 while (ZF_ISSET(reiser4_carry_real(ref), JNODE_ORPHAN)) {
1199 ref = carry_node_prev(ref);
1200 assert("nikita-1606", !carry_node_end(doing, ref));
1203 info.todo = todo;
1204 info.doing = doing;
1205 add_pointer = node_post_carry(&info, COP_INSERT,
1206 reiser4_carry_real(ref), 1);
1207 if (IS_ERR(add_pointer)) {
1208 /* no need to deallocate @new_znode here: it will be
1209 deallocated during carry error handling. */
1210 return (carry_node *) add_pointer;
1213 add_pointer->u.insert.type = COPT_CHILD;
1214 add_pointer->u.insert.child = fresh;
1215 add_pointer->u.insert.brother = brother;
1216 /* initially new node spawns empty key range */
1217 write_lock_dk(znode_get_tree(brother));
1218 znode_set_ld_key(new_znode,
1219 znode_set_rd_key(new_znode,
1220 znode_get_rd_key(brother)));
1221 write_unlock_dk(znode_get_tree(brother));
1222 return fresh;
1225 /* DEBUGGING FUNCTIONS.
1227 Probably we also should leave them on even when
1228 debugging is turned off to print dumps at errors.
1230 #if REISER4_DEBUG
1231 static int carry_level_invariant(carry_level * level, carry_queue_state state)
1233 carry_node *node;
1234 carry_node *tmp_node;
1236 if (level == NULL)
1237 return 0;
1239 if (level->track_type != 0 &&
1240 level->track_type != CARRY_TRACK_NODE &&
1241 level->track_type != CARRY_TRACK_CHANGE)
1242 return 0;
1244 /* check that nodes are in ascending order */
1245 for_all_nodes(level, node, tmp_node) {
1246 znode *left;
1247 znode *right;
1249 reiser4_key lkey;
1250 reiser4_key rkey;
1252 if (node != carry_node_front(level)) {
1253 if (state == CARRY_TODO) {
1254 right = node->node;
1255 left = carry_node_prev(node)->node;
1256 } else {
1257 right = reiser4_carry_real(node);
1258 left = reiser4_carry_real(carry_node_prev(node));
1260 if (right == NULL || left == NULL)
1261 continue;
1262 if (node_is_empty(right) || node_is_empty(left))
1263 continue;
1264 if (!keyle(leftmost_key_in_node(left, &lkey),
1265 leftmost_key_in_node(right, &rkey))) {
1266 warning("", "wrong key order");
1267 return 0;
1271 return 1;
1273 #endif
1275 /* get symbolic name for boolean */
1276 static const char *tf(int boolean/* truth value */)
1278 return boolean ? "t" : "f";
1281 /* symbolic name for carry operation */
1282 static const char *carry_op_name(carry_opcode op/* carry opcode */)
1284 switch (op) {
1285 case COP_INSERT:
1286 return "COP_INSERT";
1287 case COP_DELETE:
1288 return "COP_DELETE";
1289 case COP_CUT:
1290 return "COP_CUT";
1291 case COP_PASTE:
1292 return "COP_PASTE";
1293 case COP_UPDATE:
1294 return "COP_UPDATE";
1295 case COP_EXTENT:
1296 return "COP_EXTENT";
1297 case COP_INSERT_FLOW:
1298 return "COP_INSERT_FLOW";
1299 default:{
1300 /* not mt safe, but who cares? */
1301 static char buf[20];
1303 sprintf(buf, "unknown op: %x", op);
1304 return buf;
1309 /* dump information about carry node */
1310 static void print_carry(const char *prefix /* prefix to print */ ,
1311 carry_node * node/* node to print */)
1313 if (node == NULL) {
1314 printk("%s: null\n", prefix);
1315 return;
1317 printk
1318 ("%s: %p parent: %s, left: %s, unlock: %s, free: %s, dealloc: %s\n",
1319 prefix, node, tf(node->parent), tf(node->left), tf(node->unlock),
1320 tf(node->free), tf(node->deallocate));
1323 /* dump information about carry operation */
1324 static void print_op(const char *prefix /* prefix to print */ ,
1325 carry_op * op/* operation to print */)
1327 if (op == NULL) {
1328 printk("%s: null\n", prefix);
1329 return;
1331 printk("%s: %p carry_opcode: %s\n", prefix, op, carry_op_name(op->op));
1332 print_carry("\tnode", op->node);
1333 switch (op->op) {
1334 case COP_INSERT:
1335 case COP_PASTE:
1336 print_coord("\tcoord",
1337 op->u.insert.d ? op->u.insert.d->coord : NULL, 0);
1338 reiser4_print_key("\tkey",
1339 op->u.insert.d ? op->u.insert.d->key : NULL);
1340 print_carry("\tchild", op->u.insert.child);
1341 break;
1342 case COP_DELETE:
1343 print_carry("\tchild", op->u.delete.child);
1344 break;
1345 case COP_CUT:
1346 if (op->u.cut_or_kill.is_cut) {
1347 print_coord("\tfrom",
1348 op->u.cut_or_kill.u.kill->params.from, 0);
1349 print_coord("\tto", op->u.cut_or_kill.u.kill->params.to,
1351 } else {
1352 print_coord("\tfrom",
1353 op->u.cut_or_kill.u.cut->params.from, 0);
1354 print_coord("\tto", op->u.cut_or_kill.u.cut->params.to,
1357 break;
1358 case COP_UPDATE:
1359 print_carry("\tleft", op->u.update.left);
1360 break;
1361 default:
1362 /* do nothing */
1363 break;
1367 /* dump information about all nodes and operations in a @level */
1368 static void print_level(const char *prefix /* prefix to print */ ,
1369 carry_level * level/* level to print */)
1371 carry_node *node;
1372 carry_node *tmp_node;
1373 carry_op *op;
1374 carry_op *tmp_op;
1376 if (level == NULL) {
1377 printk("%s: null\n", prefix);
1378 return;
1380 printk("%s: %p, restartable: %s\n",
1381 prefix, level, tf(level->restartable));
1383 for_all_nodes(level, node, tmp_node)
1384 print_carry("\tcarry node", node);
1385 for_all_ops(level, op, tmp_op)
1386 print_op("\tcarry op", op);
1389 /* Make Linus happy.
1390 Local variables:
1391 c-indentation-style: "K&R"
1392 mode-name: "LC"
1393 c-basic-offset: 8
1394 tab-width: 8
1395 fill-column: 120
1396 scroll-step: 1
1397 End: