Fix xslt_process() to ensure that it inserts a NULL terminator after the
[PostgreSQL.git] / src / backend / access / nbtree / nbtxlog.c
blob895d6411323a268ee76bc0723cb201e59c948187
1 /*-------------------------------------------------------------------------
3 * nbtxlog.c
4 * WAL replay logic for btrees.
7 * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
10 * IDENTIFICATION
11 * $PostgreSQL$
13 *-------------------------------------------------------------------------
15 #include "postgres.h"
17 #include "access/nbtree.h"
18 #include "access/transam.h"
19 #include "storage/bufmgr.h"
22 * We must keep track of expected insertions due to page splits, and apply
23 * them manually if they are not seen in the WAL log during replay. This
24 * makes it safe for page insertion to be a multiple-WAL-action process.
26 * Similarly, deletion of an only child page and deletion of its parent page
27 * form multiple WAL log entries, and we have to be prepared to follow through
28 * with the deletion if the log ends between.
30 * The data structure is a simple linked list --- this should be good enough,
31 * since we don't expect a page split or multi deletion to remain incomplete
32 * for long. In any case we need to respect the order of operations.
34 typedef struct bt_incomplete_action
36 RelFileNode node; /* the index */
37 bool is_split; /* T = pending split, F = pending delete */
38 /* these fields are for a split: */
39 bool is_root; /* we split the root */
40 BlockNumber leftblk; /* left half of split */
41 BlockNumber rightblk; /* right half of split */
42 /* these fields are for a delete: */
43 BlockNumber delblk; /* parent block to be deleted */
44 } bt_incomplete_action;
46 static List *incomplete_actions;
49 static void
50 log_incomplete_split(RelFileNode node, BlockNumber leftblk,
51 BlockNumber rightblk, bool is_root)
53 bt_incomplete_action *action = palloc(sizeof(bt_incomplete_action));
55 action->node = node;
56 action->is_split = true;
57 action->is_root = is_root;
58 action->leftblk = leftblk;
59 action->rightblk = rightblk;
60 incomplete_actions = lappend(incomplete_actions, action);
63 static void
64 forget_matching_split(RelFileNode node, BlockNumber downlink, bool is_root)
66 ListCell *l;
68 foreach(l, incomplete_actions)
70 bt_incomplete_action *action = (bt_incomplete_action *) lfirst(l);
72 if (RelFileNodeEquals(node, action->node) &&
73 action->is_split &&
74 downlink == action->rightblk)
76 if (is_root != action->is_root)
77 elog(LOG, "forget_matching_split: fishy is_root data (expected %d, got %d)",
78 action->is_root, is_root);
79 incomplete_actions = list_delete_ptr(incomplete_actions, action);
80 pfree(action);
81 break; /* need not look further */
86 static void
87 log_incomplete_deletion(RelFileNode node, BlockNumber delblk)
89 bt_incomplete_action *action = palloc(sizeof(bt_incomplete_action));
91 action->node = node;
92 action->is_split = false;
93 action->delblk = delblk;
94 incomplete_actions = lappend(incomplete_actions, action);
97 static void
98 forget_matching_deletion(RelFileNode node, BlockNumber delblk)
100 ListCell *l;
102 foreach(l, incomplete_actions)
104 bt_incomplete_action *action = (bt_incomplete_action *) lfirst(l);
106 if (RelFileNodeEquals(node, action->node) &&
107 !action->is_split &&
108 delblk == action->delblk)
110 incomplete_actions = list_delete_ptr(incomplete_actions, action);
111 pfree(action);
112 break; /* need not look further */
118 * _bt_restore_page -- re-enter all the index tuples on a page
120 * The page is freshly init'd, and *from (length len) is a copy of what
121 * had been its upper part (pd_upper to pd_special). We assume that the
122 * tuples had been added to the page in item-number order, and therefore
123 * the one with highest item number appears first (lowest on the page).
125 * NOTE: the way this routine is coded, the rebuilt page will have the items
126 * in correct itemno sequence, but physically the opposite order from the
127 * original, because we insert them in the opposite of itemno order. This
128 * does not matter in any current btree code, but it's something to keep an
129 * eye on. Is it worth changing just on general principles? See also the
130 * notes in btree_xlog_split().
132 static void
133 _bt_restore_page(Page page, char *from, int len)
135 IndexTupleData itupdata;
136 Size itemsz;
137 char *end = from + len;
139 for (; from < end;)
141 /* Need to copy tuple header due to alignment considerations */
142 memcpy(&itupdata, from, sizeof(IndexTupleData));
143 itemsz = IndexTupleDSize(itupdata);
144 itemsz = MAXALIGN(itemsz);
145 if (PageAddItem(page, (Item) from, itemsz, FirstOffsetNumber,
146 false, false) == InvalidOffsetNumber)
147 elog(PANIC, "_bt_restore_page: cannot add item to page");
148 from += itemsz;
152 static void
153 _bt_restore_meta(RelFileNode rnode, XLogRecPtr lsn,
154 BlockNumber root, uint32 level,
155 BlockNumber fastroot, uint32 fastlevel)
157 Buffer metabuf;
158 Page metapg;
159 BTMetaPageData *md;
160 BTPageOpaque pageop;
162 metabuf = XLogReadBuffer(rnode, BTREE_METAPAGE, true);
163 Assert(BufferIsValid(metabuf));
164 metapg = BufferGetPage(metabuf);
166 _bt_pageinit(metapg, BufferGetPageSize(metabuf));
168 md = BTPageGetMeta(metapg);
169 md->btm_magic = BTREE_MAGIC;
170 md->btm_version = BTREE_VERSION;
171 md->btm_root = root;
172 md->btm_level = level;
173 md->btm_fastroot = fastroot;
174 md->btm_fastlevel = fastlevel;
176 pageop = (BTPageOpaque) PageGetSpecialPointer(metapg);
177 pageop->btpo_flags = BTP_META;
180 * Set pd_lower just past the end of the metadata. This is not essential
181 * but it makes the page look compressible to xlog.c.
183 ((PageHeader) metapg)->pd_lower =
184 ((char *) md + sizeof(BTMetaPageData)) - (char *) metapg;
186 PageSetLSN(metapg, lsn);
187 PageSetTLI(metapg, ThisTimeLineID);
188 MarkBufferDirty(metabuf);
189 UnlockReleaseBuffer(metabuf);
192 static void
193 btree_xlog_insert(bool isleaf, bool ismeta,
194 XLogRecPtr lsn, XLogRecord *record)
196 xl_btree_insert *xlrec = (xl_btree_insert *) XLogRecGetData(record);
197 Buffer buffer;
198 Page page;
199 char *datapos;
200 int datalen;
201 xl_btree_metadata md;
202 BlockNumber downlink = 0;
204 datapos = (char *) xlrec + SizeOfBtreeInsert;
205 datalen = record->xl_len - SizeOfBtreeInsert;
206 if (!isleaf)
208 memcpy(&downlink, datapos, sizeof(BlockNumber));
209 datapos += sizeof(BlockNumber);
210 datalen -= sizeof(BlockNumber);
212 if (ismeta)
214 memcpy(&md, datapos, sizeof(xl_btree_metadata));
215 datapos += sizeof(xl_btree_metadata);
216 datalen -= sizeof(xl_btree_metadata);
219 if ((record->xl_info & XLR_BKP_BLOCK_1) && !ismeta && isleaf)
220 return; /* nothing to do */
222 if (!(record->xl_info & XLR_BKP_BLOCK_1))
224 buffer = XLogReadBuffer(xlrec->target.node,
225 ItemPointerGetBlockNumber(&(xlrec->target.tid)),
226 false);
227 if (BufferIsValid(buffer))
229 page = (Page) BufferGetPage(buffer);
231 if (XLByteLE(lsn, PageGetLSN(page)))
233 UnlockReleaseBuffer(buffer);
235 else
237 if (PageAddItem(page, (Item) datapos, datalen,
238 ItemPointerGetOffsetNumber(&(xlrec->target.tid)),
239 false, false) == InvalidOffsetNumber)
240 elog(PANIC, "btree_insert_redo: failed to add item");
242 PageSetLSN(page, lsn);
243 PageSetTLI(page, ThisTimeLineID);
244 MarkBufferDirty(buffer);
245 UnlockReleaseBuffer(buffer);
250 if (ismeta)
251 _bt_restore_meta(xlrec->target.node, lsn,
252 md.root, md.level,
253 md.fastroot, md.fastlevel);
255 /* Forget any split this insertion completes */
256 if (!isleaf)
257 forget_matching_split(xlrec->target.node, downlink, false);
260 static void
261 btree_xlog_split(bool onleft, bool isroot,
262 XLogRecPtr lsn, XLogRecord *record)
264 xl_btree_split *xlrec = (xl_btree_split *) XLogRecGetData(record);
265 Buffer rbuf;
266 Page rpage;
267 BTPageOpaque ropaque;
268 char *datapos;
269 int datalen;
270 OffsetNumber newitemoff = 0;
271 Item newitem = NULL;
272 Size newitemsz = 0;
273 Item left_hikey = NULL;
274 Size left_hikeysz = 0;
276 datapos = (char *) xlrec + SizeOfBtreeSplit;
277 datalen = record->xl_len - SizeOfBtreeSplit;
279 /* Forget any split this insertion completes */
280 if (xlrec->level > 0)
282 /* we assume SizeOfBtreeSplit is at least 16-bit aligned */
283 BlockNumber downlink = BlockIdGetBlockNumber((BlockId) datapos);
285 datapos += sizeof(BlockIdData);
286 datalen -= sizeof(BlockIdData);
288 forget_matching_split(xlrec->node, downlink, false);
290 /* Extract left hikey and its size (still assuming 16-bit alignment) */
291 if (!(record->xl_info & XLR_BKP_BLOCK_1))
293 /* We assume 16-bit alignment is enough for IndexTupleSize */
294 left_hikey = (Item) datapos;
295 left_hikeysz = MAXALIGN(IndexTupleSize(left_hikey));
297 datapos += left_hikeysz;
298 datalen -= left_hikeysz;
302 /* Extract newitem and newitemoff, if present */
303 if (onleft)
305 /* Extract the offset (still assuming 16-bit alignment) */
306 memcpy(&newitemoff, datapos, sizeof(OffsetNumber));
307 datapos += sizeof(OffsetNumber);
308 datalen -= sizeof(OffsetNumber);
311 if (onleft && !(record->xl_info & XLR_BKP_BLOCK_1))
314 * We assume that 16-bit alignment is enough to apply IndexTupleSize
315 * (since it's fetching from a uint16 field) and also enough for
316 * PageAddItem to insert the tuple.
318 newitem = (Item) datapos;
319 newitemsz = MAXALIGN(IndexTupleSize(newitem));
320 datapos += newitemsz;
321 datalen -= newitemsz;
324 /* Reconstruct right (new) sibling from scratch */
325 rbuf = XLogReadBuffer(xlrec->node, xlrec->rightsib, true);
326 Assert(BufferIsValid(rbuf));
327 rpage = (Page) BufferGetPage(rbuf);
329 _bt_pageinit(rpage, BufferGetPageSize(rbuf));
330 ropaque = (BTPageOpaque) PageGetSpecialPointer(rpage);
332 ropaque->btpo_prev = xlrec->leftsib;
333 ropaque->btpo_next = xlrec->rnext;
334 ropaque->btpo.level = xlrec->level;
335 ropaque->btpo_flags = (xlrec->level == 0) ? BTP_LEAF : 0;
336 ropaque->btpo_cycleid = 0;
338 _bt_restore_page(rpage, datapos, datalen);
341 * On leaf level, the high key of the left page is equal to the first key
342 * on the right page.
344 if (xlrec->level == 0)
346 ItemId hiItemId = PageGetItemId(rpage, P_FIRSTDATAKEY(ropaque));
348 left_hikey = PageGetItem(rpage, hiItemId);
349 left_hikeysz = ItemIdGetLength(hiItemId);
352 PageSetLSN(rpage, lsn);
353 PageSetTLI(rpage, ThisTimeLineID);
354 MarkBufferDirty(rbuf);
356 /* don't release the buffer yet; we touch right page's first item below */
359 * Reconstruct left (original) sibling if needed. Note that this code
360 * ensures that the items remaining on the left page are in the correct
361 * item number order, but it does not reproduce the physical order they
362 * would have had. Is this worth changing? See also _bt_restore_page().
364 if (!(record->xl_info & XLR_BKP_BLOCK_1))
366 Buffer lbuf = XLogReadBuffer(xlrec->node, xlrec->leftsib, false);
368 if (BufferIsValid(lbuf))
370 Page lpage = (Page) BufferGetPage(lbuf);
371 BTPageOpaque lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage);
373 if (!XLByteLE(lsn, PageGetLSN(lpage)))
375 OffsetNumber off;
376 OffsetNumber maxoff = PageGetMaxOffsetNumber(lpage);
377 OffsetNumber deletable[MaxOffsetNumber];
378 int ndeletable = 0;
381 * Remove the items from the left page that were copied to the
382 * right page. Also remove the old high key, if any. (We must
383 * remove everything before trying to insert any items, else
384 * we risk not having enough space.)
386 if (!P_RIGHTMOST(lopaque))
388 deletable[ndeletable++] = P_HIKEY;
391 * newitemoff is given to us relative to the original
392 * page's item numbering, so adjust it for this deletion.
394 newitemoff--;
396 for (off = xlrec->firstright; off <= maxoff; off++)
397 deletable[ndeletable++] = off;
398 if (ndeletable > 0)
399 PageIndexMultiDelete(lpage, deletable, ndeletable);
402 * Add the new item if it was inserted on left page.
404 if (onleft)
406 if (PageAddItem(lpage, newitem, newitemsz, newitemoff,
407 false, false) == InvalidOffsetNumber)
408 elog(PANIC, "failed to add new item to left page after split");
411 /* Set high key */
412 if (PageAddItem(lpage, left_hikey, left_hikeysz,
413 P_HIKEY, false, false) == InvalidOffsetNumber)
414 elog(PANIC, "failed to add high key to left page after split");
416 /* Fix opaque fields */
417 lopaque->btpo_flags = (xlrec->level == 0) ? BTP_LEAF : 0;
418 lopaque->btpo_next = xlrec->rightsib;
419 lopaque->btpo_cycleid = 0;
421 PageSetLSN(lpage, lsn);
422 PageSetTLI(lpage, ThisTimeLineID);
423 MarkBufferDirty(lbuf);
426 UnlockReleaseBuffer(lbuf);
430 /* We no longer need the right buffer */
431 UnlockReleaseBuffer(rbuf);
433 /* Fix left-link of the page to the right of the new right sibling */
434 if (xlrec->rnext != P_NONE && !(record->xl_info & XLR_BKP_BLOCK_2))
436 Buffer buffer = XLogReadBuffer(xlrec->node, xlrec->rnext, false);
438 if (BufferIsValid(buffer))
440 Page page = (Page) BufferGetPage(buffer);
442 if (!XLByteLE(lsn, PageGetLSN(page)))
444 BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
446 pageop->btpo_prev = xlrec->rightsib;
448 PageSetLSN(page, lsn);
449 PageSetTLI(page, ThisTimeLineID);
450 MarkBufferDirty(buffer);
452 UnlockReleaseBuffer(buffer);
456 /* The job ain't done till the parent link is inserted... */
457 log_incomplete_split(xlrec->node,
458 xlrec->leftsib, xlrec->rightsib, isroot);
461 static void
462 btree_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
464 xl_btree_delete *xlrec;
465 Buffer buffer;
466 Page page;
467 BTPageOpaque opaque;
469 if (record->xl_info & XLR_BKP_BLOCK_1)
470 return;
472 xlrec = (xl_btree_delete *) XLogRecGetData(record);
473 buffer = XLogReadBuffer(xlrec->node, xlrec->block, false);
474 if (!BufferIsValid(buffer))
475 return;
476 page = (Page) BufferGetPage(buffer);
478 if (XLByteLE(lsn, PageGetLSN(page)))
480 UnlockReleaseBuffer(buffer);
481 return;
484 if (record->xl_len > SizeOfBtreeDelete)
486 OffsetNumber *unused;
487 OffsetNumber *unend;
489 unused = (OffsetNumber *) ((char *) xlrec + SizeOfBtreeDelete);
490 unend = (OffsetNumber *) ((char *) xlrec + record->xl_len);
492 PageIndexMultiDelete(page, unused, unend - unused);
496 * Mark the page as not containing any LP_DEAD items --- see comments in
497 * _bt_delitems().
499 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
500 opaque->btpo_flags &= ~BTP_HAS_GARBAGE;
502 PageSetLSN(page, lsn);
503 PageSetTLI(page, ThisTimeLineID);
504 MarkBufferDirty(buffer);
505 UnlockReleaseBuffer(buffer);
508 static void
509 btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
511 xl_btree_delete_page *xlrec = (xl_btree_delete_page *) XLogRecGetData(record);
512 BlockNumber parent;
513 BlockNumber target;
514 BlockNumber leftsib;
515 BlockNumber rightsib;
516 Buffer buffer;
517 Page page;
518 BTPageOpaque pageop;
520 parent = ItemPointerGetBlockNumber(&(xlrec->target.tid));
521 target = xlrec->deadblk;
522 leftsib = xlrec->leftblk;
523 rightsib = xlrec->rightblk;
525 /* parent page */
526 if (!(record->xl_info & XLR_BKP_BLOCK_1))
528 buffer = XLogReadBuffer(xlrec->target.node, parent, false);
529 if (BufferIsValid(buffer))
531 page = (Page) BufferGetPage(buffer);
532 pageop = (BTPageOpaque) PageGetSpecialPointer(page);
533 if (XLByteLE(lsn, PageGetLSN(page)))
535 UnlockReleaseBuffer(buffer);
537 else
539 OffsetNumber poffset;
541 poffset = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
542 if (poffset >= PageGetMaxOffsetNumber(page))
544 Assert(info == XLOG_BTREE_DELETE_PAGE_HALF);
545 Assert(poffset == P_FIRSTDATAKEY(pageop));
546 PageIndexTupleDelete(page, poffset);
547 pageop->btpo_flags |= BTP_HALF_DEAD;
549 else
551 ItemId itemid;
552 IndexTuple itup;
553 OffsetNumber nextoffset;
555 Assert(info != XLOG_BTREE_DELETE_PAGE_HALF);
556 itemid = PageGetItemId(page, poffset);
557 itup = (IndexTuple) PageGetItem(page, itemid);
558 ItemPointerSet(&(itup->t_tid), rightsib, P_HIKEY);
559 nextoffset = OffsetNumberNext(poffset);
560 PageIndexTupleDelete(page, nextoffset);
563 PageSetLSN(page, lsn);
564 PageSetTLI(page, ThisTimeLineID);
565 MarkBufferDirty(buffer);
566 UnlockReleaseBuffer(buffer);
571 /* Fix left-link of right sibling */
572 if (!(record->xl_info & XLR_BKP_BLOCK_2))
574 buffer = XLogReadBuffer(xlrec->target.node, rightsib, false);
575 if (BufferIsValid(buffer))
577 page = (Page) BufferGetPage(buffer);
578 if (XLByteLE(lsn, PageGetLSN(page)))
580 UnlockReleaseBuffer(buffer);
582 else
584 pageop = (BTPageOpaque) PageGetSpecialPointer(page);
585 pageop->btpo_prev = leftsib;
587 PageSetLSN(page, lsn);
588 PageSetTLI(page, ThisTimeLineID);
589 MarkBufferDirty(buffer);
590 UnlockReleaseBuffer(buffer);
595 /* Fix right-link of left sibling, if any */
596 if (!(record->xl_info & XLR_BKP_BLOCK_3))
598 if (leftsib != P_NONE)
600 buffer = XLogReadBuffer(xlrec->target.node, leftsib, false);
601 if (BufferIsValid(buffer))
603 page = (Page) BufferGetPage(buffer);
604 if (XLByteLE(lsn, PageGetLSN(page)))
606 UnlockReleaseBuffer(buffer);
608 else
610 pageop = (BTPageOpaque) PageGetSpecialPointer(page);
611 pageop->btpo_next = rightsib;
613 PageSetLSN(page, lsn);
614 PageSetTLI(page, ThisTimeLineID);
615 MarkBufferDirty(buffer);
616 UnlockReleaseBuffer(buffer);
622 /* Rewrite target page as empty deleted page */
623 buffer = XLogReadBuffer(xlrec->target.node, target, true);
624 Assert(BufferIsValid(buffer));
625 page = (Page) BufferGetPage(buffer);
627 _bt_pageinit(page, BufferGetPageSize(buffer));
628 pageop = (BTPageOpaque) PageGetSpecialPointer(page);
630 pageop->btpo_prev = leftsib;
631 pageop->btpo_next = rightsib;
632 pageop->btpo.xact = FrozenTransactionId;
633 pageop->btpo_flags = BTP_DELETED;
634 pageop->btpo_cycleid = 0;
636 PageSetLSN(page, lsn);
637 PageSetTLI(page, ThisTimeLineID);
638 MarkBufferDirty(buffer);
639 UnlockReleaseBuffer(buffer);
641 /* Update metapage if needed */
642 if (info == XLOG_BTREE_DELETE_PAGE_META)
644 xl_btree_metadata md;
646 memcpy(&md, (char *) xlrec + SizeOfBtreeDeletePage,
647 sizeof(xl_btree_metadata));
648 _bt_restore_meta(xlrec->target.node, lsn,
649 md.root, md.level,
650 md.fastroot, md.fastlevel);
653 /* Forget any completed deletion */
654 forget_matching_deletion(xlrec->target.node, target);
656 /* If parent became half-dead, remember it for deletion */
657 if (info == XLOG_BTREE_DELETE_PAGE_HALF)
658 log_incomplete_deletion(xlrec->target.node, parent);
661 static void
662 btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record)
664 xl_btree_newroot *xlrec = (xl_btree_newroot *) XLogRecGetData(record);
665 Buffer buffer;
666 Page page;
667 BTPageOpaque pageop;
668 BlockNumber downlink = 0;
670 buffer = XLogReadBuffer(xlrec->node, xlrec->rootblk, true);
671 Assert(BufferIsValid(buffer));
672 page = (Page) BufferGetPage(buffer);
674 _bt_pageinit(page, BufferGetPageSize(buffer));
675 pageop = (BTPageOpaque) PageGetSpecialPointer(page);
677 pageop->btpo_flags = BTP_ROOT;
678 pageop->btpo_prev = pageop->btpo_next = P_NONE;
679 pageop->btpo.level = xlrec->level;
680 if (xlrec->level == 0)
681 pageop->btpo_flags |= BTP_LEAF;
682 pageop->btpo_cycleid = 0;
684 if (record->xl_len > SizeOfBtreeNewroot)
686 IndexTuple itup;
688 _bt_restore_page(page,
689 (char *) xlrec + SizeOfBtreeNewroot,
690 record->xl_len - SizeOfBtreeNewroot);
691 /* extract downlink to the right-hand split page */
692 itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, P_FIRSTKEY));
693 downlink = ItemPointerGetBlockNumber(&(itup->t_tid));
694 Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY);
697 PageSetLSN(page, lsn);
698 PageSetTLI(page, ThisTimeLineID);
699 MarkBufferDirty(buffer);
700 UnlockReleaseBuffer(buffer);
702 _bt_restore_meta(xlrec->node, lsn,
703 xlrec->rootblk, xlrec->level,
704 xlrec->rootblk, xlrec->level);
706 /* Check to see if this satisfies any incomplete insertions */
707 if (record->xl_len > SizeOfBtreeNewroot)
708 forget_matching_split(xlrec->node, downlink, true);
712 void
713 btree_redo(XLogRecPtr lsn, XLogRecord *record)
715 uint8 info = record->xl_info & ~XLR_INFO_MASK;
717 RestoreBkpBlocks(lsn, record, false);
719 switch (info)
721 case XLOG_BTREE_INSERT_LEAF:
722 btree_xlog_insert(true, false, lsn, record);
723 break;
724 case XLOG_BTREE_INSERT_UPPER:
725 btree_xlog_insert(false, false, lsn, record);
726 break;
727 case XLOG_BTREE_INSERT_META:
728 btree_xlog_insert(false, true, lsn, record);
729 break;
730 case XLOG_BTREE_SPLIT_L:
731 btree_xlog_split(true, false, lsn, record);
732 break;
733 case XLOG_BTREE_SPLIT_R:
734 btree_xlog_split(false, false, lsn, record);
735 break;
736 case XLOG_BTREE_SPLIT_L_ROOT:
737 btree_xlog_split(true, true, lsn, record);
738 break;
739 case XLOG_BTREE_SPLIT_R_ROOT:
740 btree_xlog_split(false, true, lsn, record);
741 break;
742 case XLOG_BTREE_DELETE:
743 btree_xlog_delete(lsn, record);
744 break;
745 case XLOG_BTREE_DELETE_PAGE:
746 case XLOG_BTREE_DELETE_PAGE_META:
747 case XLOG_BTREE_DELETE_PAGE_HALF:
748 btree_xlog_delete_page(info, lsn, record);
749 break;
750 case XLOG_BTREE_NEWROOT:
751 btree_xlog_newroot(lsn, record);
752 break;
753 default:
754 elog(PANIC, "btree_redo: unknown op code %u", info);
758 static void
759 out_target(StringInfo buf, xl_btreetid *target)
761 appendStringInfo(buf, "rel %u/%u/%u; tid %u/%u",
762 target->node.spcNode, target->node.dbNode, target->node.relNode,
763 ItemPointerGetBlockNumber(&(target->tid)),
764 ItemPointerGetOffsetNumber(&(target->tid)));
767 void
768 btree_desc(StringInfo buf, uint8 xl_info, char *rec)
770 uint8 info = xl_info & ~XLR_INFO_MASK;
772 switch (info)
774 case XLOG_BTREE_INSERT_LEAF:
776 xl_btree_insert *xlrec = (xl_btree_insert *) rec;
778 appendStringInfo(buf, "insert: ");
779 out_target(buf, &(xlrec->target));
780 break;
782 case XLOG_BTREE_INSERT_UPPER:
784 xl_btree_insert *xlrec = (xl_btree_insert *) rec;
786 appendStringInfo(buf, "insert_upper: ");
787 out_target(buf, &(xlrec->target));
788 break;
790 case XLOG_BTREE_INSERT_META:
792 xl_btree_insert *xlrec = (xl_btree_insert *) rec;
794 appendStringInfo(buf, "insert_meta: ");
795 out_target(buf, &(xlrec->target));
796 break;
798 case XLOG_BTREE_SPLIT_L:
800 xl_btree_split *xlrec = (xl_btree_split *) rec;
802 appendStringInfo(buf, "split_l: rel %u/%u/%u ",
803 xlrec->node.spcNode, xlrec->node.dbNode,
804 xlrec->node.relNode);
805 appendStringInfo(buf, "left %u, right %u, next %u, level %u, firstright %d",
806 xlrec->leftsib, xlrec->rightsib, xlrec->rnext,
807 xlrec->level, xlrec->firstright);
808 break;
810 case XLOG_BTREE_SPLIT_R:
812 xl_btree_split *xlrec = (xl_btree_split *) rec;
814 appendStringInfo(buf, "split_r: rel %u/%u/%u ",
815 xlrec->node.spcNode, xlrec->node.dbNode,
816 xlrec->node.relNode);
817 appendStringInfo(buf, "left %u, right %u, next %u, level %u, firstright %d",
818 xlrec->leftsib, xlrec->rightsib, xlrec->rnext,
819 xlrec->level, xlrec->firstright);
820 break;
822 case XLOG_BTREE_SPLIT_L_ROOT:
824 xl_btree_split *xlrec = (xl_btree_split *) rec;
826 appendStringInfo(buf, "split_l_root: rel %u/%u/%u ",
827 xlrec->node.spcNode, xlrec->node.dbNode,
828 xlrec->node.relNode);
829 appendStringInfo(buf, "left %u, right %u, next %u, level %u, firstright %d",
830 xlrec->leftsib, xlrec->rightsib, xlrec->rnext,
831 xlrec->level, xlrec->firstright);
832 break;
834 case XLOG_BTREE_SPLIT_R_ROOT:
836 xl_btree_split *xlrec = (xl_btree_split *) rec;
838 appendStringInfo(buf, "split_r_root: rel %u/%u/%u ",
839 xlrec->node.spcNode, xlrec->node.dbNode,
840 xlrec->node.relNode);
841 appendStringInfo(buf, "left %u, right %u, next %u, level %u, firstright %d",
842 xlrec->leftsib, xlrec->rightsib, xlrec->rnext,
843 xlrec->level, xlrec->firstright);
844 break;
846 case XLOG_BTREE_DELETE:
848 xl_btree_delete *xlrec = (xl_btree_delete *) rec;
850 appendStringInfo(buf, "delete: rel %u/%u/%u; blk %u",
851 xlrec->node.spcNode, xlrec->node.dbNode,
852 xlrec->node.relNode, xlrec->block);
853 break;
855 case XLOG_BTREE_DELETE_PAGE:
856 case XLOG_BTREE_DELETE_PAGE_META:
857 case XLOG_BTREE_DELETE_PAGE_HALF:
859 xl_btree_delete_page *xlrec = (xl_btree_delete_page *) rec;
861 appendStringInfo(buf, "delete_page: ");
862 out_target(buf, &(xlrec->target));
863 appendStringInfo(buf, "; dead %u; left %u; right %u",
864 xlrec->deadblk, xlrec->leftblk, xlrec->rightblk);
865 break;
867 case XLOG_BTREE_NEWROOT:
869 xl_btree_newroot *xlrec = (xl_btree_newroot *) rec;
871 appendStringInfo(buf, "newroot: rel %u/%u/%u; root %u lev %u",
872 xlrec->node.spcNode, xlrec->node.dbNode,
873 xlrec->node.relNode,
874 xlrec->rootblk, xlrec->level);
875 break;
877 default:
878 appendStringInfo(buf, "UNKNOWN");
879 break;
883 void
884 btree_xlog_startup(void)
886 incomplete_actions = NIL;
889 void
890 btree_xlog_cleanup(void)
892 ListCell *l;
894 foreach(l, incomplete_actions)
896 bt_incomplete_action *action = (bt_incomplete_action *) lfirst(l);
898 if (action->is_split)
900 /* finish an incomplete split */
901 Buffer lbuf,
902 rbuf;
903 Page lpage,
904 rpage;
905 BTPageOpaque lpageop,
906 rpageop;
907 bool is_only;
908 Relation reln;
910 lbuf = XLogReadBuffer(action->node, action->leftblk, false);
911 /* failure is impossible because we wrote this page earlier */
912 if (!BufferIsValid(lbuf))
913 elog(PANIC, "btree_xlog_cleanup: left block unfound");
914 lpage = (Page) BufferGetPage(lbuf);
915 lpageop = (BTPageOpaque) PageGetSpecialPointer(lpage);
916 rbuf = XLogReadBuffer(action->node, action->rightblk, false);
917 /* failure is impossible because we wrote this page earlier */
918 if (!BufferIsValid(rbuf))
919 elog(PANIC, "btree_xlog_cleanup: right block unfound");
920 rpage = (Page) BufferGetPage(rbuf);
921 rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage);
923 /* if the pages are all of their level, it's a only-page split */
924 is_only = P_LEFTMOST(lpageop) && P_RIGHTMOST(rpageop);
926 reln = CreateFakeRelcacheEntry(action->node);
927 _bt_insert_parent(reln, lbuf, rbuf, NULL,
928 action->is_root, is_only);
929 FreeFakeRelcacheEntry(reln);
931 else
933 /* finish an incomplete deletion (of a half-dead page) */
934 Buffer buf;
936 buf = XLogReadBuffer(action->node, action->delblk, false);
937 if (BufferIsValid(buf))
939 Relation reln;
941 reln = CreateFakeRelcacheEntry(action->node);
942 if (_bt_pagedel(reln, buf, NULL, true) == 0)
943 elog(PANIC, "btree_xlog_cleanup: _bt_pagdel failed");
944 FreeFakeRelcacheEntry(reln);
948 incomplete_actions = NIL;
951 bool
952 btree_safe_restartpoint(void)
954 if (incomplete_actions)
955 return false;
956 return true;