revert-mm-fix-blkdev-size-calculation-in-generic_write_checks
[linux-2.6/linux-trees-mm.git] / fs / reiser4 / carry_ops.c
blob8ce8e9566e0ad785369d835a9b2abf44717c8bb6
1 /* Copyright 2001, 2002, 2003 by Hans Reiser, licensing governed by reiser4/README */
3 /* implementation of carry operations */
5 #include "forward.h"
6 #include "debug.h"
7 #include "key.h"
8 #include "coord.h"
9 #include "plugin/item/item.h"
10 #include "plugin/node/node.h"
11 #include "jnode.h"
12 #include "znode.h"
13 #include "block_alloc.h"
14 #include "tree_walk.h"
15 #include "pool.h"
16 #include "tree_mod.h"
17 #include "carry.h"
18 #include "carry_ops.h"
19 #include "tree.h"
20 #include "super.h"
21 #include "reiser4.h"
23 #include <linux/types.h>
24 #include <linux/err.h>
26 static int carry_shift_data(sideof side, coord_t * insert_coord, znode * node,
27 carry_level * doing, carry_level * todo,
28 unsigned int including_insert_coord_p);
30 extern int lock_carry_node(carry_level * level, carry_node * node);
31 extern int lock_carry_node_tail(carry_node * node);
33 /* find left neighbor of a carry node
35 Look for left neighbor of @node and add it to the @doing queue. See
36 comments in the body.
39 static carry_node *find_left_neighbor(carry_op * op /* node to find left
40 * neighbor of */ ,
41 carry_level * doing /* level to scan */ )
43 int result;
44 carry_node *node;
45 carry_node *left;
46 int flags;
47 reiser4_tree *tree;
49 node = op->node;
51 tree = current_tree;
52 read_lock_tree(tree);
53 /* first, check whether left neighbor is already in a @doing queue */
54 if (reiser4_carry_real(node)->left != NULL) {
55 /* NOTE: there is locking subtlety here. Look into
56 * find_right_neighbor() for more info */
57 if (find_carry_node(doing,
58 reiser4_carry_real(node)->left) != NULL) {
59 read_unlock_tree(tree);
60 left = node;
61 do {
62 left = list_entry(left->header.level_linkage.prev,
63 carry_node, header.level_linkage);
64 assert("nikita-3408", !carry_node_end(doing,
65 left));
66 } while (reiser4_carry_real(left) ==
67 reiser4_carry_real(node));
68 return left;
71 read_unlock_tree(tree);
73 left = reiser4_add_carry_skip(doing, POOLO_BEFORE, node);
74 if (IS_ERR(left))
75 return left;
77 left->node = node->node;
78 left->free = 1;
80 flags = GN_TRY_LOCK;
81 if (!op->u.insert.flags & COPI_LOAD_LEFT)
82 flags |= GN_NO_ALLOC;
84 /* then, feeling lucky, peek left neighbor in the cache. */
85 result = reiser4_get_left_neighbor(&left->lock_handle,
86 reiser4_carry_real(node),
87 ZNODE_WRITE_LOCK, flags);
88 if (result == 0) {
89 /* ok, node found and locked. */
90 result = lock_carry_node_tail(left);
91 if (result != 0)
92 left = ERR_PTR(result);
93 } else if (result == -E_NO_NEIGHBOR || result == -ENOENT) {
94 /* node is leftmost node in a tree, or neighbor wasn't in
95 cache, or there is an extent on the left. */
96 reiser4_pool_free(&doing->pool->node_pool, &left->header);
97 left = NULL;
98 } else if (doing->restartable) {
99 /* if left neighbor is locked, and level is restartable, add
100 new node to @doing and restart. */
101 assert("nikita-913", node->parent != 0);
102 assert("nikita-914", node->node != NULL);
103 left->left = 1;
104 left->free = 0;
105 left = ERR_PTR(-E_REPEAT);
106 } else {
107 /* left neighbor is locked, level cannot be restarted. Just
108 ignore left neighbor. */
109 reiser4_pool_free(&doing->pool->node_pool, &left->header);
110 left = NULL;
112 return left;
115 /* find right neighbor of a carry node
117 Look for right neighbor of @node and add it to the @doing queue. See
118 comments in the body.
121 static carry_node *find_right_neighbor(carry_op * op /* node to find right
122 * neighbor of */ ,
123 carry_level * doing /* level to scan */ )
125 int result;
126 carry_node *node;
127 carry_node *right;
128 lock_handle lh;
129 int flags;
130 reiser4_tree *tree;
132 init_lh(&lh);
134 node = op->node;
136 tree = current_tree;
137 read_lock_tree(tree);
138 /* first, check whether right neighbor is already in a @doing queue */
139 if (reiser4_carry_real(node)->right != NULL) {
141 * Tree lock is taken here anyway, because, even if _outcome_
142 * of (find_carry_node() != NULL) doesn't depends on
143 * concurrent updates to ->right, find_carry_node() cannot
144 * work with second argument NULL. Hence, following comment is
145 * of historic importance only.
147 * Subtle:
149 * Q: why don't we need tree lock here, looking for the right
150 * neighbor?
152 * A: even if value of node->real_node->right were changed
153 * during find_carry_node() execution, outcome of execution
154 * wouldn't change, because (in short) other thread cannot add
155 * elements to the @doing, and if node->real_node->right
156 * already was in @doing, value of node->real_node->right
157 * couldn't change, because node cannot be inserted between
158 * locked neighbors.
160 if (find_carry_node(doing,
161 reiser4_carry_real(node)->right) != NULL) {
162 read_unlock_tree(tree);
164 * What we are doing here (this is also applicable to
165 * the find_left_neighbor()).
167 * tree_walk.c code requires that insertion of a
168 * pointer to a child, modification of parent pointer
169 * in the child, and insertion of the child into
170 * sibling list are atomic (see
171 * plugin/item/internal.c:create_hook_internal()).
173 * carry allocates new node long before pointer to it
174 * is inserted into parent and, actually, long before
175 * parent is even known. Such allocated-but-orphaned
176 * nodes are only trackable through carry level lists.
178 * Situation that is handled here is following: @node
179 * has valid ->right pointer, but there is
180 * allocated-but-orphaned node in the carry queue that
181 * is logically between @node and @node->right. Here
182 * we are searching for it. Critical point is that
183 * this is only possible if @node->right is also in
184 * the carry queue (this is checked above), because
185 * this is the only way new orphaned node could be
186 * inserted between them (before inserting new node,
187 * make_space() first tries to shift to the right, so,
188 * right neighbor will be locked and queued).
191 right = node;
192 do {
193 right = list_entry(right->header.level_linkage.next,
194 carry_node, header.level_linkage);
195 assert("nikita-3408", !carry_node_end(doing,
196 right));
197 } while (reiser4_carry_real(right) ==
198 reiser4_carry_real(node));
199 return right;
202 read_unlock_tree(tree);
204 flags = GN_CAN_USE_UPPER_LEVELS;
205 if (!op->u.insert.flags & COPI_LOAD_RIGHT)
206 flags = GN_NO_ALLOC;
208 /* then, try to lock right neighbor */
209 init_lh(&lh);
210 result = reiser4_get_right_neighbor(&lh,
211 reiser4_carry_real(node),
212 ZNODE_WRITE_LOCK, flags);
213 if (result == 0) {
214 /* ok, node found and locked. */
215 right = reiser4_add_carry_skip(doing, POOLO_AFTER, node);
216 if (!IS_ERR(right)) {
217 right->node = lh.node;
218 move_lh(&right->lock_handle, &lh);
219 right->free = 1;
220 result = lock_carry_node_tail(right);
221 if (result != 0)
222 right = ERR_PTR(result);
224 } else if ((result == -E_NO_NEIGHBOR) || (result == -ENOENT)) {
225 /* node is rightmost node in a tree, or neighbor wasn't in
226 cache, or there is an extent on the right. */
227 right = NULL;
228 } else
229 right = ERR_PTR(result);
230 done_lh(&lh);
231 return right;
234 /* how much free space in a @node is needed for @op
236 How much space in @node is required for completion of @op, where @op is
237 insert or paste operation.
239 static unsigned int space_needed_for_op(znode * node /* znode data are
240 * inserted or
241 * pasted in */ ,
242 carry_op * op /* carry
243 operation */ )
245 assert("nikita-919", op != NULL);
247 switch (op->op) {
248 default:
249 impossible("nikita-1701", "Wrong opcode");
250 case COP_INSERT:
251 return space_needed(node, NULL, op->u.insert.d->data, 1);
252 case COP_PASTE:
253 return space_needed(node, op->u.insert.d->coord,
254 op->u.insert.d->data, 0);
258 /* how much space in @node is required to insert or paste @data at
259 @coord. */
260 unsigned int space_needed(const znode * node /* node data are inserted or
261 * pasted in */ ,
262 const coord_t * coord /* coord where data are
263 * inserted or pasted
264 * at */ ,
265 const reiser4_item_data * data /* data to insert or
266 * paste */ ,
267 int insertion /* non-0 is inserting, 0---paste */ )
269 int result;
270 item_plugin *iplug;
272 assert("nikita-917", node != NULL);
273 assert("nikita-918", node_plugin_by_node(node) != NULL);
274 assert("vs-230", !insertion || (coord == NULL));
276 result = 0;
277 iplug = data->iplug;
278 if (iplug->b.estimate != NULL) {
279 /* ask item plugin how much space is needed to insert this
280 item */
281 result += iplug->b.estimate(insertion ? NULL : coord, data);
282 } else {
283 /* reasonable default */
284 result += data->length;
286 if (insertion) {
287 node_plugin *nplug;
289 nplug = node->nplug;
290 /* and add node overhead */
291 if (nplug->item_overhead != NULL) {
292 result += nplug->item_overhead(node, NULL);
295 return result;
298 /* find &coord in parent where pointer to new child is to be stored. */
299 static int find_new_child_coord(carry_op * op /* COP_INSERT carry operation to
300 * insert pointer to new
301 * child */ )
303 int result;
304 znode *node;
305 znode *child;
307 assert("nikita-941", op != NULL);
308 assert("nikita-942", op->op == COP_INSERT);
310 node = reiser4_carry_real(op->node);
311 assert("nikita-943", node != NULL);
312 assert("nikita-944", node_plugin_by_node(node) != NULL);
314 child = reiser4_carry_real(op->u.insert.child);
315 result =
316 find_new_child_ptr(node, child, op->u.insert.brother,
317 op->u.insert.d->coord);
319 build_child_ptr_data(child, op->u.insert.d->data);
320 return result;
323 /* additional amount of free space in @node required to complete @op */
324 static int free_space_shortage(znode * node /* node to check */ ,
325 carry_op * op /* operation being performed */ )
327 assert("nikita-1061", node != NULL);
328 assert("nikita-1062", op != NULL);
330 switch (op->op) {
331 default:
332 impossible("nikita-1702", "Wrong opcode");
333 case COP_INSERT:
334 case COP_PASTE:
335 return space_needed_for_op(node, op) - znode_free_space(node);
336 case COP_EXTENT:
337 /* when inserting extent shift data around until insertion
338 point is utmost in the node. */
339 if (coord_wrt(op->u.insert.d->coord) == COORD_INSIDE)
340 return +1;
341 else
342 return -1;
346 /* helper function: update node pointer in operation after insertion
347 point was probably shifted into @target. */
348 static znode *sync_op(carry_op * op, carry_node * target)
350 znode *insertion_node;
352 /* reget node from coord: shift might move insertion coord to
353 the neighbor */
354 insertion_node = op->u.insert.d->coord->node;
355 /* if insertion point was actually moved into new node,
356 update carry node pointer in operation. */
357 if (insertion_node != reiser4_carry_real(op->node)) {
358 op->node = target;
359 assert("nikita-2540",
360 reiser4_carry_real(target) == insertion_node);
362 assert("nikita-2541",
363 reiser4_carry_real(op->node) == op->u.insert.d->coord->node);
364 return insertion_node;
368 * complete make_space() call: update tracked lock handle if necessary. See
369 * comments for fs/reiser4/carry.h:carry_track_type
371 static int
372 make_space_tail(carry_op * op, carry_level * doing, znode * orig_node)
374 int result;
375 carry_track_type tracking;
376 znode *node;
378 tracking = doing->track_type;
379 node = op->u.insert.d->coord->node;
381 if (tracking == CARRY_TRACK_NODE ||
382 (tracking == CARRY_TRACK_CHANGE && node != orig_node)) {
383 /* inserting or pasting into node different from
384 original. Update lock handle supplied by caller. */
385 assert("nikita-1417", doing->tracked != NULL);
386 done_lh(doing->tracked);
387 init_lh(doing->tracked);
388 result = longterm_lock_znode(doing->tracked, node,
389 ZNODE_WRITE_LOCK,
390 ZNODE_LOCK_HIPRI);
391 } else
392 result = 0;
393 return result;
396 /* This is insertion policy function. It shifts data to the left and right
397 neighbors of insertion coord and allocates new nodes until there is enough
398 free space to complete @op.
400 See comments in the body.
402 Assumes that the node format favors insertions at the right end of the node
403 as node40 does.
405 See carry_flow() on detail about flow insertion
407 static int make_space(carry_op * op /* carry operation, insert or paste */ ,
408 carry_level * doing /* current carry queue */ ,
409 carry_level * todo /* carry queue on the parent level */ )
411 znode *node;
412 int result;
413 int not_enough_space;
414 int blk_alloc;
415 znode *orig_node;
416 __u32 flags;
418 coord_t *coord;
420 assert("nikita-890", op != NULL);
421 assert("nikita-891", todo != NULL);
422 assert("nikita-892",
423 op->op == COP_INSERT ||
424 op->op == COP_PASTE || op->op == COP_EXTENT);
425 assert("nikita-1607",
426 reiser4_carry_real(op->node) == op->u.insert.d->coord->node);
428 flags = op->u.insert.flags;
430 /* NOTE check that new node can only be allocated after checking left
431 * and right neighbors. This is necessary for proper work of
432 * find_{left,right}_neighbor(). */
433 assert("nikita-3410", ergo(flags & COPI_DONT_ALLOCATE,
434 flags & COPI_DONT_SHIFT_LEFT));
435 assert("nikita-3411", ergo(flags & COPI_DONT_ALLOCATE,
436 flags & COPI_DONT_SHIFT_RIGHT));
438 coord = op->u.insert.d->coord;
439 orig_node = node = coord->node;
441 assert("nikita-908", node != NULL);
442 assert("nikita-909", node_plugin_by_node(node) != NULL);
444 result = 0;
445 /* If there is not enough space in a node, try to shift something to
446 the left neighbor. This is a bit tricky, as locking to the left is
447 low priority. This is handled by restart logic in carry().
449 not_enough_space = free_space_shortage(node, op);
450 if (not_enough_space <= 0)
451 /* it is possible that carry was called when there actually
452 was enough space in the node. For example, when inserting
453 leftmost item so that delimiting keys have to be updated.
455 return make_space_tail(op, doing, orig_node);
456 if (!(flags & COPI_DONT_SHIFT_LEFT)) {
457 carry_node *left;
458 /* make note in statistics of an attempt to move
459 something into the left neighbor */
460 left = find_left_neighbor(op, doing);
461 if (unlikely(IS_ERR(left))) {
462 if (PTR_ERR(left) == -E_REPEAT)
463 return -E_REPEAT;
464 else {
465 /* some error other than restart request
466 occurred. This shouldn't happen. Issue a
467 warning and continue as if left neighbor
468 weren't existing.
470 warning("nikita-924",
471 "Error accessing left neighbor: %li",
472 PTR_ERR(left));
474 } else if (left != NULL) {
476 /* shift everything possible on the left of and
477 including insertion coord into the left neighbor */
478 result = carry_shift_data(LEFT_SIDE, coord,
479 reiser4_carry_real(left),
480 doing, todo,
481 flags & COPI_GO_LEFT);
483 /* reget node from coord: shift_left() might move
484 insertion coord to the left neighbor */
485 node = sync_op(op, left);
487 not_enough_space = free_space_shortage(node, op);
488 /* There is not enough free space in @node, but
489 may be, there is enough free space in
490 @left. Various balancing decisions are valid here.
491 The same for the shifiting to the right.
495 /* If there still is not enough space, shift to the right */
496 if (not_enough_space > 0 && !(flags & COPI_DONT_SHIFT_RIGHT)) {
497 carry_node *right;
499 right = find_right_neighbor(op, doing);
500 if (IS_ERR(right)) {
501 warning("nikita-1065",
502 "Error accessing right neighbor: %li",
503 PTR_ERR(right));
504 } else if (right != NULL) {
505 /* node containing insertion point, and its right
506 neighbor node are write locked by now.
508 shift everything possible on the right of but
509 excluding insertion coord into the right neighbor
511 result = carry_shift_data(RIGHT_SIDE, coord,
512 reiser4_carry_real(right),
513 doing, todo,
514 flags & COPI_GO_RIGHT);
515 /* reget node from coord: shift_right() might move
516 insertion coord to the right neighbor */
517 node = sync_op(op, right);
518 not_enough_space = free_space_shortage(node, op);
521 /* If there is still not enough space, allocate new node(s).
523 We try to allocate new blocks if COPI_DONT_ALLOCATE is not set in
524 the carry operation flags (currently this is needed during flush
525 only).
527 for (blk_alloc = 0;
528 not_enough_space > 0 && result == 0 && blk_alloc < 2 &&
529 !(flags & COPI_DONT_ALLOCATE); ++blk_alloc) {
530 carry_node *fresh; /* new node we are allocating */
531 coord_t coord_shadow; /* remembered insertion point before
532 * shifting data into new node */
533 carry_node *node_shadow; /* remembered insertion node before
534 * shifting */
535 unsigned int gointo; /* whether insertion point should move
536 * into newly allocated node */
538 /* allocate new node on the right of @node. Znode and disk
539 fake block number for new node are allocated.
541 add_new_znode() posts carry operation COP_INSERT with
542 COPT_CHILD option to the parent level to add
543 pointer to newly created node to its parent.
545 Subtle point: if several new nodes are required to complete
546 insertion operation at this level, they will be inserted
547 into their parents in the order of creation, which means
548 that @node will be valid "cookie" at the time of insertion.
551 fresh = add_new_znode(node, op->node, doing, todo);
552 if (IS_ERR(fresh))
553 return PTR_ERR(fresh);
555 /* Try to shift into new node. */
556 result = lock_carry_node(doing, fresh);
557 zput(reiser4_carry_real(fresh));
558 if (result != 0) {
559 warning("nikita-947",
560 "Cannot lock new node: %i", result);
561 return result;
564 /* both nodes are write locked by now.
566 shift everything possible on the right of and
567 including insertion coord into the right neighbor.
569 coord_dup(&coord_shadow, op->u.insert.d->coord);
570 node_shadow = op->node;
571 /* move insertion point into newly created node if:
573 . insertion point is rightmost in the source node, or
574 . this is not the first node we are allocating in a row.
576 gointo =
577 (blk_alloc > 0) ||
578 coord_is_after_rightmost(op->u.insert.d->coord);
580 if (gointo &&
581 op->op == COP_PASTE &&
582 coord_is_existing_item(op->u.insert.d->coord) &&
583 is_solid_item((item_plugin_by_coord(op->u.insert.d->coord)))) {
584 /* paste into solid (atomic) item, which can contain
585 only one unit, so we need to shift it right, where
586 insertion point supposed to be */
588 assert("edward-1444", op->u.insert.d->data->iplug ==
589 item_plugin_by_id(STATIC_STAT_DATA_ID));
590 assert("edward-1445",
591 op->u.insert.d->data->length >
592 node_plugin_by_node(coord->node)->free_space
593 (coord->node));
595 op->u.insert.d->coord->between = BEFORE_UNIT;
598 result = carry_shift_data(RIGHT_SIDE, coord,
599 reiser4_carry_real(fresh),
600 doing, todo, gointo);
601 /* if insertion point was actually moved into new node,
602 update carry node pointer in operation. */
603 node = sync_op(op, fresh);
604 not_enough_space = free_space_shortage(node, op);
605 if ((not_enough_space > 0) && (node != coord_shadow.node)) {
606 /* there is not enough free in new node. Shift
607 insertion point back to the @shadow_node so that
608 next new node would be inserted between
609 @shadow_node and @fresh.
611 coord_normalize(&coord_shadow);
612 coord_dup(coord, &coord_shadow);
613 node = coord->node;
614 op->node = node_shadow;
615 if (1 || (flags & COPI_STEP_BACK)) {
616 /* still not enough space?! Maybe there is
617 enough space in the source node (i.e., node
618 data are moved from) now.
620 not_enough_space =
621 free_space_shortage(node, op);
625 if (not_enough_space > 0) {
626 if (!(flags & COPI_DONT_ALLOCATE))
627 warning("nikita-948", "Cannot insert new item");
628 result = -E_NODE_FULL;
630 assert("nikita-1622", ergo(result == 0,
631 reiser4_carry_real(op->node) == coord->node));
632 assert("nikita-2616", coord == op->u.insert.d->coord);
633 if (result == 0)
634 result = make_space_tail(op, doing, orig_node);
635 return result;
638 /* insert_paste_common() - common part of insert and paste operations
640 This function performs common part of COP_INSERT and COP_PASTE.
642 There are two ways in which insertion/paste can be requested:
644 . by directly supplying reiser4_item_data. In this case, op ->
645 u.insert.type is set to COPT_ITEM_DATA.
647 . by supplying child pointer to which is to inserted into parent. In this
648 case op -> u.insert.type == COPT_CHILD.
650 . by supplying key of new item/unit. This is currently only used during
651 extent insertion
653 This is required, because when new node is allocated we don't know at what
654 position pointer to it is to be stored in the parent. Actually, we don't
655 even know what its parent will be, because parent can be re-balanced
656 concurrently and new node re-parented, and because parent can be full and
657 pointer to the new node will go into some other node.
659 insert_paste_common() resolves pointer to child node into position in the
660 parent by calling find_new_child_coord(), that fills
661 reiser4_item_data. After this, insertion/paste proceeds uniformly.
663 Another complication is with finding free space during pasting. It may
664 happen that while shifting items to the neighbors and newly allocated
665 nodes, insertion coord can no longer be in the item we wanted to paste
666 into. At this point, paste becomes (morphs) into insert. Moreover free
667 space analysis has to be repeated, because amount of space required for
668 insertion is different from that of paste (item header overhead, etc).
670 This function "unifies" different insertion modes (by resolving child
671 pointer or key into insertion coord), and then calls make_space() to free
672 enough space in the node by shifting data to the left and right and by
673 allocating new nodes if necessary. Carry operation knows amount of space
674 required for its completion. After enough free space is obtained, caller of
675 this function (carry_{insert,paste,etc.}) performs actual insertion/paste
676 by calling item plugin method.
679 static int insert_paste_common(carry_op * op /* carry operation being
680 * performed */ ,
681 carry_level * doing /* current carry level */ ,
682 carry_level * todo /* next carry level */ ,
683 carry_insert_data * cdata /* pointer to
684 * cdata */ ,
685 coord_t * coord /* insertion/paste coord */ ,
686 reiser4_item_data * data /* data to be
687 * inserted/pasted */ )
689 assert("nikita-981", op != NULL);
690 assert("nikita-980", todo != NULL);
691 assert("nikita-979", (op->op == COP_INSERT) || (op->op == COP_PASTE)
692 || (op->op == COP_EXTENT));
694 if (op->u.insert.type == COPT_PASTE_RESTARTED) {
695 /* nothing to do. Fall through to make_space(). */
697 } else if (op->u.insert.type == COPT_KEY) {
698 node_search_result intra_node;
699 znode *node;
700 /* Problem with doing batching at the lowest level, is that
701 operations here are given by coords where modification is
702 to be performed, and one modification can invalidate coords
703 of all following operations.
705 So, we are implementing yet another type for operation that
706 will use (the only) "locator" stable across shifting of
707 data between nodes, etc.: key (COPT_KEY).
709 This clause resolves key to the coord in the node.
711 But node can change also. Probably some pieces have to be
712 added to the lock_carry_node(), to lock node by its key.
715 /* NOTE-NIKITA Lookup bias is fixed to FIND_EXACT. Complain
716 if you need something else. */
717 op->u.insert.d->coord = coord;
718 node = reiser4_carry_real(op->node);
719 intra_node = node_plugin_by_node(node)->lookup
720 (node, op->u.insert.d->key, FIND_EXACT,
721 op->u.insert.d->coord);
722 if ((intra_node != NS_FOUND) && (intra_node != NS_NOT_FOUND)) {
723 warning("nikita-1715", "Intra node lookup failure: %i",
724 intra_node);
725 return intra_node;
727 } else if (op->u.insert.type == COPT_CHILD) {
728 /* if we are asked to insert pointer to the child into
729 internal node, first convert pointer to the child into
730 coord within parent node.
732 znode *child;
733 int result;
735 op->u.insert.d = cdata;
736 op->u.insert.d->coord = coord;
737 op->u.insert.d->data = data;
738 op->u.insert.d->coord->node = reiser4_carry_real(op->node);
739 result = find_new_child_coord(op);
740 child = reiser4_carry_real(op->u.insert.child);
741 if (result != NS_NOT_FOUND) {
742 warning("nikita-993",
743 "Cannot find a place for child pointer: %i",
744 result);
745 return result;
747 /* This only happens when we did multiple insertions at
748 the previous level, trying to insert single item and
749 it so happened, that insertion of pointers to all new
750 nodes before this one already caused parent node to
751 split (may be several times).
753 I am going to come up with better solution.
755 You are not expected to understand this.
756 -- v6root/usr/sys/ken/slp.c
758 Basically, what happens here is the following: carry came
759 to the parent level and is about to insert internal item
760 pointing to the child node that it just inserted in the
761 level below. Position where internal item is to be inserted
762 was found by find_new_child_coord() above, but node of the
763 current carry operation (that is, parent node of child
764 inserted on the previous level), was determined earlier in
765 the lock_carry_level/lock_carry_node. It could so happen
766 that other carry operations already performed on the parent
767 level already split parent node, so that insertion point
768 moved into another node. Handle this by creating new carry
769 node for insertion point if necessary.
771 if (reiser4_carry_real(op->node) !=
772 op->u.insert.d->coord->node) {
773 pool_ordering direction;
774 znode *z1;
775 znode *z2;
776 reiser4_key k1;
777 reiser4_key k2;
780 * determine in what direction insertion point
781 * moved. Do this by comparing delimiting keys.
783 z1 = op->u.insert.d->coord->node;
784 z2 = reiser4_carry_real(op->node);
785 if (keyle(leftmost_key_in_node(z1, &k1),
786 leftmost_key_in_node(z2, &k2)))
787 /* insertion point moved to the left */
788 direction = POOLO_BEFORE;
789 else
790 /* insertion point moved to the right */
791 direction = POOLO_AFTER;
793 op->node = reiser4_add_carry_skip(doing,
794 direction, op->node);
795 if (IS_ERR(op->node))
796 return PTR_ERR(op->node);
797 op->node->node = op->u.insert.d->coord->node;
798 op->node->free = 1;
799 result = lock_carry_node(doing, op->node);
800 if (result != 0)
801 return result;
805 * set up key of an item being inserted: we are inserting
806 * internal item and its key is (by the very definition of
807 * search tree) is leftmost key in the child node.
809 write_lock_dk(znode_get_tree(child));
810 op->u.insert.d->key = leftmost_key_in_node(child,
811 znode_get_ld_key(child));
812 write_unlock_dk(znode_get_tree(child));
813 op->u.insert.d->data->arg = op->u.insert.brother;
814 } else {
815 assert("vs-243", op->u.insert.d->coord != NULL);
816 op->u.insert.d->coord->node = reiser4_carry_real(op->node);
819 /* find free space. */
820 return make_space(op, doing, todo);
823 /* handle carry COP_INSERT operation.
825 Insert new item into node. New item can be given in one of two ways:
827 - by passing &tree_coord and &reiser4_item_data as part of @op. This is
828 only applicable at the leaf/twig level.
830 - by passing a child node pointer to which is to be inserted by this
831 operation.
834 static int carry_insert(carry_op * op /* operation to perform */ ,
835 carry_level * doing /* queue of operations @op
836 * is part of */ ,
837 carry_level * todo /* queue where new operations
838 * are accumulated */ )
840 znode *node;
841 carry_insert_data cdata;
842 coord_t coord;
843 reiser4_item_data data;
844 carry_plugin_info info;
845 int result;
847 assert("nikita-1036", op != NULL);
848 assert("nikita-1037", todo != NULL);
849 assert("nikita-1038", op->op == COP_INSERT);
851 coord_init_zero(&coord);
853 /* perform common functionality of insert and paste. */
854 result = insert_paste_common(op, doing, todo, &cdata, &coord, &data);
855 if (result != 0)
856 return result;
858 node = op->u.insert.d->coord->node;
859 assert("nikita-1039", node != NULL);
860 assert("nikita-1040", node_plugin_by_node(node) != NULL);
862 assert("nikita-949",
863 space_needed_for_op(node, op) <= znode_free_space(node));
865 /* ask node layout to create new item. */
866 info.doing = doing;
867 info.todo = todo;
868 result = node_plugin_by_node(node)->create_item
869 (op->u.insert.d->coord, op->u.insert.d->key, op->u.insert.d->data,
870 &info);
871 doing->restartable = 0;
872 znode_make_dirty(node);
874 return result;
878 * Flow insertion code. COP_INSERT_FLOW is special tree operation that is
879 * supplied with a "flow" (that is, a stream of data) and inserts it into tree
880 * by slicing into multiple items.
883 #define flow_insert_point(op) ( ( op ) -> u.insert_flow.insert_point )
884 #define flow_insert_flow(op) ( ( op ) -> u.insert_flow.flow )
885 #define flow_insert_data(op) ( ( op ) -> u.insert_flow.data )
887 static size_t item_data_overhead(carry_op * op)
889 if (flow_insert_data(op)->iplug->b.estimate == NULL)
890 return 0;
891 return (flow_insert_data(op)->iplug->b.
892 estimate(NULL /* estimate insertion */ , flow_insert_data(op)) -
893 flow_insert_data(op)->length);
896 /* FIXME-VS: this is called several times during one make_flow_for_insertion
897 and it will always return the same result. Some optimization could be made
898 by calculating this value once at the beginning and passing it around. That
899 would reduce some flexibility in future changes
901 static int can_paste(coord_t *, const reiser4_key *, const reiser4_item_data *);
902 static size_t flow_insertion_overhead(carry_op * op)
904 znode *node;
905 size_t insertion_overhead;
907 node = flow_insert_point(op)->node;
908 insertion_overhead = 0;
909 if (node->nplug->item_overhead &&
910 !can_paste(flow_insert_point(op), &flow_insert_flow(op)->key,
911 flow_insert_data(op)))
912 insertion_overhead =
913 node->nplug->item_overhead(node, NULL) +
914 item_data_overhead(op);
915 return insertion_overhead;
918 /* how many bytes of flow does fit to the node */
919 static int what_can_fit_into_node(carry_op * op)
921 size_t free, overhead;
923 overhead = flow_insertion_overhead(op);
924 free = znode_free_space(flow_insert_point(op)->node);
925 if (free <= overhead)
926 return 0;
927 free -= overhead;
928 /* FIXME: flow->length is loff_t only to not get overflowed in case of expandign truncate */
929 if (free < op->u.insert_flow.flow->length)
930 return free;
931 return (int)op->u.insert_flow.flow->length;
934 /* in make_space_for_flow_insertion we need to check either whether whole flow
935 fits into a node or whether minimal fraction of flow fits into a node */
936 static int enough_space_for_whole_flow(carry_op * op)
938 return (unsigned)what_can_fit_into_node(op) ==
939 op->u.insert_flow.flow->length;
942 #define MIN_FLOW_FRACTION 1
943 static int enough_space_for_min_flow_fraction(carry_op * op)
945 assert("vs-902", coord_is_after_rightmost(flow_insert_point(op)));
947 return what_can_fit_into_node(op) >= MIN_FLOW_FRACTION;
950 /* this returns 0 if left neighbor was obtained successfully and everything
951 upto insertion point including it were shifted and left neighbor still has
952 some free space to put minimal fraction of flow into it */
953 static int
954 make_space_by_shift_left(carry_op * op, carry_level * doing, carry_level * todo)
956 carry_node *left;
957 znode *orig;
959 left = find_left_neighbor(op, doing);
960 if (unlikely(IS_ERR(left))) {
961 warning("vs-899",
962 "make_space_by_shift_left: "
963 "error accessing left neighbor: %li", PTR_ERR(left));
964 return 1;
966 if (left == NULL)
967 /* left neighbor either does not exist or is unformatted
968 node */
969 return 1;
971 orig = flow_insert_point(op)->node;
972 /* try to shift content of node @orig from its head upto insert point
973 including insertion point into the left neighbor */
974 carry_shift_data(LEFT_SIDE, flow_insert_point(op),
975 reiser4_carry_real(left), doing, todo,
976 1 /* including insert point */);
977 if (reiser4_carry_real(left) != flow_insert_point(op)->node) {
978 /* insertion point did not move */
979 return 1;
982 /* insertion point is set after last item in the node */
983 assert("vs-900", coord_is_after_rightmost(flow_insert_point(op)));
985 if (!enough_space_for_min_flow_fraction(op)) {
986 /* insertion point node does not have enough free space to put
987 even minimal portion of flow into it, therefore, move
988 insertion point back to orig node (before first item) */
989 coord_init_before_first_item(flow_insert_point(op), orig);
990 return 1;
993 /* part of flow is to be written to the end of node */
994 op->node = left;
995 return 0;
998 /* this returns 0 if right neighbor was obtained successfully and everything to
999 the right of insertion point was shifted to it and node got enough free
1000 space to put minimal fraction of flow into it */
1001 static int
1002 make_space_by_shift_right(carry_op * op, carry_level * doing,
1003 carry_level * todo)
1005 carry_node *right;
1007 right = find_right_neighbor(op, doing);
1008 if (unlikely(IS_ERR(right))) {
1009 warning("nikita-1065", "shift_right_excluding_insert_point: "
1010 "error accessing right neighbor: %li", PTR_ERR(right));
1011 return 1;
1013 if (right) {
1014 /* shift everything possible on the right of but excluding
1015 insertion coord into the right neighbor */
1016 carry_shift_data(RIGHT_SIDE, flow_insert_point(op),
1017 reiser4_carry_real(right), doing, todo,
1018 0 /* not including insert point */);
1019 } else {
1020 /* right neighbor either does not exist or is unformatted
1021 node */
1024 if (coord_is_after_rightmost(flow_insert_point(op))) {
1025 if (enough_space_for_min_flow_fraction(op)) {
1026 /* part of flow is to be written to the end of node */
1027 return 0;
1031 /* new node is to be added if insert point node did not get enough
1032 space for whole flow */
1033 return 1;
1036 /* this returns 0 when insert coord is set at the node end and fraction of flow
1037 fits into that node */
1038 static int
1039 make_space_by_new_nodes(carry_op * op, carry_level * doing, carry_level * todo)
1041 int result;
1042 znode *node;
1043 carry_node *new;
1045 node = flow_insert_point(op)->node;
1047 if (op->u.insert_flow.new_nodes == CARRY_FLOW_NEW_NODES_LIMIT)
1048 return RETERR(-E_NODE_FULL);
1049 /* add new node after insert point node */
1050 new = add_new_znode(node, op->node, doing, todo);
1051 if (unlikely(IS_ERR(new))) {
1052 return PTR_ERR(new);
1054 result = lock_carry_node(doing, new);
1055 zput(reiser4_carry_real(new));
1056 if (unlikely(result)) {
1057 return result;
1059 op->u.insert_flow.new_nodes++;
1060 if (!coord_is_after_rightmost(flow_insert_point(op))) {
1061 carry_shift_data(RIGHT_SIDE, flow_insert_point(op),
1062 reiser4_carry_real(new), doing, todo,
1063 0 /* not including insert point */);
1064 assert("vs-901",
1065 coord_is_after_rightmost(flow_insert_point(op)));
1067 if (enough_space_for_min_flow_fraction(op)) {
1068 return 0;
1070 if (op->u.insert_flow.new_nodes == CARRY_FLOW_NEW_NODES_LIMIT)
1071 return RETERR(-E_NODE_FULL);
1073 /* add one more new node */
1074 new = add_new_znode(node, op->node, doing, todo);
1075 if (unlikely(IS_ERR(new))) {
1076 return PTR_ERR(new);
1078 result = lock_carry_node(doing, new);
1079 zput(reiser4_carry_real(new));
1080 if (unlikely(result)) {
1081 return result;
1083 op->u.insert_flow.new_nodes++;
1086 /* move insertion point to new node */
1087 coord_init_before_first_item(flow_insert_point(op),
1088 reiser4_carry_real(new));
1089 op->node = new;
1090 return 0;
1093 static int
1094 make_space_for_flow_insertion(carry_op * op, carry_level * doing,
1095 carry_level * todo)
1097 __u32 flags = op->u.insert_flow.flags;
1099 if (enough_space_for_whole_flow(op)) {
1100 /* whole flow fits into insert point node */
1101 return 0;
1104 if (!(flags & COPI_DONT_SHIFT_LEFT)
1105 && (make_space_by_shift_left(op, doing, todo) == 0)) {
1106 /* insert point is shifted to left neighbor of original insert
1107 point node and is set after last unit in that node. It has
1108 enough space to fit at least minimal fraction of flow. */
1109 return 0;
1112 if (enough_space_for_whole_flow(op)) {
1113 /* whole flow fits into insert point node */
1114 return 0;
1117 if (!(flags & COPI_DONT_SHIFT_RIGHT)
1118 && (make_space_by_shift_right(op, doing, todo) == 0)) {
1119 /* insert point is still set to the same node, but there is
1120 nothing to the right of insert point. */
1121 return 0;
1124 if (enough_space_for_whole_flow(op)) {
1125 /* whole flow fits into insert point node */
1126 return 0;
1129 return make_space_by_new_nodes(op, doing, todo);
1132 /* implements COP_INSERT_FLOW operation */
1133 static int
1134 carry_insert_flow(carry_op * op, carry_level * doing, carry_level * todo)
1136 int result;
1137 flow_t *f;
1138 coord_t *insert_point;
1139 node_plugin *nplug;
1140 carry_plugin_info info;
1141 znode *orig_node;
1142 lock_handle *orig_lh;
1144 f = op->u.insert_flow.flow;
1145 result = 0;
1147 /* carry system needs this to work */
1148 info.doing = doing;
1149 info.todo = todo;
1151 orig_node = flow_insert_point(op)->node;
1152 orig_lh = doing->tracked;
1154 while (f->length) {
1155 result = make_space_for_flow_insertion(op, doing, todo);
1156 if (result)
1157 break;
1159 insert_point = flow_insert_point(op);
1160 nplug = node_plugin_by_node(insert_point->node);
1162 /* compose item data for insertion/pasting */
1163 flow_insert_data(op)->data = f->data;
1164 flow_insert_data(op)->length = what_can_fit_into_node(op);
1166 if (can_paste(insert_point, &f->key, flow_insert_data(op))) {
1167 /* insert point is set to item of file we are writing to and we have to append to it */
1168 assert("vs-903", insert_point->between == AFTER_UNIT);
1169 nplug->change_item_size(insert_point,
1170 flow_insert_data(op)->length);
1171 flow_insert_data(op)->iplug->b.paste(insert_point,
1172 flow_insert_data
1173 (op), &info);
1174 } else {
1175 /* new item must be inserted */
1176 pos_in_node_t new_pos;
1177 flow_insert_data(op)->length += item_data_overhead(op);
1179 /* FIXME-VS: this is because node40_create_item changes
1180 insert_point for obscure reasons */
1181 switch (insert_point->between) {
1182 case AFTER_ITEM:
1183 new_pos = insert_point->item_pos + 1;
1184 break;
1185 case EMPTY_NODE:
1186 new_pos = 0;
1187 break;
1188 case BEFORE_ITEM:
1189 assert("vs-905", insert_point->item_pos == 0);
1190 new_pos = 0;
1191 break;
1192 default:
1193 impossible("vs-906",
1194 "carry_insert_flow: invalid coord");
1195 new_pos = 0;
1196 break;
1199 nplug->create_item(insert_point, &f->key,
1200 flow_insert_data(op), &info);
1201 coord_set_item_pos(insert_point, new_pos);
1203 coord_init_after_item_end(insert_point);
1204 doing->restartable = 0;
1205 znode_make_dirty(insert_point->node);
1207 move_flow_forward(f, (unsigned)flow_insert_data(op)->length);
1210 if (orig_node != flow_insert_point(op)->node) {
1211 /* move lock to new insert point */
1212 done_lh(orig_lh);
1213 init_lh(orig_lh);
1214 result =
1215 longterm_lock_znode(orig_lh, flow_insert_point(op)->node,
1216 ZNODE_WRITE_LOCK, ZNODE_LOCK_HIPRI);
1219 return result;
1222 /* implements COP_DELETE operation
1224 Remove pointer to @op -> u.delete.child from it's parent.
1226 This function also handles killing of a tree root is last pointer from it
1227 was removed. This is complicated by our handling of "twig" level: root on
1228 twig level is never killed.
1231 static int carry_delete(carry_op * op /* operation to be performed */ ,
1232 carry_level * doing UNUSED_ARG /* current carry
1233 * level */ ,
1234 carry_level * todo /* next carry level */ )
1236 int result;
1237 coord_t coord;
1238 coord_t coord2;
1239 znode *parent;
1240 znode *child;
1241 carry_plugin_info info;
1242 reiser4_tree *tree;
1245 * This operation is called to delete internal item pointing to the
1246 * child node that was removed by carry from the tree on the previous
1247 * tree level.
1250 assert("nikita-893", op != NULL);
1251 assert("nikita-894", todo != NULL);
1252 assert("nikita-895", op->op == COP_DELETE);
1254 coord_init_zero(&coord);
1255 coord_init_zero(&coord2);
1257 parent = reiser4_carry_real(op->node);
1258 child = op->u.delete.child ?
1259 reiser4_carry_real(op->u.delete.child) : op->node->node;
1260 tree = znode_get_tree(child);
1261 read_lock_tree(tree);
1264 * @parent was determined when carry entered parent level
1265 * (lock_carry_level/lock_carry_node). Since then, actual parent of
1266 * @child node could change due to other carry operations performed on
1267 * the parent level. Check for this.
1270 if (znode_parent(child) != parent) {
1271 /* NOTE-NIKITA add stat counter for this. */
1272 parent = znode_parent(child);
1273 assert("nikita-2581", find_carry_node(doing, parent));
1275 read_unlock_tree(tree);
1277 assert("nikita-1213", znode_get_level(parent) > LEAF_LEVEL);
1279 /* Twig level horrors: tree should be of height at least 2. So, last
1280 pointer from the root at twig level is preserved even if child is
1281 empty. This is ugly, but so it was architectured.
1284 if (znode_is_root(parent) &&
1285 znode_get_level(parent) <= REISER4_MIN_TREE_HEIGHT &&
1286 node_num_items(parent) == 1) {
1287 /* Delimiting key manipulations. */
1288 write_lock_dk(tree);
1289 znode_set_ld_key(child, znode_set_ld_key(parent, reiser4_min_key()));
1290 znode_set_rd_key(child, znode_set_rd_key(parent, reiser4_max_key()));
1291 ZF_SET(child, JNODE_DKSET);
1292 write_unlock_dk(tree);
1294 /* @child escaped imminent death! */
1295 ZF_CLR(child, JNODE_HEARD_BANSHEE);
1296 return 0;
1299 /* convert child pointer to the coord_t */
1300 result = find_child_ptr(parent, child, &coord);
1301 if (result != NS_FOUND) {
1302 warning("nikita-994", "Cannot find child pointer: %i", result);
1303 print_coord_content("coord", &coord);
1304 return result;
1307 coord_dup(&coord2, &coord);
1308 info.doing = doing;
1309 info.todo = todo;
1312 * Actually kill internal item: prepare structure with
1313 * arguments for ->cut_and_kill() method...
1316 struct carry_kill_data kdata;
1317 kdata.params.from = &coord;
1318 kdata.params.to = &coord2;
1319 kdata.params.from_key = NULL;
1320 kdata.params.to_key = NULL;
1321 kdata.params.smallest_removed = NULL;
1322 kdata.params.truncate = 1;
1323 kdata.flags = op->u.delete.flags;
1324 kdata.inode = NULL;
1325 kdata.left = NULL;
1326 kdata.right = NULL;
1327 kdata.buf = NULL;
1328 /* ... and call it. */
1329 result = node_plugin_by_node(parent)->cut_and_kill(&kdata,
1330 &info);
1332 doing->restartable = 0;
1334 /* check whether root should be killed violently */
1335 if (znode_is_root(parent) &&
1336 /* don't kill roots at and lower than twig level */
1337 znode_get_level(parent) > REISER4_MIN_TREE_HEIGHT &&
1338 node_num_items(parent) == 1) {
1339 result = reiser4_kill_tree_root(coord.node);
1342 return result < 0 ? : 0;
1345 /* implements COP_CUT opration
1347 Cuts part or whole content of node.
1350 static int carry_cut(carry_op * op /* operation to be performed */ ,
1351 carry_level * doing /* current carry level */ ,
1352 carry_level * todo /* next carry level */ )
1354 int result;
1355 carry_plugin_info info;
1356 node_plugin *nplug;
1358 assert("nikita-896", op != NULL);
1359 assert("nikita-897", todo != NULL);
1360 assert("nikita-898", op->op == COP_CUT);
1362 info.doing = doing;
1363 info.todo = todo;
1365 nplug = node_plugin_by_node(reiser4_carry_real(op->node));
1366 if (op->u.cut_or_kill.is_cut)
1367 result = nplug->cut(op->u.cut_or_kill.u.cut, &info);
1368 else
1369 result = nplug->cut_and_kill(op->u.cut_or_kill.u.kill, &info);
1371 doing->restartable = 0;
1372 return result < 0 ? : 0;
1375 /* helper function for carry_paste(): returns true if @op can be continued as
1376 paste */
1377 static int
1378 can_paste(coord_t * icoord, const reiser4_key * key,
1379 const reiser4_item_data * data)
1381 coord_t circa;
1382 item_plugin *new_iplug;
1383 item_plugin *old_iplug;
1384 int result = 0; /* to keep gcc shut */
1386 assert("", icoord->between != AT_UNIT);
1388 /* obviously, one cannot paste when node is empty---there is nothing
1389 to paste into. */
1390 if (node_is_empty(icoord->node))
1391 return 0;
1392 /* if insertion point is at the middle of the item, then paste */
1393 if (!coord_is_between_items(icoord))
1394 return 1;
1395 coord_dup(&circa, icoord);
1396 circa.between = AT_UNIT;
1398 old_iplug = item_plugin_by_coord(&circa);
1399 new_iplug = data->iplug;
1401 /* check whether we can paste to the item @icoord is "at" when we
1402 ignore ->between field */
1403 if (old_iplug == new_iplug && item_can_contain_key(&circa, key, data)) {
1404 result = 1;
1405 } else if (icoord->between == BEFORE_UNIT
1406 || icoord->between == BEFORE_ITEM) {
1407 /* otherwise, try to glue to the item at the left, if any */
1408 coord_dup(&circa, icoord);
1409 if (coord_set_to_left(&circa)) {
1410 result = 0;
1411 coord_init_before_item(icoord);
1412 } else {
1413 old_iplug = item_plugin_by_coord(&circa);
1414 result = (old_iplug == new_iplug)
1415 && item_can_contain_key(icoord, key, data);
1416 if (result) {
1417 coord_dup(icoord, &circa);
1418 icoord->between = AFTER_UNIT;
1421 } else if (icoord->between == AFTER_UNIT
1422 || icoord->between == AFTER_ITEM) {
1423 coord_dup(&circa, icoord);
1424 /* otherwise, try to glue to the item at the right, if any */
1425 if (coord_set_to_right(&circa)) {
1426 result = 0;
1427 coord_init_after_item(icoord);
1428 } else {
1429 int (*cck) (const coord_t *, const reiser4_key *,
1430 const reiser4_item_data *);
1432 old_iplug = item_plugin_by_coord(&circa);
1434 cck = old_iplug->b.can_contain_key;
1435 if (cck == NULL)
1436 /* item doesn't define ->can_contain_key
1437 method? So it is not expandable. */
1438 result = 0;
1439 else {
1440 result = (old_iplug == new_iplug)
1441 && cck(&circa /*icoord */ , key, data);
1442 if (result) {
1443 coord_dup(icoord, &circa);
1444 icoord->between = BEFORE_UNIT;
1448 } else
1449 impossible("nikita-2513", "Nothing works");
1450 if (result) {
1451 if (icoord->between == BEFORE_ITEM) {
1452 assert("vs-912", icoord->unit_pos == 0);
1453 icoord->between = BEFORE_UNIT;
1454 } else if (icoord->between == AFTER_ITEM) {
1455 coord_init_after_item_end(icoord);
1458 return result;
1461 /* implements COP_PASTE operation
1463 Paste data into existing item. This is complicated by the fact that after
1464 we shifted something to the left or right neighbors trying to free some
1465 space, item we were supposed to paste into can be in different node than
1466 insertion coord. If so, we are no longer doing paste, but insert. See
1467 comments in insert_paste_common().
1470 static int carry_paste(carry_op * op /* operation to be performed */ ,
1471 carry_level * doing UNUSED_ARG /* current carry
1472 * level */ ,
1473 carry_level * todo /* next carry level */ )
1475 znode *node;
1476 carry_insert_data cdata;
1477 coord_t dcoord;
1478 reiser4_item_data data;
1479 int result;
1480 int real_size;
1481 item_plugin *iplug;
1482 carry_plugin_info info;
1483 coord_t *coord;
1485 assert("nikita-982", op != NULL);
1486 assert("nikita-983", todo != NULL);
1487 assert("nikita-984", op->op == COP_PASTE);
1489 coord_init_zero(&dcoord);
1491 result = insert_paste_common(op, doing, todo, &cdata, &dcoord, &data);
1492 if (result != 0)
1493 return result;
1495 coord = op->u.insert.d->coord;
1497 /* handle case when op -> u.insert.coord doesn't point to the item
1498 of required type. restart as insert. */
1499 if (!can_paste(coord, op->u.insert.d->key, op->u.insert.d->data)) {
1500 op->op = COP_INSERT;
1501 op->u.insert.type = COPT_PASTE_RESTARTED;
1502 result = op_dispatch_table[COP_INSERT].handler(op, doing, todo);
1504 return result;
1507 node = coord->node;
1508 iplug = item_plugin_by_coord(coord);
1509 assert("nikita-992", iplug != NULL);
1511 assert("nikita-985", node != NULL);
1512 assert("nikita-986", node_plugin_by_node(node) != NULL);
1514 assert("nikita-987",
1515 space_needed_for_op(node, op) <= znode_free_space(node));
1517 assert("nikita-1286", coord_is_existing_item(coord));
1520 * if item is expanded as a result of this operation, we should first
1521 * change item size, than call ->b.paste item method. If item is
1522 * shrunk, it should be done other way around: first call ->b.paste
1523 * method, then reduce item size.
1526 real_size = space_needed_for_op(node, op);
1527 if (real_size > 0)
1528 node->nplug->change_item_size(coord, real_size);
1530 doing->restartable = 0;
1531 info.doing = doing;
1532 info.todo = todo;
1534 result = iplug->b.paste(coord, op->u.insert.d->data, &info);
1536 if (real_size < 0)
1537 node->nplug->change_item_size(coord, real_size);
1539 /* if we pasted at the beginning of the item, update item's key. */
1540 if (coord->unit_pos == 0 && coord->between != AFTER_UNIT)
1541 node->nplug->update_item_key(coord, op->u.insert.d->key, &info);
1543 znode_make_dirty(node);
1544 return result;
1547 /* handle carry COP_EXTENT operation. */
1548 static int carry_extent(carry_op * op /* operation to perform */ ,
1549 carry_level * doing /* queue of operations @op
1550 * is part of */ ,
1551 carry_level * todo /* queue where new operations
1552 * are accumulated */ )
1554 znode *node;
1555 carry_insert_data cdata;
1556 coord_t coord;
1557 reiser4_item_data data;
1558 carry_op *delete_dummy;
1559 carry_op *insert_extent;
1560 int result;
1561 carry_plugin_info info;
1563 assert("nikita-1751", op != NULL);
1564 assert("nikita-1752", todo != NULL);
1565 assert("nikita-1753", op->op == COP_EXTENT);
1567 /* extent insertion overview:
1569 extents live on the TWIG LEVEL, which is level one above the leaf
1570 one. This complicates extent insertion logic somewhat: it may
1571 happen (and going to happen all the time) that in logical key
1572 ordering extent has to be placed between items I1 and I2, located
1573 at the leaf level, but I1 and I2 are in the same formatted leaf
1574 node N1. To insert extent one has to
1576 (1) reach node N1 and shift data between N1, its neighbors and
1577 possibly newly allocated nodes until I1 and I2 fall into different
1578 nodes. Since I1 and I2 are still neighboring items in logical key
1579 order, they will be necessary utmost items in their respective
1580 nodes.
1582 (2) After this new extent item is inserted into node on the twig
1583 level.
1585 Fortunately this process can reuse almost all code from standard
1586 insertion procedure (viz. make_space() and insert_paste_common()),
1587 due to the following observation: make_space() only shifts data up
1588 to and excluding or including insertion point. It never
1589 "over-moves" through insertion point. Thus, one can use
1590 make_space() to perform step (1). All required for this is just to
1591 instruct free_space_shortage() to keep make_space() shifting data
1592 until insertion point is at the node border.
1596 /* perform common functionality of insert and paste. */
1597 result = insert_paste_common(op, doing, todo, &cdata, &coord, &data);
1598 if (result != 0)
1599 return result;
1601 node = op->u.extent.d->coord->node;
1602 assert("nikita-1754", node != NULL);
1603 assert("nikita-1755", node_plugin_by_node(node) != NULL);
1604 assert("nikita-1700", coord_wrt(op->u.extent.d->coord) != COORD_INSIDE);
1606 /* NOTE-NIKITA add some checks here. Not assertions, -EIO. Check that
1607 extent fits between items. */
1609 info.doing = doing;
1610 info.todo = todo;
1612 /* there is another complication due to placement of extents on the
1613 twig level: extents are "rigid" in the sense that key-range
1614 occupied by extent cannot grow indefinitely to the right as it is
1615 for the formatted leaf nodes. Because of this when search finds two
1616 adjacent extents on the twig level, it has to "drill" to the leaf
1617 level, creating new node. Here we are removing this node.
1619 if (node_is_empty(node)) {
1620 delete_dummy = node_post_carry(&info, COP_DELETE, node, 1);
1621 if (IS_ERR(delete_dummy))
1622 return PTR_ERR(delete_dummy);
1623 delete_dummy->u.delete.child = NULL;
1624 delete_dummy->u.delete.flags = DELETE_RETAIN_EMPTY;
1625 ZF_SET(node, JNODE_HEARD_BANSHEE);
1628 /* proceed with inserting extent item into parent. We are definitely
1629 inserting rather than pasting if we get that far. */
1630 insert_extent = node_post_carry(&info, COP_INSERT, node, 1);
1631 if (IS_ERR(insert_extent))
1632 /* @delete_dummy will be automatically destroyed on the level
1633 exiting */
1634 return PTR_ERR(insert_extent);
1635 /* NOTE-NIKITA insertion by key is simplest option here. Another
1636 possibility is to insert on the left or right of already existing
1637 item.
1639 insert_extent->u.insert.type = COPT_KEY;
1640 insert_extent->u.insert.d = op->u.extent.d;
1641 assert("nikita-1719", op->u.extent.d->key != NULL);
1642 insert_extent->u.insert.d->data->arg = op->u.extent.d->coord;
1643 insert_extent->u.insert.flags =
1644 znode_get_tree(node)->carry.new_extent_flags;
1647 * if carry was asked to track lock handle we should actually track
1648 * lock handle on the twig node rather than on the leaf where
1649 * operation was started from. Transfer tracked lock handle.
1651 if (doing->track_type) {
1652 assert("nikita-3242", doing->tracked != NULL);
1653 assert("nikita-3244", todo->tracked == NULL);
1654 todo->tracked = doing->tracked;
1655 todo->track_type = CARRY_TRACK_NODE;
1656 doing->tracked = NULL;
1657 doing->track_type = 0;
1660 return 0;
1663 /* update key in @parent between pointers to @left and @right.
1665 Find coords of @left and @right and update delimiting key between them.
1666 This is helper function called by carry_update(). Finds position of
1667 internal item involved. Updates item key. Updates delimiting keys of child
1668 nodes involved.
1670 static int update_delimiting_key(znode * parent /* node key is updated
1671 * in */ ,
1672 znode * left /* child of @parent */ ,
1673 znode * right /* child of @parent */ ,
1674 carry_level * doing /* current carry
1675 * level */ ,
1676 carry_level * todo /* parent carry
1677 * level */ ,
1678 const char **error_msg /* place to
1679 * store error
1680 * message */ )
1682 coord_t left_pos;
1683 coord_t right_pos;
1684 int result;
1685 reiser4_key ldkey;
1686 carry_plugin_info info;
1688 assert("nikita-1177", right != NULL);
1689 /* find position of right left child in a parent */
1690 result = find_child_ptr(parent, right, &right_pos);
1691 if (result != NS_FOUND) {
1692 *error_msg = "Cannot find position of right child";
1693 return result;
1696 if ((left != NULL) && !coord_is_leftmost_unit(&right_pos)) {
1697 /* find position of the left child in a parent */
1698 result = find_child_ptr(parent, left, &left_pos);
1699 if (result != NS_FOUND) {
1700 *error_msg = "Cannot find position of left child";
1701 return result;
1703 assert("nikita-1355", left_pos.node != NULL);
1704 } else
1705 left_pos.node = NULL;
1707 /* check that they are separated by exactly one key and are basically
1708 sane */
1709 if (REISER4_DEBUG) {
1710 if ((left_pos.node != NULL)
1711 && !coord_is_existing_unit(&left_pos)) {
1712 *error_msg = "Left child is bastard";
1713 return RETERR(-EIO);
1715 if (!coord_is_existing_unit(&right_pos)) {
1716 *error_msg = "Right child is bastard";
1717 return RETERR(-EIO);
1719 if (left_pos.node != NULL &&
1720 !coord_are_neighbors(&left_pos, &right_pos)) {
1721 *error_msg = "Children are not direct siblings";
1722 return RETERR(-EIO);
1725 *error_msg = NULL;
1727 info.doing = doing;
1728 info.todo = todo;
1731 * If child node is not empty, new key of internal item is a key of
1732 * leftmost item in the child node. If the child is empty, take its
1733 * right delimiting key as a new key of the internal item. Precise key
1734 * in the latter case is not important per se, because the child (and
1735 * the internal item) are going to be killed shortly anyway, but we
1736 * have to preserve correct order of keys in the parent node.
1739 if (!ZF_ISSET(right, JNODE_HEARD_BANSHEE))
1740 leftmost_key_in_node(right, &ldkey);
1741 else {
1742 read_lock_dk(znode_get_tree(parent));
1743 ldkey = *znode_get_rd_key(right);
1744 read_unlock_dk(znode_get_tree(parent));
1746 node_plugin_by_node(parent)->update_item_key(&right_pos, &ldkey, &info);
1747 doing->restartable = 0;
1748 znode_make_dirty(parent);
1749 return 0;
1752 /* implements COP_UPDATE opration
1754 Update delimiting keys.
1757 static int carry_update(carry_op * op /* operation to be performed */ ,
1758 carry_level * doing /* current carry level */ ,
1759 carry_level * todo /* next carry level */ )
1761 int result;
1762 carry_node *missing UNUSED_ARG;
1763 znode *left;
1764 znode *right;
1765 carry_node *lchild;
1766 carry_node *rchild;
1767 const char *error_msg;
1768 reiser4_tree *tree;
1771 * This operation is called to update key of internal item. This is
1772 * necessary when carry shifted of cut data on the child
1773 * level. Arguments of this operation are:
1775 * @right --- child node. Operation should update key of internal
1776 * item pointing to @right.
1778 * @left --- left neighbor of @right. This parameter is optional.
1781 assert("nikita-902", op != NULL);
1782 assert("nikita-903", todo != NULL);
1783 assert("nikita-904", op->op == COP_UPDATE);
1785 lchild = op->u.update.left;
1786 rchild = op->node;
1788 if (lchild != NULL) {
1789 assert("nikita-1001", lchild->parent);
1790 assert("nikita-1003", !lchild->left);
1791 left = reiser4_carry_real(lchild);
1792 } else
1793 left = NULL;
1795 tree = znode_get_tree(rchild->node);
1796 read_lock_tree(tree);
1797 right = znode_parent(rchild->node);
1798 read_unlock_tree(tree);
1800 if (right != NULL) {
1801 result = update_delimiting_key(right,
1802 lchild ? lchild->node : NULL,
1803 rchild->node,
1804 doing, todo, &error_msg);
1805 } else {
1806 error_msg = "Cannot find node to update key in";
1807 result = RETERR(-EIO);
1809 /* operation will be reposted to the next level by the
1810 ->update_item_key() method of node plugin, if necessary. */
1812 if (result != 0) {
1813 warning("nikita-999", "Error updating delimiting key: %s (%i)",
1814 error_msg ? : "", result);
1816 return result;
1819 /* move items from @node during carry */
1820 static int carry_shift_data(sideof side /* in what direction to move data */ ,
1821 coord_t * insert_coord /* coord where new item
1822 * is to be inserted */ ,
1823 znode * node /* node which data are moved from */ ,
1824 carry_level * doing /* active carry queue */ ,
1825 carry_level * todo /* carry queue where new
1826 * operations are to be put
1827 * in */ ,
1828 unsigned int including_insert_coord_p /* true if
1829 * @insertion_coord
1830 * can be moved */ )
1832 int result;
1833 znode *source;
1834 carry_plugin_info info;
1835 node_plugin *nplug;
1837 source = insert_coord->node;
1839 info.doing = doing;
1840 info.todo = todo;
1842 nplug = node_plugin_by_node(node);
1843 result = nplug->shift(insert_coord, node,
1844 (side == LEFT_SIDE) ? SHIFT_LEFT : SHIFT_RIGHT, 0,
1845 (int)including_insert_coord_p, &info);
1846 /* the only error ->shift() method of node plugin can return is
1847 -ENOMEM due to carry node/operation allocation. */
1848 assert("nikita-915", result >= 0 || result == -ENOMEM);
1849 if (result > 0) {
1851 * if some number of bytes was actually shifted, mark nodes
1852 * dirty, and carry level as non-restartable.
1854 doing->restartable = 0;
1855 znode_make_dirty(source);
1856 znode_make_dirty(node);
1859 assert("nikita-2077", coord_check(insert_coord));
1860 return 0;
1863 typedef carry_node *(*carry_iterator) (carry_node * node);
1864 static carry_node *find_dir_carry(carry_node * node, carry_level * level,
1865 carry_iterator iterator);
1867 static carry_node *pool_level_list_prev(carry_node *node)
1869 return list_entry(node->header.level_linkage.prev, carry_node, header.level_linkage);
1872 /* look for the left neighbor of given carry node in a carry queue.
1874 This is used by find_left_neighbor(), but I am not sure that this
1875 really gives any advantage. More statistics required.
1878 carry_node *find_left_carry(carry_node * node /* node to find left neighbor
1879 * of */ ,
1880 carry_level * level /* level to scan */ )
1882 return find_dir_carry(node, level,
1883 (carry_iterator) pool_level_list_prev);
1886 static carry_node *pool_level_list_next(carry_node *node)
1888 return list_entry(node->header.level_linkage.next, carry_node, header.level_linkage);
1891 /* look for the right neighbor of given carry node in a
1892 carry queue.
1894 This is used by find_right_neighbor(), but I am not sure that this
1895 really gives any advantage. More statistics required.
1898 carry_node *find_right_carry(carry_node * node /* node to find right neighbor
1899 * of */ ,
1900 carry_level * level /* level to scan */ )
1902 return find_dir_carry(node, level,
1903 (carry_iterator) pool_level_list_next);
1906 /* look for the left or right neighbor of given carry node in a carry
1907 queue.
1909 Helper function used by find_{left|right}_carry().
1911 static carry_node *find_dir_carry(carry_node * node /* node to start scanning
1912 * from */ ,
1913 carry_level * level /* level to scan */ ,
1914 carry_iterator iterator /* operation to
1915 * move to the next
1916 * node */ )
1918 carry_node *neighbor;
1920 assert("nikita-1059", node != NULL);
1921 assert("nikita-1060", level != NULL);
1923 /* scan list of carry nodes on this list dir-ward, skipping all
1924 carry nodes referencing the same znode. */
1925 neighbor = node;
1926 while (1) {
1927 neighbor = iterator(neighbor);
1928 if (carry_node_end(level, neighbor))
1929 /* list head is reached */
1930 return NULL;
1931 if (reiser4_carry_real(neighbor) != reiser4_carry_real(node))
1932 return neighbor;
1937 * Memory reservation estimation.
1939 * Carry process proceeds through tree levels upwards. Carry assumes that it
1940 * takes tree in consistent state (e.g., that search tree invariants hold),
1941 * and leaves tree consistent after it finishes. This means that when some
1942 * error occurs carry cannot simply return if there are pending carry
1943 * operations. Generic solution for this problem is carry-undo either as
1944 * transaction manager feature (requiring checkpoints and isolation), or
1945 * through some carry specific mechanism.
1947 * Our current approach is to panic if carry hits an error while tree is
1948 * inconsistent. Unfortunately -ENOMEM can easily be triggered. To work around
1949 * this "memory reservation" mechanism was added.
1951 * Memory reservation is implemented by perthread-pages.diff patch from
1952 * core-patches. Its API is defined in <linux/gfp.h>
1954 * int perthread_pages_reserve(int nrpages, gfp_t gfp);
1955 * void perthread_pages_release(int nrpages);
1956 * int perthread_pages_count(void);
1958 * carry estimates its worst case memory requirements at the entry, reserved
1959 * enough memory, and released unused pages before returning.
1961 * Code below estimates worst case memory requirements for a given carry
1962 * queue. This is dome by summing worst case memory requirements for each
1963 * operation in the queue.
1968 * Memory memory requirements of many operations depends on the tree
1969 * height. For example, item insertion requires new node to be inserted at
1970 * each tree level in the worst case. What tree height should be used for
1971 * estimation? Current tree height is wrong, because tree height can change
1972 * between the time when estimation was done and the time when operation is
1973 * actually performed. Maximal possible tree height (REISER4_MAX_ZTREE_HEIGHT)
1974 * is also not desirable, because it would lead to the huge over-estimation
1975 * all the time. Plausible solution is "capped tree height": if current tree
1976 * height is less than some TREE_HEIGHT_CAP constant, capped tree height is
1977 * TREE_HEIGHT_CAP, otherwise it's current tree height. Idea behind this is
1978 * that if tree height is TREE_HEIGHT_CAP or larger, it's extremely unlikely
1979 * to be increased even more during short interval of time.
1981 #define TREE_HEIGHT_CAP (5)
1983 /* return capped tree height for the @tree. See comment above. */
1984 static int cap_tree_height(reiser4_tree * tree)
1986 return max_t(int, tree->height, TREE_HEIGHT_CAP);
1989 /* return capped tree height for the current tree. */
1990 static int capped_height(void)
1992 return cap_tree_height(current_tree);
1995 /* return number of pages required to store given number of bytes */
1996 static int bytes_to_pages(int bytes)
1998 return (bytes + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
2001 /* how many pages are required to allocate znodes during item insertion. */
2002 static int carry_estimate_znodes(void)
2005 * Note, that there we have some problem here: there is no way to
2006 * reserve pages specifically for the given slab. This means that
2007 * these pages can be hijacked for some other end.
2010 /* in the worst case we need 3 new znode on each tree level */
2011 return bytes_to_pages(capped_height() * sizeof(znode) * 3);
2015 * how many pages are required to load bitmaps. One bitmap per level.
2017 static int carry_estimate_bitmaps(void)
2019 if (reiser4_is_set(reiser4_get_current_sb(), REISER4_DONT_LOAD_BITMAP)) {
2020 int bytes;
2022 bytes = capped_height() * (0 + /* bnode should be added, but its is private to
2023 * bitmap.c, skip for now. */
2024 2 * sizeof(jnode)); /* working and commit jnodes */
2025 return bytes_to_pages(bytes) + 2; /* and their contents */
2026 } else
2027 /* bitmaps were pre-loaded during mount */
2028 return 0;
2031 /* worst case item insertion memory requirements */
2032 static int carry_estimate_insert(carry_op * op, carry_level * level)
2034 return carry_estimate_bitmaps() + carry_estimate_znodes() + 1 + /* new atom */
2035 capped_height() + /* new block on each level */
2036 1 + /* and possibly extra new block at the leaf level */
2037 3; /* loading of leaves into memory */
2040 /* worst case item deletion memory requirements */
2041 static int carry_estimate_delete(carry_op * op, carry_level * level)
2043 return carry_estimate_bitmaps() + carry_estimate_znodes() + 1 + /* new atom */
2044 3; /* loading of leaves into memory */
2047 /* worst case tree cut memory requirements */
2048 static int carry_estimate_cut(carry_op * op, carry_level * level)
2050 return carry_estimate_bitmaps() + carry_estimate_znodes() + 1 + /* new atom */
2051 3; /* loading of leaves into memory */
2054 /* worst case memory requirements of pasting into item */
2055 static int carry_estimate_paste(carry_op * op, carry_level * level)
2057 return carry_estimate_bitmaps() + carry_estimate_znodes() + 1 + /* new atom */
2058 capped_height() + /* new block on each level */
2059 1 + /* and possibly extra new block at the leaf level */
2060 3; /* loading of leaves into memory */
2063 /* worst case memory requirements of extent insertion */
2064 static int carry_estimate_extent(carry_op * op, carry_level * level)
2066 return carry_estimate_insert(op, level) + /* insert extent */
2067 carry_estimate_delete(op, level); /* kill leaf */
2070 /* worst case memory requirements of key update */
2071 static int carry_estimate_update(carry_op * op, carry_level * level)
2073 return 0;
2076 /* worst case memory requirements of flow insertion */
2077 static int carry_estimate_insert_flow(carry_op * op, carry_level * level)
2079 int newnodes;
2081 newnodes = min(bytes_to_pages(op->u.insert_flow.flow->length),
2082 CARRY_FLOW_NEW_NODES_LIMIT);
2084 * roughly estimate insert_flow as a sequence of insertions.
2086 return newnodes * carry_estimate_insert(op, level);
2089 /* This is dispatch table for carry operations. It can be trivially
2090 abstracted into useful plugin: tunable balancing policy is a good
2091 thing. */
2092 carry_op_handler op_dispatch_table[COP_LAST_OP] = {
2093 [COP_INSERT] = {
2094 .handler = carry_insert,
2095 .estimate = carry_estimate_insert}
2097 [COP_DELETE] = {
2098 .handler = carry_delete,
2099 .estimate = carry_estimate_delete}
2101 [COP_CUT] = {
2102 .handler = carry_cut,
2103 .estimate = carry_estimate_cut}
2105 [COP_PASTE] = {
2106 .handler = carry_paste,
2107 .estimate = carry_estimate_paste}
2109 [COP_EXTENT] = {
2110 .handler = carry_extent,
2111 .estimate = carry_estimate_extent}
2113 [COP_UPDATE] = {
2114 .handler = carry_update,
2115 .estimate = carry_estimate_update}
2117 [COP_INSERT_FLOW] = {
2118 .handler = carry_insert_flow,
2119 .estimate = carry_estimate_insert_flow}
2122 /* Make Linus happy.
2123 Local variables:
2124 c-indentation-style: "K&R"
2125 mode-name: "LC"
2126 c-basic-offset: 8
2127 tab-width: 8
2128 fill-column: 120
2129 scroll-step: 1
2130 End: