Fix xslt_process() to ensure that it inserts a NULL terminator after the
[PostgreSQL.git] / src / backend / access / nbtree / nbtinsert.c
bloba63534d712961347f5a2ea777a1f5919141f461c
1 /*-------------------------------------------------------------------------
3 * nbtinsert.c
4 * Item insertion in Lehman and Yao btrees for Postgres.
6 * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
10 * IDENTIFICATION
11 * $PostgreSQL$
13 *-------------------------------------------------------------------------
16 #include "postgres.h"
18 #include "access/heapam.h"
19 #include "access/nbtree.h"
20 #include "access/transam.h"
21 #include "miscadmin.h"
22 #include "storage/bufmgr.h"
23 #include "storage/lmgr.h"
24 #include "utils/inval.h"
25 #include "utils/tqual.h"
28 typedef struct
30 /* context data for _bt_checksplitloc */
31 Size newitemsz; /* size of new item to be inserted */
32 int fillfactor; /* needed when splitting rightmost page */
33 bool is_leaf; /* T if splitting a leaf page */
34 bool is_rightmost; /* T if splitting a rightmost page */
35 OffsetNumber newitemoff; /* where the new item is to be inserted */
36 int leftspace; /* space available for items on left page */
37 int rightspace; /* space available for items on right page */
38 int olddataitemstotal; /* space taken by old items */
40 bool have_split; /* found a valid split? */
42 /* these fields valid only if have_split is true */
43 bool newitemonleft; /* new item on left or right of best split */
44 OffsetNumber firstright; /* best split point */
45 int best_delta; /* best size delta so far */
46 } FindSplitData;
49 static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
51 static TransactionId _bt_check_unique(Relation rel, IndexTuple itup,
52 Relation heapRel, Buffer buf, OffsetNumber ioffset,
53 ScanKey itup_scankey);
54 static void _bt_findinsertloc(Relation rel,
55 Buffer *bufptr,
56 OffsetNumber *offsetptr,
57 int keysz,
58 ScanKey scankey,
59 IndexTuple newtup);
60 static void _bt_insertonpg(Relation rel, Buffer buf,
61 BTStack stack,
62 IndexTuple itup,
63 OffsetNumber newitemoff,
64 bool split_only_page);
65 static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
66 OffsetNumber newitemoff, Size newitemsz,
67 IndexTuple newitem, bool newitemonleft);
68 static OffsetNumber _bt_findsplitloc(Relation rel, Page page,
69 OffsetNumber newitemoff,
70 Size newitemsz,
71 bool *newitemonleft);
72 static void _bt_checksplitloc(FindSplitData *state,
73 OffsetNumber firstoldonright, bool newitemonleft,
74 int dataitemstoleft, Size firstoldonrightsz);
75 static void _bt_pgaddtup(Relation rel, Page page,
76 Size itemsize, IndexTuple itup,
77 OffsetNumber itup_off, const char *where);
78 static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
79 int keysz, ScanKey scankey);
80 static void _bt_vacuum_one_page(Relation rel, Buffer buffer);
84 * _bt_doinsert() -- Handle insertion of a single index tuple in the tree.
86 * This routine is called by the public interface routines, btbuild
87 * and btinsert. By here, itup is filled in, including the TID.
89 void
90 _bt_doinsert(Relation rel, IndexTuple itup,
91 bool index_is_unique, Relation heapRel)
93 int natts = rel->rd_rel->relnatts;
94 ScanKey itup_scankey;
95 BTStack stack;
96 Buffer buf;
97 OffsetNumber offset;
99 /* we need an insertion scan key to do our search, so build one */
100 itup_scankey = _bt_mkscankey(rel, itup);
102 top:
103 /* find the first page containing this key */
104 stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE);
106 offset = InvalidOffsetNumber;
108 /* trade in our read lock for a write lock */
109 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
110 LockBuffer(buf, BT_WRITE);
113 * If the page was split between the time that we surrendered our read
114 * lock and acquired our write lock, then this page may no longer be the
115 * right place for the key we want to insert. In this case, we need to
116 * move right in the tree. See Lehman and Yao for an excruciatingly
117 * precise description.
119 buf = _bt_moveright(rel, buf, natts, itup_scankey, false, BT_WRITE);
122 * If we're not allowing duplicates, make sure the key isn't already in
123 * the index.
125 * NOTE: obviously, _bt_check_unique can only detect keys that are already
126 * in the index; so it cannot defend against concurrent insertions of the
127 * same key. We protect against that by means of holding a write lock on
128 * the target page. Any other would-be inserter of the same key must
129 * acquire a write lock on the same target page, so only one would-be
130 * inserter can be making the check at one time. Furthermore, once we are
131 * past the check we hold write locks continuously until we have performed
132 * our insertion, so no later inserter can fail to see our insertion.
133 * (This requires some care in _bt_insertonpg.)
135 * If we must wait for another xact, we release the lock while waiting,
136 * and then must start over completely.
138 if (index_is_unique)
140 TransactionId xwait;
142 offset = _bt_binsrch(rel, buf, natts, itup_scankey, false);
143 xwait = _bt_check_unique(rel, itup, heapRel, buf, offset, itup_scankey);
145 if (TransactionIdIsValid(xwait))
147 /* Have to wait for the other guy ... */
148 _bt_relbuf(rel, buf);
149 XactLockTableWait(xwait);
150 /* start over... */
151 _bt_freestack(stack);
152 goto top;
156 /* do the insertion */
157 _bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup);
158 _bt_insertonpg(rel, buf, stack, itup, offset, false);
160 /* be tidy */
161 _bt_freestack(stack);
162 _bt_freeskey(itup_scankey);
166 * _bt_check_unique() -- Check for violation of unique index constraint
168 * offset points to the first possible item that could conflict. It can
169 * also point to end-of-page, which means that the first tuple to check
170 * is the first tuple on the next page.
172 * Returns InvalidTransactionId if there is no conflict, else an xact ID
173 * we must wait for to see if it commits a conflicting tuple. If an actual
174 * conflict is detected, no return --- just ereport().
176 static TransactionId
177 _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
178 Buffer buf, OffsetNumber offset, ScanKey itup_scankey)
180 TupleDesc itupdesc = RelationGetDescr(rel);
181 int natts = rel->rd_rel->relnatts;
182 SnapshotData SnapshotDirty;
183 OffsetNumber maxoff;
184 Page page;
185 BTPageOpaque opaque;
186 Buffer nbuf = InvalidBuffer;
188 InitDirtySnapshot(SnapshotDirty);
190 page = BufferGetPage(buf);
191 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
192 maxoff = PageGetMaxOffsetNumber(page);
195 * Scan over all equal tuples, looking for live conflicts.
197 for (;;)
199 ItemId curitemid;
200 IndexTuple curitup;
201 BlockNumber nblkno;
204 * make sure the offset points to an actual item before trying to
205 * examine it...
207 if (offset <= maxoff)
209 curitemid = PageGetItemId(page, offset);
212 * We can skip items that are marked killed.
214 * Formerly, we applied _bt_isequal() before checking the kill
215 * flag, so as to fall out of the item loop as soon as possible.
216 * However, in the presence of heavy update activity an index may
217 * contain many killed items with the same key; running
218 * _bt_isequal() on each killed item gets expensive. Furthermore
219 * it is likely that the non-killed version of each key appears
220 * first, so that we didn't actually get to exit any sooner
221 * anyway. So now we just advance over killed items as quickly as
222 * we can. We only apply _bt_isequal() when we get to a non-killed
223 * item or the end of the page.
225 if (!ItemIdIsDead(curitemid))
227 ItemPointerData htid;
228 bool all_dead;
231 * _bt_compare returns 0 for (1,NULL) and (1,NULL) - this's
232 * how we handling NULLs - and so we must not use _bt_compare
233 * in real comparison, but only for ordering/finding items on
234 * pages. - vadim 03/24/97
236 if (!_bt_isequal(itupdesc, page, offset, natts, itup_scankey))
237 break; /* we're past all the equal tuples */
239 /* okay, we gotta fetch the heap tuple ... */
240 curitup = (IndexTuple) PageGetItem(page, curitemid);
241 htid = curitup->t_tid;
244 * We check the whole HOT-chain to see if there is any tuple
245 * that satisfies SnapshotDirty. This is necessary because we
246 * have just a single index entry for the entire chain.
248 if (heap_hot_search(&htid, heapRel, &SnapshotDirty, &all_dead))
250 /* it is a duplicate */
251 TransactionId xwait =
252 (TransactionIdIsValid(SnapshotDirty.xmin)) ?
253 SnapshotDirty.xmin : SnapshotDirty.xmax;
256 * If this tuple is being updated by other transaction
257 * then we have to wait for its commit/abort.
259 if (TransactionIdIsValid(xwait))
261 if (nbuf != InvalidBuffer)
262 _bt_relbuf(rel, nbuf);
263 /* Tell _bt_doinsert to wait... */
264 return xwait;
268 * Otherwise we have a definite conflict. But before
269 * complaining, look to see if the tuple we want to insert
270 * is itself now committed dead --- if so, don't complain.
271 * This is a waste of time in normal scenarios but we must
272 * do it to support CREATE INDEX CONCURRENTLY.
274 * We must follow HOT-chains here because during
275 * concurrent index build, we insert the root TID though
276 * the actual tuple may be somewhere in the HOT-chain.
277 * While following the chain we might not stop at the
278 * exact tuple which triggered the insert, but that's OK
279 * because if we find a live tuple anywhere in this chain,
280 * we have a unique key conflict. The other live tuple is
281 * not part of this chain because it had a different index
282 * entry.
284 htid = itup->t_tid;
285 if (heap_hot_search(&htid, heapRel, SnapshotSelf, NULL))
287 /* Normal case --- it's still live */
289 else
292 * It's been deleted, so no error, and no need to
293 * continue searching
295 break;
298 ereport(ERROR,
299 (errcode(ERRCODE_UNIQUE_VIOLATION),
300 errmsg("duplicate key value violates unique constraint \"%s\"",
301 RelationGetRelationName(rel))));
303 else if (all_dead)
306 * The conflicting tuple (or whole HOT chain) is dead to
307 * everyone, so we may as well mark the index entry
308 * killed.
310 ItemIdMarkDead(curitemid);
311 opaque->btpo_flags |= BTP_HAS_GARBAGE;
312 /* be sure to mark the proper buffer dirty... */
313 if (nbuf != InvalidBuffer)
314 SetBufferCommitInfoNeedsSave(nbuf);
315 else
316 SetBufferCommitInfoNeedsSave(buf);
322 * Advance to next tuple to continue checking.
324 if (offset < maxoff)
325 offset = OffsetNumberNext(offset);
326 else
328 /* If scankey == hikey we gotta check the next page too */
329 if (P_RIGHTMOST(opaque))
330 break;
331 if (!_bt_isequal(itupdesc, page, P_HIKEY,
332 natts, itup_scankey))
333 break;
334 /* Advance to next non-dead page --- there must be one */
335 for (;;)
337 nblkno = opaque->btpo_next;
338 nbuf = _bt_relandgetbuf(rel, nbuf, nblkno, BT_READ);
339 page = BufferGetPage(nbuf);
340 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
341 if (!P_IGNORE(opaque))
342 break;
343 if (P_RIGHTMOST(opaque))
344 elog(ERROR, "fell off the end of index \"%s\"",
345 RelationGetRelationName(rel));
347 maxoff = PageGetMaxOffsetNumber(page);
348 offset = P_FIRSTDATAKEY(opaque);
352 if (nbuf != InvalidBuffer)
353 _bt_relbuf(rel, nbuf);
355 return InvalidTransactionId;
360 * _bt_findinsertloc() -- Finds an insert location for a tuple
362 * If the new key is equal to one or more existing keys, we can
363 * legitimately place it anywhere in the series of equal keys --- in fact,
364 * if the new key is equal to the page's "high key" we can place it on
365 * the next page. If it is equal to the high key, and there's not room
366 * to insert the new tuple on the current page without splitting, then
367 * we can move right hoping to find more free space and avoid a split.
368 * (We should not move right indefinitely, however, since that leads to
369 * O(N^2) insertion behavior in the presence of many equal keys.)
370 * Once we have chosen the page to put the key on, we'll insert it before
371 * any existing equal keys because of the way _bt_binsrch() works.
373 * If there's not enough room in the space, we try to make room by
374 * removing any LP_DEAD tuples.
376 * On entry, *buf and *offsetptr point to the first legal position
377 * where the new tuple could be inserted. The caller should hold an
378 * exclusive lock on *buf. *offsetptr can also be set to
379 * InvalidOffsetNumber, in which case the function will search for the
380 * right location within the page if needed. On exit, they point to the
381 * chosen insert location. If _bt_findinsertloc decides to move right,
382 * the lock and pin on the original page will be released and the new
383 * page returned to the caller is exclusively locked instead.
385 * newtup is the new tuple we're inserting, and scankey is an insertion
386 * type scan key for it.
388 static void
389 _bt_findinsertloc(Relation rel,
390 Buffer *bufptr,
391 OffsetNumber *offsetptr,
392 int keysz,
393 ScanKey scankey,
394 IndexTuple newtup)
396 Buffer buf = *bufptr;
397 Page page = BufferGetPage(buf);
398 Size itemsz;
399 BTPageOpaque lpageop;
400 bool movedright,
401 vacuumed;
402 OffsetNumber newitemoff;
403 OffsetNumber firstlegaloff = *offsetptr;
405 lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
407 itemsz = IndexTupleDSize(*newtup);
408 itemsz = MAXALIGN(itemsz); /* be safe, PageAddItem will do this but we
409 * need to be consistent */
412 * Check whether the item can fit on a btree page at all. (Eventually, we
413 * ought to try to apply TOAST methods if not.) We actually need to be
414 * able to fit three items on every page, so restrict any one item to 1/3
415 * the per-page available space. Note that at this point, itemsz doesn't
416 * include the ItemId.
418 if (itemsz > BTMaxItemSize(page))
419 ereport(ERROR,
420 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
421 errmsg("index row size %lu exceeds btree maximum, %lu",
422 (unsigned long) itemsz,
423 (unsigned long) BTMaxItemSize(page)),
424 errhint("Values larger than 1/3 of a buffer page cannot be indexed.\n"
425 "Consider a function index of an MD5 hash of the value, "
426 "or use full text indexing.")));
428 /*----------
429 * If we will need to split the page to put the item on this page,
430 * check whether we can put the tuple somewhere to the right,
431 * instead. Keep scanning right until we
432 * (a) find a page with enough free space,
433 * (b) reach the last page where the tuple can legally go, or
434 * (c) get tired of searching.
435 * (c) is not flippant; it is important because if there are many
436 * pages' worth of equal keys, it's better to split one of the early
437 * pages than to scan all the way to the end of the run of equal keys
438 * on every insert. We implement "get tired" as a random choice,
439 * since stopping after scanning a fixed number of pages wouldn't work
440 * well (we'd never reach the right-hand side of previously split
441 * pages). Currently the probability of moving right is set at 0.99,
442 * which may seem too high to change the behavior much, but it does an
443 * excellent job of preventing O(N^2) behavior with many equal keys.
444 *----------
446 movedright = false;
447 vacuumed = false;
448 while (PageGetFreeSpace(page) < itemsz)
450 Buffer rbuf;
453 * before considering moving right, see if we can obtain enough space
454 * by erasing LP_DEAD items
456 if (P_ISLEAF(lpageop) && P_HAS_GARBAGE(lpageop))
458 _bt_vacuum_one_page(rel, buf);
461 * remember that we vacuumed this page, because that makes the
462 * hint supplied by the caller invalid
464 vacuumed = true;
466 if (PageGetFreeSpace(page) >= itemsz)
467 break; /* OK, now we have enough space */
471 * nope, so check conditions (b) and (c) enumerated above
473 if (P_RIGHTMOST(lpageop) ||
474 _bt_compare(rel, keysz, scankey, page, P_HIKEY) != 0 ||
475 random() <= (MAX_RANDOM_VALUE / 100))
476 break;
479 * step right to next non-dead page
481 * must write-lock that page before releasing write lock on current
482 * page; else someone else's _bt_check_unique scan could fail to see
483 * our insertion. write locks on intermediate dead pages won't do
484 * because we don't know when they will get de-linked from the tree.
486 rbuf = InvalidBuffer;
488 for (;;)
490 BlockNumber rblkno = lpageop->btpo_next;
492 rbuf = _bt_relandgetbuf(rel, rbuf, rblkno, BT_WRITE);
493 page = BufferGetPage(rbuf);
494 lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
495 if (!P_IGNORE(lpageop))
496 break;
497 if (P_RIGHTMOST(lpageop))
498 elog(ERROR, "fell off the end of index \"%s\"",
499 RelationGetRelationName(rel));
501 _bt_relbuf(rel, buf);
502 buf = rbuf;
503 movedright = true;
504 vacuumed = false;
508 * Now we are on the right page, so find the insert position. If we moved
509 * right at all, we know we should insert at the start of the page. If we
510 * didn't move right, we can use the firstlegaloff hint if the caller
511 * supplied one, unless we vacuumed the page which might have moved tuples
512 * around making the hint invalid. If we didn't move right or can't use
513 * the hint, find the position by searching.
515 if (movedright)
516 newitemoff = P_FIRSTDATAKEY(lpageop);
517 else if (firstlegaloff != InvalidOffsetNumber && !vacuumed)
518 newitemoff = firstlegaloff;
519 else
520 newitemoff = _bt_binsrch(rel, buf, keysz, scankey, false);
522 *bufptr = buf;
523 *offsetptr = newitemoff;
526 /*----------
527 * _bt_insertonpg() -- Insert a tuple on a particular page in the index.
529 * This recursive procedure does the following things:
531 * + if necessary, splits the target page (making sure that the
532 * split is equitable as far as post-insert free space goes).
533 * + inserts the tuple.
534 * + if the page was split, pops the parent stack, and finds the
535 * right place to insert the new child pointer (by walking
536 * right using information stored in the parent stack).
537 * + invokes itself with the appropriate tuple for the right
538 * child page on the parent.
539 * + updates the metapage if a true root or fast root is split.
541 * On entry, we must have the right buffer in which to do the
542 * insertion, and the buffer must be pinned and write-locked. On return,
543 * we will have dropped both the pin and the lock on the buffer.
545 * The locking interactions in this code are critical. You should
546 * grok Lehman and Yao's paper before making any changes. In addition,
547 * you need to understand how we disambiguate duplicate keys in this
548 * implementation, in order to be able to find our location using
549 * L&Y "move right" operations. Since we may insert duplicate user
550 * keys, and since these dups may propagate up the tree, we use the
551 * 'afteritem' parameter to position ourselves correctly for the
552 * insertion on internal pages.
553 *----------
555 static void
556 _bt_insertonpg(Relation rel,
557 Buffer buf,
558 BTStack stack,
559 IndexTuple itup,
560 OffsetNumber newitemoff,
561 bool split_only_page)
563 Page page;
564 BTPageOpaque lpageop;
565 OffsetNumber firstright = InvalidOffsetNumber;
566 Size itemsz;
568 page = BufferGetPage(buf);
569 lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
571 itemsz = IndexTupleDSize(*itup);
572 itemsz = MAXALIGN(itemsz); /* be safe, PageAddItem will do this but we
573 * need to be consistent */
576 * Do we need to split the page to fit the item on it?
578 * Note: PageGetFreeSpace() subtracts sizeof(ItemIdData) from its result,
579 * so this comparison is correct even though we appear to be accounting
580 * only for the item and not for its line pointer.
582 if (PageGetFreeSpace(page) < itemsz)
584 bool is_root = P_ISROOT(lpageop);
585 bool is_only = P_LEFTMOST(lpageop) && P_RIGHTMOST(lpageop);
586 bool newitemonleft;
587 Buffer rbuf;
589 /* Choose the split point */
590 firstright = _bt_findsplitloc(rel, page,
591 newitemoff, itemsz,
592 &newitemonleft);
594 /* split the buffer into left and right halves */
595 rbuf = _bt_split(rel, buf, firstright,
596 newitemoff, itemsz, itup, newitemonleft);
598 /*----------
599 * By here,
601 * + our target page has been split;
602 * + the original tuple has been inserted;
603 * + we have write locks on both the old (left half)
604 * and new (right half) buffers, after the split; and
605 * + we know the key we want to insert into the parent
606 * (it's the "high key" on the left child page).
608 * We're ready to do the parent insertion. We need to hold onto the
609 * locks for the child pages until we locate the parent, but we can
610 * release them before doing the actual insertion (see Lehman and Yao
611 * for the reasoning).
612 *----------
614 _bt_insert_parent(rel, buf, rbuf, stack, is_root, is_only);
616 else
618 Buffer metabuf = InvalidBuffer;
619 Page metapg = NULL;
620 BTMetaPageData *metad = NULL;
621 OffsetNumber itup_off;
622 BlockNumber itup_blkno;
624 itup_off = newitemoff;
625 itup_blkno = BufferGetBlockNumber(buf);
628 * If we are doing this insert because we split a page that was the
629 * only one on its tree level, but was not the root, it may have been
630 * the "fast root". We need to ensure that the fast root link points
631 * at or above the current page. We can safely acquire a lock on the
632 * metapage here --- see comments for _bt_newroot().
634 if (split_only_page)
636 Assert(!P_ISLEAF(lpageop));
638 metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
639 metapg = BufferGetPage(metabuf);
640 metad = BTPageGetMeta(metapg);
642 if (metad->btm_fastlevel >= lpageop->btpo.level)
644 /* no update wanted */
645 _bt_relbuf(rel, metabuf);
646 metabuf = InvalidBuffer;
650 /* Do the update. No ereport(ERROR) until changes are logged */
651 START_CRIT_SECTION();
653 _bt_pgaddtup(rel, page, itemsz, itup, newitemoff, "page");
655 MarkBufferDirty(buf);
657 if (BufferIsValid(metabuf))
659 metad->btm_fastroot = itup_blkno;
660 metad->btm_fastlevel = lpageop->btpo.level;
661 MarkBufferDirty(metabuf);
664 /* XLOG stuff */
665 if (!rel->rd_istemp)
667 xl_btree_insert xlrec;
668 BlockNumber xldownlink;
669 xl_btree_metadata xlmeta;
670 uint8 xlinfo;
671 XLogRecPtr recptr;
672 XLogRecData rdata[4];
673 XLogRecData *nextrdata;
674 IndexTupleData trunctuple;
676 xlrec.target.node = rel->rd_node;
677 ItemPointerSet(&(xlrec.target.tid), itup_blkno, itup_off);
679 rdata[0].data = (char *) &xlrec;
680 rdata[0].len = SizeOfBtreeInsert;
681 rdata[0].buffer = InvalidBuffer;
682 rdata[0].next = nextrdata = &(rdata[1]);
684 if (P_ISLEAF(lpageop))
685 xlinfo = XLOG_BTREE_INSERT_LEAF;
686 else
688 xldownlink = ItemPointerGetBlockNumber(&(itup->t_tid));
689 Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY);
691 nextrdata->data = (char *) &xldownlink;
692 nextrdata->len = sizeof(BlockNumber);
693 nextrdata->buffer = InvalidBuffer;
694 nextrdata->next = nextrdata + 1;
695 nextrdata++;
697 xlinfo = XLOG_BTREE_INSERT_UPPER;
700 if (BufferIsValid(metabuf))
702 xlmeta.root = metad->btm_root;
703 xlmeta.level = metad->btm_level;
704 xlmeta.fastroot = metad->btm_fastroot;
705 xlmeta.fastlevel = metad->btm_fastlevel;
707 nextrdata->data = (char *) &xlmeta;
708 nextrdata->len = sizeof(xl_btree_metadata);
709 nextrdata->buffer = InvalidBuffer;
710 nextrdata->next = nextrdata + 1;
711 nextrdata++;
713 xlinfo = XLOG_BTREE_INSERT_META;
716 /* Read comments in _bt_pgaddtup */
717 if (!P_ISLEAF(lpageop) && newitemoff == P_FIRSTDATAKEY(lpageop))
719 trunctuple = *itup;
720 trunctuple.t_info = sizeof(IndexTupleData);
721 nextrdata->data = (char *) &trunctuple;
722 nextrdata->len = sizeof(IndexTupleData);
724 else
726 nextrdata->data = (char *) itup;
727 nextrdata->len = IndexTupleDSize(*itup);
729 nextrdata->buffer = buf;
730 nextrdata->buffer_std = true;
731 nextrdata->next = NULL;
733 recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
735 if (BufferIsValid(metabuf))
737 PageSetLSN(metapg, recptr);
738 PageSetTLI(metapg, ThisTimeLineID);
741 PageSetLSN(page, recptr);
742 PageSetTLI(page, ThisTimeLineID);
745 END_CRIT_SECTION();
747 /* release buffers; send out relcache inval if metapage changed */
748 if (BufferIsValid(metabuf))
750 if (!InRecovery)
751 CacheInvalidateRelcache(rel);
752 _bt_relbuf(rel, metabuf);
755 _bt_relbuf(rel, buf);
760 * _bt_split() -- split a page in the btree.
762 * On entry, buf is the page to split, and is pinned and write-locked.
763 * firstright is the item index of the first item to be moved to the
764 * new right page. newitemoff etc. tell us about the new item that
765 * must be inserted along with the data from the old page.
767 * Returns the new right sibling of buf, pinned and write-locked.
768 * The pin and lock on buf are maintained.
770 static Buffer
771 _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
772 OffsetNumber newitemoff, Size newitemsz, IndexTuple newitem,
773 bool newitemonleft)
775 Buffer rbuf;
776 Page origpage;
777 Page leftpage,
778 rightpage;
779 BTPageOpaque ropaque,
780 lopaque,
781 oopaque;
782 Buffer sbuf = InvalidBuffer;
783 Page spage = NULL;
784 BTPageOpaque sopaque = NULL;
785 Size itemsz;
786 ItemId itemid;
787 IndexTuple item;
788 OffsetNumber leftoff,
789 rightoff;
790 OffsetNumber maxoff;
791 OffsetNumber i;
792 bool isroot;
794 rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
795 origpage = BufferGetPage(buf);
796 leftpage = PageGetTempPage(origpage);
797 rightpage = BufferGetPage(rbuf);
799 _bt_pageinit(leftpage, BufferGetPageSize(buf));
800 /* rightpage was already initialized by _bt_getbuf */
803 * Copy the original page's LSN and TLI into leftpage, which will become
804 * the updated version of the page. We need this because XLogInsert will
805 * examine these fields and possibly dump them in a page image.
807 PageSetLSN(leftpage, PageGetLSN(origpage));
808 PageSetTLI(leftpage, PageGetTLI(origpage));
810 /* init btree private data */
811 oopaque = (BTPageOpaque) PageGetSpecialPointer(origpage);
812 lopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage);
813 ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage);
815 isroot = P_ISROOT(oopaque);
817 /* if we're splitting this page, it won't be the root when we're done */
818 /* also, clear the SPLIT_END and HAS_GARBAGE flags in both pages */
819 lopaque->btpo_flags = oopaque->btpo_flags;
820 lopaque->btpo_flags &= ~(BTP_ROOT | BTP_SPLIT_END | BTP_HAS_GARBAGE);
821 ropaque->btpo_flags = lopaque->btpo_flags;
822 lopaque->btpo_prev = oopaque->btpo_prev;
823 lopaque->btpo_next = BufferGetBlockNumber(rbuf);
824 ropaque->btpo_prev = BufferGetBlockNumber(buf);
825 ropaque->btpo_next = oopaque->btpo_next;
826 lopaque->btpo.level = ropaque->btpo.level = oopaque->btpo.level;
827 /* Since we already have write-lock on both pages, ok to read cycleid */
828 lopaque->btpo_cycleid = _bt_vacuum_cycleid(rel);
829 ropaque->btpo_cycleid = lopaque->btpo_cycleid;
832 * If the page we're splitting is not the rightmost page at its level in
833 * the tree, then the first entry on the page is the high key for the
834 * page. We need to copy that to the right half. Otherwise (meaning the
835 * rightmost page case), all the items on the right half will be user
836 * data.
838 rightoff = P_HIKEY;
840 if (!P_RIGHTMOST(oopaque))
842 itemid = PageGetItemId(origpage, P_HIKEY);
843 itemsz = ItemIdGetLength(itemid);
844 item = (IndexTuple) PageGetItem(origpage, itemid);
845 if (PageAddItem(rightpage, (Item) item, itemsz, rightoff,
846 false, false) == InvalidOffsetNumber)
847 elog(PANIC, "failed to add hikey to the right sibling"
848 " while splitting block %u of index \"%s\"",
849 BufferGetBlockNumber(buf), RelationGetRelationName(rel));
850 rightoff = OffsetNumberNext(rightoff);
854 * The "high key" for the new left page will be the first key that's going
855 * to go into the new right page. This might be either the existing data
856 * item at position firstright, or the incoming tuple.
858 leftoff = P_HIKEY;
859 if (!newitemonleft && newitemoff == firstright)
861 /* incoming tuple will become first on right page */
862 itemsz = newitemsz;
863 item = newitem;
865 else
867 /* existing item at firstright will become first on right page */
868 itemid = PageGetItemId(origpage, firstright);
869 itemsz = ItemIdGetLength(itemid);
870 item = (IndexTuple) PageGetItem(origpage, itemid);
872 if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
873 false, false) == InvalidOffsetNumber)
874 elog(PANIC, "failed to add hikey to the left sibling"
875 " while splitting block %u of index \"%s\"",
876 BufferGetBlockNumber(buf), RelationGetRelationName(rel));
877 leftoff = OffsetNumberNext(leftoff);
880 * Now transfer all the data items to the appropriate page.
882 * Note: we *must* insert at least the right page's items in item-number
883 * order, for the benefit of _bt_restore_page().
885 maxoff = PageGetMaxOffsetNumber(origpage);
887 for (i = P_FIRSTDATAKEY(oopaque); i <= maxoff; i = OffsetNumberNext(i))
889 itemid = PageGetItemId(origpage, i);
890 itemsz = ItemIdGetLength(itemid);
891 item = (IndexTuple) PageGetItem(origpage, itemid);
893 /* does new item belong before this one? */
894 if (i == newitemoff)
896 if (newitemonleft)
898 _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff,
899 "left sibling");
900 leftoff = OffsetNumberNext(leftoff);
902 else
904 _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
905 "right sibling");
906 rightoff = OffsetNumberNext(rightoff);
910 /* decide which page to put it on */
911 if (i < firstright)
913 _bt_pgaddtup(rel, leftpage, itemsz, item, leftoff,
914 "left sibling");
915 leftoff = OffsetNumberNext(leftoff);
917 else
919 _bt_pgaddtup(rel, rightpage, itemsz, item, rightoff,
920 "right sibling");
921 rightoff = OffsetNumberNext(rightoff);
925 /* cope with possibility that newitem goes at the end */
926 if (i <= newitemoff)
929 * Can't have newitemonleft here; that would imply we were told to put
930 * *everything* on the left page, which cannot fit (if it could, we'd
931 * not be splitting the page).
933 Assert(!newitemonleft);
934 _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
935 "right sibling");
936 rightoff = OffsetNumberNext(rightoff);
940 * We have to grab the right sibling (if any) and fix the prev pointer
941 * there. We are guaranteed that this is deadlock-free since no other
942 * writer will be holding a lock on that page and trying to move left, and
943 * all readers release locks on a page before trying to fetch its
944 * neighbors.
947 if (!P_RIGHTMOST(ropaque))
949 sbuf = _bt_getbuf(rel, ropaque->btpo_next, BT_WRITE);
950 spage = BufferGetPage(sbuf);
951 sopaque = (BTPageOpaque) PageGetSpecialPointer(spage);
952 if (sopaque->btpo_prev != ropaque->btpo_prev)
953 elog(PANIC, "right sibling's left-link doesn't match: "
954 "block %u links to %u instead of expected %u in index \"%s\"",
955 ropaque->btpo_next, sopaque->btpo_prev, ropaque->btpo_prev,
956 RelationGetRelationName(rel));
959 * Check to see if we can set the SPLIT_END flag in the right-hand
960 * split page; this can save some I/O for vacuum since it need not
961 * proceed to the right sibling. We can set the flag if the right
962 * sibling has a different cycleid: that means it could not be part of
963 * a group of pages that were all split off from the same ancestor
964 * page. If you're confused, imagine that page A splits to A B and
965 * then again, yielding A C B, while vacuum is in progress. Tuples
966 * originally in A could now be in either B or C, hence vacuum must
967 * examine both pages. But if D, our right sibling, has a different
968 * cycleid then it could not contain any tuples that were in A when
969 * the vacuum started.
971 if (sopaque->btpo_cycleid != ropaque->btpo_cycleid)
972 ropaque->btpo_flags |= BTP_SPLIT_END;
976 * Right sibling is locked, new siblings are prepared, but original page
977 * is not updated yet.
979 * NO EREPORT(ERROR) till right sibling is updated. We can get away with
980 * not starting the critical section till here because we haven't been
981 * scribbling on the original page yet, and we don't care about the new
982 * sibling until it's linked into the btree.
984 START_CRIT_SECTION();
987 * By here, the original data page has been split into two new halves, and
988 * these are correct. The algorithm requires that the left page never
989 * move during a split, so we copy the new left page back on top of the
990 * original. Note that this is not a waste of time, since we also require
991 * (in the page management code) that the center of a page always be
992 * clean, and the most efficient way to guarantee this is just to compact
993 * the data by reinserting it into a new left page. (XXX the latter
994 * comment is probably obsolete.)
996 * We need to do this before writing the WAL record, so that XLogInsert
997 * can WAL log an image of the page if necessary.
999 PageRestoreTempPage(leftpage, origpage);
1001 MarkBufferDirty(buf);
1002 MarkBufferDirty(rbuf);
1004 if (!P_RIGHTMOST(ropaque))
1006 sopaque->btpo_prev = BufferGetBlockNumber(rbuf);
1007 MarkBufferDirty(sbuf);
1010 /* XLOG stuff */
1011 if (!rel->rd_istemp)
1013 xl_btree_split xlrec;
1014 uint8 xlinfo;
1015 XLogRecPtr recptr;
1016 XLogRecData rdata[7];
1017 XLogRecData *lastrdata;
1019 xlrec.node = rel->rd_node;
1020 xlrec.leftsib = BufferGetBlockNumber(buf);
1021 xlrec.rightsib = BufferGetBlockNumber(rbuf);
1022 xlrec.rnext = ropaque->btpo_next;
1023 xlrec.level = ropaque->btpo.level;
1024 xlrec.firstright = firstright;
1026 rdata[0].data = (char *) &xlrec;
1027 rdata[0].len = SizeOfBtreeSplit;
1028 rdata[0].buffer = InvalidBuffer;
1030 lastrdata = &rdata[0];
1032 if (ropaque->btpo.level > 0)
1034 /* Log downlink on non-leaf pages */
1035 lastrdata->next = lastrdata + 1;
1036 lastrdata++;
1038 lastrdata->data = (char *) &newitem->t_tid.ip_blkid;
1039 lastrdata->len = sizeof(BlockIdData);
1040 lastrdata->buffer = InvalidBuffer;
1043 * We must also log the left page's high key, because the right
1044 * page's leftmost key is suppressed on non-leaf levels. Show it
1045 * as belonging to the left page buffer, so that it is not stored
1046 * if XLogInsert decides it needs a full-page image of the left
1047 * page.
1049 lastrdata->next = lastrdata + 1;
1050 lastrdata++;
1052 itemid = PageGetItemId(origpage, P_HIKEY);
1053 item = (IndexTuple) PageGetItem(origpage, itemid);
1054 lastrdata->data = (char *) item;
1055 lastrdata->len = MAXALIGN(IndexTupleSize(item));
1056 lastrdata->buffer = buf; /* backup block 1 */
1057 lastrdata->buffer_std = true;
1061 * Log the new item and its offset, if it was inserted on the left
1062 * page. (If it was put on the right page, we don't need to explicitly
1063 * WAL log it because it's included with all the other items on the
1064 * right page.) Show the new item as belonging to the left page
1065 * buffer, so that it is not stored if XLogInsert decides it needs a
1066 * full-page image of the left page. We store the offset anyway,
1067 * though, to support archive compression of these records.
1069 if (newitemonleft)
1071 lastrdata->next = lastrdata + 1;
1072 lastrdata++;
1074 lastrdata->data = (char *) &newitemoff;
1075 lastrdata->len = sizeof(OffsetNumber);
1076 lastrdata->buffer = InvalidBuffer;
1078 lastrdata->next = lastrdata + 1;
1079 lastrdata++;
1081 lastrdata->data = (char *) newitem;
1082 lastrdata->len = MAXALIGN(newitemsz);
1083 lastrdata->buffer = buf; /* backup block 1 */
1084 lastrdata->buffer_std = true;
1086 else if (ropaque->btpo.level == 0)
1089 * Although we don't need to WAL-log the new item, we still need
1090 * XLogInsert to consider storing a full-page image of the left
1091 * page, so make an empty entry referencing that buffer. This also
1092 * ensures that the left page is always backup block 1.
1094 lastrdata->next = lastrdata + 1;
1095 lastrdata++;
1097 lastrdata->data = NULL;
1098 lastrdata->len = 0;
1099 lastrdata->buffer = buf; /* backup block 1 */
1100 lastrdata->buffer_std = true;
1104 * Log the contents of the right page in the format understood by
1105 * _bt_restore_page(). We set lastrdata->buffer to InvalidBuffer,
1106 * because we're going to recreate the whole page anyway, so it should
1107 * never be stored by XLogInsert.
1109 * Direct access to page is not good but faster - we should implement
1110 * some new func in page API. Note we only store the tuples
1111 * themselves, knowing that they were inserted in item-number order
1112 * and so the item pointers can be reconstructed. See comments for
1113 * _bt_restore_page().
1115 lastrdata->next = lastrdata + 1;
1116 lastrdata++;
1118 lastrdata->data = (char *) rightpage +
1119 ((PageHeader) rightpage)->pd_upper;
1120 lastrdata->len = ((PageHeader) rightpage)->pd_special -
1121 ((PageHeader) rightpage)->pd_upper;
1122 lastrdata->buffer = InvalidBuffer;
1124 /* Log the right sibling, because we've changed its' prev-pointer. */
1125 if (!P_RIGHTMOST(ropaque))
1127 lastrdata->next = lastrdata + 1;
1128 lastrdata++;
1130 lastrdata->data = NULL;
1131 lastrdata->len = 0;
1132 lastrdata->buffer = sbuf; /* backup block 2 */
1133 lastrdata->buffer_std = true;
1136 lastrdata->next = NULL;
1138 if (isroot)
1139 xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L_ROOT : XLOG_BTREE_SPLIT_R_ROOT;
1140 else
1141 xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L : XLOG_BTREE_SPLIT_R;
1143 recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
1145 PageSetLSN(origpage, recptr);
1146 PageSetTLI(origpage, ThisTimeLineID);
1147 PageSetLSN(rightpage, recptr);
1148 PageSetTLI(rightpage, ThisTimeLineID);
1149 if (!P_RIGHTMOST(ropaque))
1151 PageSetLSN(spage, recptr);
1152 PageSetTLI(spage, ThisTimeLineID);
1156 END_CRIT_SECTION();
1158 /* release the old right sibling */
1159 if (!P_RIGHTMOST(ropaque))
1160 _bt_relbuf(rel, sbuf);
1162 /* split's done */
1163 return rbuf;
1167 * _bt_findsplitloc() -- find an appropriate place to split a page.
1169 * The idea here is to equalize the free space that will be on each split
1170 * page, *after accounting for the inserted tuple*. (If we fail to account
1171 * for it, we might find ourselves with too little room on the page that
1172 * it needs to go into!)
1174 * If the page is the rightmost page on its level, we instead try to arrange
1175 * to leave the left split page fillfactor% full. In this way, when we are
1176 * inserting successively increasing keys (consider sequences, timestamps,
1177 * etc) we will end up with a tree whose pages are about fillfactor% full,
1178 * instead of the 50% full result that we'd get without this special case.
1179 * This is the same as nbtsort.c produces for a newly-created tree. Note
1180 * that leaf and nonleaf pages use different fillfactors.
1182 * We are passed the intended insert position of the new tuple, expressed as
1183 * the offsetnumber of the tuple it must go in front of. (This could be
1184 * maxoff+1 if the tuple is to go at the end.)
1186 * We return the index of the first existing tuple that should go on the
1187 * righthand page, plus a boolean indicating whether the new tuple goes on
1188 * the left or right page. The bool is necessary to disambiguate the case
1189 * where firstright == newitemoff.
1191 static OffsetNumber
1192 _bt_findsplitloc(Relation rel,
1193 Page page,
1194 OffsetNumber newitemoff,
1195 Size newitemsz,
1196 bool *newitemonleft)
1198 BTPageOpaque opaque;
1199 OffsetNumber offnum;
1200 OffsetNumber maxoff;
1201 ItemId itemid;
1202 FindSplitData state;
1203 int leftspace,
1204 rightspace,
1205 goodenough,
1206 olddataitemstotal,
1207 olddataitemstoleft;
1208 bool goodenoughfound;
1210 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1212 /* Passed-in newitemsz is MAXALIGNED but does not include line pointer */
1213 newitemsz += sizeof(ItemIdData);
1215 /* Total free space available on a btree page, after fixed overhead */
1216 leftspace = rightspace =
1217 PageGetPageSize(page) - SizeOfPageHeaderData -
1218 MAXALIGN(sizeof(BTPageOpaqueData));
1220 /* The right page will have the same high key as the old page */
1221 if (!P_RIGHTMOST(opaque))
1223 itemid = PageGetItemId(page, P_HIKEY);
1224 rightspace -= (int) (MAXALIGN(ItemIdGetLength(itemid)) +
1225 sizeof(ItemIdData));
1228 /* Count up total space in data items without actually scanning 'em */
1229 olddataitemstotal = rightspace - (int) PageGetExactFreeSpace(page);
1231 state.newitemsz = newitemsz;
1232 state.is_leaf = P_ISLEAF(opaque);
1233 state.is_rightmost = P_RIGHTMOST(opaque);
1234 state.have_split = false;
1235 if (state.is_leaf)
1236 state.fillfactor = RelationGetFillFactor(rel,
1237 BTREE_DEFAULT_FILLFACTOR);
1238 else
1239 state.fillfactor = BTREE_NONLEAF_FILLFACTOR;
1240 state.newitemonleft = false; /* these just to keep compiler quiet */
1241 state.firstright = 0;
1242 state.best_delta = 0;
1243 state.leftspace = leftspace;
1244 state.rightspace = rightspace;
1245 state.olddataitemstotal = olddataitemstotal;
1246 state.newitemoff = newitemoff;
1249 * Finding the best possible split would require checking all the possible
1250 * split points, because of the high-key and left-key special cases.
1251 * That's probably more work than it's worth; instead, stop as soon as we
1252 * find a "good-enough" split, where good-enough is defined as an
1253 * imbalance in free space of no more than pagesize/16 (arbitrary...) This
1254 * should let us stop near the middle on most pages, instead of plowing to
1255 * the end.
1257 goodenough = leftspace / 16;
1260 * Scan through the data items and calculate space usage for a split at
1261 * each possible position.
1263 olddataitemstoleft = 0;
1264 goodenoughfound = false;
1265 maxoff = PageGetMaxOffsetNumber(page);
1267 for (offnum = P_FIRSTDATAKEY(opaque);
1268 offnum <= maxoff;
1269 offnum = OffsetNumberNext(offnum))
1271 Size itemsz;
1273 itemid = PageGetItemId(page, offnum);
1274 itemsz = MAXALIGN(ItemIdGetLength(itemid)) + sizeof(ItemIdData);
1277 * Will the new item go to left or right of split?
1279 if (offnum > newitemoff)
1280 _bt_checksplitloc(&state, offnum, true,
1281 olddataitemstoleft, itemsz);
1283 else if (offnum < newitemoff)
1284 _bt_checksplitloc(&state, offnum, false,
1285 olddataitemstoleft, itemsz);
1286 else
1288 /* need to try it both ways! */
1289 _bt_checksplitloc(&state, offnum, true,
1290 olddataitemstoleft, itemsz);
1292 _bt_checksplitloc(&state, offnum, false,
1293 olddataitemstoleft, itemsz);
1296 /* Abort scan once we find a good-enough choice */
1297 if (state.have_split && state.best_delta <= goodenough)
1299 goodenoughfound = true;
1300 break;
1303 olddataitemstoleft += itemsz;
1307 * If the new item goes as the last item, check for splitting so that all
1308 * the old items go to the left page and the new item goes to the right
1309 * page.
1311 if (newitemoff > maxoff && !goodenoughfound)
1312 _bt_checksplitloc(&state, newitemoff, false, olddataitemstotal, 0);
1315 * I believe it is not possible to fail to find a feasible split, but just
1316 * in case ...
1318 if (!state.have_split)
1319 elog(ERROR, "could not find a feasible split point for index \"%s\"",
1320 RelationGetRelationName(rel));
1322 *newitemonleft = state.newitemonleft;
1323 return state.firstright;
1327 * Subroutine to analyze a particular possible split choice (ie, firstright
1328 * and newitemonleft settings), and record the best split so far in *state.
1330 * firstoldonright is the offset of the first item on the original page
1331 * that goes to the right page, and firstoldonrightsz is the size of that
1332 * tuple. firstoldonright can be > max offset, which means that all the old
1333 * items go to the left page and only the new item goes to the right page.
1334 * In that case, firstoldonrightsz is not used.
1336 * olddataitemstoleft is the total size of all old items to the left of
1337 * firstoldonright.
1339 static void
1340 _bt_checksplitloc(FindSplitData *state,
1341 OffsetNumber firstoldonright,
1342 bool newitemonleft,
1343 int olddataitemstoleft,
1344 Size firstoldonrightsz)
1346 int leftfree,
1347 rightfree;
1348 Size firstrightitemsz;
1349 bool newitemisfirstonright;
1351 /* Is the new item going to be the first item on the right page? */
1352 newitemisfirstonright = (firstoldonright == state->newitemoff
1353 && !newitemonleft);
1355 if (newitemisfirstonright)
1356 firstrightitemsz = state->newitemsz;
1357 else
1358 firstrightitemsz = firstoldonrightsz;
1360 /* Account for all the old tuples */
1361 leftfree = state->leftspace - olddataitemstoleft;
1362 rightfree = state->rightspace -
1363 (state->olddataitemstotal - olddataitemstoleft);
1366 * The first item on the right page becomes the high key of the left page;
1367 * therefore it counts against left space as well as right space.
1369 leftfree -= firstrightitemsz;
1371 /* account for the new item */
1372 if (newitemonleft)
1373 leftfree -= (int) state->newitemsz;
1374 else
1375 rightfree -= (int) state->newitemsz;
1378 * If we are not on the leaf level, we will be able to discard the key
1379 * data from the first item that winds up on the right page.
1381 if (!state->is_leaf)
1382 rightfree += (int) firstrightitemsz -
1383 (int) (MAXALIGN(sizeof(IndexTupleData)) + sizeof(ItemIdData));
1386 * If feasible split point, remember best delta.
1388 if (leftfree >= 0 && rightfree >= 0)
1390 int delta;
1392 if (state->is_rightmost)
1395 * If splitting a rightmost page, try to put (100-fillfactor)% of
1396 * free space on left page. See comments for _bt_findsplitloc.
1398 delta = (state->fillfactor * leftfree)
1399 - ((100 - state->fillfactor) * rightfree);
1401 else
1403 /* Otherwise, aim for equal free space on both sides */
1404 delta = leftfree - rightfree;
1407 if (delta < 0)
1408 delta = -delta;
1409 if (!state->have_split || delta < state->best_delta)
1411 state->have_split = true;
1412 state->newitemonleft = newitemonleft;
1413 state->firstright = firstoldonright;
1414 state->best_delta = delta;
1420 * _bt_insert_parent() -- Insert downlink into parent after a page split.
1422 * On entry, buf and rbuf are the left and right split pages, which we
1423 * still hold write locks on per the L&Y algorithm. We release the
1424 * write locks once we have write lock on the parent page. (Any sooner,
1425 * and it'd be possible for some other process to try to split or delete
1426 * one of these pages, and get confused because it cannot find the downlink.)
1428 * stack - stack showing how we got here. May be NULL in cases that don't
1429 * have to be efficient (concurrent ROOT split, WAL recovery)
1430 * is_root - we split the true root
1431 * is_only - we split a page alone on its level (might have been fast root)
1433 * This is exported so it can be called by nbtxlog.c.
1435 void
1436 _bt_insert_parent(Relation rel,
1437 Buffer buf,
1438 Buffer rbuf,
1439 BTStack stack,
1440 bool is_root,
1441 bool is_only)
1444 * Here we have to do something Lehman and Yao don't talk about: deal with
1445 * a root split and construction of a new root. If our stack is empty
1446 * then we have just split a node on what had been the root level when we
1447 * descended the tree. If it was still the root then we perform a
1448 * new-root construction. If it *wasn't* the root anymore, search to find
1449 * the next higher level that someone constructed meanwhile, and find the
1450 * right place to insert as for the normal case.
1452 * If we have to search for the parent level, we do so by re-descending
1453 * from the root. This is not super-efficient, but it's rare enough not
1454 * to matter. (This path is also taken when called from WAL recovery ---
1455 * we have no stack in that case.)
1457 if (is_root)
1459 Buffer rootbuf;
1461 Assert(stack == NULL);
1462 Assert(is_only);
1463 /* create a new root node and update the metapage */
1464 rootbuf = _bt_newroot(rel, buf, rbuf);
1465 /* release the split buffers */
1466 _bt_relbuf(rel, rootbuf);
1467 _bt_relbuf(rel, rbuf);
1468 _bt_relbuf(rel, buf);
1470 else
1472 BlockNumber bknum = BufferGetBlockNumber(buf);
1473 BlockNumber rbknum = BufferGetBlockNumber(rbuf);
1474 Page page = BufferGetPage(buf);
1475 IndexTuple new_item;
1476 BTStackData fakestack;
1477 IndexTuple ritem;
1478 Buffer pbuf;
1480 if (stack == NULL)
1482 BTPageOpaque lpageop;
1484 if (!InRecovery)
1485 elog(DEBUG2, "concurrent ROOT page split");
1486 lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
1487 /* Find the leftmost page at the next level up */
1488 pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false);
1489 /* Set up a phony stack entry pointing there */
1490 stack = &fakestack;
1491 stack->bts_blkno = BufferGetBlockNumber(pbuf);
1492 stack->bts_offset = InvalidOffsetNumber;
1493 /* bts_btentry will be initialized below */
1494 stack->bts_parent = NULL;
1495 _bt_relbuf(rel, pbuf);
1498 /* get high key from left page == lowest key on new right page */
1499 ritem = (IndexTuple) PageGetItem(page,
1500 PageGetItemId(page, P_HIKEY));
1502 /* form an index tuple that points at the new right page */
1503 new_item = CopyIndexTuple(ritem);
1504 ItemPointerSet(&(new_item->t_tid), rbknum, P_HIKEY);
1507 * Find the parent buffer and get the parent page.
1509 * Oops - if we were moved right then we need to change stack item! We
1510 * want to find parent pointing to where we are, right ? - vadim
1511 * 05/27/97
1513 ItemPointerSet(&(stack->bts_btentry.t_tid), bknum, P_HIKEY);
1515 pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
1517 /* Now we can unlock the children */
1518 _bt_relbuf(rel, rbuf);
1519 _bt_relbuf(rel, buf);
1521 /* Check for error only after writing children */
1522 if (pbuf == InvalidBuffer)
1523 elog(ERROR, "failed to re-find parent key in index \"%s\" for split pages %u/%u",
1524 RelationGetRelationName(rel), bknum, rbknum);
1526 /* Recursively update the parent */
1527 _bt_insertonpg(rel, pbuf, stack->bts_parent,
1528 new_item, stack->bts_offset + 1,
1529 is_only);
1531 /* be tidy */
1532 pfree(new_item);
1537 * _bt_getstackbuf() -- Walk back up the tree one step, and find the item
1538 * we last looked at in the parent.
1540 * This is possible because we save the downlink from the parent item,
1541 * which is enough to uniquely identify it. Insertions into the parent
1542 * level could cause the item to move right; deletions could cause it
1543 * to move left, but not left of the page we previously found it in.
1545 * Adjusts bts_blkno & bts_offset if changed.
1547 * Returns InvalidBuffer if item not found (should not happen).
1549 Buffer
1550 _bt_getstackbuf(Relation rel, BTStack stack, int access)
1552 BlockNumber blkno;
1553 OffsetNumber start;
1555 blkno = stack->bts_blkno;
1556 start = stack->bts_offset;
1558 for (;;)
1560 Buffer buf;
1561 Page page;
1562 BTPageOpaque opaque;
1564 buf = _bt_getbuf(rel, blkno, access);
1565 page = BufferGetPage(buf);
1566 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1568 if (!P_IGNORE(opaque))
1570 OffsetNumber offnum,
1571 minoff,
1572 maxoff;
1573 ItemId itemid;
1574 IndexTuple item;
1576 minoff = P_FIRSTDATAKEY(opaque);
1577 maxoff = PageGetMaxOffsetNumber(page);
1580 * start = InvalidOffsetNumber means "search the whole page". We
1581 * need this test anyway due to possibility that page has a high
1582 * key now when it didn't before.
1584 if (start < minoff)
1585 start = minoff;
1588 * Need this check too, to guard against possibility that page
1589 * split since we visited it originally.
1591 if (start > maxoff)
1592 start = OffsetNumberNext(maxoff);
1595 * These loops will check every item on the page --- but in an
1596 * order that's attuned to the probability of where it actually
1597 * is. Scan to the right first, then to the left.
1599 for (offnum = start;
1600 offnum <= maxoff;
1601 offnum = OffsetNumberNext(offnum))
1603 itemid = PageGetItemId(page, offnum);
1604 item = (IndexTuple) PageGetItem(page, itemid);
1605 if (BTEntrySame(item, &stack->bts_btentry))
1607 /* Return accurate pointer to where link is now */
1608 stack->bts_blkno = blkno;
1609 stack->bts_offset = offnum;
1610 return buf;
1614 for (offnum = OffsetNumberPrev(start);
1615 offnum >= minoff;
1616 offnum = OffsetNumberPrev(offnum))
1618 itemid = PageGetItemId(page, offnum);
1619 item = (IndexTuple) PageGetItem(page, itemid);
1620 if (BTEntrySame(item, &stack->bts_btentry))
1622 /* Return accurate pointer to where link is now */
1623 stack->bts_blkno = blkno;
1624 stack->bts_offset = offnum;
1625 return buf;
1631 * The item we're looking for moved right at least one page.
1633 if (P_RIGHTMOST(opaque))
1635 _bt_relbuf(rel, buf);
1636 return InvalidBuffer;
1638 blkno = opaque->btpo_next;
1639 start = InvalidOffsetNumber;
1640 _bt_relbuf(rel, buf);
1645 * _bt_newroot() -- Create a new root page for the index.
1647 * We've just split the old root page and need to create a new one.
1648 * In order to do this, we add a new root page to the file, then lock
1649 * the metadata page and update it. This is guaranteed to be deadlock-
1650 * free, because all readers release their locks on the metadata page
1651 * before trying to lock the root, and all writers lock the root before
1652 * trying to lock the metadata page. We have a write lock on the old
1653 * root page, so we have not introduced any cycles into the waits-for
1654 * graph.
1656 * On entry, lbuf (the old root) and rbuf (its new peer) are write-
1657 * locked. On exit, a new root page exists with entries for the
1658 * two new children, metapage is updated and unlocked/unpinned.
1659 * The new root buffer is returned to caller which has to unlock/unpin
1660 * lbuf, rbuf & rootbuf.
1662 static Buffer
1663 _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
1665 Buffer rootbuf;
1666 Page lpage,
1667 rootpage;
1668 BlockNumber lbkno,
1669 rbkno;
1670 BlockNumber rootblknum;
1671 BTPageOpaque rootopaque;
1672 ItemId itemid;
1673 IndexTuple item;
1674 Size itemsz;
1675 IndexTuple new_item;
1676 Buffer metabuf;
1677 Page metapg;
1678 BTMetaPageData *metad;
1680 lbkno = BufferGetBlockNumber(lbuf);
1681 rbkno = BufferGetBlockNumber(rbuf);
1682 lpage = BufferGetPage(lbuf);
1684 /* get a new root page */
1685 rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
1686 rootpage = BufferGetPage(rootbuf);
1687 rootblknum = BufferGetBlockNumber(rootbuf);
1689 /* acquire lock on the metapage */
1690 metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
1691 metapg = BufferGetPage(metabuf);
1692 metad = BTPageGetMeta(metapg);
1694 /* NO EREPORT(ERROR) from here till newroot op is logged */
1695 START_CRIT_SECTION();
1697 /* set btree special data */
1698 rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
1699 rootopaque->btpo_prev = rootopaque->btpo_next = P_NONE;
1700 rootopaque->btpo_flags = BTP_ROOT;
1701 rootopaque->btpo.level =
1702 ((BTPageOpaque) PageGetSpecialPointer(lpage))->btpo.level + 1;
1703 rootopaque->btpo_cycleid = 0;
1705 /* update metapage data */
1706 metad->btm_root = rootblknum;
1707 metad->btm_level = rootopaque->btpo.level;
1708 metad->btm_fastroot = rootblknum;
1709 metad->btm_fastlevel = rootopaque->btpo.level;
1712 * Create downlink item for left page (old root). Since this will be the
1713 * first item in a non-leaf page, it implicitly has minus-infinity key
1714 * value, so we need not store any actual key in it.
1716 itemsz = sizeof(IndexTupleData);
1717 new_item = (IndexTuple) palloc(itemsz);
1718 new_item->t_info = itemsz;
1719 ItemPointerSet(&(new_item->t_tid), lbkno, P_HIKEY);
1722 * Insert the left page pointer into the new root page. The root page is
1723 * the rightmost page on its level so there is no "high key" in it; the
1724 * two items will go into positions P_HIKEY and P_FIRSTKEY.
1726 * Note: we *must* insert the two items in item-number order, for the
1727 * benefit of _bt_restore_page().
1729 if (PageAddItem(rootpage, (Item) new_item, itemsz, P_HIKEY,
1730 false, false) == InvalidOffsetNumber)
1731 elog(PANIC, "failed to add leftkey to new root page"
1732 " while splitting block %u of index \"%s\"",
1733 BufferGetBlockNumber(lbuf), RelationGetRelationName(rel));
1734 pfree(new_item);
1737 * Create downlink item for right page. The key for it is obtained from
1738 * the "high key" position in the left page.
1740 itemid = PageGetItemId(lpage, P_HIKEY);
1741 itemsz = ItemIdGetLength(itemid);
1742 item = (IndexTuple) PageGetItem(lpage, itemid);
1743 new_item = CopyIndexTuple(item);
1744 ItemPointerSet(&(new_item->t_tid), rbkno, P_HIKEY);
1747 * insert the right page pointer into the new root page.
1749 if (PageAddItem(rootpage, (Item) new_item, itemsz, P_FIRSTKEY,
1750 false, false) == InvalidOffsetNumber)
1751 elog(PANIC, "failed to add rightkey to new root page"
1752 " while splitting block %u of index \"%s\"",
1753 BufferGetBlockNumber(lbuf), RelationGetRelationName(rel));
1754 pfree(new_item);
1756 MarkBufferDirty(rootbuf);
1757 MarkBufferDirty(metabuf);
1759 /* XLOG stuff */
1760 if (!rel->rd_istemp)
1762 xl_btree_newroot xlrec;
1763 XLogRecPtr recptr;
1764 XLogRecData rdata[2];
1766 xlrec.node = rel->rd_node;
1767 xlrec.rootblk = rootblknum;
1768 xlrec.level = metad->btm_level;
1770 rdata[0].data = (char *) &xlrec;
1771 rdata[0].len = SizeOfBtreeNewroot;
1772 rdata[0].buffer = InvalidBuffer;
1773 rdata[0].next = &(rdata[1]);
1776 * Direct access to page is not good but faster - we should implement
1777 * some new func in page API.
1779 rdata[1].data = (char *) rootpage + ((PageHeader) rootpage)->pd_upper;
1780 rdata[1].len = ((PageHeader) rootpage)->pd_special -
1781 ((PageHeader) rootpage)->pd_upper;
1782 rdata[1].buffer = InvalidBuffer;
1783 rdata[1].next = NULL;
1785 recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, rdata);
1787 PageSetLSN(rootpage, recptr);
1788 PageSetTLI(rootpage, ThisTimeLineID);
1789 PageSetLSN(metapg, recptr);
1790 PageSetTLI(metapg, ThisTimeLineID);
1793 END_CRIT_SECTION();
1795 /* send out relcache inval for metapage change */
1796 if (!InRecovery)
1797 CacheInvalidateRelcache(rel);
1799 /* done with metapage */
1800 _bt_relbuf(rel, metabuf);
1802 return rootbuf;
1806 * _bt_pgaddtup() -- add a tuple to a particular page in the index.
1808 * This routine adds the tuple to the page as requested. It does
1809 * not affect pin/lock status, but you'd better have a write lock
1810 * and pin on the target buffer! Don't forget to write and release
1811 * the buffer afterwards, either.
1813 * The main difference between this routine and a bare PageAddItem call
1814 * is that this code knows that the leftmost index tuple on a non-leaf
1815 * btree page doesn't need to have a key. Therefore, it strips such
1816 * tuples down to just the tuple header. CAUTION: this works ONLY if
1817 * we insert the tuples in order, so that the given itup_off does
1818 * represent the final position of the tuple!
1820 static void
1821 _bt_pgaddtup(Relation rel,
1822 Page page,
1823 Size itemsize,
1824 IndexTuple itup,
1825 OffsetNumber itup_off,
1826 const char *where)
1828 BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1829 IndexTupleData trunctuple;
1831 if (!P_ISLEAF(opaque) && itup_off == P_FIRSTDATAKEY(opaque))
1833 trunctuple = *itup;
1834 trunctuple.t_info = sizeof(IndexTupleData);
1835 itup = &trunctuple;
1836 itemsize = sizeof(IndexTupleData);
1839 if (PageAddItem(page, (Item) itup, itemsize, itup_off,
1840 false, false) == InvalidOffsetNumber)
1841 elog(PANIC, "failed to add item to the %s in index \"%s\"",
1842 where, RelationGetRelationName(rel));
1846 * _bt_isequal - used in _bt_doinsert in check for duplicates.
1848 * This is very similar to _bt_compare, except for NULL handling.
1849 * Rule is simple: NOT_NULL not equal NULL, NULL not equal NULL too.
1851 static bool
1852 _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
1853 int keysz, ScanKey scankey)
1855 IndexTuple itup;
1856 int i;
1858 /* Better be comparing to a leaf item */
1859 Assert(P_ISLEAF((BTPageOpaque) PageGetSpecialPointer(page)));
1861 itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
1863 for (i = 1; i <= keysz; i++)
1865 AttrNumber attno;
1866 Datum datum;
1867 bool isNull;
1868 int32 result;
1870 attno = scankey->sk_attno;
1871 Assert(attno == i);
1872 datum = index_getattr(itup, attno, itupdesc, &isNull);
1874 /* NULLs are never equal to anything */
1875 if (isNull || (scankey->sk_flags & SK_ISNULL))
1876 return false;
1878 result = DatumGetInt32(FunctionCall2(&scankey->sk_func,
1879 datum,
1880 scankey->sk_argument));
1882 if (result != 0)
1883 return false;
1885 scankey++;
1888 /* if we get here, the keys are equal */
1889 return true;
1893 * _bt_vacuum_one_page - vacuum just one index page.
1895 * Try to remove LP_DEAD items from the given page. The passed buffer
1896 * must be exclusive-locked, but unlike a real VACUUM, we don't need a
1897 * super-exclusive "cleanup" lock (see nbtree/README).
1899 static void
1900 _bt_vacuum_one_page(Relation rel, Buffer buffer)
1902 OffsetNumber deletable[MaxOffsetNumber];
1903 int ndeletable = 0;
1904 OffsetNumber offnum,
1905 minoff,
1906 maxoff;
1907 Page page = BufferGetPage(buffer);
1908 BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1911 * Scan over all items to see which ones need to be deleted according to
1912 * LP_DEAD flags.
1914 minoff = P_FIRSTDATAKEY(opaque);
1915 maxoff = PageGetMaxOffsetNumber(page);
1916 for (offnum = minoff;
1917 offnum <= maxoff;
1918 offnum = OffsetNumberNext(offnum))
1920 ItemId itemId = PageGetItemId(page, offnum);
1922 if (ItemIdIsDead(itemId))
1923 deletable[ndeletable++] = offnum;
1926 if (ndeletable > 0)
1927 _bt_delitems(rel, buffer, deletable, ndeletable);
1930 * Note: if we didn't find any LP_DEAD items, then the page's
1931 * BTP_HAS_GARBAGE hint bit is falsely set. We do not bother expending a
1932 * separate write to clear it, however. We will clear it when we split
1933 * the page.