1 /* Copyright 2001, 2002, 2003 by Hans Reiser, licensing governed by
4 /* implementation of carry operations */
10 #include "plugin/item/item.h"
11 #include "plugin/node/node.h"
14 #include "block_alloc.h"
15 #include "tree_walk.h"
19 #include "carry_ops.h"
24 #include <linux/types.h>
25 #include <linux/err.h>
27 static int carry_shift_data(sideof side
, coord_t
*insert_coord
, znode
* node
,
28 carry_level
* doing
, carry_level
* todo
,
29 unsigned int including_insert_coord_p
);
31 extern int lock_carry_node(carry_level
* level
, carry_node
* node
);
32 extern int lock_carry_node_tail(carry_node
* node
);
34 /* find left neighbor of a carry node
36 Look for left neighbor of @node and add it to the @doing queue. See
40 static carry_node
*find_left_neighbor(carry_op
* op
/* node to find left
42 carry_level
* doing
/* level to scan */)
54 /* first, check whether left neighbor is already in a @doing queue */
55 if (reiser4_carry_real(node
)->left
!= NULL
) {
56 /* NOTE: there is locking subtlety here. Look into
57 * find_right_neighbor() for more info */
58 if (find_carry_node(doing
,
59 reiser4_carry_real(node
)->left
) != NULL
) {
60 read_unlock_tree(tree
);
63 left
= list_entry(left
->header
.level_linkage
.prev
,
64 carry_node
, header
.level_linkage
);
65 assert("nikita-3408", !carry_node_end(doing
,
67 } while (reiser4_carry_real(left
) ==
68 reiser4_carry_real(node
));
72 read_unlock_tree(tree
);
74 left
= reiser4_add_carry_skip(doing
, POOLO_BEFORE
, node
);
78 left
->node
= node
->node
;
82 if (!(op
->u
.insert
.flags
& COPI_LOAD_LEFT
))
85 /* then, feeling lucky, peek left neighbor in the cache. */
86 result
= reiser4_get_left_neighbor(&left
->lock_handle
,
87 reiser4_carry_real(node
),
88 ZNODE_WRITE_LOCK
, flags
);
90 /* ok, node found and locked. */
91 result
= lock_carry_node_tail(left
);
93 left
= ERR_PTR(result
);
94 } else if (result
== -E_NO_NEIGHBOR
|| result
== -ENOENT
) {
95 /* node is leftmost node in a tree, or neighbor wasn't in
96 cache, or there is an extent on the left. */
97 reiser4_pool_free(&doing
->pool
->node_pool
, &left
->header
);
99 } else if (doing
->restartable
) {
100 /* if left neighbor is locked, and level is restartable, add
101 new node to @doing and restart. */
102 assert("nikita-913", node
->parent
!= 0);
103 assert("nikita-914", node
->node
!= NULL
);
106 left
= ERR_PTR(-E_REPEAT
);
108 /* left neighbor is locked, level cannot be restarted. Just
109 ignore left neighbor. */
110 reiser4_pool_free(&doing
->pool
->node_pool
, &left
->header
);
116 /* find right neighbor of a carry node
118 Look for right neighbor of @node and add it to the @doing queue. See
119 comments in the body.
122 static carry_node
*find_right_neighbor(carry_op
* op
/* node to find right
124 carry_level
* doing
/* level to scan */)
138 read_lock_tree(tree
);
139 /* first, check whether right neighbor is already in a @doing queue */
140 if (reiser4_carry_real(node
)->right
!= NULL
) {
142 * Tree lock is taken here anyway, because, even if _outcome_
143 * of (find_carry_node() != NULL) doesn't depends on
144 * concurrent updates to ->right, find_carry_node() cannot
145 * work with second argument NULL. Hence, following comment is
146 * of historic importance only.
150 * Q: why don't we need tree lock here, looking for the right
153 * A: even if value of node->real_node->right were changed
154 * during find_carry_node() execution, outcome of execution
155 * wouldn't change, because (in short) other thread cannot add
156 * elements to the @doing, and if node->real_node->right
157 * already was in @doing, value of node->real_node->right
158 * couldn't change, because node cannot be inserted between
161 if (find_carry_node(doing
,
162 reiser4_carry_real(node
)->right
) != NULL
) {
163 read_unlock_tree(tree
);
165 * What we are doing here (this is also applicable to
166 * the find_left_neighbor()).
168 * tree_walk.c code requires that insertion of a
169 * pointer to a child, modification of parent pointer
170 * in the child, and insertion of the child into
171 * sibling list are atomic (see
172 * plugin/item/internal.c:create_hook_internal()).
174 * carry allocates new node long before pointer to it
175 * is inserted into parent and, actually, long before
176 * parent is even known. Such allocated-but-orphaned
177 * nodes are only trackable through carry level lists.
179 * Situation that is handled here is following: @node
180 * has valid ->right pointer, but there is
181 * allocated-but-orphaned node in the carry queue that
182 * is logically between @node and @node->right. Here
183 * we are searching for it. Critical point is that
184 * this is only possible if @node->right is also in
185 * the carry queue (this is checked above), because
186 * this is the only way new orphaned node could be
187 * inserted between them (before inserting new node,
188 * make_space() first tries to shift to the right, so,
189 * right neighbor will be locked and queued).
194 right
= list_entry(right
->header
.level_linkage
.next
,
195 carry_node
, header
.level_linkage
);
196 assert("nikita-3408", !carry_node_end(doing
,
198 } while (reiser4_carry_real(right
) ==
199 reiser4_carry_real(node
));
203 read_unlock_tree(tree
);
205 flags
= GN_CAN_USE_UPPER_LEVELS
;
206 if (!(op
->u
.insert
.flags
& COPI_LOAD_RIGHT
))
209 /* then, try to lock right neighbor */
211 result
= reiser4_get_right_neighbor(&lh
,
212 reiser4_carry_real(node
),
213 ZNODE_WRITE_LOCK
, flags
);
215 /* ok, node found and locked. */
216 right
= reiser4_add_carry_skip(doing
, POOLO_AFTER
, node
);
217 if (!IS_ERR(right
)) {
218 right
->node
= lh
.node
;
219 move_lh(&right
->lock_handle
, &lh
);
221 result
= lock_carry_node_tail(right
);
223 right
= ERR_PTR(result
);
225 } else if ((result
== -E_NO_NEIGHBOR
) || (result
== -ENOENT
)) {
226 /* node is rightmost node in a tree, or neighbor wasn't in
227 cache, or there is an extent on the right. */
230 right
= ERR_PTR(result
);
235 /* how much free space in a @node is needed for @op
237 How much space in @node is required for completion of @op, where @op is
238 insert or paste operation.
240 static unsigned int space_needed_for_op(znode
* node
/* znode data are
243 carry_op
* op
/* carry
246 assert("nikita-919", op
!= NULL
);
250 impossible("nikita-1701", "Wrong opcode");
252 return space_needed(node
, NULL
, op
->u
.insert
.d
->data
, 1);
254 return space_needed(node
, op
->u
.insert
.d
->coord
,
255 op
->u
.insert
.d
->data
, 0);
259 /* how much space in @node is required to insert or paste @data at
261 unsigned int space_needed(const znode
* node
/* node data are inserted or
263 const coord_t
*coord
/* coord where data are
266 const reiser4_item_data
* data
/* data to insert or
268 int insertion
/* non-0 is inserting, 0---paste */)
273 assert("nikita-917", node
!= NULL
);
274 assert("nikita-918", node_plugin_by_node(node
) != NULL
);
275 assert("vs-230", !insertion
|| (coord
== NULL
));
279 if (iplug
->b
.estimate
!= NULL
) {
280 /* ask item plugin how much space is needed to insert this
282 result
+= iplug
->b
.estimate(insertion
? NULL
: coord
, data
);
284 /* reasonable default */
285 result
+= data
->length
;
291 /* and add node overhead */
292 if (nplug
->item_overhead
!= NULL
)
293 result
+= nplug
->item_overhead(node
, NULL
);
298 /* find &coord in parent where pointer to new child is to be stored. */
299 static int find_new_child_coord(carry_op
* op
/* COP_INSERT carry operation to
300 * insert pointer to new
307 assert("nikita-941", op
!= NULL
);
308 assert("nikita-942", op
->op
== COP_INSERT
);
310 node
= reiser4_carry_real(op
->node
);
311 assert("nikita-943", node
!= NULL
);
312 assert("nikita-944", node_plugin_by_node(node
) != NULL
);
314 child
= reiser4_carry_real(op
->u
.insert
.child
);
316 find_new_child_ptr(node
, child
, op
->u
.insert
.brother
,
317 op
->u
.insert
.d
->coord
);
319 build_child_ptr_data(child
, op
->u
.insert
.d
->data
);
323 /* additional amount of free space in @node required to complete @op */
324 static int free_space_shortage(znode
* node
/* node to check */ ,
325 carry_op
* op
/* operation being performed */)
327 assert("nikita-1061", node
!= NULL
);
328 assert("nikita-1062", op
!= NULL
);
332 impossible("nikita-1702", "Wrong opcode");
335 return space_needed_for_op(node
, op
) - znode_free_space(node
);
337 /* when inserting extent shift data around until insertion
338 point is utmost in the node. */
339 if (coord_wrt(op
->u
.insert
.d
->coord
) == COORD_INSIDE
)
346 /* helper function: update node pointer in operation after insertion
347 point was probably shifted into @target. */
348 static znode
*sync_op(carry_op
* op
, carry_node
* target
)
350 znode
*insertion_node
;
352 /* reget node from coord: shift might move insertion coord to
354 insertion_node
= op
->u
.insert
.d
->coord
->node
;
355 /* if insertion point was actually moved into new node,
356 update carry node pointer in operation. */
357 if (insertion_node
!= reiser4_carry_real(op
->node
)) {
359 assert("nikita-2540",
360 reiser4_carry_real(target
) == insertion_node
);
362 assert("nikita-2541",
363 reiser4_carry_real(op
->node
) == op
->u
.insert
.d
->coord
->node
);
364 return insertion_node
;
368 * complete make_space() call: update tracked lock handle if necessary. See
369 * comments for fs/reiser4/carry.h:carry_track_type
372 make_space_tail(carry_op
* op
, carry_level
* doing
, znode
* orig_node
)
375 carry_track_type tracking
;
378 tracking
= doing
->track_type
;
379 node
= op
->u
.insert
.d
->coord
->node
;
381 if (tracking
== CARRY_TRACK_NODE
||
382 (tracking
== CARRY_TRACK_CHANGE
&& node
!= orig_node
)) {
383 /* inserting or pasting into node different from
384 original. Update lock handle supplied by caller. */
385 assert("nikita-1417", doing
->tracked
!= NULL
);
386 done_lh(doing
->tracked
);
387 init_lh(doing
->tracked
);
388 result
= longterm_lock_znode(doing
->tracked
, node
,
396 /* This is insertion policy function. It shifts data to the left and right
397 neighbors of insertion coord and allocates new nodes until there is enough
398 free space to complete @op.
400 See comments in the body.
402 Assumes that the node format favors insertions at the right end of the node
405 See carry_flow() on detail about flow insertion
407 static int make_space(carry_op
* op
/* carry operation, insert or paste */ ,
408 carry_level
* doing
/* current carry queue */ ,
409 carry_level
* todo
/* carry queue on the parent level */)
413 int not_enough_space
;
420 assert("nikita-890", op
!= NULL
);
421 assert("nikita-891", todo
!= NULL
);
423 op
->op
== COP_INSERT
||
424 op
->op
== COP_PASTE
|| op
->op
== COP_EXTENT
);
425 assert("nikita-1607",
426 reiser4_carry_real(op
->node
) == op
->u
.insert
.d
->coord
->node
);
428 flags
= op
->u
.insert
.flags
;
430 /* NOTE check that new node can only be allocated after checking left
431 * and right neighbors. This is necessary for proper work of
432 * find_{left,right}_neighbor(). */
433 assert("nikita-3410", ergo(flags
& COPI_DONT_ALLOCATE
,
434 flags
& COPI_DONT_SHIFT_LEFT
));
435 assert("nikita-3411", ergo(flags
& COPI_DONT_ALLOCATE
,
436 flags
& COPI_DONT_SHIFT_RIGHT
));
438 coord
= op
->u
.insert
.d
->coord
;
439 orig_node
= node
= coord
->node
;
441 assert("nikita-908", node
!= NULL
);
442 assert("nikita-909", node_plugin_by_node(node
) != NULL
);
445 /* If there is not enough space in a node, try to shift something to
446 the left neighbor. This is a bit tricky, as locking to the left is
447 low priority. This is handled by restart logic in carry().
449 not_enough_space
= free_space_shortage(node
, op
);
450 if (not_enough_space
<= 0)
451 /* it is possible that carry was called when there actually
452 was enough space in the node. For example, when inserting
453 leftmost item so that delimiting keys have to be updated.
455 return make_space_tail(op
, doing
, orig_node
);
456 if (!(flags
& COPI_DONT_SHIFT_LEFT
)) {
458 /* make note in statistics of an attempt to move
459 something into the left neighbor */
460 left
= find_left_neighbor(op
, doing
);
461 if (unlikely(IS_ERR(left
))) {
462 if (PTR_ERR(left
) == -E_REPEAT
)
465 /* some error other than restart request
466 occurred. This shouldn't happen. Issue a
467 warning and continue as if left neighbor
470 warning("nikita-924",
471 "Error accessing left neighbor: %li",
474 } else if (left
!= NULL
) {
476 /* shift everything possible on the left of and
477 including insertion coord into the left neighbor */
478 result
= carry_shift_data(LEFT_SIDE
, coord
,
479 reiser4_carry_real(left
),
481 flags
& COPI_GO_LEFT
);
483 /* reget node from coord: shift_left() might move
484 insertion coord to the left neighbor */
485 node
= sync_op(op
, left
);
487 not_enough_space
= free_space_shortage(node
, op
);
488 /* There is not enough free space in @node, but
489 may be, there is enough free space in
490 @left. Various balancing decisions are valid here.
491 The same for the shifiting to the right.
495 /* If there still is not enough space, shift to the right */
496 if (not_enough_space
> 0 && !(flags
& COPI_DONT_SHIFT_RIGHT
)) {
499 right
= find_right_neighbor(op
, doing
);
501 warning("nikita-1065",
502 "Error accessing right neighbor: %li",
504 } else if (right
!= NULL
) {
505 /* node containing insertion point, and its right
506 neighbor node are write locked by now.
508 shift everything possible on the right of but
509 excluding insertion coord into the right neighbor
511 result
= carry_shift_data(RIGHT_SIDE
, coord
,
512 reiser4_carry_real(right
),
514 flags
& COPI_GO_RIGHT
);
515 /* reget node from coord: shift_right() might move
516 insertion coord to the right neighbor */
517 node
= sync_op(op
, right
);
518 not_enough_space
= free_space_shortage(node
, op
);
521 /* If there is still not enough space, allocate new node(s).
523 We try to allocate new blocks if COPI_DONT_ALLOCATE is not set in
524 the carry operation flags (currently this is needed during flush
528 not_enough_space
> 0 && result
== 0 && blk_alloc
< 2 &&
529 !(flags
& COPI_DONT_ALLOCATE
); ++blk_alloc
) {
530 carry_node
*fresh
; /* new node we are allocating */
531 coord_t coord_shadow
; /* remembered insertion point before
532 * shifting data into new node */
533 carry_node
*node_shadow
; /* remembered insertion node
535 unsigned int gointo
; /* whether insertion point should move
536 * into newly allocated node */
538 /* allocate new node on the right of @node. Znode and disk
539 fake block number for new node are allocated.
541 add_new_znode() posts carry operation COP_INSERT with
542 COPT_CHILD option to the parent level to add
543 pointer to newly created node to its parent.
545 Subtle point: if several new nodes are required to complete
546 insertion operation at this level, they will be inserted
547 into their parents in the order of creation, which means
548 that @node will be valid "cookie" at the time of insertion.
551 fresh
= add_new_znode(node
, op
->node
, doing
, todo
);
553 return PTR_ERR(fresh
);
555 /* Try to shift into new node. */
556 result
= lock_carry_node(doing
, fresh
);
557 zput(reiser4_carry_real(fresh
));
559 warning("nikita-947",
560 "Cannot lock new node: %i", result
);
564 /* both nodes are write locked by now.
566 shift everything possible on the right of and
567 including insertion coord into the right neighbor.
569 coord_dup(&coord_shadow
, op
->u
.insert
.d
->coord
);
570 node_shadow
= op
->node
;
571 /* move insertion point into newly created node if:
573 . insertion point is rightmost in the source node, or
574 . this is not the first node we are allocating in a row.
578 coord_is_after_rightmost(op
->u
.insert
.d
->coord
);
581 op
->op
== COP_PASTE
&&
582 coord_is_existing_item(op
->u
.insert
.d
->coord
) &&
583 is_solid_item((item_plugin_by_coord(op
->u
.insert
.d
->coord
)))) {
584 /* paste into solid (atomic) item, which can contain
585 only one unit, so we need to shift it right, where
586 insertion point supposed to be */
588 assert("edward-1444", op
->u
.insert
.d
->data
->iplug
==
589 item_plugin_by_id(STATIC_STAT_DATA_ID
));
590 assert("edward-1445",
591 op
->u
.insert
.d
->data
->length
>
592 node_plugin_by_node(coord
->node
)->free_space
595 op
->u
.insert
.d
->coord
->between
= BEFORE_UNIT
;
598 result
= carry_shift_data(RIGHT_SIDE
, coord
,
599 reiser4_carry_real(fresh
),
600 doing
, todo
, gointo
);
601 /* if insertion point was actually moved into new node,
602 update carry node pointer in operation. */
603 node
= sync_op(op
, fresh
);
604 not_enough_space
= free_space_shortage(node
, op
);
605 if ((not_enough_space
> 0) && (node
!= coord_shadow
.node
)) {
606 /* there is not enough free in new node. Shift
607 insertion point back to the @shadow_node so that
608 next new node would be inserted between
609 @shadow_node and @fresh.
611 coord_normalize(&coord_shadow
);
612 coord_dup(coord
, &coord_shadow
);
614 op
->node
= node_shadow
;
615 if (1 || (flags
& COPI_STEP_BACK
)) {
616 /* still not enough space?! Maybe there is
617 enough space in the source node (i.e., node
618 data are moved from) now.
621 free_space_shortage(node
, op
);
625 if (not_enough_space
> 0) {
626 if (!(flags
& COPI_DONT_ALLOCATE
))
627 warning("nikita-948", "Cannot insert new item");
628 result
= -E_NODE_FULL
;
630 assert("nikita-1622", ergo(result
== 0,
631 reiser4_carry_real(op
->node
) == coord
->node
));
632 assert("nikita-2616", coord
== op
->u
.insert
.d
->coord
);
634 result
= make_space_tail(op
, doing
, orig_node
);
638 /* insert_paste_common() - common part of insert and paste operations
640 This function performs common part of COP_INSERT and COP_PASTE.
642 There are two ways in which insertion/paste can be requested:
644 . by directly supplying reiser4_item_data. In this case, op ->
645 u.insert.type is set to COPT_ITEM_DATA.
647 . by supplying child pointer to which is to inserted into parent. In this
648 case op -> u.insert.type == COPT_CHILD.
650 . by supplying key of new item/unit. This is currently only used during
653 This is required, because when new node is allocated we don't know at what
654 position pointer to it is to be stored in the parent. Actually, we don't
655 even know what its parent will be, because parent can be re-balanced
656 concurrently and new node re-parented, and because parent can be full and
657 pointer to the new node will go into some other node.
659 insert_paste_common() resolves pointer to child node into position in the
660 parent by calling find_new_child_coord(), that fills
661 reiser4_item_data. After this, insertion/paste proceeds uniformly.
663 Another complication is with finding free space during pasting. It may
664 happen that while shifting items to the neighbors and newly allocated
665 nodes, insertion coord can no longer be in the item we wanted to paste
666 into. At this point, paste becomes (morphs) into insert. Moreover free
667 space analysis has to be repeated, because amount of space required for
668 insertion is different from that of paste (item header overhead, etc).
670 This function "unifies" different insertion modes (by resolving child
671 pointer or key into insertion coord), and then calls make_space() to free
672 enough space in the node by shifting data to the left and right and by
673 allocating new nodes if necessary. Carry operation knows amount of space
674 required for its completion. After enough free space is obtained, caller of
675 this function (carry_{insert,paste,etc.}) performs actual insertion/paste
676 by calling item plugin method.
679 static int insert_paste_common(carry_op
* op
/* carry operation being
681 carry_level
* doing
/* current carry level */ ,
682 carry_level
* todo
/* next carry level */ ,
683 carry_insert_data
* cdata
/* pointer to
685 coord_t
*coord
/* insertion/paste coord */ ,
686 reiser4_item_data
* data
/* data to be
687 * inserted/pasted */ )
689 assert("nikita-981", op
!= NULL
);
690 assert("nikita-980", todo
!= NULL
);
691 assert("nikita-979", (op
->op
== COP_INSERT
) || (op
->op
== COP_PASTE
)
692 || (op
->op
== COP_EXTENT
));
694 if (op
->u
.insert
.type
== COPT_PASTE_RESTARTED
) {
695 /* nothing to do. Fall through to make_space(). */
697 } else if (op
->u
.insert
.type
== COPT_KEY
) {
698 node_search_result intra_node
;
700 /* Problem with doing batching at the lowest level, is that
701 operations here are given by coords where modification is
702 to be performed, and one modification can invalidate coords
703 of all following operations.
705 So, we are implementing yet another type for operation that
706 will use (the only) "locator" stable across shifting of
707 data between nodes, etc.: key (COPT_KEY).
709 This clause resolves key to the coord in the node.
711 But node can change also. Probably some pieces have to be
712 added to the lock_carry_node(), to lock node by its key.
715 /* NOTE-NIKITA Lookup bias is fixed to FIND_EXACT. Complain
716 if you need something else. */
717 op
->u
.insert
.d
->coord
= coord
;
718 node
= reiser4_carry_real(op
->node
);
719 intra_node
= node_plugin_by_node(node
)->lookup
720 (node
, op
->u
.insert
.d
->key
, FIND_EXACT
,
721 op
->u
.insert
.d
->coord
);
722 if ((intra_node
!= NS_FOUND
) && (intra_node
!= NS_NOT_FOUND
)) {
723 warning("nikita-1715", "Intra node lookup failure: %i",
727 } else if (op
->u
.insert
.type
== COPT_CHILD
) {
728 /* if we are asked to insert pointer to the child into
729 internal node, first convert pointer to the child into
730 coord within parent node.
735 op
->u
.insert
.d
= cdata
;
736 op
->u
.insert
.d
->coord
= coord
;
737 op
->u
.insert
.d
->data
= data
;
738 op
->u
.insert
.d
->coord
->node
= reiser4_carry_real(op
->node
);
739 result
= find_new_child_coord(op
);
740 child
= reiser4_carry_real(op
->u
.insert
.child
);
741 if (result
!= NS_NOT_FOUND
) {
742 warning("nikita-993",
743 "Cannot find a place for child pointer: %i",
747 /* This only happens when we did multiple insertions at
748 the previous level, trying to insert single item and
749 it so happened, that insertion of pointers to all new
750 nodes before this one already caused parent node to
751 split (may be several times).
753 I am going to come up with better solution.
755 You are not expected to understand this.
756 -- v6root/usr/sys/ken/slp.c
758 Basically, what happens here is the following: carry came
759 to the parent level and is about to insert internal item
760 pointing to the child node that it just inserted in the
761 level below. Position where internal item is to be inserted
762 was found by find_new_child_coord() above, but node of the
763 current carry operation (that is, parent node of child
764 inserted on the previous level), was determined earlier in
765 the lock_carry_level/lock_carry_node. It could so happen
766 that other carry operations already performed on the parent
767 level already split parent node, so that insertion point
768 moved into another node. Handle this by creating new carry
769 node for insertion point if necessary.
771 if (reiser4_carry_real(op
->node
) !=
772 op
->u
.insert
.d
->coord
->node
) {
773 pool_ordering direction
;
780 * determine in what direction insertion point
781 * moved. Do this by comparing delimiting keys.
783 z1
= op
->u
.insert
.d
->coord
->node
;
784 z2
= reiser4_carry_real(op
->node
);
785 if (keyle(leftmost_key_in_node(z1
, &k1
),
786 leftmost_key_in_node(z2
, &k2
)))
787 /* insertion point moved to the left */
788 direction
= POOLO_BEFORE
;
790 /* insertion point moved to the right */
791 direction
= POOLO_AFTER
;
793 op
->node
= reiser4_add_carry_skip(doing
,
794 direction
, op
->node
);
795 if (IS_ERR(op
->node
))
796 return PTR_ERR(op
->node
);
797 op
->node
->node
= op
->u
.insert
.d
->coord
->node
;
799 result
= lock_carry_node(doing
, op
->node
);
805 * set up key of an item being inserted: we are inserting
806 * internal item and its key is (by the very definition of
807 * search tree) is leftmost key in the child node.
809 write_lock_dk(znode_get_tree(child
));
810 op
->u
.insert
.d
->key
= leftmost_key_in_node(child
,
811 znode_get_ld_key(child
));
812 write_unlock_dk(znode_get_tree(child
));
813 op
->u
.insert
.d
->data
->arg
= op
->u
.insert
.brother
;
815 assert("vs-243", op
->u
.insert
.d
->coord
!= NULL
);
816 op
->u
.insert
.d
->coord
->node
= reiser4_carry_real(op
->node
);
819 /* find free space. */
820 return make_space(op
, doing
, todo
);
823 /* handle carry COP_INSERT operation.
825 Insert new item into node. New item can be given in one of two ways:
827 - by passing &tree_coord and &reiser4_item_data as part of @op. This is
828 only applicable at the leaf/twig level.
830 - by passing a child node pointer to which is to be inserted by this
834 static int carry_insert(carry_op
* op
/* operation to perform */ ,
835 carry_level
* doing
/* queue of operations @op
837 carry_level
* todo
/* queue where new operations
838 * are accumulated */ )
841 carry_insert_data cdata
;
843 reiser4_item_data data
;
844 carry_plugin_info info
;
847 assert("nikita-1036", op
!= NULL
);
848 assert("nikita-1037", todo
!= NULL
);
849 assert("nikita-1038", op
->op
== COP_INSERT
);
851 coord_init_zero(&coord
);
853 /* perform common functionality of insert and paste. */
854 result
= insert_paste_common(op
, doing
, todo
, &cdata
, &coord
, &data
);
858 node
= op
->u
.insert
.d
->coord
->node
;
859 assert("nikita-1039", node
!= NULL
);
860 assert("nikita-1040", node_plugin_by_node(node
) != NULL
);
863 space_needed_for_op(node
, op
) <= znode_free_space(node
));
865 /* ask node layout to create new item. */
868 result
= node_plugin_by_node(node
)->create_item
869 (op
->u
.insert
.d
->coord
, op
->u
.insert
.d
->key
, op
->u
.insert
.d
->data
,
871 doing
->restartable
= 0;
872 znode_make_dirty(node
);
878 * Flow insertion code. COP_INSERT_FLOW is special tree operation that is
879 * supplied with a "flow" (that is, a stream of data) and inserts it into tree
880 * by slicing into multiple items.
883 #define flow_insert_point(op) ((op)->u.insert_flow.insert_point)
884 #define flow_insert_flow(op) ((op)->u.insert_flow.flow)
885 #define flow_insert_data(op) ((op)->u.insert_flow.data)
887 static size_t item_data_overhead(carry_op
* op
)
889 if (flow_insert_data(op
)->iplug
->b
.estimate
== NULL
)
891 return (flow_insert_data(op
)->iplug
->b
.
892 estimate(NULL
/* estimate insertion */ , flow_insert_data(op
)) -
893 flow_insert_data(op
)->length
);
896 /* FIXME-VS: this is called several times during one make_flow_for_insertion
897 and it will always return the same result. Some optimization could be made
898 by calculating this value once at the beginning and passing it around. That
899 would reduce some flexibility in future changes
901 static int can_paste(coord_t
*, const reiser4_key
*, const reiser4_item_data
*);
902 static size_t flow_insertion_overhead(carry_op
* op
)
905 size_t insertion_overhead
;
907 node
= flow_insert_point(op
)->node
;
908 insertion_overhead
= 0;
909 if (node
->nplug
->item_overhead
&&
910 !can_paste(flow_insert_point(op
), &flow_insert_flow(op
)->key
,
911 flow_insert_data(op
)))
913 node
->nplug
->item_overhead(node
, NULL
) +
914 item_data_overhead(op
);
915 return insertion_overhead
;
918 /* how many bytes of flow does fit to the node */
919 static int what_can_fit_into_node(carry_op
* op
)
921 size_t free
, overhead
;
923 overhead
= flow_insertion_overhead(op
);
924 free
= znode_free_space(flow_insert_point(op
)->node
);
925 if (free
<= overhead
)
928 /* FIXME: flow->length is loff_t only to not get overflowed in case of
929 expandign truncate */
930 if (free
< op
->u
.insert_flow
.flow
->length
)
932 return (int)op
->u
.insert_flow
.flow
->length
;
935 /* in make_space_for_flow_insertion we need to check either whether whole flow
936 fits into a node or whether minimal fraction of flow fits into a node */
937 static int enough_space_for_whole_flow(carry_op
* op
)
939 return (unsigned)what_can_fit_into_node(op
) ==
940 op
->u
.insert_flow
.flow
->length
;
943 #define MIN_FLOW_FRACTION 1
944 static int enough_space_for_min_flow_fraction(carry_op
* op
)
946 assert("vs-902", coord_is_after_rightmost(flow_insert_point(op
)));
948 return what_can_fit_into_node(op
) >= MIN_FLOW_FRACTION
;
951 /* this returns 0 if left neighbor was obtained successfully and everything
952 upto insertion point including it were shifted and left neighbor still has
953 some free space to put minimal fraction of flow into it */
955 make_space_by_shift_left(carry_op
* op
, carry_level
* doing
, carry_level
* todo
)
960 left
= find_left_neighbor(op
, doing
);
961 if (unlikely(IS_ERR(left
))) {
963 "make_space_by_shift_left: "
964 "error accessing left neighbor: %li", PTR_ERR(left
));
968 /* left neighbor either does not exist or is unformatted
972 orig
= flow_insert_point(op
)->node
;
973 /* try to shift content of node @orig from its head upto insert point
974 including insertion point into the left neighbor */
975 carry_shift_data(LEFT_SIDE
, flow_insert_point(op
),
976 reiser4_carry_real(left
), doing
, todo
,
977 1/* including insert point */);
978 if (reiser4_carry_real(left
) != flow_insert_point(op
)->node
) {
979 /* insertion point did not move */
983 /* insertion point is set after last item in the node */
984 assert("vs-900", coord_is_after_rightmost(flow_insert_point(op
)));
986 if (!enough_space_for_min_flow_fraction(op
)) {
987 /* insertion point node does not have enough free space to put
988 even minimal portion of flow into it, therefore, move
989 insertion point back to orig node (before first item) */
990 coord_init_before_first_item(flow_insert_point(op
), orig
);
994 /* part of flow is to be written to the end of node */
999 /* this returns 0 if right neighbor was obtained successfully and everything to
1000 the right of insertion point was shifted to it and node got enough free
1001 space to put minimal fraction of flow into it */
1003 make_space_by_shift_right(carry_op
* op
, carry_level
* doing
,
1008 right
= find_right_neighbor(op
, doing
);
1009 if (unlikely(IS_ERR(right
))) {
1010 warning("nikita-1065", "shift_right_excluding_insert_point: "
1011 "error accessing right neighbor: %li", PTR_ERR(right
));
1015 /* shift everything possible on the right of but excluding
1016 insertion coord into the right neighbor */
1017 carry_shift_data(RIGHT_SIDE
, flow_insert_point(op
),
1018 reiser4_carry_real(right
), doing
, todo
,
1019 0/* not including insert point */);
1021 /* right neighbor either does not exist or is unformatted
1025 if (coord_is_after_rightmost(flow_insert_point(op
))) {
1026 if (enough_space_for_min_flow_fraction(op
)) {
1027 /* part of flow is to be written to the end of node */
1032 /* new node is to be added if insert point node did not get enough
1033 space for whole flow */
1037 /* this returns 0 when insert coord is set at the node end and fraction of flow
1038 fits into that node */
1040 make_space_by_new_nodes(carry_op
* op
, carry_level
* doing
, carry_level
* todo
)
1046 node
= flow_insert_point(op
)->node
;
1048 if (op
->u
.insert_flow
.new_nodes
== CARRY_FLOW_NEW_NODES_LIMIT
)
1049 return RETERR(-E_NODE_FULL
);
1050 /* add new node after insert point node */
1051 new = add_new_znode(node
, op
->node
, doing
, todo
);
1052 if (unlikely(IS_ERR(new)))
1053 return PTR_ERR(new);
1054 result
= lock_carry_node(doing
, new);
1055 zput(reiser4_carry_real(new));
1056 if (unlikely(result
))
1058 op
->u
.insert_flow
.new_nodes
++;
1059 if (!coord_is_after_rightmost(flow_insert_point(op
))) {
1060 carry_shift_data(RIGHT_SIDE
, flow_insert_point(op
),
1061 reiser4_carry_real(new), doing
, todo
,
1062 0/* not including insert point */);
1064 coord_is_after_rightmost(flow_insert_point(op
)));
1066 if (enough_space_for_min_flow_fraction(op
))
1068 if (op
->u
.insert_flow
.new_nodes
== CARRY_FLOW_NEW_NODES_LIMIT
)
1069 return RETERR(-E_NODE_FULL
);
1071 /* add one more new node */
1072 new = add_new_znode(node
, op
->node
, doing
, todo
);
1073 if (unlikely(IS_ERR(new)))
1074 return PTR_ERR(new);
1075 result
= lock_carry_node(doing
, new);
1076 zput(reiser4_carry_real(new));
1077 if (unlikely(result
))
1079 op
->u
.insert_flow
.new_nodes
++;
1082 /* move insertion point to new node */
1083 coord_init_before_first_item(flow_insert_point(op
),
1084 reiser4_carry_real(new));
1090 make_space_for_flow_insertion(carry_op
* op
, carry_level
* doing
,
1093 __u32 flags
= op
->u
.insert_flow
.flags
;
1095 if (enough_space_for_whole_flow(op
)) {
1096 /* whole flow fits into insert point node */
1100 if (!(flags
& COPI_DONT_SHIFT_LEFT
)
1101 && (make_space_by_shift_left(op
, doing
, todo
) == 0)) {
1102 /* insert point is shifted to left neighbor of original insert
1103 point node and is set after last unit in that node. It has
1104 enough space to fit at least minimal fraction of flow. */
1108 if (enough_space_for_whole_flow(op
)) {
1109 /* whole flow fits into insert point node */
1113 if (!(flags
& COPI_DONT_SHIFT_RIGHT
)
1114 && (make_space_by_shift_right(op
, doing
, todo
) == 0)) {
1115 /* insert point is still set to the same node, but there is
1116 nothing to the right of insert point. */
1120 if (enough_space_for_whole_flow(op
)) {
1121 /* whole flow fits into insert point node */
1125 return make_space_by_new_nodes(op
, doing
, todo
);
1128 /* implements COP_INSERT_FLOW operation */
1130 carry_insert_flow(carry_op
* op
, carry_level
* doing
, carry_level
* todo
)
1134 coord_t
*insert_point
;
1136 carry_plugin_info info
;
1138 lock_handle
*orig_lh
;
1140 f
= op
->u
.insert_flow
.flow
;
1143 /* carry system needs this to work */
1147 orig_node
= flow_insert_point(op
)->node
;
1148 orig_lh
= doing
->tracked
;
1151 result
= make_space_for_flow_insertion(op
, doing
, todo
);
1155 insert_point
= flow_insert_point(op
);
1156 nplug
= node_plugin_by_node(insert_point
->node
);
1158 /* compose item data for insertion/pasting */
1159 flow_insert_data(op
)->data
= f
->data
;
1160 flow_insert_data(op
)->length
= what_can_fit_into_node(op
);
1162 if (can_paste(insert_point
, &f
->key
, flow_insert_data(op
))) {
1163 /* insert point is set to item of file we are writing to
1164 and we have to append to it */
1165 assert("vs-903", insert_point
->between
== AFTER_UNIT
);
1166 nplug
->change_item_size(insert_point
,
1167 flow_insert_data(op
)->length
);
1168 flow_insert_data(op
)->iplug
->b
.paste(insert_point
,
1172 /* new item must be inserted */
1173 pos_in_node_t new_pos
;
1174 flow_insert_data(op
)->length
+= item_data_overhead(op
);
1176 /* FIXME-VS: this is because node40_create_item changes
1177 insert_point for obscure reasons */
1178 switch (insert_point
->between
) {
1180 new_pos
= insert_point
->item_pos
+ 1;
1186 assert("vs-905", insert_point
->item_pos
== 0);
1190 impossible("vs-906",
1191 "carry_insert_flow: invalid coord");
1196 nplug
->create_item(insert_point
, &f
->key
,
1197 flow_insert_data(op
), &info
);
1198 coord_set_item_pos(insert_point
, new_pos
);
1200 coord_init_after_item_end(insert_point
);
1201 doing
->restartable
= 0;
1202 znode_make_dirty(insert_point
->node
);
1204 move_flow_forward(f
, (unsigned)flow_insert_data(op
)->length
);
1207 if (orig_node
!= flow_insert_point(op
)->node
) {
1208 /* move lock to new insert point */
1212 longterm_lock_znode(orig_lh
, flow_insert_point(op
)->node
,
1213 ZNODE_WRITE_LOCK
, ZNODE_LOCK_HIPRI
);
1219 /* implements COP_DELETE operation
1221 Remove pointer to @op -> u.delete.child from it's parent.
1223 This function also handles killing of a tree root is last pointer from it
1224 was removed. This is complicated by our handling of "twig" level: root on
1225 twig level is never killed.
1228 static int carry_delete(carry_op
* op
/* operation to be performed */ ,
1229 carry_level
* doing UNUSED_ARG
/* current carry
1231 carry_level
* todo
/* next carry level */)
1238 carry_plugin_info info
;
1242 * This operation is called to delete internal item pointing to the
1243 * child node that was removed by carry from the tree on the previous
1247 assert("nikita-893", op
!= NULL
);
1248 assert("nikita-894", todo
!= NULL
);
1249 assert("nikita-895", op
->op
== COP_DELETE
);
1251 coord_init_zero(&coord
);
1252 coord_init_zero(&coord2
);
1254 parent
= reiser4_carry_real(op
->node
);
1255 child
= op
->u
.delete.child
?
1256 reiser4_carry_real(op
->u
.delete.child
) : op
->node
->node
;
1257 tree
= znode_get_tree(child
);
1258 read_lock_tree(tree
);
1261 * @parent was determined when carry entered parent level
1262 * (lock_carry_level/lock_carry_node). Since then, actual parent of
1263 * @child node could change due to other carry operations performed on
1264 * the parent level. Check for this.
1267 if (znode_parent(child
) != parent
) {
1268 /* NOTE-NIKITA add stat counter for this. */
1269 parent
= znode_parent(child
);
1270 assert("nikita-2581", find_carry_node(doing
, parent
));
1272 read_unlock_tree(tree
);
1274 assert("nikita-1213", znode_get_level(parent
) > LEAF_LEVEL
);
1276 /* Twig level horrors: tree should be of height at least 2. So, last
1277 pointer from the root at twig level is preserved even if child is
1278 empty. This is ugly, but so it was architectured.
1281 if (znode_is_root(parent
) &&
1282 znode_get_level(parent
) <= REISER4_MIN_TREE_HEIGHT
&&
1283 node_num_items(parent
) == 1) {
1284 /* Delimiting key manipulations. */
1285 write_lock_dk(tree
);
1286 znode_set_ld_key(child
, znode_set_ld_key(parent
, reiser4_min_key()));
1287 znode_set_rd_key(child
, znode_set_rd_key(parent
, reiser4_max_key()));
1288 ZF_SET(child
, JNODE_DKSET
);
1289 write_unlock_dk(tree
);
1291 /* @child escaped imminent death! */
1292 ZF_CLR(child
, JNODE_HEARD_BANSHEE
);
1296 /* convert child pointer to the coord_t */
1297 result
= find_child_ptr(parent
, child
, &coord
);
1298 if (result
!= NS_FOUND
) {
1299 warning("nikita-994", "Cannot find child pointer: %i", result
);
1300 print_coord_content("coord", &coord
);
1304 coord_dup(&coord2
, &coord
);
1309 * Actually kill internal item: prepare structure with
1310 * arguments for ->cut_and_kill() method...
1313 struct carry_kill_data kdata
;
1314 kdata
.params
.from
= &coord
;
1315 kdata
.params
.to
= &coord2
;
1316 kdata
.params
.from_key
= NULL
;
1317 kdata
.params
.to_key
= NULL
;
1318 kdata
.params
.smallest_removed
= NULL
;
1319 kdata
.params
.truncate
= 1;
1320 kdata
.flags
= op
->u
.delete.flags
;
1325 /* ... and call it. */
1326 result
= node_plugin_by_node(parent
)->cut_and_kill(&kdata
,
1329 doing
->restartable
= 0;
1331 /* check whether root should be killed violently */
1332 if (znode_is_root(parent
) &&
1333 /* don't kill roots at and lower than twig level */
1334 znode_get_level(parent
) > REISER4_MIN_TREE_HEIGHT
&&
1335 node_num_items(parent
) == 1)
1336 result
= reiser4_kill_tree_root(coord
.node
);
1338 return result
< 0 ? : 0;
1341 /* implements COP_CUT opration
1343 Cuts part or whole content of node.
1346 static int carry_cut(carry_op
* op
/* operation to be performed */ ,
1347 carry_level
* doing
/* current carry level */ ,
1348 carry_level
* todo
/* next carry level */)
1351 carry_plugin_info info
;
1354 assert("nikita-896", op
!= NULL
);
1355 assert("nikita-897", todo
!= NULL
);
1356 assert("nikita-898", op
->op
== COP_CUT
);
1361 nplug
= node_plugin_by_node(reiser4_carry_real(op
->node
));
1362 if (op
->u
.cut_or_kill
.is_cut
)
1363 result
= nplug
->cut(op
->u
.cut_or_kill
.u
.cut
, &info
);
1365 result
= nplug
->cut_and_kill(op
->u
.cut_or_kill
.u
.kill
, &info
);
1367 doing
->restartable
= 0;
1368 return result
< 0 ? : 0;
1371 /* helper function for carry_paste(): returns true if @op can be continued as
1374 can_paste(coord_t
*icoord
, const reiser4_key
* key
,
1375 const reiser4_item_data
* data
)
1378 item_plugin
*new_iplug
;
1379 item_plugin
*old_iplug
;
1380 int result
= 0; /* to keep gcc shut */
1382 assert("", icoord
->between
!= AT_UNIT
);
1384 /* obviously, one cannot paste when node is empty---there is nothing
1386 if (node_is_empty(icoord
->node
))
1388 /* if insertion point is at the middle of the item, then paste */
1389 if (!coord_is_between_items(icoord
))
1391 coord_dup(&circa
, icoord
);
1392 circa
.between
= AT_UNIT
;
1394 old_iplug
= item_plugin_by_coord(&circa
);
1395 new_iplug
= data
->iplug
;
1397 /* check whether we can paste to the item @icoord is "at" when we
1398 ignore ->between field */
1399 if (old_iplug
== new_iplug
&& item_can_contain_key(&circa
, key
, data
))
1401 else if (icoord
->between
== BEFORE_UNIT
1402 || icoord
->between
== BEFORE_ITEM
) {
1403 /* otherwise, try to glue to the item at the left, if any */
1404 coord_dup(&circa
, icoord
);
1405 if (coord_set_to_left(&circa
)) {
1407 coord_init_before_item(icoord
);
1409 old_iplug
= item_plugin_by_coord(&circa
);
1410 result
= (old_iplug
== new_iplug
)
1411 && item_can_contain_key(icoord
, key
, data
);
1413 coord_dup(icoord
, &circa
);
1414 icoord
->between
= AFTER_UNIT
;
1417 } else if (icoord
->between
== AFTER_UNIT
1418 || icoord
->between
== AFTER_ITEM
) {
1419 coord_dup(&circa
, icoord
);
1420 /* otherwise, try to glue to the item at the right, if any */
1421 if (coord_set_to_right(&circa
)) {
1423 coord_init_after_item(icoord
);
1425 int (*cck
) (const coord_t
*, const reiser4_key
*,
1426 const reiser4_item_data
*);
1428 old_iplug
= item_plugin_by_coord(&circa
);
1430 cck
= old_iplug
->b
.can_contain_key
;
1432 /* item doesn't define ->can_contain_key
1433 method? So it is not expandable. */
1436 result
= (old_iplug
== new_iplug
)
1437 && cck(&circa
/*icoord */ , key
, data
);
1439 coord_dup(icoord
, &circa
);
1440 icoord
->between
= BEFORE_UNIT
;
1445 impossible("nikita-2513", "Nothing works");
1447 if (icoord
->between
== BEFORE_ITEM
) {
1448 assert("vs-912", icoord
->unit_pos
== 0);
1449 icoord
->between
= BEFORE_UNIT
;
1450 } else if (icoord
->between
== AFTER_ITEM
) {
1451 coord_init_after_item_end(icoord
);
1457 /* implements COP_PASTE operation
1459 Paste data into existing item. This is complicated by the fact that after
1460 we shifted something to the left or right neighbors trying to free some
1461 space, item we were supposed to paste into can be in different node than
1462 insertion coord. If so, we are no longer doing paste, but insert. See
1463 comments in insert_paste_common().
1466 static int carry_paste(carry_op
* op
/* operation to be performed */ ,
1467 carry_level
* doing UNUSED_ARG
/* current carry
1469 carry_level
* todo
/* next carry level */)
1472 carry_insert_data cdata
;
1474 reiser4_item_data data
;
1478 carry_plugin_info info
;
1481 assert("nikita-982", op
!= NULL
);
1482 assert("nikita-983", todo
!= NULL
);
1483 assert("nikita-984", op
->op
== COP_PASTE
);
1485 coord_init_zero(&dcoord
);
1487 result
= insert_paste_common(op
, doing
, todo
, &cdata
, &dcoord
, &data
);
1491 coord
= op
->u
.insert
.d
->coord
;
1493 /* handle case when op -> u.insert.coord doesn't point to the item
1494 of required type. restart as insert. */
1495 if (!can_paste(coord
, op
->u
.insert
.d
->key
, op
->u
.insert
.d
->data
)) {
1496 op
->op
= COP_INSERT
;
1497 op
->u
.insert
.type
= COPT_PASTE_RESTARTED
;
1498 result
= op_dispatch_table
[COP_INSERT
].handler(op
, doing
, todo
);
1504 iplug
= item_plugin_by_coord(coord
);
1505 assert("nikita-992", iplug
!= NULL
);
1507 assert("nikita-985", node
!= NULL
);
1508 assert("nikita-986", node_plugin_by_node(node
) != NULL
);
1510 assert("nikita-987",
1511 space_needed_for_op(node
, op
) <= znode_free_space(node
));
1513 assert("nikita-1286", coord_is_existing_item(coord
));
1516 * if item is expanded as a result of this operation, we should first
1517 * change item size, than call ->b.paste item method. If item is
1518 * shrunk, it should be done other way around: first call ->b.paste
1519 * method, then reduce item size.
1522 real_size
= space_needed_for_op(node
, op
);
1524 node
->nplug
->change_item_size(coord
, real_size
);
1526 doing
->restartable
= 0;
1530 result
= iplug
->b
.paste(coord
, op
->u
.insert
.d
->data
, &info
);
1533 node
->nplug
->change_item_size(coord
, real_size
);
1535 /* if we pasted at the beginning of the item, update item's key. */
1536 if (coord
->unit_pos
== 0 && coord
->between
!= AFTER_UNIT
)
1537 node
->nplug
->update_item_key(coord
, op
->u
.insert
.d
->key
, &info
);
1539 znode_make_dirty(node
);
1543 /* handle carry COP_EXTENT operation. */
1544 static int carry_extent(carry_op
* op
/* operation to perform */ ,
1545 carry_level
* doing
/* queue of operations @op
1547 carry_level
* todo
/* queue where new operations
1548 * are accumulated */ )
1551 carry_insert_data cdata
;
1553 reiser4_item_data data
;
1554 carry_op
*delete_dummy
;
1555 carry_op
*insert_extent
;
1557 carry_plugin_info info
;
1559 assert("nikita-1751", op
!= NULL
);
1560 assert("nikita-1752", todo
!= NULL
);
1561 assert("nikita-1753", op
->op
== COP_EXTENT
);
1563 /* extent insertion overview:
1565 extents live on the TWIG LEVEL, which is level one above the leaf
1566 one. This complicates extent insertion logic somewhat: it may
1567 happen (and going to happen all the time) that in logical key
1568 ordering extent has to be placed between items I1 and I2, located
1569 at the leaf level, but I1 and I2 are in the same formatted leaf
1570 node N1. To insert extent one has to
1572 (1) reach node N1 and shift data between N1, its neighbors and
1573 possibly newly allocated nodes until I1 and I2 fall into different
1574 nodes. Since I1 and I2 are still neighboring items in logical key
1575 order, they will be necessary utmost items in their respective
1578 (2) After this new extent item is inserted into node on the twig
1581 Fortunately this process can reuse almost all code from standard
1582 insertion procedure (viz. make_space() and insert_paste_common()),
1583 due to the following observation: make_space() only shifts data up
1584 to and excluding or including insertion point. It never
1585 "over-moves" through insertion point. Thus, one can use
1586 make_space() to perform step (1). All required for this is just to
1587 instruct free_space_shortage() to keep make_space() shifting data
1588 until insertion point is at the node border.
1592 /* perform common functionality of insert and paste. */
1593 result
= insert_paste_common(op
, doing
, todo
, &cdata
, &coord
, &data
);
1597 node
= op
->u
.extent
.d
->coord
->node
;
1598 assert("nikita-1754", node
!= NULL
);
1599 assert("nikita-1755", node_plugin_by_node(node
) != NULL
);
1600 assert("nikita-1700", coord_wrt(op
->u
.extent
.d
->coord
) != COORD_INSIDE
);
1602 /* NOTE-NIKITA add some checks here. Not assertions, -EIO. Check that
1603 extent fits between items. */
1608 /* there is another complication due to placement of extents on the
1609 twig level: extents are "rigid" in the sense that key-range
1610 occupied by extent cannot grow indefinitely to the right as it is
1611 for the formatted leaf nodes. Because of this when search finds two
1612 adjacent extents on the twig level, it has to "drill" to the leaf
1613 level, creating new node. Here we are removing this node.
1615 if (node_is_empty(node
)) {
1616 delete_dummy
= node_post_carry(&info
, COP_DELETE
, node
, 1);
1617 if (IS_ERR(delete_dummy
))
1618 return PTR_ERR(delete_dummy
);
1619 delete_dummy
->u
.delete.child
= NULL
;
1620 delete_dummy
->u
.delete.flags
= DELETE_RETAIN_EMPTY
;
1621 ZF_SET(node
, JNODE_HEARD_BANSHEE
);
1624 /* proceed with inserting extent item into parent. We are definitely
1625 inserting rather than pasting if we get that far. */
1626 insert_extent
= node_post_carry(&info
, COP_INSERT
, node
, 1);
1627 if (IS_ERR(insert_extent
))
1628 /* @delete_dummy will be automatically destroyed on the level
1630 return PTR_ERR(insert_extent
);
1631 /* NOTE-NIKITA insertion by key is simplest option here. Another
1632 possibility is to insert on the left or right of already existing
1635 insert_extent
->u
.insert
.type
= COPT_KEY
;
1636 insert_extent
->u
.insert
.d
= op
->u
.extent
.d
;
1637 assert("nikita-1719", op
->u
.extent
.d
->key
!= NULL
);
1638 insert_extent
->u
.insert
.d
->data
->arg
= op
->u
.extent
.d
->coord
;
1639 insert_extent
->u
.insert
.flags
=
1640 znode_get_tree(node
)->carry
.new_extent_flags
;
1643 * if carry was asked to track lock handle we should actually track
1644 * lock handle on the twig node rather than on the leaf where
1645 * operation was started from. Transfer tracked lock handle.
1647 if (doing
->track_type
) {
1648 assert("nikita-3242", doing
->tracked
!= NULL
);
1649 assert("nikita-3244", todo
->tracked
== NULL
);
1650 todo
->tracked
= doing
->tracked
;
1651 todo
->track_type
= CARRY_TRACK_NODE
;
1652 doing
->tracked
= NULL
;
1653 doing
->track_type
= 0;
1659 /* update key in @parent between pointers to @left and @right.
1661 Find coords of @left and @right and update delimiting key between them.
1662 This is helper function called by carry_update(). Finds position of
1663 internal item involved. Updates item key. Updates delimiting keys of child
1666 static int update_delimiting_key(znode
* parent
/* node key is updated
1668 znode
* left
/* child of @parent */ ,
1669 znode
* right
/* child of @parent */ ,
1670 carry_level
* doing
/* current carry
1672 carry_level
* todo
/* parent carry
1674 const char **error_msg
/* place to
1682 carry_plugin_info info
;
1684 assert("nikita-1177", right
!= NULL
);
1685 /* find position of right left child in a parent */
1686 result
= find_child_ptr(parent
, right
, &right_pos
);
1687 if (result
!= NS_FOUND
) {
1688 *error_msg
= "Cannot find position of right child";
1692 if ((left
!= NULL
) && !coord_is_leftmost_unit(&right_pos
)) {
1693 /* find position of the left child in a parent */
1694 result
= find_child_ptr(parent
, left
, &left_pos
);
1695 if (result
!= NS_FOUND
) {
1696 *error_msg
= "Cannot find position of left child";
1699 assert("nikita-1355", left_pos
.node
!= NULL
);
1701 left_pos
.node
= NULL
;
1703 /* check that they are separated by exactly one key and are basically
1705 if (REISER4_DEBUG
) {
1706 if ((left_pos
.node
!= NULL
)
1707 && !coord_is_existing_unit(&left_pos
)) {
1708 *error_msg
= "Left child is bastard";
1709 return RETERR(-EIO
);
1711 if (!coord_is_existing_unit(&right_pos
)) {
1712 *error_msg
= "Right child is bastard";
1713 return RETERR(-EIO
);
1715 if (left_pos
.node
!= NULL
&&
1716 !coord_are_neighbors(&left_pos
, &right_pos
)) {
1717 *error_msg
= "Children are not direct siblings";
1718 return RETERR(-EIO
);
1727 * If child node is not empty, new key of internal item is a key of
1728 * leftmost item in the child node. If the child is empty, take its
1729 * right delimiting key as a new key of the internal item. Precise key
1730 * in the latter case is not important per se, because the child (and
1731 * the internal item) are going to be killed shortly anyway, but we
1732 * have to preserve correct order of keys in the parent node.
1735 if (!ZF_ISSET(right
, JNODE_HEARD_BANSHEE
))
1736 leftmost_key_in_node(right
, &ldkey
);
1738 read_lock_dk(znode_get_tree(parent
));
1739 ldkey
= *znode_get_rd_key(right
);
1740 read_unlock_dk(znode_get_tree(parent
));
1742 node_plugin_by_node(parent
)->update_item_key(&right_pos
, &ldkey
, &info
);
1743 doing
->restartable
= 0;
1744 znode_make_dirty(parent
);
1748 /* implements COP_UPDATE opration
1750 Update delimiting keys.
1753 static int carry_update(carry_op
* op
/* operation to be performed */ ,
1754 carry_level
* doing
/* current carry level */ ,
1755 carry_level
* todo
/* next carry level */)
1758 carry_node
*missing UNUSED_ARG
;
1763 const char *error_msg
;
1767 * This operation is called to update key of internal item. This is
1768 * necessary when carry shifted of cut data on the child
1769 * level. Arguments of this operation are:
1771 * @right --- child node. Operation should update key of internal
1772 * item pointing to @right.
1774 * @left --- left neighbor of @right. This parameter is optional.
1777 assert("nikita-902", op
!= NULL
);
1778 assert("nikita-903", todo
!= NULL
);
1779 assert("nikita-904", op
->op
== COP_UPDATE
);
1781 lchild
= op
->u
.update
.left
;
1784 if (lchild
!= NULL
) {
1785 assert("nikita-1001", lchild
->parent
);
1786 assert("nikita-1003", !lchild
->left
);
1787 left
= reiser4_carry_real(lchild
);
1791 tree
= znode_get_tree(rchild
->node
);
1792 read_lock_tree(tree
);
1793 right
= znode_parent(rchild
->node
);
1794 read_unlock_tree(tree
);
1796 if (right
!= NULL
) {
1797 result
= update_delimiting_key(right
,
1798 lchild
? lchild
->node
: NULL
,
1800 doing
, todo
, &error_msg
);
1802 error_msg
= "Cannot find node to update key in";
1803 result
= RETERR(-EIO
);
1805 /* operation will be reposted to the next level by the
1806 ->update_item_key() method of node plugin, if necessary. */
1809 warning("nikita-999", "Error updating delimiting key: %s (%i)",
1810 error_msg
? : "", result
);
1815 /* move items from @node during carry */
1816 static int carry_shift_data(sideof side
/* in what direction to move data */ ,
1817 coord_t
*insert_coord
/* coord where new item
1818 * is to be inserted */,
1819 znode
* node
/* node which data are moved from */ ,
1820 carry_level
* doing
/* active carry queue */ ,
1821 carry_level
* todo
/* carry queue where new
1822 * operations are to be put
1824 unsigned int including_insert_coord_p
1825 /* true if @insertion_coord can be moved */ )
1829 carry_plugin_info info
;
1832 source
= insert_coord
->node
;
1837 nplug
= node_plugin_by_node(node
);
1838 result
= nplug
->shift(insert_coord
, node
,
1839 (side
== LEFT_SIDE
) ? SHIFT_LEFT
: SHIFT_RIGHT
, 0,
1840 (int)including_insert_coord_p
, &info
);
1841 /* the only error ->shift() method of node plugin can return is
1842 -ENOMEM due to carry node/operation allocation. */
1843 assert("nikita-915", result
>= 0 || result
== -ENOMEM
);
1846 * if some number of bytes was actually shifted, mark nodes
1847 * dirty, and carry level as non-restartable.
1849 doing
->restartable
= 0;
1850 znode_make_dirty(source
);
1851 znode_make_dirty(node
);
1854 assert("nikita-2077", coord_check(insert_coord
));
1858 typedef carry_node
*(*carry_iterator
) (carry_node
* node
);
1859 static carry_node
*find_dir_carry(carry_node
* node
, carry_level
* level
,
1860 carry_iterator iterator
);
1862 static carry_node
*pool_level_list_prev(carry_node
*node
)
1864 return list_entry(node
->header
.level_linkage
.prev
, carry_node
, header
.level_linkage
);
1867 /* look for the left neighbor of given carry node in a carry queue.
1869 This is used by find_left_neighbor(), but I am not sure that this
1870 really gives any advantage. More statistics required.
1873 carry_node
*find_left_carry(carry_node
* node
/* node to find left neighbor
1875 carry_level
* level
/* level to scan */)
1877 return find_dir_carry(node
, level
,
1878 (carry_iterator
) pool_level_list_prev
);
1881 static carry_node
*pool_level_list_next(carry_node
*node
)
1883 return list_entry(node
->header
.level_linkage
.next
, carry_node
, header
.level_linkage
);
1886 /* look for the right neighbor of given carry node in a
1889 This is used by find_right_neighbor(), but I am not sure that this
1890 really gives any advantage. More statistics required.
1893 carry_node
*find_right_carry(carry_node
* node
/* node to find right neighbor
1895 carry_level
* level
/* level to scan */)
1897 return find_dir_carry(node
, level
,
1898 (carry_iterator
) pool_level_list_next
);
1901 /* look for the left or right neighbor of given carry node in a carry
1904 Helper function used by find_{left|right}_carry().
1906 static carry_node
*find_dir_carry(carry_node
* node
/* node to start
1907 * scanning from */ ,
1908 carry_level
* level
/* level to scan */ ,
1909 carry_iterator iterator
/* operation to
1913 carry_node
*neighbor
;
1915 assert("nikita-1059", node
!= NULL
);
1916 assert("nikita-1060", level
!= NULL
);
1918 /* scan list of carry nodes on this list dir-ward, skipping all
1919 carry nodes referencing the same znode. */
1922 neighbor
= iterator(neighbor
);
1923 if (carry_node_end(level
, neighbor
))
1924 /* list head is reached */
1926 if (reiser4_carry_real(neighbor
) != reiser4_carry_real(node
))
1932 * Memory reservation estimation.
1934 * Carry process proceeds through tree levels upwards. Carry assumes that it
1935 * takes tree in consistent state (e.g., that search tree invariants hold),
1936 * and leaves tree consistent after it finishes. This means that when some
1937 * error occurs carry cannot simply return if there are pending carry
1938 * operations. Generic solution for this problem is carry-undo either as
1939 * transaction manager feature (requiring checkpoints and isolation), or
1940 * through some carry specific mechanism.
1942 * Our current approach is to panic if carry hits an error while tree is
1943 * inconsistent. Unfortunately -ENOMEM can easily be triggered. To work around
1944 * this "memory reservation" mechanism was added.
1946 * Memory reservation is implemented by perthread-pages.diff patch from
1947 * core-patches. Its API is defined in <linux/gfp.h>
1949 * int perthread_pages_reserve(int nrpages, gfp_t gfp);
1950 * void perthread_pages_release(int nrpages);
1951 * int perthread_pages_count(void);
1953 * carry estimates its worst case memory requirements at the entry, reserved
1954 * enough memory, and released unused pages before returning.
1956 * Code below estimates worst case memory requirements for a given carry
1957 * queue. This is dome by summing worst case memory requirements for each
1958 * operation in the queue.
1963 * Memory memory requirements of many operations depends on the tree
1964 * height. For example, item insertion requires new node to be inserted at
1965 * each tree level in the worst case. What tree height should be used for
1966 * estimation? Current tree height is wrong, because tree height can change
1967 * between the time when estimation was done and the time when operation is
1968 * actually performed. Maximal possible tree height (REISER4_MAX_ZTREE_HEIGHT)
1969 * is also not desirable, because it would lead to the huge over-estimation
1970 * all the time. Plausible solution is "capped tree height": if current tree
1971 * height is less than some TREE_HEIGHT_CAP constant, capped tree height is
1972 * TREE_HEIGHT_CAP, otherwise it's current tree height. Idea behind this is
1973 * that if tree height is TREE_HEIGHT_CAP or larger, it's extremely unlikely
1974 * to be increased even more during short interval of time.
1976 #define TREE_HEIGHT_CAP (5)
1978 /* return capped tree height for the @tree. See comment above. */
1979 static int cap_tree_height(reiser4_tree
* tree
)
1981 return max_t(int, tree
->height
, TREE_HEIGHT_CAP
);
1984 /* return capped tree height for the current tree. */
1985 static int capped_height(void)
1987 return cap_tree_height(current_tree
);
1990 /* return number of pages required to store given number of bytes */
1991 static int bytes_to_pages(int bytes
)
1993 return (bytes
+ PAGE_CACHE_SIZE
- 1) >> PAGE_CACHE_SHIFT
;
1996 /* how many pages are required to allocate znodes during item insertion. */
1997 static int carry_estimate_znodes(void)
2000 * Note, that there we have some problem here: there is no way to
2001 * reserve pages specifically for the given slab. This means that
2002 * these pages can be hijacked for some other end.
2005 /* in the worst case we need 3 new znode on each tree level */
2006 return bytes_to_pages(capped_height() * sizeof(znode
) * 3);
2010 * how many pages are required to load bitmaps. One bitmap per level.
2012 static int carry_estimate_bitmaps(void)
2014 if (reiser4_is_set(reiser4_get_current_sb(), REISER4_DONT_LOAD_BITMAP
)) {
2017 bytes
= capped_height() * (0 + /* bnode should be added, but
2018 * its is private to bitmap.c,
2021 /* working and commit jnodes */
2022 return bytes_to_pages(bytes
) + 2; /* and their contents */
2024 /* bitmaps were pre-loaded during mount */
2028 /* worst case item insertion memory requirements */
2029 static int carry_estimate_insert(carry_op
* op
, carry_level
* level
)
2031 return carry_estimate_bitmaps() + carry_estimate_znodes() + 1 +
2033 capped_height() + /* new block on each level */
2034 1 + /* and possibly extra new block at the leaf level */
2035 3; /* loading of leaves into memory */
2038 /* worst case item deletion memory requirements */
2039 static int carry_estimate_delete(carry_op
* op
, carry_level
* level
)
2041 return carry_estimate_bitmaps() + carry_estimate_znodes() + 1 +
2043 3; /* loading of leaves into memory */
2046 /* worst case tree cut memory requirements */
2047 static int carry_estimate_cut(carry_op
* op
, carry_level
* level
)
2049 return carry_estimate_bitmaps() + carry_estimate_znodes() + 1 +
2051 3; /* loading of leaves into memory */
2054 /* worst case memory requirements of pasting into item */
2055 static int carry_estimate_paste(carry_op
* op
, carry_level
* level
)
2057 return carry_estimate_bitmaps() + carry_estimate_znodes() + 1 +
2059 capped_height() + /* new block on each level */
2060 1 + /* and possibly extra new block at the leaf level */
2061 3; /* loading of leaves into memory */
2064 /* worst case memory requirements of extent insertion */
2065 static int carry_estimate_extent(carry_op
* op
, carry_level
* level
)
2067 return carry_estimate_insert(op
, level
) + /* insert extent */
2068 carry_estimate_delete(op
, level
); /* kill leaf */
2071 /* worst case memory requirements of key update */
2072 static int carry_estimate_update(carry_op
* op
, carry_level
* level
)
2077 /* worst case memory requirements of flow insertion */
2078 static int carry_estimate_insert_flow(carry_op
* op
, carry_level
* level
)
2082 newnodes
= min(bytes_to_pages(op
->u
.insert_flow
.flow
->length
),
2083 CARRY_FLOW_NEW_NODES_LIMIT
);
2085 * roughly estimate insert_flow as a sequence of insertions.
2087 return newnodes
* carry_estimate_insert(op
, level
);
2090 /* This is dispatch table for carry operations. It can be trivially
2091 abstracted into useful plugin: tunable balancing policy is a good
2093 carry_op_handler op_dispatch_table
[COP_LAST_OP
] = {
2095 .handler
= carry_insert
,
2096 .estimate
= carry_estimate_insert
}
2099 .handler
= carry_delete
,
2100 .estimate
= carry_estimate_delete
}
2103 .handler
= carry_cut
,
2104 .estimate
= carry_estimate_cut
}
2107 .handler
= carry_paste
,
2108 .estimate
= carry_estimate_paste
}
2111 .handler
= carry_extent
,
2112 .estimate
= carry_estimate_extent
}
2115 .handler
= carry_update
,
2116 .estimate
= carry_estimate_update
}
2118 [COP_INSERT_FLOW
] = {
2119 .handler
= carry_insert_flow
,
2120 .estimate
= carry_estimate_insert_flow
}
2123 /* Make Linus happy.
2125 c-indentation-style: "K&R"