fed up with those stupid warnings
[mmotm.git] / fs / reiser4 / carry_ops.c
blob0906dbd9c3308f5a8bd0164b840eff14c427487a
1 /* Copyright 2001, 2002, 2003 by Hans Reiser, licensing governed by
2 reiser4/README */
4 /* implementation of carry operations */
6 #include "forward.h"
7 #include "debug.h"
8 #include "key.h"
9 #include "coord.h"
10 #include "plugin/item/item.h"
11 #include "plugin/node/node.h"
12 #include "jnode.h"
13 #include "znode.h"
14 #include "block_alloc.h"
15 #include "tree_walk.h"
16 #include "pool.h"
17 #include "tree_mod.h"
18 #include "carry.h"
19 #include "carry_ops.h"
20 #include "tree.h"
21 #include "super.h"
22 #include "reiser4.h"
24 #include <linux/types.h>
25 #include <linux/err.h>
27 static int carry_shift_data(sideof side, coord_t *insert_coord, znode * node,
28 carry_level * doing, carry_level * todo,
29 unsigned int including_insert_coord_p);
31 extern int lock_carry_node(carry_level * level, carry_node * node);
32 extern int lock_carry_node_tail(carry_node * node);
34 /* find left neighbor of a carry node
36 Look for left neighbor of @node and add it to the @doing queue. See
37 comments in the body.
40 static carry_node *find_left_neighbor(carry_op * op /* node to find left
41 * neighbor of */ ,
42 carry_level * doing/* level to scan */)
44 int result;
45 carry_node *node;
46 carry_node *left;
47 int flags;
48 reiser4_tree *tree;
50 node = op->node;
52 tree = current_tree;
53 read_lock_tree(tree);
54 /* first, check whether left neighbor is already in a @doing queue */
55 if (reiser4_carry_real(node)->left != NULL) {
56 /* NOTE: there is locking subtlety here. Look into
57 * find_right_neighbor() for more info */
58 if (find_carry_node(doing,
59 reiser4_carry_real(node)->left) != NULL) {
60 read_unlock_tree(tree);
61 left = node;
62 do {
63 left = list_entry(left->header.level_linkage.prev,
64 carry_node, header.level_linkage);
65 assert("nikita-3408", !carry_node_end(doing,
66 left));
67 } while (reiser4_carry_real(left) ==
68 reiser4_carry_real(node));
69 return left;
72 read_unlock_tree(tree);
74 left = reiser4_add_carry_skip(doing, POOLO_BEFORE, node);
75 if (IS_ERR(left))
76 return left;
78 left->node = node->node;
79 left->free = 1;
81 flags = GN_TRY_LOCK;
82 if (!(op->u.insert.flags & COPI_LOAD_LEFT))
83 flags |= GN_NO_ALLOC;
85 /* then, feeling lucky, peek left neighbor in the cache. */
86 result = reiser4_get_left_neighbor(&left->lock_handle,
87 reiser4_carry_real(node),
88 ZNODE_WRITE_LOCK, flags);
89 if (result == 0) {
90 /* ok, node found and locked. */
91 result = lock_carry_node_tail(left);
92 if (result != 0)
93 left = ERR_PTR(result);
94 } else if (result == -E_NO_NEIGHBOR || result == -ENOENT) {
95 /* node is leftmost node in a tree, or neighbor wasn't in
96 cache, or there is an extent on the left. */
97 reiser4_pool_free(&doing->pool->node_pool, &left->header);
98 left = NULL;
99 } else if (doing->restartable) {
100 /* if left neighbor is locked, and level is restartable, add
101 new node to @doing and restart. */
102 assert("nikita-913", node->parent != 0);
103 assert("nikita-914", node->node != NULL);
104 left->left = 1;
105 left->free = 0;
106 left = ERR_PTR(-E_REPEAT);
107 } else {
108 /* left neighbor is locked, level cannot be restarted. Just
109 ignore left neighbor. */
110 reiser4_pool_free(&doing->pool->node_pool, &left->header);
111 left = NULL;
113 return left;
116 /* find right neighbor of a carry node
118 Look for right neighbor of @node and add it to the @doing queue. See
119 comments in the body.
122 static carry_node *find_right_neighbor(carry_op * op /* node to find right
123 * neighbor of */ ,
124 carry_level * doing/* level to scan */)
126 int result;
127 carry_node *node;
128 carry_node *right;
129 lock_handle lh;
130 int flags;
131 reiser4_tree *tree;
133 init_lh(&lh);
135 node = op->node;
137 tree = current_tree;
138 read_lock_tree(tree);
139 /* first, check whether right neighbor is already in a @doing queue */
140 if (reiser4_carry_real(node)->right != NULL) {
142 * Tree lock is taken here anyway, because, even if _outcome_
143 * of (find_carry_node() != NULL) doesn't depends on
144 * concurrent updates to ->right, find_carry_node() cannot
145 * work with second argument NULL. Hence, following comment is
146 * of historic importance only.
148 * Subtle:
150 * Q: why don't we need tree lock here, looking for the right
151 * neighbor?
153 * A: even if value of node->real_node->right were changed
154 * during find_carry_node() execution, outcome of execution
155 * wouldn't change, because (in short) other thread cannot add
156 * elements to the @doing, and if node->real_node->right
157 * already was in @doing, value of node->real_node->right
158 * couldn't change, because node cannot be inserted between
159 * locked neighbors.
161 if (find_carry_node(doing,
162 reiser4_carry_real(node)->right) != NULL) {
163 read_unlock_tree(tree);
165 * What we are doing here (this is also applicable to
166 * the find_left_neighbor()).
168 * tree_walk.c code requires that insertion of a
169 * pointer to a child, modification of parent pointer
170 * in the child, and insertion of the child into
171 * sibling list are atomic (see
172 * plugin/item/internal.c:create_hook_internal()).
174 * carry allocates new node long before pointer to it
175 * is inserted into parent and, actually, long before
176 * parent is even known. Such allocated-but-orphaned
177 * nodes are only trackable through carry level lists.
179 * Situation that is handled here is following: @node
180 * has valid ->right pointer, but there is
181 * allocated-but-orphaned node in the carry queue that
182 * is logically between @node and @node->right. Here
183 * we are searching for it. Critical point is that
184 * this is only possible if @node->right is also in
185 * the carry queue (this is checked above), because
186 * this is the only way new orphaned node could be
187 * inserted between them (before inserting new node,
188 * make_space() first tries to shift to the right, so,
189 * right neighbor will be locked and queued).
192 right = node;
193 do {
194 right = list_entry(right->header.level_linkage.next,
195 carry_node, header.level_linkage);
196 assert("nikita-3408", !carry_node_end(doing,
197 right));
198 } while (reiser4_carry_real(right) ==
199 reiser4_carry_real(node));
200 return right;
203 read_unlock_tree(tree);
205 flags = GN_CAN_USE_UPPER_LEVELS;
206 if (!(op->u.insert.flags & COPI_LOAD_RIGHT))
207 flags = GN_NO_ALLOC;
209 /* then, try to lock right neighbor */
210 init_lh(&lh);
211 result = reiser4_get_right_neighbor(&lh,
212 reiser4_carry_real(node),
213 ZNODE_WRITE_LOCK, flags);
214 if (result == 0) {
215 /* ok, node found and locked. */
216 right = reiser4_add_carry_skip(doing, POOLO_AFTER, node);
217 if (!IS_ERR(right)) {
218 right->node = lh.node;
219 move_lh(&right->lock_handle, &lh);
220 right->free = 1;
221 result = lock_carry_node_tail(right);
222 if (result != 0)
223 right = ERR_PTR(result);
225 } else if ((result == -E_NO_NEIGHBOR) || (result == -ENOENT)) {
226 /* node is rightmost node in a tree, or neighbor wasn't in
227 cache, or there is an extent on the right. */
228 right = NULL;
229 } else
230 right = ERR_PTR(result);
231 done_lh(&lh);
232 return right;
235 /* how much free space in a @node is needed for @op
237 How much space in @node is required for completion of @op, where @op is
238 insert or paste operation.
240 static unsigned int space_needed_for_op(znode * node /* znode data are
241 * inserted or
242 * pasted in */ ,
243 carry_op * op /* carry
244 operation */ )
246 assert("nikita-919", op != NULL);
248 switch (op->op) {
249 default:
250 impossible("nikita-1701", "Wrong opcode");
251 case COP_INSERT:
252 return space_needed(node, NULL, op->u.insert.d->data, 1);
253 case COP_PASTE:
254 return space_needed(node, op->u.insert.d->coord,
255 op->u.insert.d->data, 0);
259 /* how much space in @node is required to insert or paste @data at
260 @coord. */
261 unsigned int space_needed(const znode * node /* node data are inserted or
262 * pasted in */ ,
263 const coord_t *coord /* coord where data are
264 * inserted or pasted
265 * at */ ,
266 const reiser4_item_data * data /* data to insert or
267 * paste */ ,
268 int insertion/* non-0 is inserting, 0---paste */)
270 int result;
271 item_plugin *iplug;
273 assert("nikita-917", node != NULL);
274 assert("nikita-918", node_plugin_by_node(node) != NULL);
275 assert("vs-230", !insertion || (coord == NULL));
277 result = 0;
278 iplug = data->iplug;
279 if (iplug->b.estimate != NULL) {
280 /* ask item plugin how much space is needed to insert this
281 item */
282 result += iplug->b.estimate(insertion ? NULL : coord, data);
283 } else {
284 /* reasonable default */
285 result += data->length;
287 if (insertion) {
288 node_plugin *nplug;
290 nplug = node->nplug;
291 /* and add node overhead */
292 if (nplug->item_overhead != NULL)
293 result += nplug->item_overhead(node, NULL);
295 return result;
298 /* find &coord in parent where pointer to new child is to be stored. */
299 static int find_new_child_coord(carry_op * op /* COP_INSERT carry operation to
300 * insert pointer to new
301 * child */ )
303 int result;
304 znode *node;
305 znode *child;
307 assert("nikita-941", op != NULL);
308 assert("nikita-942", op->op == COP_INSERT);
310 node = reiser4_carry_real(op->node);
311 assert("nikita-943", node != NULL);
312 assert("nikita-944", node_plugin_by_node(node) != NULL);
314 child = reiser4_carry_real(op->u.insert.child);
315 result =
316 find_new_child_ptr(node, child, op->u.insert.brother,
317 op->u.insert.d->coord);
319 build_child_ptr_data(child, op->u.insert.d->data);
320 return result;
323 /* additional amount of free space in @node required to complete @op */
324 static int free_space_shortage(znode * node /* node to check */ ,
325 carry_op * op/* operation being performed */)
327 assert("nikita-1061", node != NULL);
328 assert("nikita-1062", op != NULL);
330 switch (op->op) {
331 default:
332 impossible("nikita-1702", "Wrong opcode");
333 case COP_INSERT:
334 case COP_PASTE:
335 return space_needed_for_op(node, op) - znode_free_space(node);
336 case COP_EXTENT:
337 /* when inserting extent shift data around until insertion
338 point is utmost in the node. */
339 if (coord_wrt(op->u.insert.d->coord) == COORD_INSIDE)
340 return +1;
341 else
342 return -1;
346 /* helper function: update node pointer in operation after insertion
347 point was probably shifted into @target. */
348 static znode *sync_op(carry_op * op, carry_node * target)
350 znode *insertion_node;
352 /* reget node from coord: shift might move insertion coord to
353 the neighbor */
354 insertion_node = op->u.insert.d->coord->node;
355 /* if insertion point was actually moved into new node,
356 update carry node pointer in operation. */
357 if (insertion_node != reiser4_carry_real(op->node)) {
358 op->node = target;
359 assert("nikita-2540",
360 reiser4_carry_real(target) == insertion_node);
362 assert("nikita-2541",
363 reiser4_carry_real(op->node) == op->u.insert.d->coord->node);
364 return insertion_node;
368 * complete make_space() call: update tracked lock handle if necessary. See
369 * comments for fs/reiser4/carry.h:carry_track_type
371 static int
372 make_space_tail(carry_op * op, carry_level * doing, znode * orig_node)
374 int result;
375 carry_track_type tracking;
376 znode *node;
378 tracking = doing->track_type;
379 node = op->u.insert.d->coord->node;
381 if (tracking == CARRY_TRACK_NODE ||
382 (tracking == CARRY_TRACK_CHANGE && node != orig_node)) {
383 /* inserting or pasting into node different from
384 original. Update lock handle supplied by caller. */
385 assert("nikita-1417", doing->tracked != NULL);
386 done_lh(doing->tracked);
387 init_lh(doing->tracked);
388 result = longterm_lock_znode(doing->tracked, node,
389 ZNODE_WRITE_LOCK,
390 ZNODE_LOCK_HIPRI);
391 } else
392 result = 0;
393 return result;
396 /* This is insertion policy function. It shifts data to the left and right
397 neighbors of insertion coord and allocates new nodes until there is enough
398 free space to complete @op.
400 See comments in the body.
402 Assumes that the node format favors insertions at the right end of the node
403 as node40 does.
405 See carry_flow() on detail about flow insertion
407 static int make_space(carry_op * op /* carry operation, insert or paste */ ,
408 carry_level * doing /* current carry queue */ ,
409 carry_level * todo/* carry queue on the parent level */)
411 znode *node;
412 int result;
413 int not_enough_space;
414 int blk_alloc;
415 znode *orig_node;
416 __u32 flags;
418 coord_t *coord;
420 assert("nikita-890", op != NULL);
421 assert("nikita-891", todo != NULL);
422 assert("nikita-892",
423 op->op == COP_INSERT ||
424 op->op == COP_PASTE || op->op == COP_EXTENT);
425 assert("nikita-1607",
426 reiser4_carry_real(op->node) == op->u.insert.d->coord->node);
428 flags = op->u.insert.flags;
430 /* NOTE check that new node can only be allocated after checking left
431 * and right neighbors. This is necessary for proper work of
432 * find_{left,right}_neighbor(). */
433 assert("nikita-3410", ergo(flags & COPI_DONT_ALLOCATE,
434 flags & COPI_DONT_SHIFT_LEFT));
435 assert("nikita-3411", ergo(flags & COPI_DONT_ALLOCATE,
436 flags & COPI_DONT_SHIFT_RIGHT));
438 coord = op->u.insert.d->coord;
439 orig_node = node = coord->node;
441 assert("nikita-908", node != NULL);
442 assert("nikita-909", node_plugin_by_node(node) != NULL);
444 result = 0;
445 /* If there is not enough space in a node, try to shift something to
446 the left neighbor. This is a bit tricky, as locking to the left is
447 low priority. This is handled by restart logic in carry().
449 not_enough_space = free_space_shortage(node, op);
450 if (not_enough_space <= 0)
451 /* it is possible that carry was called when there actually
452 was enough space in the node. For example, when inserting
453 leftmost item so that delimiting keys have to be updated.
455 return make_space_tail(op, doing, orig_node);
456 if (!(flags & COPI_DONT_SHIFT_LEFT)) {
457 carry_node *left;
458 /* make note in statistics of an attempt to move
459 something into the left neighbor */
460 left = find_left_neighbor(op, doing);
461 if (unlikely(IS_ERR(left))) {
462 if (PTR_ERR(left) == -E_REPEAT)
463 return -E_REPEAT;
464 else {
465 /* some error other than restart request
466 occurred. This shouldn't happen. Issue a
467 warning and continue as if left neighbor
468 weren't existing.
470 warning("nikita-924",
471 "Error accessing left neighbor: %li",
472 PTR_ERR(left));
474 } else if (left != NULL) {
476 /* shift everything possible on the left of and
477 including insertion coord into the left neighbor */
478 result = carry_shift_data(LEFT_SIDE, coord,
479 reiser4_carry_real(left),
480 doing, todo,
481 flags & COPI_GO_LEFT);
483 /* reget node from coord: shift_left() might move
484 insertion coord to the left neighbor */
485 node = sync_op(op, left);
487 not_enough_space = free_space_shortage(node, op);
488 /* There is not enough free space in @node, but
489 may be, there is enough free space in
490 @left. Various balancing decisions are valid here.
491 The same for the shifiting to the right.
495 /* If there still is not enough space, shift to the right */
496 if (not_enough_space > 0 && !(flags & COPI_DONT_SHIFT_RIGHT)) {
497 carry_node *right;
499 right = find_right_neighbor(op, doing);
500 if (IS_ERR(right)) {
501 warning("nikita-1065",
502 "Error accessing right neighbor: %li",
503 PTR_ERR(right));
504 } else if (right != NULL) {
505 /* node containing insertion point, and its right
506 neighbor node are write locked by now.
508 shift everything possible on the right of but
509 excluding insertion coord into the right neighbor
511 result = carry_shift_data(RIGHT_SIDE, coord,
512 reiser4_carry_real(right),
513 doing, todo,
514 flags & COPI_GO_RIGHT);
515 /* reget node from coord: shift_right() might move
516 insertion coord to the right neighbor */
517 node = sync_op(op, right);
518 not_enough_space = free_space_shortage(node, op);
521 /* If there is still not enough space, allocate new node(s).
523 We try to allocate new blocks if COPI_DONT_ALLOCATE is not set in
524 the carry operation flags (currently this is needed during flush
525 only).
527 for (blk_alloc = 0;
528 not_enough_space > 0 && result == 0 && blk_alloc < 2 &&
529 !(flags & COPI_DONT_ALLOCATE); ++blk_alloc) {
530 carry_node *fresh; /* new node we are allocating */
531 coord_t coord_shadow; /* remembered insertion point before
532 * shifting data into new node */
533 carry_node *node_shadow; /* remembered insertion node
534 * before shifting */
535 unsigned int gointo; /* whether insertion point should move
536 * into newly allocated node */
538 /* allocate new node on the right of @node. Znode and disk
539 fake block number for new node are allocated.
541 add_new_znode() posts carry operation COP_INSERT with
542 COPT_CHILD option to the parent level to add
543 pointer to newly created node to its parent.
545 Subtle point: if several new nodes are required to complete
546 insertion operation at this level, they will be inserted
547 into their parents in the order of creation, which means
548 that @node will be valid "cookie" at the time of insertion.
551 fresh = add_new_znode(node, op->node, doing, todo);
552 if (IS_ERR(fresh))
553 return PTR_ERR(fresh);
555 /* Try to shift into new node. */
556 result = lock_carry_node(doing, fresh);
557 zput(reiser4_carry_real(fresh));
558 if (result != 0) {
559 warning("nikita-947",
560 "Cannot lock new node: %i", result);
561 return result;
564 /* both nodes are write locked by now.
566 shift everything possible on the right of and
567 including insertion coord into the right neighbor.
569 coord_dup(&coord_shadow, op->u.insert.d->coord);
570 node_shadow = op->node;
571 /* move insertion point into newly created node if:
573 . insertion point is rightmost in the source node, or
574 . this is not the first node we are allocating in a row.
576 gointo =
577 (blk_alloc > 0) ||
578 coord_is_after_rightmost(op->u.insert.d->coord);
580 if (gointo &&
581 op->op == COP_PASTE &&
582 coord_is_existing_item(op->u.insert.d->coord) &&
583 is_solid_item((item_plugin_by_coord(op->u.insert.d->coord)))) {
584 /* paste into solid (atomic) item, which can contain
585 only one unit, so we need to shift it right, where
586 insertion point supposed to be */
588 assert("edward-1444", op->u.insert.d->data->iplug ==
589 item_plugin_by_id(STATIC_STAT_DATA_ID));
590 assert("edward-1445",
591 op->u.insert.d->data->length >
592 node_plugin_by_node(coord->node)->free_space
593 (coord->node));
595 op->u.insert.d->coord->between = BEFORE_UNIT;
598 result = carry_shift_data(RIGHT_SIDE, coord,
599 reiser4_carry_real(fresh),
600 doing, todo, gointo);
601 /* if insertion point was actually moved into new node,
602 update carry node pointer in operation. */
603 node = sync_op(op, fresh);
604 not_enough_space = free_space_shortage(node, op);
605 if ((not_enough_space > 0) && (node != coord_shadow.node)) {
606 /* there is not enough free in new node. Shift
607 insertion point back to the @shadow_node so that
608 next new node would be inserted between
609 @shadow_node and @fresh.
611 coord_normalize(&coord_shadow);
612 coord_dup(coord, &coord_shadow);
613 node = coord->node;
614 op->node = node_shadow;
615 if (1 || (flags & COPI_STEP_BACK)) {
616 /* still not enough space?! Maybe there is
617 enough space in the source node (i.e., node
618 data are moved from) now.
620 not_enough_space =
621 free_space_shortage(node, op);
625 if (not_enough_space > 0) {
626 if (!(flags & COPI_DONT_ALLOCATE))
627 warning("nikita-948", "Cannot insert new item");
628 result = -E_NODE_FULL;
630 assert("nikita-1622", ergo(result == 0,
631 reiser4_carry_real(op->node) == coord->node));
632 assert("nikita-2616", coord == op->u.insert.d->coord);
633 if (result == 0)
634 result = make_space_tail(op, doing, orig_node);
635 return result;
638 /* insert_paste_common() - common part of insert and paste operations
640 This function performs common part of COP_INSERT and COP_PASTE.
642 There are two ways in which insertion/paste can be requested:
644 . by directly supplying reiser4_item_data. In this case, op ->
645 u.insert.type is set to COPT_ITEM_DATA.
647 . by supplying child pointer to which is to inserted into parent. In this
648 case op -> u.insert.type == COPT_CHILD.
650 . by supplying key of new item/unit. This is currently only used during
651 extent insertion
653 This is required, because when new node is allocated we don't know at what
654 position pointer to it is to be stored in the parent. Actually, we don't
655 even know what its parent will be, because parent can be re-balanced
656 concurrently and new node re-parented, and because parent can be full and
657 pointer to the new node will go into some other node.
659 insert_paste_common() resolves pointer to child node into position in the
660 parent by calling find_new_child_coord(), that fills
661 reiser4_item_data. After this, insertion/paste proceeds uniformly.
663 Another complication is with finding free space during pasting. It may
664 happen that while shifting items to the neighbors and newly allocated
665 nodes, insertion coord can no longer be in the item we wanted to paste
666 into. At this point, paste becomes (morphs) into insert. Moreover free
667 space analysis has to be repeated, because amount of space required for
668 insertion is different from that of paste (item header overhead, etc).
670 This function "unifies" different insertion modes (by resolving child
671 pointer or key into insertion coord), and then calls make_space() to free
672 enough space in the node by shifting data to the left and right and by
673 allocating new nodes if necessary. Carry operation knows amount of space
674 required for its completion. After enough free space is obtained, caller of
675 this function (carry_{insert,paste,etc.}) performs actual insertion/paste
676 by calling item plugin method.
679 static int insert_paste_common(carry_op * op /* carry operation being
680 * performed */ ,
681 carry_level * doing /* current carry level */ ,
682 carry_level * todo /* next carry level */ ,
683 carry_insert_data * cdata /* pointer to
684 * cdata */ ,
685 coord_t *coord /* insertion/paste coord */ ,
686 reiser4_item_data * data /* data to be
687 * inserted/pasted */ )
689 assert("nikita-981", op != NULL);
690 assert("nikita-980", todo != NULL);
691 assert("nikita-979", (op->op == COP_INSERT) || (op->op == COP_PASTE)
692 || (op->op == COP_EXTENT));
694 if (op->u.insert.type == COPT_PASTE_RESTARTED) {
695 /* nothing to do. Fall through to make_space(). */
697 } else if (op->u.insert.type == COPT_KEY) {
698 node_search_result intra_node;
699 znode *node;
700 /* Problem with doing batching at the lowest level, is that
701 operations here are given by coords where modification is
702 to be performed, and one modification can invalidate coords
703 of all following operations.
705 So, we are implementing yet another type for operation that
706 will use (the only) "locator" stable across shifting of
707 data between nodes, etc.: key (COPT_KEY).
709 This clause resolves key to the coord in the node.
711 But node can change also. Probably some pieces have to be
712 added to the lock_carry_node(), to lock node by its key.
715 /* NOTE-NIKITA Lookup bias is fixed to FIND_EXACT. Complain
716 if you need something else. */
717 op->u.insert.d->coord = coord;
718 node = reiser4_carry_real(op->node);
719 intra_node = node_plugin_by_node(node)->lookup
720 (node, op->u.insert.d->key, FIND_EXACT,
721 op->u.insert.d->coord);
722 if ((intra_node != NS_FOUND) && (intra_node != NS_NOT_FOUND)) {
723 warning("nikita-1715", "Intra node lookup failure: %i",
724 intra_node);
725 return intra_node;
727 } else if (op->u.insert.type == COPT_CHILD) {
728 /* if we are asked to insert pointer to the child into
729 internal node, first convert pointer to the child into
730 coord within parent node.
732 znode *child;
733 int result;
735 op->u.insert.d = cdata;
736 op->u.insert.d->coord = coord;
737 op->u.insert.d->data = data;
738 op->u.insert.d->coord->node = reiser4_carry_real(op->node);
739 result = find_new_child_coord(op);
740 child = reiser4_carry_real(op->u.insert.child);
741 if (result != NS_NOT_FOUND) {
742 warning("nikita-993",
743 "Cannot find a place for child pointer: %i",
744 result);
745 return result;
747 /* This only happens when we did multiple insertions at
748 the previous level, trying to insert single item and
749 it so happened, that insertion of pointers to all new
750 nodes before this one already caused parent node to
751 split (may be several times).
753 I am going to come up with better solution.
755 You are not expected to understand this.
756 -- v6root/usr/sys/ken/slp.c
758 Basically, what happens here is the following: carry came
759 to the parent level and is about to insert internal item
760 pointing to the child node that it just inserted in the
761 level below. Position where internal item is to be inserted
762 was found by find_new_child_coord() above, but node of the
763 current carry operation (that is, parent node of child
764 inserted on the previous level), was determined earlier in
765 the lock_carry_level/lock_carry_node. It could so happen
766 that other carry operations already performed on the parent
767 level already split parent node, so that insertion point
768 moved into another node. Handle this by creating new carry
769 node for insertion point if necessary.
771 if (reiser4_carry_real(op->node) !=
772 op->u.insert.d->coord->node) {
773 pool_ordering direction;
774 znode *z1;
775 znode *z2;
776 reiser4_key k1;
777 reiser4_key k2;
780 * determine in what direction insertion point
781 * moved. Do this by comparing delimiting keys.
783 z1 = op->u.insert.d->coord->node;
784 z2 = reiser4_carry_real(op->node);
785 if (keyle(leftmost_key_in_node(z1, &k1),
786 leftmost_key_in_node(z2, &k2)))
787 /* insertion point moved to the left */
788 direction = POOLO_BEFORE;
789 else
790 /* insertion point moved to the right */
791 direction = POOLO_AFTER;
793 op->node = reiser4_add_carry_skip(doing,
794 direction, op->node);
795 if (IS_ERR(op->node))
796 return PTR_ERR(op->node);
797 op->node->node = op->u.insert.d->coord->node;
798 op->node->free = 1;
799 result = lock_carry_node(doing, op->node);
800 if (result != 0)
801 return result;
805 * set up key of an item being inserted: we are inserting
806 * internal item and its key is (by the very definition of
807 * search tree) is leftmost key in the child node.
809 write_lock_dk(znode_get_tree(child));
810 op->u.insert.d->key = leftmost_key_in_node(child,
811 znode_get_ld_key(child));
812 write_unlock_dk(znode_get_tree(child));
813 op->u.insert.d->data->arg = op->u.insert.brother;
814 } else {
815 assert("vs-243", op->u.insert.d->coord != NULL);
816 op->u.insert.d->coord->node = reiser4_carry_real(op->node);
819 /* find free space. */
820 return make_space(op, doing, todo);
823 /* handle carry COP_INSERT operation.
825 Insert new item into node. New item can be given in one of two ways:
827 - by passing &tree_coord and &reiser4_item_data as part of @op. This is
828 only applicable at the leaf/twig level.
830 - by passing a child node pointer to which is to be inserted by this
831 operation.
834 static int carry_insert(carry_op * op /* operation to perform */ ,
835 carry_level * doing /* queue of operations @op
836 * is part of */ ,
837 carry_level * todo /* queue where new operations
838 * are accumulated */ )
840 znode *node;
841 carry_insert_data cdata;
842 coord_t coord;
843 reiser4_item_data data;
844 carry_plugin_info info;
845 int result;
847 assert("nikita-1036", op != NULL);
848 assert("nikita-1037", todo != NULL);
849 assert("nikita-1038", op->op == COP_INSERT);
851 coord_init_zero(&coord);
853 /* perform common functionality of insert and paste. */
854 result = insert_paste_common(op, doing, todo, &cdata, &coord, &data);
855 if (result != 0)
856 return result;
858 node = op->u.insert.d->coord->node;
859 assert("nikita-1039", node != NULL);
860 assert("nikita-1040", node_plugin_by_node(node) != NULL);
862 assert("nikita-949",
863 space_needed_for_op(node, op) <= znode_free_space(node));
865 /* ask node layout to create new item. */
866 info.doing = doing;
867 info.todo = todo;
868 result = node_plugin_by_node(node)->create_item
869 (op->u.insert.d->coord, op->u.insert.d->key, op->u.insert.d->data,
870 &info);
871 doing->restartable = 0;
872 znode_make_dirty(node);
874 return result;
878 * Flow insertion code. COP_INSERT_FLOW is special tree operation that is
879 * supplied with a "flow" (that is, a stream of data) and inserts it into tree
880 * by slicing into multiple items.
883 #define flow_insert_point(op) ((op)->u.insert_flow.insert_point)
884 #define flow_insert_flow(op) ((op)->u.insert_flow.flow)
885 #define flow_insert_data(op) ((op)->u.insert_flow.data)
887 static size_t item_data_overhead(carry_op * op)
889 if (flow_insert_data(op)->iplug->b.estimate == NULL)
890 return 0;
891 return (flow_insert_data(op)->iplug->b.
892 estimate(NULL /* estimate insertion */ , flow_insert_data(op)) -
893 flow_insert_data(op)->length);
896 /* FIXME-VS: this is called several times during one make_flow_for_insertion
897 and it will always return the same result. Some optimization could be made
898 by calculating this value once at the beginning and passing it around. That
899 would reduce some flexibility in future changes
901 static int can_paste(coord_t *, const reiser4_key *, const reiser4_item_data *);
902 static size_t flow_insertion_overhead(carry_op * op)
904 znode *node;
905 size_t insertion_overhead;
907 node = flow_insert_point(op)->node;
908 insertion_overhead = 0;
909 if (node->nplug->item_overhead &&
910 !can_paste(flow_insert_point(op), &flow_insert_flow(op)->key,
911 flow_insert_data(op)))
912 insertion_overhead =
913 node->nplug->item_overhead(node, NULL) +
914 item_data_overhead(op);
915 return insertion_overhead;
918 /* how many bytes of flow does fit to the node */
919 static int what_can_fit_into_node(carry_op * op)
921 size_t free, overhead;
923 overhead = flow_insertion_overhead(op);
924 free = znode_free_space(flow_insert_point(op)->node);
925 if (free <= overhead)
926 return 0;
927 free -= overhead;
928 /* FIXME: flow->length is loff_t only to not get overflowed in case of
929 expandign truncate */
930 if (free < op->u.insert_flow.flow->length)
931 return free;
932 return (int)op->u.insert_flow.flow->length;
935 /* in make_space_for_flow_insertion we need to check either whether whole flow
936 fits into a node or whether minimal fraction of flow fits into a node */
937 static int enough_space_for_whole_flow(carry_op * op)
939 return (unsigned)what_can_fit_into_node(op) ==
940 op->u.insert_flow.flow->length;
943 #define MIN_FLOW_FRACTION 1
944 static int enough_space_for_min_flow_fraction(carry_op * op)
946 assert("vs-902", coord_is_after_rightmost(flow_insert_point(op)));
948 return what_can_fit_into_node(op) >= MIN_FLOW_FRACTION;
951 /* this returns 0 if left neighbor was obtained successfully and everything
952 upto insertion point including it were shifted and left neighbor still has
953 some free space to put minimal fraction of flow into it */
954 static int
955 make_space_by_shift_left(carry_op * op, carry_level * doing, carry_level * todo)
957 carry_node *left;
958 znode *orig;
960 left = find_left_neighbor(op, doing);
961 if (unlikely(IS_ERR(left))) {
962 warning("vs-899",
963 "make_space_by_shift_left: "
964 "error accessing left neighbor: %li", PTR_ERR(left));
965 return 1;
967 if (left == NULL)
968 /* left neighbor either does not exist or is unformatted
969 node */
970 return 1;
972 orig = flow_insert_point(op)->node;
973 /* try to shift content of node @orig from its head upto insert point
974 including insertion point into the left neighbor */
975 carry_shift_data(LEFT_SIDE, flow_insert_point(op),
976 reiser4_carry_real(left), doing, todo,
977 1/* including insert point */);
978 if (reiser4_carry_real(left) != flow_insert_point(op)->node) {
979 /* insertion point did not move */
980 return 1;
983 /* insertion point is set after last item in the node */
984 assert("vs-900", coord_is_after_rightmost(flow_insert_point(op)));
986 if (!enough_space_for_min_flow_fraction(op)) {
987 /* insertion point node does not have enough free space to put
988 even minimal portion of flow into it, therefore, move
989 insertion point back to orig node (before first item) */
990 coord_init_before_first_item(flow_insert_point(op), orig);
991 return 1;
994 /* part of flow is to be written to the end of node */
995 op->node = left;
996 return 0;
999 /* this returns 0 if right neighbor was obtained successfully and everything to
1000 the right of insertion point was shifted to it and node got enough free
1001 space to put minimal fraction of flow into it */
1002 static int
1003 make_space_by_shift_right(carry_op * op, carry_level * doing,
1004 carry_level * todo)
1006 carry_node *right;
1008 right = find_right_neighbor(op, doing);
1009 if (unlikely(IS_ERR(right))) {
1010 warning("nikita-1065", "shift_right_excluding_insert_point: "
1011 "error accessing right neighbor: %li", PTR_ERR(right));
1012 return 1;
1014 if (right) {
1015 /* shift everything possible on the right of but excluding
1016 insertion coord into the right neighbor */
1017 carry_shift_data(RIGHT_SIDE, flow_insert_point(op),
1018 reiser4_carry_real(right), doing, todo,
1019 0/* not including insert point */);
1020 } else {
1021 /* right neighbor either does not exist or is unformatted
1022 node */
1025 if (coord_is_after_rightmost(flow_insert_point(op))) {
1026 if (enough_space_for_min_flow_fraction(op)) {
1027 /* part of flow is to be written to the end of node */
1028 return 0;
1032 /* new node is to be added if insert point node did not get enough
1033 space for whole flow */
1034 return 1;
1037 /* this returns 0 when insert coord is set at the node end and fraction of flow
1038 fits into that node */
1039 static int
1040 make_space_by_new_nodes(carry_op * op, carry_level * doing, carry_level * todo)
1042 int result;
1043 znode *node;
1044 carry_node *new;
1046 node = flow_insert_point(op)->node;
1048 if (op->u.insert_flow.new_nodes == CARRY_FLOW_NEW_NODES_LIMIT)
1049 return RETERR(-E_NODE_FULL);
1050 /* add new node after insert point node */
1051 new = add_new_znode(node, op->node, doing, todo);
1052 if (unlikely(IS_ERR(new)))
1053 return PTR_ERR(new);
1054 result = lock_carry_node(doing, new);
1055 zput(reiser4_carry_real(new));
1056 if (unlikely(result))
1057 return result;
1058 op->u.insert_flow.new_nodes++;
1059 if (!coord_is_after_rightmost(flow_insert_point(op))) {
1060 carry_shift_data(RIGHT_SIDE, flow_insert_point(op),
1061 reiser4_carry_real(new), doing, todo,
1062 0/* not including insert point */);
1063 assert("vs-901",
1064 coord_is_after_rightmost(flow_insert_point(op)));
1066 if (enough_space_for_min_flow_fraction(op))
1067 return 0;
1068 if (op->u.insert_flow.new_nodes == CARRY_FLOW_NEW_NODES_LIMIT)
1069 return RETERR(-E_NODE_FULL);
1071 /* add one more new node */
1072 new = add_new_znode(node, op->node, doing, todo);
1073 if (unlikely(IS_ERR(new)))
1074 return PTR_ERR(new);
1075 result = lock_carry_node(doing, new);
1076 zput(reiser4_carry_real(new));
1077 if (unlikely(result))
1078 return result;
1079 op->u.insert_flow.new_nodes++;
1082 /* move insertion point to new node */
1083 coord_init_before_first_item(flow_insert_point(op),
1084 reiser4_carry_real(new));
1085 op->node = new;
1086 return 0;
1089 static int
1090 make_space_for_flow_insertion(carry_op * op, carry_level * doing,
1091 carry_level * todo)
1093 __u32 flags = op->u.insert_flow.flags;
1095 if (enough_space_for_whole_flow(op)) {
1096 /* whole flow fits into insert point node */
1097 return 0;
1100 if (!(flags & COPI_DONT_SHIFT_LEFT)
1101 && (make_space_by_shift_left(op, doing, todo) == 0)) {
1102 /* insert point is shifted to left neighbor of original insert
1103 point node and is set after last unit in that node. It has
1104 enough space to fit at least minimal fraction of flow. */
1105 return 0;
1108 if (enough_space_for_whole_flow(op)) {
1109 /* whole flow fits into insert point node */
1110 return 0;
1113 if (!(flags & COPI_DONT_SHIFT_RIGHT)
1114 && (make_space_by_shift_right(op, doing, todo) == 0)) {
1115 /* insert point is still set to the same node, but there is
1116 nothing to the right of insert point. */
1117 return 0;
1120 if (enough_space_for_whole_flow(op)) {
1121 /* whole flow fits into insert point node */
1122 return 0;
1125 return make_space_by_new_nodes(op, doing, todo);
1128 /* implements COP_INSERT_FLOW operation */
1129 static int
1130 carry_insert_flow(carry_op * op, carry_level * doing, carry_level * todo)
1132 int result;
1133 flow_t *f;
1134 coord_t *insert_point;
1135 node_plugin *nplug;
1136 carry_plugin_info info;
1137 znode *orig_node;
1138 lock_handle *orig_lh;
1140 f = op->u.insert_flow.flow;
1141 result = 0;
1143 /* carry system needs this to work */
1144 info.doing = doing;
1145 info.todo = todo;
1147 orig_node = flow_insert_point(op)->node;
1148 orig_lh = doing->tracked;
1150 while (f->length) {
1151 result = make_space_for_flow_insertion(op, doing, todo);
1152 if (result)
1153 break;
1155 insert_point = flow_insert_point(op);
1156 nplug = node_plugin_by_node(insert_point->node);
1158 /* compose item data for insertion/pasting */
1159 flow_insert_data(op)->data = f->data;
1160 flow_insert_data(op)->length = what_can_fit_into_node(op);
1162 if (can_paste(insert_point, &f->key, flow_insert_data(op))) {
1163 /* insert point is set to item of file we are writing to
1164 and we have to append to it */
1165 assert("vs-903", insert_point->between == AFTER_UNIT);
1166 nplug->change_item_size(insert_point,
1167 flow_insert_data(op)->length);
1168 flow_insert_data(op)->iplug->b.paste(insert_point,
1169 flow_insert_data
1170 (op), &info);
1171 } else {
1172 /* new item must be inserted */
1173 pos_in_node_t new_pos;
1174 flow_insert_data(op)->length += item_data_overhead(op);
1176 /* FIXME-VS: this is because node40_create_item changes
1177 insert_point for obscure reasons */
1178 switch (insert_point->between) {
1179 case AFTER_ITEM:
1180 new_pos = insert_point->item_pos + 1;
1181 break;
1182 case EMPTY_NODE:
1183 new_pos = 0;
1184 break;
1185 case BEFORE_ITEM:
1186 assert("vs-905", insert_point->item_pos == 0);
1187 new_pos = 0;
1188 break;
1189 default:
1190 impossible("vs-906",
1191 "carry_insert_flow: invalid coord");
1192 new_pos = 0;
1193 break;
1196 nplug->create_item(insert_point, &f->key,
1197 flow_insert_data(op), &info);
1198 coord_set_item_pos(insert_point, new_pos);
1200 coord_init_after_item_end(insert_point);
1201 doing->restartable = 0;
1202 znode_make_dirty(insert_point->node);
1204 move_flow_forward(f, (unsigned)flow_insert_data(op)->length);
1207 if (orig_node != flow_insert_point(op)->node) {
1208 /* move lock to new insert point */
1209 done_lh(orig_lh);
1210 init_lh(orig_lh);
1211 result =
1212 longterm_lock_znode(orig_lh, flow_insert_point(op)->node,
1213 ZNODE_WRITE_LOCK, ZNODE_LOCK_HIPRI);
1216 return result;
1219 /* implements COP_DELETE operation
1221 Remove pointer to @op -> u.delete.child from it's parent.
1223 This function also handles killing of a tree root is last pointer from it
1224 was removed. This is complicated by our handling of "twig" level: root on
1225 twig level is never killed.
1228 static int carry_delete(carry_op * op /* operation to be performed */ ,
1229 carry_level * doing UNUSED_ARG /* current carry
1230 * level */ ,
1231 carry_level * todo/* next carry level */)
1233 int result;
1234 coord_t coord;
1235 coord_t coord2;
1236 znode *parent;
1237 znode *child;
1238 carry_plugin_info info;
1239 reiser4_tree *tree;
1242 * This operation is called to delete internal item pointing to the
1243 * child node that was removed by carry from the tree on the previous
1244 * tree level.
1247 assert("nikita-893", op != NULL);
1248 assert("nikita-894", todo != NULL);
1249 assert("nikita-895", op->op == COP_DELETE);
1251 coord_init_zero(&coord);
1252 coord_init_zero(&coord2);
1254 parent = reiser4_carry_real(op->node);
1255 child = op->u.delete.child ?
1256 reiser4_carry_real(op->u.delete.child) : op->node->node;
1257 tree = znode_get_tree(child);
1258 read_lock_tree(tree);
1261 * @parent was determined when carry entered parent level
1262 * (lock_carry_level/lock_carry_node). Since then, actual parent of
1263 * @child node could change due to other carry operations performed on
1264 * the parent level. Check for this.
1267 if (znode_parent(child) != parent) {
1268 /* NOTE-NIKITA add stat counter for this. */
1269 parent = znode_parent(child);
1270 assert("nikita-2581", find_carry_node(doing, parent));
1272 read_unlock_tree(tree);
1274 assert("nikita-1213", znode_get_level(parent) > LEAF_LEVEL);
1276 /* Twig level horrors: tree should be of height at least 2. So, last
1277 pointer from the root at twig level is preserved even if child is
1278 empty. This is ugly, but so it was architectured.
1281 if (znode_is_root(parent) &&
1282 znode_get_level(parent) <= REISER4_MIN_TREE_HEIGHT &&
1283 node_num_items(parent) == 1) {
1284 /* Delimiting key manipulations. */
1285 write_lock_dk(tree);
1286 znode_set_ld_key(child, znode_set_ld_key(parent, reiser4_min_key()));
1287 znode_set_rd_key(child, znode_set_rd_key(parent, reiser4_max_key()));
1288 ZF_SET(child, JNODE_DKSET);
1289 write_unlock_dk(tree);
1291 /* @child escaped imminent death! */
1292 ZF_CLR(child, JNODE_HEARD_BANSHEE);
1293 return 0;
1296 /* convert child pointer to the coord_t */
1297 result = find_child_ptr(parent, child, &coord);
1298 if (result != NS_FOUND) {
1299 warning("nikita-994", "Cannot find child pointer: %i", result);
1300 print_coord_content("coord", &coord);
1301 return result;
1304 coord_dup(&coord2, &coord);
1305 info.doing = doing;
1306 info.todo = todo;
1309 * Actually kill internal item: prepare structure with
1310 * arguments for ->cut_and_kill() method...
1313 struct carry_kill_data kdata;
1314 kdata.params.from = &coord;
1315 kdata.params.to = &coord2;
1316 kdata.params.from_key = NULL;
1317 kdata.params.to_key = NULL;
1318 kdata.params.smallest_removed = NULL;
1319 kdata.params.truncate = 1;
1320 kdata.flags = op->u.delete.flags;
1321 kdata.inode = NULL;
1322 kdata.left = NULL;
1323 kdata.right = NULL;
1324 kdata.buf = NULL;
1325 /* ... and call it. */
1326 result = node_plugin_by_node(parent)->cut_and_kill(&kdata,
1327 &info);
1329 doing->restartable = 0;
1331 /* check whether root should be killed violently */
1332 if (znode_is_root(parent) &&
1333 /* don't kill roots at and lower than twig level */
1334 znode_get_level(parent) > REISER4_MIN_TREE_HEIGHT &&
1335 node_num_items(parent) == 1)
1336 result = reiser4_kill_tree_root(coord.node);
1338 return result < 0 ? : 0;
1341 /* implements COP_CUT opration
1343 Cuts part or whole content of node.
1346 static int carry_cut(carry_op * op /* operation to be performed */ ,
1347 carry_level * doing /* current carry level */ ,
1348 carry_level * todo/* next carry level */)
1350 int result;
1351 carry_plugin_info info;
1352 node_plugin *nplug;
1354 assert("nikita-896", op != NULL);
1355 assert("nikita-897", todo != NULL);
1356 assert("nikita-898", op->op == COP_CUT);
1358 info.doing = doing;
1359 info.todo = todo;
1361 nplug = node_plugin_by_node(reiser4_carry_real(op->node));
1362 if (op->u.cut_or_kill.is_cut)
1363 result = nplug->cut(op->u.cut_or_kill.u.cut, &info);
1364 else
1365 result = nplug->cut_and_kill(op->u.cut_or_kill.u.kill, &info);
1367 doing->restartable = 0;
1368 return result < 0 ? : 0;
1371 /* helper function for carry_paste(): returns true if @op can be continued as
1372 paste */
1373 static int
1374 can_paste(coord_t *icoord, const reiser4_key * key,
1375 const reiser4_item_data * data)
1377 coord_t circa;
1378 item_plugin *new_iplug;
1379 item_plugin *old_iplug;
1380 int result = 0; /* to keep gcc shut */
1382 assert("", icoord->between != AT_UNIT);
1384 /* obviously, one cannot paste when node is empty---there is nothing
1385 to paste into. */
1386 if (node_is_empty(icoord->node))
1387 return 0;
1388 /* if insertion point is at the middle of the item, then paste */
1389 if (!coord_is_between_items(icoord))
1390 return 1;
1391 coord_dup(&circa, icoord);
1392 circa.between = AT_UNIT;
1394 old_iplug = item_plugin_by_coord(&circa);
1395 new_iplug = data->iplug;
1397 /* check whether we can paste to the item @icoord is "at" when we
1398 ignore ->between field */
1399 if (old_iplug == new_iplug && item_can_contain_key(&circa, key, data))
1400 result = 1;
1401 else if (icoord->between == BEFORE_UNIT
1402 || icoord->between == BEFORE_ITEM) {
1403 /* otherwise, try to glue to the item at the left, if any */
1404 coord_dup(&circa, icoord);
1405 if (coord_set_to_left(&circa)) {
1406 result = 0;
1407 coord_init_before_item(icoord);
1408 } else {
1409 old_iplug = item_plugin_by_coord(&circa);
1410 result = (old_iplug == new_iplug)
1411 && item_can_contain_key(icoord, key, data);
1412 if (result) {
1413 coord_dup(icoord, &circa);
1414 icoord->between = AFTER_UNIT;
1417 } else if (icoord->between == AFTER_UNIT
1418 || icoord->between == AFTER_ITEM) {
1419 coord_dup(&circa, icoord);
1420 /* otherwise, try to glue to the item at the right, if any */
1421 if (coord_set_to_right(&circa)) {
1422 result = 0;
1423 coord_init_after_item(icoord);
1424 } else {
1425 int (*cck) (const coord_t *, const reiser4_key *,
1426 const reiser4_item_data *);
1428 old_iplug = item_plugin_by_coord(&circa);
1430 cck = old_iplug->b.can_contain_key;
1431 if (cck == NULL)
1432 /* item doesn't define ->can_contain_key
1433 method? So it is not expandable. */
1434 result = 0;
1435 else {
1436 result = (old_iplug == new_iplug)
1437 && cck(&circa /*icoord */ , key, data);
1438 if (result) {
1439 coord_dup(icoord, &circa);
1440 icoord->between = BEFORE_UNIT;
1444 } else
1445 impossible("nikita-2513", "Nothing works");
1446 if (result) {
1447 if (icoord->between == BEFORE_ITEM) {
1448 assert("vs-912", icoord->unit_pos == 0);
1449 icoord->between = BEFORE_UNIT;
1450 } else if (icoord->between == AFTER_ITEM) {
1451 coord_init_after_item_end(icoord);
1454 return result;
1457 /* implements COP_PASTE operation
1459 Paste data into existing item. This is complicated by the fact that after
1460 we shifted something to the left or right neighbors trying to free some
1461 space, item we were supposed to paste into can be in different node than
1462 insertion coord. If so, we are no longer doing paste, but insert. See
1463 comments in insert_paste_common().
1466 static int carry_paste(carry_op * op /* operation to be performed */ ,
1467 carry_level * doing UNUSED_ARG /* current carry
1468 * level */ ,
1469 carry_level * todo/* next carry level */)
1471 znode *node;
1472 carry_insert_data cdata;
1473 coord_t dcoord;
1474 reiser4_item_data data;
1475 int result;
1476 int real_size;
1477 item_plugin *iplug;
1478 carry_plugin_info info;
1479 coord_t *coord;
1481 assert("nikita-982", op != NULL);
1482 assert("nikita-983", todo != NULL);
1483 assert("nikita-984", op->op == COP_PASTE);
1485 coord_init_zero(&dcoord);
1487 result = insert_paste_common(op, doing, todo, &cdata, &dcoord, &data);
1488 if (result != 0)
1489 return result;
1491 coord = op->u.insert.d->coord;
1493 /* handle case when op -> u.insert.coord doesn't point to the item
1494 of required type. restart as insert. */
1495 if (!can_paste(coord, op->u.insert.d->key, op->u.insert.d->data)) {
1496 op->op = COP_INSERT;
1497 op->u.insert.type = COPT_PASTE_RESTARTED;
1498 result = op_dispatch_table[COP_INSERT].handler(op, doing, todo);
1500 return result;
1503 node = coord->node;
1504 iplug = item_plugin_by_coord(coord);
1505 assert("nikita-992", iplug != NULL);
1507 assert("nikita-985", node != NULL);
1508 assert("nikita-986", node_plugin_by_node(node) != NULL);
1510 assert("nikita-987",
1511 space_needed_for_op(node, op) <= znode_free_space(node));
1513 assert("nikita-1286", coord_is_existing_item(coord));
1516 * if item is expanded as a result of this operation, we should first
1517 * change item size, than call ->b.paste item method. If item is
1518 * shrunk, it should be done other way around: first call ->b.paste
1519 * method, then reduce item size.
1522 real_size = space_needed_for_op(node, op);
1523 if (real_size > 0)
1524 node->nplug->change_item_size(coord, real_size);
1526 doing->restartable = 0;
1527 info.doing = doing;
1528 info.todo = todo;
1530 result = iplug->b.paste(coord, op->u.insert.d->data, &info);
1532 if (real_size < 0)
1533 node->nplug->change_item_size(coord, real_size);
1535 /* if we pasted at the beginning of the item, update item's key. */
1536 if (coord->unit_pos == 0 && coord->between != AFTER_UNIT)
1537 node->nplug->update_item_key(coord, op->u.insert.d->key, &info);
1539 znode_make_dirty(node);
1540 return result;
1543 /* handle carry COP_EXTENT operation. */
1544 static int carry_extent(carry_op * op /* operation to perform */ ,
1545 carry_level * doing /* queue of operations @op
1546 * is part of */ ,
1547 carry_level * todo /* queue where new operations
1548 * are accumulated */ )
1550 znode *node;
1551 carry_insert_data cdata;
1552 coord_t coord;
1553 reiser4_item_data data;
1554 carry_op *delete_dummy;
1555 carry_op *insert_extent;
1556 int result;
1557 carry_plugin_info info;
1559 assert("nikita-1751", op != NULL);
1560 assert("nikita-1752", todo != NULL);
1561 assert("nikita-1753", op->op == COP_EXTENT);
1563 /* extent insertion overview:
1565 extents live on the TWIG LEVEL, which is level one above the leaf
1566 one. This complicates extent insertion logic somewhat: it may
1567 happen (and going to happen all the time) that in logical key
1568 ordering extent has to be placed between items I1 and I2, located
1569 at the leaf level, but I1 and I2 are in the same formatted leaf
1570 node N1. To insert extent one has to
1572 (1) reach node N1 and shift data between N1, its neighbors and
1573 possibly newly allocated nodes until I1 and I2 fall into different
1574 nodes. Since I1 and I2 are still neighboring items in logical key
1575 order, they will be necessary utmost items in their respective
1576 nodes.
1578 (2) After this new extent item is inserted into node on the twig
1579 level.
1581 Fortunately this process can reuse almost all code from standard
1582 insertion procedure (viz. make_space() and insert_paste_common()),
1583 due to the following observation: make_space() only shifts data up
1584 to and excluding or including insertion point. It never
1585 "over-moves" through insertion point. Thus, one can use
1586 make_space() to perform step (1). All required for this is just to
1587 instruct free_space_shortage() to keep make_space() shifting data
1588 until insertion point is at the node border.
1592 /* perform common functionality of insert and paste. */
1593 result = insert_paste_common(op, doing, todo, &cdata, &coord, &data);
1594 if (result != 0)
1595 return result;
1597 node = op->u.extent.d->coord->node;
1598 assert("nikita-1754", node != NULL);
1599 assert("nikita-1755", node_plugin_by_node(node) != NULL);
1600 assert("nikita-1700", coord_wrt(op->u.extent.d->coord) != COORD_INSIDE);
1602 /* NOTE-NIKITA add some checks here. Not assertions, -EIO. Check that
1603 extent fits between items. */
1605 info.doing = doing;
1606 info.todo = todo;
1608 /* there is another complication due to placement of extents on the
1609 twig level: extents are "rigid" in the sense that key-range
1610 occupied by extent cannot grow indefinitely to the right as it is
1611 for the formatted leaf nodes. Because of this when search finds two
1612 adjacent extents on the twig level, it has to "drill" to the leaf
1613 level, creating new node. Here we are removing this node.
1615 if (node_is_empty(node)) {
1616 delete_dummy = node_post_carry(&info, COP_DELETE, node, 1);
1617 if (IS_ERR(delete_dummy))
1618 return PTR_ERR(delete_dummy);
1619 delete_dummy->u.delete.child = NULL;
1620 delete_dummy->u.delete.flags = DELETE_RETAIN_EMPTY;
1621 ZF_SET(node, JNODE_HEARD_BANSHEE);
1624 /* proceed with inserting extent item into parent. We are definitely
1625 inserting rather than pasting if we get that far. */
1626 insert_extent = node_post_carry(&info, COP_INSERT, node, 1);
1627 if (IS_ERR(insert_extent))
1628 /* @delete_dummy will be automatically destroyed on the level
1629 exiting */
1630 return PTR_ERR(insert_extent);
1631 /* NOTE-NIKITA insertion by key is simplest option here. Another
1632 possibility is to insert on the left or right of already existing
1633 item.
1635 insert_extent->u.insert.type = COPT_KEY;
1636 insert_extent->u.insert.d = op->u.extent.d;
1637 assert("nikita-1719", op->u.extent.d->key != NULL);
1638 insert_extent->u.insert.d->data->arg = op->u.extent.d->coord;
1639 insert_extent->u.insert.flags =
1640 znode_get_tree(node)->carry.new_extent_flags;
1643 * if carry was asked to track lock handle we should actually track
1644 * lock handle on the twig node rather than on the leaf where
1645 * operation was started from. Transfer tracked lock handle.
1647 if (doing->track_type) {
1648 assert("nikita-3242", doing->tracked != NULL);
1649 assert("nikita-3244", todo->tracked == NULL);
1650 todo->tracked = doing->tracked;
1651 todo->track_type = CARRY_TRACK_NODE;
1652 doing->tracked = NULL;
1653 doing->track_type = 0;
1656 return 0;
1659 /* update key in @parent between pointers to @left and @right.
1661 Find coords of @left and @right and update delimiting key between them.
1662 This is helper function called by carry_update(). Finds position of
1663 internal item involved. Updates item key. Updates delimiting keys of child
1664 nodes involved.
1666 static int update_delimiting_key(znode * parent /* node key is updated
1667 * in */ ,
1668 znode * left /* child of @parent */ ,
1669 znode * right /* child of @parent */ ,
1670 carry_level * doing /* current carry
1671 * level */ ,
1672 carry_level * todo /* parent carry
1673 * level */ ,
1674 const char **error_msg /* place to
1675 * store error
1676 * message */ )
1678 coord_t left_pos;
1679 coord_t right_pos;
1680 int result;
1681 reiser4_key ldkey;
1682 carry_plugin_info info;
1684 assert("nikita-1177", right != NULL);
1685 /* find position of right left child in a parent */
1686 result = find_child_ptr(parent, right, &right_pos);
1687 if (result != NS_FOUND) {
1688 *error_msg = "Cannot find position of right child";
1689 return result;
1692 if ((left != NULL) && !coord_is_leftmost_unit(&right_pos)) {
1693 /* find position of the left child in a parent */
1694 result = find_child_ptr(parent, left, &left_pos);
1695 if (result != NS_FOUND) {
1696 *error_msg = "Cannot find position of left child";
1697 return result;
1699 assert("nikita-1355", left_pos.node != NULL);
1700 } else
1701 left_pos.node = NULL;
1703 /* check that they are separated by exactly one key and are basically
1704 sane */
1705 if (REISER4_DEBUG) {
1706 if ((left_pos.node != NULL)
1707 && !coord_is_existing_unit(&left_pos)) {
1708 *error_msg = "Left child is bastard";
1709 return RETERR(-EIO);
1711 if (!coord_is_existing_unit(&right_pos)) {
1712 *error_msg = "Right child is bastard";
1713 return RETERR(-EIO);
1715 if (left_pos.node != NULL &&
1716 !coord_are_neighbors(&left_pos, &right_pos)) {
1717 *error_msg = "Children are not direct siblings";
1718 return RETERR(-EIO);
1721 *error_msg = NULL;
1723 info.doing = doing;
1724 info.todo = todo;
1727 * If child node is not empty, new key of internal item is a key of
1728 * leftmost item in the child node. If the child is empty, take its
1729 * right delimiting key as a new key of the internal item. Precise key
1730 * in the latter case is not important per se, because the child (and
1731 * the internal item) are going to be killed shortly anyway, but we
1732 * have to preserve correct order of keys in the parent node.
1735 if (!ZF_ISSET(right, JNODE_HEARD_BANSHEE))
1736 leftmost_key_in_node(right, &ldkey);
1737 else {
1738 read_lock_dk(znode_get_tree(parent));
1739 ldkey = *znode_get_rd_key(right);
1740 read_unlock_dk(znode_get_tree(parent));
1742 node_plugin_by_node(parent)->update_item_key(&right_pos, &ldkey, &info);
1743 doing->restartable = 0;
1744 znode_make_dirty(parent);
1745 return 0;
1748 /* implements COP_UPDATE opration
1750 Update delimiting keys.
1753 static int carry_update(carry_op * op /* operation to be performed */ ,
1754 carry_level * doing /* current carry level */ ,
1755 carry_level * todo/* next carry level */)
1757 int result;
1758 carry_node *missing UNUSED_ARG;
1759 znode *left;
1760 znode *right;
1761 carry_node *lchild;
1762 carry_node *rchild;
1763 const char *error_msg;
1764 reiser4_tree *tree;
1767 * This operation is called to update key of internal item. This is
1768 * necessary when carry shifted of cut data on the child
1769 * level. Arguments of this operation are:
1771 * @right --- child node. Operation should update key of internal
1772 * item pointing to @right.
1774 * @left --- left neighbor of @right. This parameter is optional.
1777 assert("nikita-902", op != NULL);
1778 assert("nikita-903", todo != NULL);
1779 assert("nikita-904", op->op == COP_UPDATE);
1781 lchild = op->u.update.left;
1782 rchild = op->node;
1784 if (lchild != NULL) {
1785 assert("nikita-1001", lchild->parent);
1786 assert("nikita-1003", !lchild->left);
1787 left = reiser4_carry_real(lchild);
1788 } else
1789 left = NULL;
1791 tree = znode_get_tree(rchild->node);
1792 read_lock_tree(tree);
1793 right = znode_parent(rchild->node);
1794 read_unlock_tree(tree);
1796 if (right != NULL) {
1797 result = update_delimiting_key(right,
1798 lchild ? lchild->node : NULL,
1799 rchild->node,
1800 doing, todo, &error_msg);
1801 } else {
1802 error_msg = "Cannot find node to update key in";
1803 result = RETERR(-EIO);
1805 /* operation will be reposted to the next level by the
1806 ->update_item_key() method of node plugin, if necessary. */
1808 if (result != 0) {
1809 warning("nikita-999", "Error updating delimiting key: %s (%i)",
1810 error_msg ? : "", result);
1812 return result;
1815 /* move items from @node during carry */
1816 static int carry_shift_data(sideof side /* in what direction to move data */ ,
1817 coord_t *insert_coord /* coord where new item
1818 * is to be inserted */,
1819 znode * node /* node which data are moved from */ ,
1820 carry_level * doing /* active carry queue */ ,
1821 carry_level * todo /* carry queue where new
1822 * operations are to be put
1823 * in */ ,
1824 unsigned int including_insert_coord_p
1825 /* true if @insertion_coord can be moved */ )
1827 int result;
1828 znode *source;
1829 carry_plugin_info info;
1830 node_plugin *nplug;
1832 source = insert_coord->node;
1834 info.doing = doing;
1835 info.todo = todo;
1837 nplug = node_plugin_by_node(node);
1838 result = nplug->shift(insert_coord, node,
1839 (side == LEFT_SIDE) ? SHIFT_LEFT : SHIFT_RIGHT, 0,
1840 (int)including_insert_coord_p, &info);
1841 /* the only error ->shift() method of node plugin can return is
1842 -ENOMEM due to carry node/operation allocation. */
1843 assert("nikita-915", result >= 0 || result == -ENOMEM);
1844 if (result > 0) {
1846 * if some number of bytes was actually shifted, mark nodes
1847 * dirty, and carry level as non-restartable.
1849 doing->restartable = 0;
1850 znode_make_dirty(source);
1851 znode_make_dirty(node);
1854 assert("nikita-2077", coord_check(insert_coord));
1855 return 0;
1858 typedef carry_node *(*carry_iterator) (carry_node * node);
1859 static carry_node *find_dir_carry(carry_node * node, carry_level * level,
1860 carry_iterator iterator);
1862 static carry_node *pool_level_list_prev(carry_node *node)
1864 return list_entry(node->header.level_linkage.prev, carry_node, header.level_linkage);
1867 /* look for the left neighbor of given carry node in a carry queue.
1869 This is used by find_left_neighbor(), but I am not sure that this
1870 really gives any advantage. More statistics required.
1873 carry_node *find_left_carry(carry_node * node /* node to find left neighbor
1874 * of */ ,
1875 carry_level * level/* level to scan */)
1877 return find_dir_carry(node, level,
1878 (carry_iterator) pool_level_list_prev);
1881 static carry_node *pool_level_list_next(carry_node *node)
1883 return list_entry(node->header.level_linkage.next, carry_node, header.level_linkage);
1886 /* look for the right neighbor of given carry node in a
1887 carry queue.
1889 This is used by find_right_neighbor(), but I am not sure that this
1890 really gives any advantage. More statistics required.
1893 carry_node *find_right_carry(carry_node * node /* node to find right neighbor
1894 * of */ ,
1895 carry_level * level/* level to scan */)
1897 return find_dir_carry(node, level,
1898 (carry_iterator) pool_level_list_next);
1901 /* look for the left or right neighbor of given carry node in a carry
1902 queue.
1904 Helper function used by find_{left|right}_carry().
1906 static carry_node *find_dir_carry(carry_node * node /* node to start
1907 * scanning from */ ,
1908 carry_level * level /* level to scan */ ,
1909 carry_iterator iterator /* operation to
1910 * move to the
1911 * next node */)
1913 carry_node *neighbor;
1915 assert("nikita-1059", node != NULL);
1916 assert("nikita-1060", level != NULL);
1918 /* scan list of carry nodes on this list dir-ward, skipping all
1919 carry nodes referencing the same znode. */
1920 neighbor = node;
1921 while (1) {
1922 neighbor = iterator(neighbor);
1923 if (carry_node_end(level, neighbor))
1924 /* list head is reached */
1925 return NULL;
1926 if (reiser4_carry_real(neighbor) != reiser4_carry_real(node))
1927 return neighbor;
1932 * Memory reservation estimation.
1934 * Carry process proceeds through tree levels upwards. Carry assumes that it
1935 * takes tree in consistent state (e.g., that search tree invariants hold),
1936 * and leaves tree consistent after it finishes. This means that when some
1937 * error occurs carry cannot simply return if there are pending carry
1938 * operations. Generic solution for this problem is carry-undo either as
1939 * transaction manager feature (requiring checkpoints and isolation), or
1940 * through some carry specific mechanism.
1942 * Our current approach is to panic if carry hits an error while tree is
1943 * inconsistent. Unfortunately -ENOMEM can easily be triggered. To work around
1944 * this "memory reservation" mechanism was added.
1946 * Memory reservation is implemented by perthread-pages.diff patch from
1947 * core-patches. Its API is defined in <linux/gfp.h>
1949 * int perthread_pages_reserve(int nrpages, gfp_t gfp);
1950 * void perthread_pages_release(int nrpages);
1951 * int perthread_pages_count(void);
1953 * carry estimates its worst case memory requirements at the entry, reserved
1954 * enough memory, and released unused pages before returning.
1956 * Code below estimates worst case memory requirements for a given carry
1957 * queue. This is dome by summing worst case memory requirements for each
1958 * operation in the queue.
1963 * Memory memory requirements of many operations depends on the tree
1964 * height. For example, item insertion requires new node to be inserted at
1965 * each tree level in the worst case. What tree height should be used for
1966 * estimation? Current tree height is wrong, because tree height can change
1967 * between the time when estimation was done and the time when operation is
1968 * actually performed. Maximal possible tree height (REISER4_MAX_ZTREE_HEIGHT)
1969 * is also not desirable, because it would lead to the huge over-estimation
1970 * all the time. Plausible solution is "capped tree height": if current tree
1971 * height is less than some TREE_HEIGHT_CAP constant, capped tree height is
1972 * TREE_HEIGHT_CAP, otherwise it's current tree height. Idea behind this is
1973 * that if tree height is TREE_HEIGHT_CAP or larger, it's extremely unlikely
1974 * to be increased even more during short interval of time.
1976 #define TREE_HEIGHT_CAP (5)
1978 /* return capped tree height for the @tree. See comment above. */
1979 static int cap_tree_height(reiser4_tree * tree)
1981 return max_t(int, tree->height, TREE_HEIGHT_CAP);
1984 /* return capped tree height for the current tree. */
1985 static int capped_height(void)
1987 return cap_tree_height(current_tree);
1990 /* return number of pages required to store given number of bytes */
1991 static int bytes_to_pages(int bytes)
1993 return (bytes + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
1996 /* how many pages are required to allocate znodes during item insertion. */
1997 static int carry_estimate_znodes(void)
2000 * Note, that there we have some problem here: there is no way to
2001 * reserve pages specifically for the given slab. This means that
2002 * these pages can be hijacked for some other end.
2005 /* in the worst case we need 3 new znode on each tree level */
2006 return bytes_to_pages(capped_height() * sizeof(znode) * 3);
2010 * how many pages are required to load bitmaps. One bitmap per level.
2012 static int carry_estimate_bitmaps(void)
2014 if (reiser4_is_set(reiser4_get_current_sb(), REISER4_DONT_LOAD_BITMAP)) {
2015 int bytes;
2017 bytes = capped_height() * (0 + /* bnode should be added, but
2018 * its is private to bitmap.c,
2019 * skip for now. */
2020 2 * sizeof(jnode));
2021 /* working and commit jnodes */
2022 return bytes_to_pages(bytes) + 2; /* and their contents */
2023 } else
2024 /* bitmaps were pre-loaded during mount */
2025 return 0;
2028 /* worst case item insertion memory requirements */
2029 static int carry_estimate_insert(carry_op * op, carry_level * level)
2031 return carry_estimate_bitmaps() + carry_estimate_znodes() + 1 +
2032 /* new atom */
2033 capped_height() + /* new block on each level */
2034 1 + /* and possibly extra new block at the leaf level */
2035 3; /* loading of leaves into memory */
2038 /* worst case item deletion memory requirements */
2039 static int carry_estimate_delete(carry_op * op, carry_level * level)
2041 return carry_estimate_bitmaps() + carry_estimate_znodes() + 1 +
2042 /* new atom */
2043 3; /* loading of leaves into memory */
2046 /* worst case tree cut memory requirements */
2047 static int carry_estimate_cut(carry_op * op, carry_level * level)
2049 return carry_estimate_bitmaps() + carry_estimate_znodes() + 1 +
2050 /* new atom */
2051 3; /* loading of leaves into memory */
2054 /* worst case memory requirements of pasting into item */
2055 static int carry_estimate_paste(carry_op * op, carry_level * level)
2057 return carry_estimate_bitmaps() + carry_estimate_znodes() + 1 +
2058 /* new atom */
2059 capped_height() + /* new block on each level */
2060 1 + /* and possibly extra new block at the leaf level */
2061 3; /* loading of leaves into memory */
2064 /* worst case memory requirements of extent insertion */
2065 static int carry_estimate_extent(carry_op * op, carry_level * level)
2067 return carry_estimate_insert(op, level) + /* insert extent */
2068 carry_estimate_delete(op, level); /* kill leaf */
2071 /* worst case memory requirements of key update */
2072 static int carry_estimate_update(carry_op * op, carry_level * level)
2074 return 0;
2077 /* worst case memory requirements of flow insertion */
2078 static int carry_estimate_insert_flow(carry_op * op, carry_level * level)
2080 int newnodes;
2082 newnodes = min(bytes_to_pages(op->u.insert_flow.flow->length),
2083 CARRY_FLOW_NEW_NODES_LIMIT);
2085 * roughly estimate insert_flow as a sequence of insertions.
2087 return newnodes * carry_estimate_insert(op, level);
2090 /* This is dispatch table for carry operations. It can be trivially
2091 abstracted into useful plugin: tunable balancing policy is a good
2092 thing. */
2093 carry_op_handler op_dispatch_table[COP_LAST_OP] = {
2094 [COP_INSERT] = {
2095 .handler = carry_insert,
2096 .estimate = carry_estimate_insert}
2098 [COP_DELETE] = {
2099 .handler = carry_delete,
2100 .estimate = carry_estimate_delete}
2102 [COP_CUT] = {
2103 .handler = carry_cut,
2104 .estimate = carry_estimate_cut}
2106 [COP_PASTE] = {
2107 .handler = carry_paste,
2108 .estimate = carry_estimate_paste}
2110 [COP_EXTENT] = {
2111 .handler = carry_extent,
2112 .estimate = carry_estimate_extent}
2114 [COP_UPDATE] = {
2115 .handler = carry_update,
2116 .estimate = carry_estimate_update}
2118 [COP_INSERT_FLOW] = {
2119 .handler = carry_insert_flow,
2120 .estimate = carry_estimate_insert_flow}
2123 /* Make Linus happy.
2124 Local variables:
2125 c-indentation-style: "K&R"
2126 mode-name: "LC"
2127 c-basic-offset: 8
2128 tab-width: 8
2129 fill-column: 120
2130 scroll-step: 1
2131 End: