1 /* Copyright 2001, 2002, 2003 by Hans Reiser, licensing governed by reiser4/README */
3 /* implementation of carry operations */
9 #include "plugin/item/item.h"
10 #include "plugin/node/node.h"
13 #include "block_alloc.h"
14 #include "tree_walk.h"
18 #include "carry_ops.h"
23 #include <linux/types.h>
24 #include <linux/err.h>
26 static int carry_shift_data(sideof side
, coord_t
* insert_coord
, znode
* node
,
27 carry_level
* doing
, carry_level
* todo
,
28 unsigned int including_insert_coord_p
);
30 extern int lock_carry_node(carry_level
* level
, carry_node
* node
);
31 extern int lock_carry_node_tail(carry_node
* node
);
33 /* find left neighbor of a carry node
35 Look for left neighbor of @node and add it to the @doing queue. See
39 static carry_node
*find_left_neighbor(carry_op
* op
/* node to find left
41 carry_level
* doing
/* level to scan */ )
53 /* first, check whether left neighbor is already in a @doing queue */
54 if (reiser4_carry_real(node
)->left
!= NULL
) {
55 /* NOTE: there is locking subtlety here. Look into
56 * find_right_neighbor() for more info */
57 if (find_carry_node(doing
,
58 reiser4_carry_real(node
)->left
) != NULL
) {
59 read_unlock_tree(tree
);
62 left
= list_entry(left
->header
.level_linkage
.prev
,
63 carry_node
, header
.level_linkage
);
64 assert("nikita-3408", !carry_node_end(doing
,
66 } while (reiser4_carry_real(left
) ==
67 reiser4_carry_real(node
));
71 read_unlock_tree(tree
);
73 left
= reiser4_add_carry_skip(doing
, POOLO_BEFORE
, node
);
77 left
->node
= node
->node
;
81 if (!op
->u
.insert
.flags
& COPI_LOAD_LEFT
)
84 /* then, feeling lucky, peek left neighbor in the cache. */
85 result
= reiser4_get_left_neighbor(&left
->lock_handle
,
86 reiser4_carry_real(node
),
87 ZNODE_WRITE_LOCK
, flags
);
89 /* ok, node found and locked. */
90 result
= lock_carry_node_tail(left
);
92 left
= ERR_PTR(result
);
93 } else if (result
== -E_NO_NEIGHBOR
|| result
== -ENOENT
) {
94 /* node is leftmost node in a tree, or neighbor wasn't in
95 cache, or there is an extent on the left. */
96 reiser4_pool_free(&doing
->pool
->node_pool
, &left
->header
);
98 } else if (doing
->restartable
) {
99 /* if left neighbor is locked, and level is restartable, add
100 new node to @doing and restart. */
101 assert("nikita-913", node
->parent
!= 0);
102 assert("nikita-914", node
->node
!= NULL
);
105 left
= ERR_PTR(-E_REPEAT
);
107 /* left neighbor is locked, level cannot be restarted. Just
108 ignore left neighbor. */
109 reiser4_pool_free(&doing
->pool
->node_pool
, &left
->header
);
115 /* find right neighbor of a carry node
117 Look for right neighbor of @node and add it to the @doing queue. See
118 comments in the body.
121 static carry_node
*find_right_neighbor(carry_op
* op
/* node to find right
123 carry_level
* doing
/* level to scan */ )
137 read_lock_tree(tree
);
138 /* first, check whether right neighbor is already in a @doing queue */
139 if (reiser4_carry_real(node
)->right
!= NULL
) {
141 * Tree lock is taken here anyway, because, even if _outcome_
142 * of (find_carry_node() != NULL) doesn't depends on
143 * concurrent updates to ->right, find_carry_node() cannot
144 * work with second argument NULL. Hence, following comment is
145 * of historic importance only.
149 * Q: why don't we need tree lock here, looking for the right
152 * A: even if value of node->real_node->right were changed
153 * during find_carry_node() execution, outcome of execution
154 * wouldn't change, because (in short) other thread cannot add
155 * elements to the @doing, and if node->real_node->right
156 * already was in @doing, value of node->real_node->right
157 * couldn't change, because node cannot be inserted between
160 if (find_carry_node(doing
,
161 reiser4_carry_real(node
)->right
) != NULL
) {
162 read_unlock_tree(tree
);
164 * What we are doing here (this is also applicable to
165 * the find_left_neighbor()).
167 * tree_walk.c code requires that insertion of a
168 * pointer to a child, modification of parent pointer
169 * in the child, and insertion of the child into
170 * sibling list are atomic (see
171 * plugin/item/internal.c:create_hook_internal()).
173 * carry allocates new node long before pointer to it
174 * is inserted into parent and, actually, long before
175 * parent is even known. Such allocated-but-orphaned
176 * nodes are only trackable through carry level lists.
178 * Situation that is handled here is following: @node
179 * has valid ->right pointer, but there is
180 * allocated-but-orphaned node in the carry queue that
181 * is logically between @node and @node->right. Here
182 * we are searching for it. Critical point is that
183 * this is only possible if @node->right is also in
184 * the carry queue (this is checked above), because
185 * this is the only way new orphaned node could be
186 * inserted between them (before inserting new node,
187 * make_space() first tries to shift to the right, so,
188 * right neighbor will be locked and queued).
193 right
= list_entry(right
->header
.level_linkage
.next
,
194 carry_node
, header
.level_linkage
);
195 assert("nikita-3408", !carry_node_end(doing
,
197 } while (reiser4_carry_real(right
) ==
198 reiser4_carry_real(node
));
202 read_unlock_tree(tree
);
204 flags
= GN_CAN_USE_UPPER_LEVELS
;
205 if (!op
->u
.insert
.flags
& COPI_LOAD_RIGHT
)
208 /* then, try to lock right neighbor */
210 result
= reiser4_get_right_neighbor(&lh
,
211 reiser4_carry_real(node
),
212 ZNODE_WRITE_LOCK
, flags
);
214 /* ok, node found and locked. */
215 right
= reiser4_add_carry_skip(doing
, POOLO_AFTER
, node
);
216 if (!IS_ERR(right
)) {
217 right
->node
= lh
.node
;
218 move_lh(&right
->lock_handle
, &lh
);
220 result
= lock_carry_node_tail(right
);
222 right
= ERR_PTR(result
);
224 } else if ((result
== -E_NO_NEIGHBOR
) || (result
== -ENOENT
)) {
225 /* node is rightmost node in a tree, or neighbor wasn't in
226 cache, or there is an extent on the right. */
229 right
= ERR_PTR(result
);
234 /* how much free space in a @node is needed for @op
236 How much space in @node is required for completion of @op, where @op is
237 insert or paste operation.
239 static unsigned int space_needed_for_op(znode
* node
/* znode data are
242 carry_op
* op
/* carry
245 assert("nikita-919", op
!= NULL
);
249 impossible("nikita-1701", "Wrong opcode");
251 return space_needed(node
, NULL
, op
->u
.insert
.d
->data
, 1);
253 return space_needed(node
, op
->u
.insert
.d
->coord
,
254 op
->u
.insert
.d
->data
, 0);
258 /* how much space in @node is required to insert or paste @data at
260 unsigned int space_needed(const znode
* node
/* node data are inserted or
262 const coord_t
* coord
/* coord where data are
265 const reiser4_item_data
* data
/* data to insert or
267 int insertion
/* non-0 is inserting, 0---paste */ )
272 assert("nikita-917", node
!= NULL
);
273 assert("nikita-918", node_plugin_by_node(node
) != NULL
);
274 assert("vs-230", !insertion
|| (coord
== NULL
));
278 if (iplug
->b
.estimate
!= NULL
) {
279 /* ask item plugin how much space is needed to insert this
281 result
+= iplug
->b
.estimate(insertion
? NULL
: coord
, data
);
283 /* reasonable default */
284 result
+= data
->length
;
290 /* and add node overhead */
291 if (nplug
->item_overhead
!= NULL
) {
292 result
+= nplug
->item_overhead(node
, NULL
);
298 /* find &coord in parent where pointer to new child is to be stored. */
299 static int find_new_child_coord(carry_op
* op
/* COP_INSERT carry operation to
300 * insert pointer to new
307 assert("nikita-941", op
!= NULL
);
308 assert("nikita-942", op
->op
== COP_INSERT
);
310 node
= reiser4_carry_real(op
->node
);
311 assert("nikita-943", node
!= NULL
);
312 assert("nikita-944", node_plugin_by_node(node
) != NULL
);
314 child
= reiser4_carry_real(op
->u
.insert
.child
);
316 find_new_child_ptr(node
, child
, op
->u
.insert
.brother
,
317 op
->u
.insert
.d
->coord
);
319 build_child_ptr_data(child
, op
->u
.insert
.d
->data
);
323 /* additional amount of free space in @node required to complete @op */
324 static int free_space_shortage(znode
* node
/* node to check */ ,
325 carry_op
* op
/* operation being performed */ )
327 assert("nikita-1061", node
!= NULL
);
328 assert("nikita-1062", op
!= NULL
);
332 impossible("nikita-1702", "Wrong opcode");
335 return space_needed_for_op(node
, op
) - znode_free_space(node
);
337 /* when inserting extent shift data around until insertion
338 point is utmost in the node. */
339 if (coord_wrt(op
->u
.insert
.d
->coord
) == COORD_INSIDE
)
346 /* helper function: update node pointer in operation after insertion
347 point was probably shifted into @target. */
348 static znode
*sync_op(carry_op
* op
, carry_node
* target
)
350 znode
*insertion_node
;
352 /* reget node from coord: shift might move insertion coord to
354 insertion_node
= op
->u
.insert
.d
->coord
->node
;
355 /* if insertion point was actually moved into new node,
356 update carry node pointer in operation. */
357 if (insertion_node
!= reiser4_carry_real(op
->node
)) {
359 assert("nikita-2540",
360 reiser4_carry_real(target
) == insertion_node
);
362 assert("nikita-2541",
363 reiser4_carry_real(op
->node
) == op
->u
.insert
.d
->coord
->node
);
364 return insertion_node
;
368 * complete make_space() call: update tracked lock handle if necessary. See
369 * comments for fs/reiser4/carry.h:carry_track_type
372 make_space_tail(carry_op
* op
, carry_level
* doing
, znode
* orig_node
)
375 carry_track_type tracking
;
378 tracking
= doing
->track_type
;
379 node
= op
->u
.insert
.d
->coord
->node
;
381 if (tracking
== CARRY_TRACK_NODE
||
382 (tracking
== CARRY_TRACK_CHANGE
&& node
!= orig_node
)) {
383 /* inserting or pasting into node different from
384 original. Update lock handle supplied by caller. */
385 assert("nikita-1417", doing
->tracked
!= NULL
);
386 done_lh(doing
->tracked
);
387 init_lh(doing
->tracked
);
388 result
= longterm_lock_znode(doing
->tracked
, node
,
396 /* This is insertion policy function. It shifts data to the left and right
397 neighbors of insertion coord and allocates new nodes until there is enough
398 free space to complete @op.
400 See comments in the body.
402 Assumes that the node format favors insertions at the right end of the node
405 See carry_flow() on detail about flow insertion
407 static int make_space(carry_op
* op
/* carry operation, insert or paste */ ,
408 carry_level
* doing
/* current carry queue */ ,
409 carry_level
* todo
/* carry queue on the parent level */ )
413 int not_enough_space
;
420 assert("nikita-890", op
!= NULL
);
421 assert("nikita-891", todo
!= NULL
);
423 op
->op
== COP_INSERT
||
424 op
->op
== COP_PASTE
|| op
->op
== COP_EXTENT
);
425 assert("nikita-1607",
426 reiser4_carry_real(op
->node
) == op
->u
.insert
.d
->coord
->node
);
428 flags
= op
->u
.insert
.flags
;
430 /* NOTE check that new node can only be allocated after checking left
431 * and right neighbors. This is necessary for proper work of
432 * find_{left,right}_neighbor(). */
433 assert("nikita-3410", ergo(flags
& COPI_DONT_ALLOCATE
,
434 flags
& COPI_DONT_SHIFT_LEFT
));
435 assert("nikita-3411", ergo(flags
& COPI_DONT_ALLOCATE
,
436 flags
& COPI_DONT_SHIFT_RIGHT
));
438 coord
= op
->u
.insert
.d
->coord
;
439 orig_node
= node
= coord
->node
;
441 assert("nikita-908", node
!= NULL
);
442 assert("nikita-909", node_plugin_by_node(node
) != NULL
);
445 /* If there is not enough space in a node, try to shift something to
446 the left neighbor. This is a bit tricky, as locking to the left is
447 low priority. This is handled by restart logic in carry().
449 not_enough_space
= free_space_shortage(node
, op
);
450 if (not_enough_space
<= 0)
451 /* it is possible that carry was called when there actually
452 was enough space in the node. For example, when inserting
453 leftmost item so that delimiting keys have to be updated.
455 return make_space_tail(op
, doing
, orig_node
);
456 if (!(flags
& COPI_DONT_SHIFT_LEFT
)) {
458 /* make note in statistics of an attempt to move
459 something into the left neighbor */
460 left
= find_left_neighbor(op
, doing
);
461 if (unlikely(IS_ERR(left
))) {
462 if (PTR_ERR(left
) == -E_REPEAT
)
465 /* some error other than restart request
466 occurred. This shouldn't happen. Issue a
467 warning and continue as if left neighbor
470 warning("nikita-924",
471 "Error accessing left neighbor: %li",
474 } else if (left
!= NULL
) {
476 /* shift everything possible on the left of and
477 including insertion coord into the left neighbor */
478 result
= carry_shift_data(LEFT_SIDE
, coord
,
479 reiser4_carry_real(left
),
481 flags
& COPI_GO_LEFT
);
483 /* reget node from coord: shift_left() might move
484 insertion coord to the left neighbor */
485 node
= sync_op(op
, left
);
487 not_enough_space
= free_space_shortage(node
, op
);
488 /* There is not enough free space in @node, but
489 may be, there is enough free space in
490 @left. Various balancing decisions are valid here.
491 The same for the shifiting to the right.
495 /* If there still is not enough space, shift to the right */
496 if (not_enough_space
> 0 && !(flags
& COPI_DONT_SHIFT_RIGHT
)) {
499 right
= find_right_neighbor(op
, doing
);
501 warning("nikita-1065",
502 "Error accessing right neighbor: %li",
504 } else if (right
!= NULL
) {
505 /* node containing insertion point, and its right
506 neighbor node are write locked by now.
508 shift everything possible on the right of but
509 excluding insertion coord into the right neighbor
511 result
= carry_shift_data(RIGHT_SIDE
, coord
,
512 reiser4_carry_real(right
),
514 flags
& COPI_GO_RIGHT
);
515 /* reget node from coord: shift_right() might move
516 insertion coord to the right neighbor */
517 node
= sync_op(op
, right
);
518 not_enough_space
= free_space_shortage(node
, op
);
521 /* If there is still not enough space, allocate new node(s).
523 We try to allocate new blocks if COPI_DONT_ALLOCATE is not set in
524 the carry operation flags (currently this is needed during flush
528 not_enough_space
> 0 && result
== 0 && blk_alloc
< 2 &&
529 !(flags
& COPI_DONT_ALLOCATE
); ++blk_alloc
) {
530 carry_node
*fresh
; /* new node we are allocating */
531 coord_t coord_shadow
; /* remembered insertion point before
532 * shifting data into new node */
533 carry_node
*node_shadow
; /* remembered insertion node before
535 unsigned int gointo
; /* whether insertion point should move
536 * into newly allocated node */
538 /* allocate new node on the right of @node. Znode and disk
539 fake block number for new node are allocated.
541 add_new_znode() posts carry operation COP_INSERT with
542 COPT_CHILD option to the parent level to add
543 pointer to newly created node to its parent.
545 Subtle point: if several new nodes are required to complete
546 insertion operation at this level, they will be inserted
547 into their parents in the order of creation, which means
548 that @node will be valid "cookie" at the time of insertion.
551 fresh
= add_new_znode(node
, op
->node
, doing
, todo
);
553 return PTR_ERR(fresh
);
555 /* Try to shift into new node. */
556 result
= lock_carry_node(doing
, fresh
);
557 zput(reiser4_carry_real(fresh
));
559 warning("nikita-947",
560 "Cannot lock new node: %i", result
);
564 /* both nodes are write locked by now.
566 shift everything possible on the right of and
567 including insertion coord into the right neighbor.
569 coord_dup(&coord_shadow
, op
->u
.insert
.d
->coord
);
570 node_shadow
= op
->node
;
571 /* move insertion point into newly created node if:
573 . insertion point is rightmost in the source node, or
574 . this is not the first node we are allocating in a row.
578 coord_is_after_rightmost(op
->u
.insert
.d
->coord
);
581 op
->op
== COP_PASTE
&&
582 coord_is_existing_item(op
->u
.insert
.d
->coord
) &&
583 is_solid_item((item_plugin_by_coord(op
->u
.insert
.d
->coord
)))) {
584 /* paste into solid (atomic) item, which can contain
585 only one unit, so we need to shift it right, where
586 insertion point supposed to be */
588 assert("edward-1444", op
->u
.insert
.d
->data
->iplug
==
589 item_plugin_by_id(STATIC_STAT_DATA_ID
));
590 assert("edward-1445",
591 op
->u
.insert
.d
->data
->length
>
592 node_plugin_by_node(coord
->node
)->free_space
595 op
->u
.insert
.d
->coord
->between
= BEFORE_UNIT
;
598 result
= carry_shift_data(RIGHT_SIDE
, coord
,
599 reiser4_carry_real(fresh
),
600 doing
, todo
, gointo
);
601 /* if insertion point was actually moved into new node,
602 update carry node pointer in operation. */
603 node
= sync_op(op
, fresh
);
604 not_enough_space
= free_space_shortage(node
, op
);
605 if ((not_enough_space
> 0) && (node
!= coord_shadow
.node
)) {
606 /* there is not enough free in new node. Shift
607 insertion point back to the @shadow_node so that
608 next new node would be inserted between
609 @shadow_node and @fresh.
611 coord_normalize(&coord_shadow
);
612 coord_dup(coord
, &coord_shadow
);
614 op
->node
= node_shadow
;
615 if (1 || (flags
& COPI_STEP_BACK
)) {
616 /* still not enough space?! Maybe there is
617 enough space in the source node (i.e., node
618 data are moved from) now.
621 free_space_shortage(node
, op
);
625 if (not_enough_space
> 0) {
626 if (!(flags
& COPI_DONT_ALLOCATE
))
627 warning("nikita-948", "Cannot insert new item");
628 result
= -E_NODE_FULL
;
630 assert("nikita-1622", ergo(result
== 0,
631 reiser4_carry_real(op
->node
) == coord
->node
));
632 assert("nikita-2616", coord
== op
->u
.insert
.d
->coord
);
634 result
= make_space_tail(op
, doing
, orig_node
);
638 /* insert_paste_common() - common part of insert and paste operations
640 This function performs common part of COP_INSERT and COP_PASTE.
642 There are two ways in which insertion/paste can be requested:
644 . by directly supplying reiser4_item_data. In this case, op ->
645 u.insert.type is set to COPT_ITEM_DATA.
647 . by supplying child pointer to which is to inserted into parent. In this
648 case op -> u.insert.type == COPT_CHILD.
650 . by supplying key of new item/unit. This is currently only used during
653 This is required, because when new node is allocated we don't know at what
654 position pointer to it is to be stored in the parent. Actually, we don't
655 even know what its parent will be, because parent can be re-balanced
656 concurrently and new node re-parented, and because parent can be full and
657 pointer to the new node will go into some other node.
659 insert_paste_common() resolves pointer to child node into position in the
660 parent by calling find_new_child_coord(), that fills
661 reiser4_item_data. After this, insertion/paste proceeds uniformly.
663 Another complication is with finding free space during pasting. It may
664 happen that while shifting items to the neighbors and newly allocated
665 nodes, insertion coord can no longer be in the item we wanted to paste
666 into. At this point, paste becomes (morphs) into insert. Moreover free
667 space analysis has to be repeated, because amount of space required for
668 insertion is different from that of paste (item header overhead, etc).
670 This function "unifies" different insertion modes (by resolving child
671 pointer or key into insertion coord), and then calls make_space() to free
672 enough space in the node by shifting data to the left and right and by
673 allocating new nodes if necessary. Carry operation knows amount of space
674 required for its completion. After enough free space is obtained, caller of
675 this function (carry_{insert,paste,etc.}) performs actual insertion/paste
676 by calling item plugin method.
679 static int insert_paste_common(carry_op
* op
/* carry operation being
681 carry_level
* doing
/* current carry level */ ,
682 carry_level
* todo
/* next carry level */ ,
683 carry_insert_data
* cdata
/* pointer to
685 coord_t
* coord
/* insertion/paste coord */ ,
686 reiser4_item_data
* data
/* data to be
687 * inserted/pasted */ )
689 assert("nikita-981", op
!= NULL
);
690 assert("nikita-980", todo
!= NULL
);
691 assert("nikita-979", (op
->op
== COP_INSERT
) || (op
->op
== COP_PASTE
)
692 || (op
->op
== COP_EXTENT
));
694 if (op
->u
.insert
.type
== COPT_PASTE_RESTARTED
) {
695 /* nothing to do. Fall through to make_space(). */
697 } else if (op
->u
.insert
.type
== COPT_KEY
) {
698 node_search_result intra_node
;
700 /* Problem with doing batching at the lowest level, is that
701 operations here are given by coords where modification is
702 to be performed, and one modification can invalidate coords
703 of all following operations.
705 So, we are implementing yet another type for operation that
706 will use (the only) "locator" stable across shifting of
707 data between nodes, etc.: key (COPT_KEY).
709 This clause resolves key to the coord in the node.
711 But node can change also. Probably some pieces have to be
712 added to the lock_carry_node(), to lock node by its key.
715 /* NOTE-NIKITA Lookup bias is fixed to FIND_EXACT. Complain
716 if you need something else. */
717 op
->u
.insert
.d
->coord
= coord
;
718 node
= reiser4_carry_real(op
->node
);
719 intra_node
= node_plugin_by_node(node
)->lookup
720 (node
, op
->u
.insert
.d
->key
, FIND_EXACT
,
721 op
->u
.insert
.d
->coord
);
722 if ((intra_node
!= NS_FOUND
) && (intra_node
!= NS_NOT_FOUND
)) {
723 warning("nikita-1715", "Intra node lookup failure: %i",
727 } else if (op
->u
.insert
.type
== COPT_CHILD
) {
728 /* if we are asked to insert pointer to the child into
729 internal node, first convert pointer to the child into
730 coord within parent node.
735 op
->u
.insert
.d
= cdata
;
736 op
->u
.insert
.d
->coord
= coord
;
737 op
->u
.insert
.d
->data
= data
;
738 op
->u
.insert
.d
->coord
->node
= reiser4_carry_real(op
->node
);
739 result
= find_new_child_coord(op
);
740 child
= reiser4_carry_real(op
->u
.insert
.child
);
741 if (result
!= NS_NOT_FOUND
) {
742 warning("nikita-993",
743 "Cannot find a place for child pointer: %i",
747 /* This only happens when we did multiple insertions at
748 the previous level, trying to insert single item and
749 it so happened, that insertion of pointers to all new
750 nodes before this one already caused parent node to
751 split (may be several times).
753 I am going to come up with better solution.
755 You are not expected to understand this.
756 -- v6root/usr/sys/ken/slp.c
758 Basically, what happens here is the following: carry came
759 to the parent level and is about to insert internal item
760 pointing to the child node that it just inserted in the
761 level below. Position where internal item is to be inserted
762 was found by find_new_child_coord() above, but node of the
763 current carry operation (that is, parent node of child
764 inserted on the previous level), was determined earlier in
765 the lock_carry_level/lock_carry_node. It could so happen
766 that other carry operations already performed on the parent
767 level already split parent node, so that insertion point
768 moved into another node. Handle this by creating new carry
769 node for insertion point if necessary.
771 if (reiser4_carry_real(op
->node
) !=
772 op
->u
.insert
.d
->coord
->node
) {
773 pool_ordering direction
;
780 * determine in what direction insertion point
781 * moved. Do this by comparing delimiting keys.
783 z1
= op
->u
.insert
.d
->coord
->node
;
784 z2
= reiser4_carry_real(op
->node
);
785 if (keyle(leftmost_key_in_node(z1
, &k1
),
786 leftmost_key_in_node(z2
, &k2
)))
787 /* insertion point moved to the left */
788 direction
= POOLO_BEFORE
;
790 /* insertion point moved to the right */
791 direction
= POOLO_AFTER
;
793 op
->node
= reiser4_add_carry_skip(doing
,
794 direction
, op
->node
);
795 if (IS_ERR(op
->node
))
796 return PTR_ERR(op
->node
);
797 op
->node
->node
= op
->u
.insert
.d
->coord
->node
;
799 result
= lock_carry_node(doing
, op
->node
);
805 * set up key of an item being inserted: we are inserting
806 * internal item and its key is (by the very definition of
807 * search tree) is leftmost key in the child node.
809 write_lock_dk(znode_get_tree(child
));
810 op
->u
.insert
.d
->key
= leftmost_key_in_node(child
,
811 znode_get_ld_key(child
));
812 write_unlock_dk(znode_get_tree(child
));
813 op
->u
.insert
.d
->data
->arg
= op
->u
.insert
.brother
;
815 assert("vs-243", op
->u
.insert
.d
->coord
!= NULL
);
816 op
->u
.insert
.d
->coord
->node
= reiser4_carry_real(op
->node
);
819 /* find free space. */
820 return make_space(op
, doing
, todo
);
823 /* handle carry COP_INSERT operation.
825 Insert new item into node. New item can be given in one of two ways:
827 - by passing &tree_coord and &reiser4_item_data as part of @op. This is
828 only applicable at the leaf/twig level.
830 - by passing a child node pointer to which is to be inserted by this
834 static int carry_insert(carry_op
* op
/* operation to perform */ ,
835 carry_level
* doing
/* queue of operations @op
837 carry_level
* todo
/* queue where new operations
838 * are accumulated */ )
841 carry_insert_data cdata
;
843 reiser4_item_data data
;
844 carry_plugin_info info
;
847 assert("nikita-1036", op
!= NULL
);
848 assert("nikita-1037", todo
!= NULL
);
849 assert("nikita-1038", op
->op
== COP_INSERT
);
851 coord_init_zero(&coord
);
853 /* perform common functionality of insert and paste. */
854 result
= insert_paste_common(op
, doing
, todo
, &cdata
, &coord
, &data
);
858 node
= op
->u
.insert
.d
->coord
->node
;
859 assert("nikita-1039", node
!= NULL
);
860 assert("nikita-1040", node_plugin_by_node(node
) != NULL
);
863 space_needed_for_op(node
, op
) <= znode_free_space(node
));
865 /* ask node layout to create new item. */
868 result
= node_plugin_by_node(node
)->create_item
869 (op
->u
.insert
.d
->coord
, op
->u
.insert
.d
->key
, op
->u
.insert
.d
->data
,
871 doing
->restartable
= 0;
872 znode_make_dirty(node
);
878 * Flow insertion code. COP_INSERT_FLOW is special tree operation that is
879 * supplied with a "flow" (that is, a stream of data) and inserts it into tree
880 * by slicing into multiple items.
883 #define flow_insert_point(op) ( ( op ) -> u.insert_flow.insert_point )
884 #define flow_insert_flow(op) ( ( op ) -> u.insert_flow.flow )
885 #define flow_insert_data(op) ( ( op ) -> u.insert_flow.data )
887 static size_t item_data_overhead(carry_op
* op
)
889 if (flow_insert_data(op
)->iplug
->b
.estimate
== NULL
)
891 return (flow_insert_data(op
)->iplug
->b
.
892 estimate(NULL
/* estimate insertion */ , flow_insert_data(op
)) -
893 flow_insert_data(op
)->length
);
896 /* FIXME-VS: this is called several times during one make_flow_for_insertion
897 and it will always return the same result. Some optimization could be made
898 by calculating this value once at the beginning and passing it around. That
899 would reduce some flexibility in future changes
901 static int can_paste(coord_t
*, const reiser4_key
*, const reiser4_item_data
*);
902 static size_t flow_insertion_overhead(carry_op
* op
)
905 size_t insertion_overhead
;
907 node
= flow_insert_point(op
)->node
;
908 insertion_overhead
= 0;
909 if (node
->nplug
->item_overhead
&&
910 !can_paste(flow_insert_point(op
), &flow_insert_flow(op
)->key
,
911 flow_insert_data(op
)))
913 node
->nplug
->item_overhead(node
, NULL
) +
914 item_data_overhead(op
);
915 return insertion_overhead
;
918 /* how many bytes of flow does fit to the node */
919 static int what_can_fit_into_node(carry_op
* op
)
921 size_t free
, overhead
;
923 overhead
= flow_insertion_overhead(op
);
924 free
= znode_free_space(flow_insert_point(op
)->node
);
925 if (free
<= overhead
)
928 /* FIXME: flow->length is loff_t only to not get overflowed in case of expandign truncate */
929 if (free
< op
->u
.insert_flow
.flow
->length
)
931 return (int)op
->u
.insert_flow
.flow
->length
;
934 /* in make_space_for_flow_insertion we need to check either whether whole flow
935 fits into a node or whether minimal fraction of flow fits into a node */
936 static int enough_space_for_whole_flow(carry_op
* op
)
938 return (unsigned)what_can_fit_into_node(op
) ==
939 op
->u
.insert_flow
.flow
->length
;
942 #define MIN_FLOW_FRACTION 1
943 static int enough_space_for_min_flow_fraction(carry_op
* op
)
945 assert("vs-902", coord_is_after_rightmost(flow_insert_point(op
)));
947 return what_can_fit_into_node(op
) >= MIN_FLOW_FRACTION
;
950 /* this returns 0 if left neighbor was obtained successfully and everything
951 upto insertion point including it were shifted and left neighbor still has
952 some free space to put minimal fraction of flow into it */
954 make_space_by_shift_left(carry_op
* op
, carry_level
* doing
, carry_level
* todo
)
959 left
= find_left_neighbor(op
, doing
);
960 if (unlikely(IS_ERR(left
))) {
962 "make_space_by_shift_left: "
963 "error accessing left neighbor: %li", PTR_ERR(left
));
967 /* left neighbor either does not exist or is unformatted
971 orig
= flow_insert_point(op
)->node
;
972 /* try to shift content of node @orig from its head upto insert point
973 including insertion point into the left neighbor */
974 carry_shift_data(LEFT_SIDE
, flow_insert_point(op
),
975 reiser4_carry_real(left
), doing
, todo
,
976 1 /* including insert point */);
977 if (reiser4_carry_real(left
) != flow_insert_point(op
)->node
) {
978 /* insertion point did not move */
982 /* insertion point is set after last item in the node */
983 assert("vs-900", coord_is_after_rightmost(flow_insert_point(op
)));
985 if (!enough_space_for_min_flow_fraction(op
)) {
986 /* insertion point node does not have enough free space to put
987 even minimal portion of flow into it, therefore, move
988 insertion point back to orig node (before first item) */
989 coord_init_before_first_item(flow_insert_point(op
), orig
);
993 /* part of flow is to be written to the end of node */
998 /* this returns 0 if right neighbor was obtained successfully and everything to
999 the right of insertion point was shifted to it and node got enough free
1000 space to put minimal fraction of flow into it */
1002 make_space_by_shift_right(carry_op
* op
, carry_level
* doing
,
1007 right
= find_right_neighbor(op
, doing
);
1008 if (unlikely(IS_ERR(right
))) {
1009 warning("nikita-1065", "shift_right_excluding_insert_point: "
1010 "error accessing right neighbor: %li", PTR_ERR(right
));
1014 /* shift everything possible on the right of but excluding
1015 insertion coord into the right neighbor */
1016 carry_shift_data(RIGHT_SIDE
, flow_insert_point(op
),
1017 reiser4_carry_real(right
), doing
, todo
,
1018 0 /* not including insert point */);
1020 /* right neighbor either does not exist or is unformatted
1024 if (coord_is_after_rightmost(flow_insert_point(op
))) {
1025 if (enough_space_for_min_flow_fraction(op
)) {
1026 /* part of flow is to be written to the end of node */
1031 /* new node is to be added if insert point node did not get enough
1032 space for whole flow */
1036 /* this returns 0 when insert coord is set at the node end and fraction of flow
1037 fits into that node */
1039 make_space_by_new_nodes(carry_op
* op
, carry_level
* doing
, carry_level
* todo
)
1045 node
= flow_insert_point(op
)->node
;
1047 if (op
->u
.insert_flow
.new_nodes
== CARRY_FLOW_NEW_NODES_LIMIT
)
1048 return RETERR(-E_NODE_FULL
);
1049 /* add new node after insert point node */
1050 new = add_new_znode(node
, op
->node
, doing
, todo
);
1051 if (unlikely(IS_ERR(new))) {
1052 return PTR_ERR(new);
1054 result
= lock_carry_node(doing
, new);
1055 zput(reiser4_carry_real(new));
1056 if (unlikely(result
)) {
1059 op
->u
.insert_flow
.new_nodes
++;
1060 if (!coord_is_after_rightmost(flow_insert_point(op
))) {
1061 carry_shift_data(RIGHT_SIDE
, flow_insert_point(op
),
1062 reiser4_carry_real(new), doing
, todo
,
1063 0 /* not including insert point */);
1065 coord_is_after_rightmost(flow_insert_point(op
)));
1067 if (enough_space_for_min_flow_fraction(op
)) {
1070 if (op
->u
.insert_flow
.new_nodes
== CARRY_FLOW_NEW_NODES_LIMIT
)
1071 return RETERR(-E_NODE_FULL
);
1073 /* add one more new node */
1074 new = add_new_znode(node
, op
->node
, doing
, todo
);
1075 if (unlikely(IS_ERR(new))) {
1076 return PTR_ERR(new);
1078 result
= lock_carry_node(doing
, new);
1079 zput(reiser4_carry_real(new));
1080 if (unlikely(result
)) {
1083 op
->u
.insert_flow
.new_nodes
++;
1086 /* move insertion point to new node */
1087 coord_init_before_first_item(flow_insert_point(op
),
1088 reiser4_carry_real(new));
1094 make_space_for_flow_insertion(carry_op
* op
, carry_level
* doing
,
1097 __u32 flags
= op
->u
.insert_flow
.flags
;
1099 if (enough_space_for_whole_flow(op
)) {
1100 /* whole flow fits into insert point node */
1104 if (!(flags
& COPI_DONT_SHIFT_LEFT
)
1105 && (make_space_by_shift_left(op
, doing
, todo
) == 0)) {
1106 /* insert point is shifted to left neighbor of original insert
1107 point node and is set after last unit in that node. It has
1108 enough space to fit at least minimal fraction of flow. */
1112 if (enough_space_for_whole_flow(op
)) {
1113 /* whole flow fits into insert point node */
1117 if (!(flags
& COPI_DONT_SHIFT_RIGHT
)
1118 && (make_space_by_shift_right(op
, doing
, todo
) == 0)) {
1119 /* insert point is still set to the same node, but there is
1120 nothing to the right of insert point. */
1124 if (enough_space_for_whole_flow(op
)) {
1125 /* whole flow fits into insert point node */
1129 return make_space_by_new_nodes(op
, doing
, todo
);
1132 /* implements COP_INSERT_FLOW operation */
1134 carry_insert_flow(carry_op
* op
, carry_level
* doing
, carry_level
* todo
)
1138 coord_t
*insert_point
;
1140 carry_plugin_info info
;
1142 lock_handle
*orig_lh
;
1144 f
= op
->u
.insert_flow
.flow
;
1147 /* carry system needs this to work */
1151 orig_node
= flow_insert_point(op
)->node
;
1152 orig_lh
= doing
->tracked
;
1155 result
= make_space_for_flow_insertion(op
, doing
, todo
);
1159 insert_point
= flow_insert_point(op
);
1160 nplug
= node_plugin_by_node(insert_point
->node
);
1162 /* compose item data for insertion/pasting */
1163 flow_insert_data(op
)->data
= f
->data
;
1164 flow_insert_data(op
)->length
= what_can_fit_into_node(op
);
1166 if (can_paste(insert_point
, &f
->key
, flow_insert_data(op
))) {
1167 /* insert point is set to item of file we are writing to and we have to append to it */
1168 assert("vs-903", insert_point
->between
== AFTER_UNIT
);
1169 nplug
->change_item_size(insert_point
,
1170 flow_insert_data(op
)->length
);
1171 flow_insert_data(op
)->iplug
->b
.paste(insert_point
,
1175 /* new item must be inserted */
1176 pos_in_node_t new_pos
;
1177 flow_insert_data(op
)->length
+= item_data_overhead(op
);
1179 /* FIXME-VS: this is because node40_create_item changes
1180 insert_point for obscure reasons */
1181 switch (insert_point
->between
) {
1183 new_pos
= insert_point
->item_pos
+ 1;
1189 assert("vs-905", insert_point
->item_pos
== 0);
1193 impossible("vs-906",
1194 "carry_insert_flow: invalid coord");
1199 nplug
->create_item(insert_point
, &f
->key
,
1200 flow_insert_data(op
), &info
);
1201 coord_set_item_pos(insert_point
, new_pos
);
1203 coord_init_after_item_end(insert_point
);
1204 doing
->restartable
= 0;
1205 znode_make_dirty(insert_point
->node
);
1207 move_flow_forward(f
, (unsigned)flow_insert_data(op
)->length
);
1210 if (orig_node
!= flow_insert_point(op
)->node
) {
1211 /* move lock to new insert point */
1215 longterm_lock_znode(orig_lh
, flow_insert_point(op
)->node
,
1216 ZNODE_WRITE_LOCK
, ZNODE_LOCK_HIPRI
);
1222 /* implements COP_DELETE operation
1224 Remove pointer to @op -> u.delete.child from it's parent.
1226 This function also handles killing of a tree root is last pointer from it
1227 was removed. This is complicated by our handling of "twig" level: root on
1228 twig level is never killed.
1231 static int carry_delete(carry_op
* op
/* operation to be performed */ ,
1232 carry_level
* doing UNUSED_ARG
/* current carry
1234 carry_level
* todo
/* next carry level */ )
1241 carry_plugin_info info
;
1245 * This operation is called to delete internal item pointing to the
1246 * child node that was removed by carry from the tree on the previous
1250 assert("nikita-893", op
!= NULL
);
1251 assert("nikita-894", todo
!= NULL
);
1252 assert("nikita-895", op
->op
== COP_DELETE
);
1254 coord_init_zero(&coord
);
1255 coord_init_zero(&coord2
);
1257 parent
= reiser4_carry_real(op
->node
);
1258 child
= op
->u
.delete.child
?
1259 reiser4_carry_real(op
->u
.delete.child
) : op
->node
->node
;
1260 tree
= znode_get_tree(child
);
1261 read_lock_tree(tree
);
1264 * @parent was determined when carry entered parent level
1265 * (lock_carry_level/lock_carry_node). Since then, actual parent of
1266 * @child node could change due to other carry operations performed on
1267 * the parent level. Check for this.
1270 if (znode_parent(child
) != parent
) {
1271 /* NOTE-NIKITA add stat counter for this. */
1272 parent
= znode_parent(child
);
1273 assert("nikita-2581", find_carry_node(doing
, parent
));
1275 read_unlock_tree(tree
);
1277 assert("nikita-1213", znode_get_level(parent
) > LEAF_LEVEL
);
1279 /* Twig level horrors: tree should be of height at least 2. So, last
1280 pointer from the root at twig level is preserved even if child is
1281 empty. This is ugly, but so it was architectured.
1284 if (znode_is_root(parent
) &&
1285 znode_get_level(parent
) <= REISER4_MIN_TREE_HEIGHT
&&
1286 node_num_items(parent
) == 1) {
1287 /* Delimiting key manipulations. */
1288 write_lock_dk(tree
);
1289 znode_set_ld_key(child
, znode_set_ld_key(parent
, reiser4_min_key()));
1290 znode_set_rd_key(child
, znode_set_rd_key(parent
, reiser4_max_key()));
1291 ZF_SET(child
, JNODE_DKSET
);
1292 write_unlock_dk(tree
);
1294 /* @child escaped imminent death! */
1295 ZF_CLR(child
, JNODE_HEARD_BANSHEE
);
1299 /* convert child pointer to the coord_t */
1300 result
= find_child_ptr(parent
, child
, &coord
);
1301 if (result
!= NS_FOUND
) {
1302 warning("nikita-994", "Cannot find child pointer: %i", result
);
1303 print_coord_content("coord", &coord
);
1307 coord_dup(&coord2
, &coord
);
1312 * Actually kill internal item: prepare structure with
1313 * arguments for ->cut_and_kill() method...
1316 struct carry_kill_data kdata
;
1317 kdata
.params
.from
= &coord
;
1318 kdata
.params
.to
= &coord2
;
1319 kdata
.params
.from_key
= NULL
;
1320 kdata
.params
.to_key
= NULL
;
1321 kdata
.params
.smallest_removed
= NULL
;
1322 kdata
.params
.truncate
= 1;
1323 kdata
.flags
= op
->u
.delete.flags
;
1328 /* ... and call it. */
1329 result
= node_plugin_by_node(parent
)->cut_and_kill(&kdata
,
1332 doing
->restartable
= 0;
1334 /* check whether root should be killed violently */
1335 if (znode_is_root(parent
) &&
1336 /* don't kill roots at and lower than twig level */
1337 znode_get_level(parent
) > REISER4_MIN_TREE_HEIGHT
&&
1338 node_num_items(parent
) == 1) {
1339 result
= reiser4_kill_tree_root(coord
.node
);
1342 return result
< 0 ? : 0;
1345 /* implements COP_CUT opration
1347 Cuts part or whole content of node.
1350 static int carry_cut(carry_op
* op
/* operation to be performed */ ,
1351 carry_level
* doing
/* current carry level */ ,
1352 carry_level
* todo
/* next carry level */ )
1355 carry_plugin_info info
;
1358 assert("nikita-896", op
!= NULL
);
1359 assert("nikita-897", todo
!= NULL
);
1360 assert("nikita-898", op
->op
== COP_CUT
);
1365 nplug
= node_plugin_by_node(reiser4_carry_real(op
->node
));
1366 if (op
->u
.cut_or_kill
.is_cut
)
1367 result
= nplug
->cut(op
->u
.cut_or_kill
.u
.cut
, &info
);
1369 result
= nplug
->cut_and_kill(op
->u
.cut_or_kill
.u
.kill
, &info
);
1371 doing
->restartable
= 0;
1372 return result
< 0 ? : 0;
1375 /* helper function for carry_paste(): returns true if @op can be continued as
1378 can_paste(coord_t
* icoord
, const reiser4_key
* key
,
1379 const reiser4_item_data
* data
)
1382 item_plugin
*new_iplug
;
1383 item_plugin
*old_iplug
;
1384 int result
= 0; /* to keep gcc shut */
1386 assert("", icoord
->between
!= AT_UNIT
);
1388 /* obviously, one cannot paste when node is empty---there is nothing
1390 if (node_is_empty(icoord
->node
))
1392 /* if insertion point is at the middle of the item, then paste */
1393 if (!coord_is_between_items(icoord
))
1395 coord_dup(&circa
, icoord
);
1396 circa
.between
= AT_UNIT
;
1398 old_iplug
= item_plugin_by_coord(&circa
);
1399 new_iplug
= data
->iplug
;
1401 /* check whether we can paste to the item @icoord is "at" when we
1402 ignore ->between field */
1403 if (old_iplug
== new_iplug
&& item_can_contain_key(&circa
, key
, data
)) {
1405 } else if (icoord
->between
== BEFORE_UNIT
1406 || icoord
->between
== BEFORE_ITEM
) {
1407 /* otherwise, try to glue to the item at the left, if any */
1408 coord_dup(&circa
, icoord
);
1409 if (coord_set_to_left(&circa
)) {
1411 coord_init_before_item(icoord
);
1413 old_iplug
= item_plugin_by_coord(&circa
);
1414 result
= (old_iplug
== new_iplug
)
1415 && item_can_contain_key(icoord
, key
, data
);
1417 coord_dup(icoord
, &circa
);
1418 icoord
->between
= AFTER_UNIT
;
1421 } else if (icoord
->between
== AFTER_UNIT
1422 || icoord
->between
== AFTER_ITEM
) {
1423 coord_dup(&circa
, icoord
);
1424 /* otherwise, try to glue to the item at the right, if any */
1425 if (coord_set_to_right(&circa
)) {
1427 coord_init_after_item(icoord
);
1429 int (*cck
) (const coord_t
*, const reiser4_key
*,
1430 const reiser4_item_data
*);
1432 old_iplug
= item_plugin_by_coord(&circa
);
1434 cck
= old_iplug
->b
.can_contain_key
;
1436 /* item doesn't define ->can_contain_key
1437 method? So it is not expandable. */
1440 result
= (old_iplug
== new_iplug
)
1441 && cck(&circa
/*icoord */ , key
, data
);
1443 coord_dup(icoord
, &circa
);
1444 icoord
->between
= BEFORE_UNIT
;
1449 impossible("nikita-2513", "Nothing works");
1451 if (icoord
->between
== BEFORE_ITEM
) {
1452 assert("vs-912", icoord
->unit_pos
== 0);
1453 icoord
->between
= BEFORE_UNIT
;
1454 } else if (icoord
->between
== AFTER_ITEM
) {
1455 coord_init_after_item_end(icoord
);
1461 /* implements COP_PASTE operation
1463 Paste data into existing item. This is complicated by the fact that after
1464 we shifted something to the left or right neighbors trying to free some
1465 space, item we were supposed to paste into can be in different node than
1466 insertion coord. If so, we are no longer doing paste, but insert. See
1467 comments in insert_paste_common().
1470 static int carry_paste(carry_op
* op
/* operation to be performed */ ,
1471 carry_level
* doing UNUSED_ARG
/* current carry
1473 carry_level
* todo
/* next carry level */ )
1476 carry_insert_data cdata
;
1478 reiser4_item_data data
;
1482 carry_plugin_info info
;
1485 assert("nikita-982", op
!= NULL
);
1486 assert("nikita-983", todo
!= NULL
);
1487 assert("nikita-984", op
->op
== COP_PASTE
);
1489 coord_init_zero(&dcoord
);
1491 result
= insert_paste_common(op
, doing
, todo
, &cdata
, &dcoord
, &data
);
1495 coord
= op
->u
.insert
.d
->coord
;
1497 /* handle case when op -> u.insert.coord doesn't point to the item
1498 of required type. restart as insert. */
1499 if (!can_paste(coord
, op
->u
.insert
.d
->key
, op
->u
.insert
.d
->data
)) {
1500 op
->op
= COP_INSERT
;
1501 op
->u
.insert
.type
= COPT_PASTE_RESTARTED
;
1502 result
= op_dispatch_table
[COP_INSERT
].handler(op
, doing
, todo
);
1508 iplug
= item_plugin_by_coord(coord
);
1509 assert("nikita-992", iplug
!= NULL
);
1511 assert("nikita-985", node
!= NULL
);
1512 assert("nikita-986", node_plugin_by_node(node
) != NULL
);
1514 assert("nikita-987",
1515 space_needed_for_op(node
, op
) <= znode_free_space(node
));
1517 assert("nikita-1286", coord_is_existing_item(coord
));
1520 * if item is expanded as a result of this operation, we should first
1521 * change item size, than call ->b.paste item method. If item is
1522 * shrunk, it should be done other way around: first call ->b.paste
1523 * method, then reduce item size.
1526 real_size
= space_needed_for_op(node
, op
);
1528 node
->nplug
->change_item_size(coord
, real_size
);
1530 doing
->restartable
= 0;
1534 result
= iplug
->b
.paste(coord
, op
->u
.insert
.d
->data
, &info
);
1537 node
->nplug
->change_item_size(coord
, real_size
);
1539 /* if we pasted at the beginning of the item, update item's key. */
1540 if (coord
->unit_pos
== 0 && coord
->between
!= AFTER_UNIT
)
1541 node
->nplug
->update_item_key(coord
, op
->u
.insert
.d
->key
, &info
);
1543 znode_make_dirty(node
);
1547 /* handle carry COP_EXTENT operation. */
1548 static int carry_extent(carry_op
* op
/* operation to perform */ ,
1549 carry_level
* doing
/* queue of operations @op
1551 carry_level
* todo
/* queue where new operations
1552 * are accumulated */ )
1555 carry_insert_data cdata
;
1557 reiser4_item_data data
;
1558 carry_op
*delete_dummy
;
1559 carry_op
*insert_extent
;
1561 carry_plugin_info info
;
1563 assert("nikita-1751", op
!= NULL
);
1564 assert("nikita-1752", todo
!= NULL
);
1565 assert("nikita-1753", op
->op
== COP_EXTENT
);
1567 /* extent insertion overview:
1569 extents live on the TWIG LEVEL, which is level one above the leaf
1570 one. This complicates extent insertion logic somewhat: it may
1571 happen (and going to happen all the time) that in logical key
1572 ordering extent has to be placed between items I1 and I2, located
1573 at the leaf level, but I1 and I2 are in the same formatted leaf
1574 node N1. To insert extent one has to
1576 (1) reach node N1 and shift data between N1, its neighbors and
1577 possibly newly allocated nodes until I1 and I2 fall into different
1578 nodes. Since I1 and I2 are still neighboring items in logical key
1579 order, they will be necessary utmost items in their respective
1582 (2) After this new extent item is inserted into node on the twig
1585 Fortunately this process can reuse almost all code from standard
1586 insertion procedure (viz. make_space() and insert_paste_common()),
1587 due to the following observation: make_space() only shifts data up
1588 to and excluding or including insertion point. It never
1589 "over-moves" through insertion point. Thus, one can use
1590 make_space() to perform step (1). All required for this is just to
1591 instruct free_space_shortage() to keep make_space() shifting data
1592 until insertion point is at the node border.
1596 /* perform common functionality of insert and paste. */
1597 result
= insert_paste_common(op
, doing
, todo
, &cdata
, &coord
, &data
);
1601 node
= op
->u
.extent
.d
->coord
->node
;
1602 assert("nikita-1754", node
!= NULL
);
1603 assert("nikita-1755", node_plugin_by_node(node
) != NULL
);
1604 assert("nikita-1700", coord_wrt(op
->u
.extent
.d
->coord
) != COORD_INSIDE
);
1606 /* NOTE-NIKITA add some checks here. Not assertions, -EIO. Check that
1607 extent fits between items. */
1612 /* there is another complication due to placement of extents on the
1613 twig level: extents are "rigid" in the sense that key-range
1614 occupied by extent cannot grow indefinitely to the right as it is
1615 for the formatted leaf nodes. Because of this when search finds two
1616 adjacent extents on the twig level, it has to "drill" to the leaf
1617 level, creating new node. Here we are removing this node.
1619 if (node_is_empty(node
)) {
1620 delete_dummy
= node_post_carry(&info
, COP_DELETE
, node
, 1);
1621 if (IS_ERR(delete_dummy
))
1622 return PTR_ERR(delete_dummy
);
1623 delete_dummy
->u
.delete.child
= NULL
;
1624 delete_dummy
->u
.delete.flags
= DELETE_RETAIN_EMPTY
;
1625 ZF_SET(node
, JNODE_HEARD_BANSHEE
);
1628 /* proceed with inserting extent item into parent. We are definitely
1629 inserting rather than pasting if we get that far. */
1630 insert_extent
= node_post_carry(&info
, COP_INSERT
, node
, 1);
1631 if (IS_ERR(insert_extent
))
1632 /* @delete_dummy will be automatically destroyed on the level
1634 return PTR_ERR(insert_extent
);
1635 /* NOTE-NIKITA insertion by key is simplest option here. Another
1636 possibility is to insert on the left or right of already existing
1639 insert_extent
->u
.insert
.type
= COPT_KEY
;
1640 insert_extent
->u
.insert
.d
= op
->u
.extent
.d
;
1641 assert("nikita-1719", op
->u
.extent
.d
->key
!= NULL
);
1642 insert_extent
->u
.insert
.d
->data
->arg
= op
->u
.extent
.d
->coord
;
1643 insert_extent
->u
.insert
.flags
=
1644 znode_get_tree(node
)->carry
.new_extent_flags
;
1647 * if carry was asked to track lock handle we should actually track
1648 * lock handle on the twig node rather than on the leaf where
1649 * operation was started from. Transfer tracked lock handle.
1651 if (doing
->track_type
) {
1652 assert("nikita-3242", doing
->tracked
!= NULL
);
1653 assert("nikita-3244", todo
->tracked
== NULL
);
1654 todo
->tracked
= doing
->tracked
;
1655 todo
->track_type
= CARRY_TRACK_NODE
;
1656 doing
->tracked
= NULL
;
1657 doing
->track_type
= 0;
1663 /* update key in @parent between pointers to @left and @right.
1665 Find coords of @left and @right and update delimiting key between them.
1666 This is helper function called by carry_update(). Finds position of
1667 internal item involved. Updates item key. Updates delimiting keys of child
1670 static int update_delimiting_key(znode
* parent
/* node key is updated
1672 znode
* left
/* child of @parent */ ,
1673 znode
* right
/* child of @parent */ ,
1674 carry_level
* doing
/* current carry
1676 carry_level
* todo
/* parent carry
1678 const char **error_msg
/* place to
1686 carry_plugin_info info
;
1688 assert("nikita-1177", right
!= NULL
);
1689 /* find position of right left child in a parent */
1690 result
= find_child_ptr(parent
, right
, &right_pos
);
1691 if (result
!= NS_FOUND
) {
1692 *error_msg
= "Cannot find position of right child";
1696 if ((left
!= NULL
) && !coord_is_leftmost_unit(&right_pos
)) {
1697 /* find position of the left child in a parent */
1698 result
= find_child_ptr(parent
, left
, &left_pos
);
1699 if (result
!= NS_FOUND
) {
1700 *error_msg
= "Cannot find position of left child";
1703 assert("nikita-1355", left_pos
.node
!= NULL
);
1705 left_pos
.node
= NULL
;
1707 /* check that they are separated by exactly one key and are basically
1709 if (REISER4_DEBUG
) {
1710 if ((left_pos
.node
!= NULL
)
1711 && !coord_is_existing_unit(&left_pos
)) {
1712 *error_msg
= "Left child is bastard";
1713 return RETERR(-EIO
);
1715 if (!coord_is_existing_unit(&right_pos
)) {
1716 *error_msg
= "Right child is bastard";
1717 return RETERR(-EIO
);
1719 if (left_pos
.node
!= NULL
&&
1720 !coord_are_neighbors(&left_pos
, &right_pos
)) {
1721 *error_msg
= "Children are not direct siblings";
1722 return RETERR(-EIO
);
1731 * If child node is not empty, new key of internal item is a key of
1732 * leftmost item in the child node. If the child is empty, take its
1733 * right delimiting key as a new key of the internal item. Precise key
1734 * in the latter case is not important per se, because the child (and
1735 * the internal item) are going to be killed shortly anyway, but we
1736 * have to preserve correct order of keys in the parent node.
1739 if (!ZF_ISSET(right
, JNODE_HEARD_BANSHEE
))
1740 leftmost_key_in_node(right
, &ldkey
);
1742 read_lock_dk(znode_get_tree(parent
));
1743 ldkey
= *znode_get_rd_key(right
);
1744 read_unlock_dk(znode_get_tree(parent
));
1746 node_plugin_by_node(parent
)->update_item_key(&right_pos
, &ldkey
, &info
);
1747 doing
->restartable
= 0;
1748 znode_make_dirty(parent
);
1752 /* implements COP_UPDATE opration
1754 Update delimiting keys.
1757 static int carry_update(carry_op
* op
/* operation to be performed */ ,
1758 carry_level
* doing
/* current carry level */ ,
1759 carry_level
* todo
/* next carry level */ )
1762 carry_node
*missing UNUSED_ARG
;
1767 const char *error_msg
;
1771 * This operation is called to update key of internal item. This is
1772 * necessary when carry shifted of cut data on the child
1773 * level. Arguments of this operation are:
1775 * @right --- child node. Operation should update key of internal
1776 * item pointing to @right.
1778 * @left --- left neighbor of @right. This parameter is optional.
1781 assert("nikita-902", op
!= NULL
);
1782 assert("nikita-903", todo
!= NULL
);
1783 assert("nikita-904", op
->op
== COP_UPDATE
);
1785 lchild
= op
->u
.update
.left
;
1788 if (lchild
!= NULL
) {
1789 assert("nikita-1001", lchild
->parent
);
1790 assert("nikita-1003", !lchild
->left
);
1791 left
= reiser4_carry_real(lchild
);
1795 tree
= znode_get_tree(rchild
->node
);
1796 read_lock_tree(tree
);
1797 right
= znode_parent(rchild
->node
);
1798 read_unlock_tree(tree
);
1800 if (right
!= NULL
) {
1801 result
= update_delimiting_key(right
,
1802 lchild
? lchild
->node
: NULL
,
1804 doing
, todo
, &error_msg
);
1806 error_msg
= "Cannot find node to update key in";
1807 result
= RETERR(-EIO
);
1809 /* operation will be reposted to the next level by the
1810 ->update_item_key() method of node plugin, if necessary. */
1813 warning("nikita-999", "Error updating delimiting key: %s (%i)",
1814 error_msg
? : "", result
);
1819 /* move items from @node during carry */
1820 static int carry_shift_data(sideof side
/* in what direction to move data */ ,
1821 coord_t
* insert_coord
/* coord where new item
1822 * is to be inserted */ ,
1823 znode
* node
/* node which data are moved from */ ,
1824 carry_level
* doing
/* active carry queue */ ,
1825 carry_level
* todo
/* carry queue where new
1826 * operations are to be put
1828 unsigned int including_insert_coord_p
/* true if
1834 carry_plugin_info info
;
1837 source
= insert_coord
->node
;
1842 nplug
= node_plugin_by_node(node
);
1843 result
= nplug
->shift(insert_coord
, node
,
1844 (side
== LEFT_SIDE
) ? SHIFT_LEFT
: SHIFT_RIGHT
, 0,
1845 (int)including_insert_coord_p
, &info
);
1846 /* the only error ->shift() method of node plugin can return is
1847 -ENOMEM due to carry node/operation allocation. */
1848 assert("nikita-915", result
>= 0 || result
== -ENOMEM
);
1851 * if some number of bytes was actually shifted, mark nodes
1852 * dirty, and carry level as non-restartable.
1854 doing
->restartable
= 0;
1855 znode_make_dirty(source
);
1856 znode_make_dirty(node
);
1859 assert("nikita-2077", coord_check(insert_coord
));
1863 typedef carry_node
*(*carry_iterator
) (carry_node
* node
);
1864 static carry_node
*find_dir_carry(carry_node
* node
, carry_level
* level
,
1865 carry_iterator iterator
);
1867 static carry_node
*pool_level_list_prev(carry_node
*node
)
1869 return list_entry(node
->header
.level_linkage
.prev
, carry_node
, header
.level_linkage
);
1872 /* look for the left neighbor of given carry node in a carry queue.
1874 This is used by find_left_neighbor(), but I am not sure that this
1875 really gives any advantage. More statistics required.
1878 carry_node
*find_left_carry(carry_node
* node
/* node to find left neighbor
1880 carry_level
* level
/* level to scan */ )
1882 return find_dir_carry(node
, level
,
1883 (carry_iterator
) pool_level_list_prev
);
1886 static carry_node
*pool_level_list_next(carry_node
*node
)
1888 return list_entry(node
->header
.level_linkage
.next
, carry_node
, header
.level_linkage
);
1891 /* look for the right neighbor of given carry node in a
1894 This is used by find_right_neighbor(), but I am not sure that this
1895 really gives any advantage. More statistics required.
1898 carry_node
*find_right_carry(carry_node
* node
/* node to find right neighbor
1900 carry_level
* level
/* level to scan */ )
1902 return find_dir_carry(node
, level
,
1903 (carry_iterator
) pool_level_list_next
);
1906 /* look for the left or right neighbor of given carry node in a carry
1909 Helper function used by find_{left|right}_carry().
1911 static carry_node
*find_dir_carry(carry_node
* node
/* node to start scanning
1913 carry_level
* level
/* level to scan */ ,
1914 carry_iterator iterator
/* operation to
1918 carry_node
*neighbor
;
1920 assert("nikita-1059", node
!= NULL
);
1921 assert("nikita-1060", level
!= NULL
);
1923 /* scan list of carry nodes on this list dir-ward, skipping all
1924 carry nodes referencing the same znode. */
1927 neighbor
= iterator(neighbor
);
1928 if (carry_node_end(level
, neighbor
))
1929 /* list head is reached */
1931 if (reiser4_carry_real(neighbor
) != reiser4_carry_real(node
))
1937 * Memory reservation estimation.
1939 * Carry process proceeds through tree levels upwards. Carry assumes that it
1940 * takes tree in consistent state (e.g., that search tree invariants hold),
1941 * and leaves tree consistent after it finishes. This means that when some
1942 * error occurs carry cannot simply return if there are pending carry
1943 * operations. Generic solution for this problem is carry-undo either as
1944 * transaction manager feature (requiring checkpoints and isolation), or
1945 * through some carry specific mechanism.
1947 * Our current approach is to panic if carry hits an error while tree is
1948 * inconsistent. Unfortunately -ENOMEM can easily be triggered. To work around
1949 * this "memory reservation" mechanism was added.
1951 * Memory reservation is implemented by perthread-pages.diff patch from
1952 * core-patches. Its API is defined in <linux/gfp.h>
1954 * int perthread_pages_reserve(int nrpages, gfp_t gfp);
1955 * void perthread_pages_release(int nrpages);
1956 * int perthread_pages_count(void);
1958 * carry estimates its worst case memory requirements at the entry, reserved
1959 * enough memory, and released unused pages before returning.
1961 * Code below estimates worst case memory requirements for a given carry
1962 * queue. This is dome by summing worst case memory requirements for each
1963 * operation in the queue.
1968 * Memory memory requirements of many operations depends on the tree
1969 * height. For example, item insertion requires new node to be inserted at
1970 * each tree level in the worst case. What tree height should be used for
1971 * estimation? Current tree height is wrong, because tree height can change
1972 * between the time when estimation was done and the time when operation is
1973 * actually performed. Maximal possible tree height (REISER4_MAX_ZTREE_HEIGHT)
1974 * is also not desirable, because it would lead to the huge over-estimation
1975 * all the time. Plausible solution is "capped tree height": if current tree
1976 * height is less than some TREE_HEIGHT_CAP constant, capped tree height is
1977 * TREE_HEIGHT_CAP, otherwise it's current tree height. Idea behind this is
1978 * that if tree height is TREE_HEIGHT_CAP or larger, it's extremely unlikely
1979 * to be increased even more during short interval of time.
1981 #define TREE_HEIGHT_CAP (5)
1983 /* return capped tree height for the @tree. See comment above. */
1984 static int cap_tree_height(reiser4_tree
* tree
)
1986 return max_t(int, tree
->height
, TREE_HEIGHT_CAP
);
1989 /* return capped tree height for the current tree. */
1990 static int capped_height(void)
1992 return cap_tree_height(current_tree
);
1995 /* return number of pages required to store given number of bytes */
1996 static int bytes_to_pages(int bytes
)
1998 return (bytes
+ PAGE_CACHE_SIZE
- 1) >> PAGE_CACHE_SHIFT
;
2001 /* how many pages are required to allocate znodes during item insertion. */
2002 static int carry_estimate_znodes(void)
2005 * Note, that there we have some problem here: there is no way to
2006 * reserve pages specifically for the given slab. This means that
2007 * these pages can be hijacked for some other end.
2010 /* in the worst case we need 3 new znode on each tree level */
2011 return bytes_to_pages(capped_height() * sizeof(znode
) * 3);
2015 * how many pages are required to load bitmaps. One bitmap per level.
2017 static int carry_estimate_bitmaps(void)
2019 if (reiser4_is_set(reiser4_get_current_sb(), REISER4_DONT_LOAD_BITMAP
)) {
2022 bytes
= capped_height() * (0 + /* bnode should be added, but its is private to
2023 * bitmap.c, skip for now. */
2024 2 * sizeof(jnode
)); /* working and commit jnodes */
2025 return bytes_to_pages(bytes
) + 2; /* and their contents */
2027 /* bitmaps were pre-loaded during mount */
2031 /* worst case item insertion memory requirements */
2032 static int carry_estimate_insert(carry_op
* op
, carry_level
* level
)
2034 return carry_estimate_bitmaps() + carry_estimate_znodes() + 1 + /* new atom */
2035 capped_height() + /* new block on each level */
2036 1 + /* and possibly extra new block at the leaf level */
2037 3; /* loading of leaves into memory */
2040 /* worst case item deletion memory requirements */
2041 static int carry_estimate_delete(carry_op
* op
, carry_level
* level
)
2043 return carry_estimate_bitmaps() + carry_estimate_znodes() + 1 + /* new atom */
2044 3; /* loading of leaves into memory */
2047 /* worst case tree cut memory requirements */
2048 static int carry_estimate_cut(carry_op
* op
, carry_level
* level
)
2050 return carry_estimate_bitmaps() + carry_estimate_znodes() + 1 + /* new atom */
2051 3; /* loading of leaves into memory */
2054 /* worst case memory requirements of pasting into item */
2055 static int carry_estimate_paste(carry_op
* op
, carry_level
* level
)
2057 return carry_estimate_bitmaps() + carry_estimate_znodes() + 1 + /* new atom */
2058 capped_height() + /* new block on each level */
2059 1 + /* and possibly extra new block at the leaf level */
2060 3; /* loading of leaves into memory */
2063 /* worst case memory requirements of extent insertion */
2064 static int carry_estimate_extent(carry_op
* op
, carry_level
* level
)
2066 return carry_estimate_insert(op
, level
) + /* insert extent */
2067 carry_estimate_delete(op
, level
); /* kill leaf */
2070 /* worst case memory requirements of key update */
2071 static int carry_estimate_update(carry_op
* op
, carry_level
* level
)
2076 /* worst case memory requirements of flow insertion */
2077 static int carry_estimate_insert_flow(carry_op
* op
, carry_level
* level
)
2081 newnodes
= min(bytes_to_pages(op
->u
.insert_flow
.flow
->length
),
2082 CARRY_FLOW_NEW_NODES_LIMIT
);
2084 * roughly estimate insert_flow as a sequence of insertions.
2086 return newnodes
* carry_estimate_insert(op
, level
);
2089 /* This is dispatch table for carry operations. It can be trivially
2090 abstracted into useful plugin: tunable balancing policy is a good
2092 carry_op_handler op_dispatch_table
[COP_LAST_OP
] = {
2094 .handler
= carry_insert
,
2095 .estimate
= carry_estimate_insert
}
2098 .handler
= carry_delete
,
2099 .estimate
= carry_estimate_delete
}
2102 .handler
= carry_cut
,
2103 .estimate
= carry_estimate_cut
}
2106 .handler
= carry_paste
,
2107 .estimate
= carry_estimate_paste
}
2110 .handler
= carry_extent
,
2111 .estimate
= carry_estimate_extent
}
2114 .handler
= carry_update
,
2115 .estimate
= carry_estimate_update
}
2117 [COP_INSERT_FLOW
] = {
2118 .handler
= carry_insert_flow
,
2119 .estimate
= carry_estimate_insert_flow
}
2122 /* Make Linus happy.
2124 c-indentation-style: "K&R"