1 /*-------------------------------------------------------------------------
4 * WAL replay logic for inverted index.
7 * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
12 *-------------------------------------------------------------------------
16 #include "access/gin.h"
17 #include "access/xlogutils.h"
18 #include "storage/bufmgr.h"
19 #include "utils/memutils.h"
21 static MemoryContext opCtx
; /* working memory for operations */
22 static MemoryContext topCtx
;
24 typedef struct ginIncompleteSplit
27 BlockNumber leftBlkno
;
28 BlockNumber rightBlkno
;
29 BlockNumber rootBlkno
;
32 static List
*incomplete_splits
;
35 pushIncompleteSplit(RelFileNode node
, BlockNumber leftBlkno
, BlockNumber rightBlkno
, BlockNumber rootBlkno
)
37 ginIncompleteSplit
*split
;
39 MemoryContextSwitchTo(topCtx
);
41 split
= palloc(sizeof(ginIncompleteSplit
));
44 split
->leftBlkno
= leftBlkno
;
45 split
->rightBlkno
= rightBlkno
;
46 split
->rootBlkno
= rootBlkno
;
48 incomplete_splits
= lappend(incomplete_splits
, split
);
50 MemoryContextSwitchTo(opCtx
);
54 forgetIncompleteSplit(RelFileNode node
, BlockNumber leftBlkno
, BlockNumber updateBlkno
)
58 foreach(l
, incomplete_splits
)
60 ginIncompleteSplit
*split
= (ginIncompleteSplit
*) lfirst(l
);
62 if (RelFileNodeEquals(node
, split
->node
) && leftBlkno
== split
->leftBlkno
&& updateBlkno
== split
->rightBlkno
)
64 incomplete_splits
= list_delete_ptr(incomplete_splits
, split
);
71 ginRedoCreateIndex(XLogRecPtr lsn
, XLogRecord
*record
)
73 RelFileNode
*node
= (RelFileNode
*) XLogRecGetData(record
);
77 buffer
= XLogReadBuffer(*node
, GIN_ROOT_BLKNO
, true);
78 Assert(BufferIsValid(buffer
));
79 page
= (Page
) BufferGetPage(buffer
);
81 GinInitBuffer(buffer
, GIN_LEAF
);
83 PageSetLSN(page
, lsn
);
84 PageSetTLI(page
, ThisTimeLineID
);
86 MarkBufferDirty(buffer
);
87 UnlockReleaseBuffer(buffer
);
91 ginRedoCreatePTree(XLogRecPtr lsn
, XLogRecord
*record
)
93 ginxlogCreatePostingTree
*data
= (ginxlogCreatePostingTree
*) XLogRecGetData(record
);
94 ItemPointerData
*items
= (ItemPointerData
*) (XLogRecGetData(record
) + sizeof(ginxlogCreatePostingTree
));
98 buffer
= XLogReadBuffer(data
->node
, data
->blkno
, true);
99 Assert(BufferIsValid(buffer
));
100 page
= (Page
) BufferGetPage(buffer
);
102 GinInitBuffer(buffer
, GIN_DATA
| GIN_LEAF
);
103 memcpy(GinDataPageGetData(page
), items
, sizeof(ItemPointerData
) * data
->nitem
);
104 GinPageGetOpaque(page
)->maxoff
= data
->nitem
;
106 PageSetLSN(page
, lsn
);
107 PageSetTLI(page
, ThisTimeLineID
);
109 MarkBufferDirty(buffer
);
110 UnlockReleaseBuffer(buffer
);
114 ginRedoInsert(XLogRecPtr lsn
, XLogRecord
*record
)
116 ginxlogInsert
*data
= (ginxlogInsert
*) XLogRecGetData(record
);
120 /* nothing else to do if page was backed up */
121 if (record
->xl_info
& XLR_BKP_BLOCK_1
)
124 buffer
= XLogReadBuffer(data
->node
, data
->blkno
, false);
125 Assert(BufferIsValid(buffer
));
126 page
= (Page
) BufferGetPage(buffer
);
130 Assert(data
->isDelete
== FALSE
);
131 Assert(GinPageIsData(page
));
133 if (!XLByteLE(lsn
, PageGetLSN(page
)))
138 ItemPointerData
*items
= (ItemPointerData
*) (XLogRecGetData(record
) + sizeof(ginxlogInsert
));
140 Assert(GinPageIsLeaf(page
));
141 Assert(data
->updateBlkno
== InvalidBlockNumber
);
143 for (i
= 0; i
< data
->nitem
; i
++)
144 GinDataPageAddItem(page
, items
+ i
, data
->offset
+ i
);
150 Assert(!GinPageIsLeaf(page
));
152 if (data
->updateBlkno
!= InvalidBlockNumber
)
154 /* update link to right page after split */
155 pitem
= (PostingItem
*) GinDataPageGetItem(page
, data
->offset
);
156 PostingItemSetBlockNumber(pitem
, data
->updateBlkno
);
159 pitem
= (PostingItem
*) (XLogRecGetData(record
) + sizeof(ginxlogInsert
));
161 GinDataPageAddItem(page
, pitem
, data
->offset
);
165 if (!data
->isLeaf
&& data
->updateBlkno
!= InvalidBlockNumber
)
167 PostingItem
*pitem
= (PostingItem
*) (XLogRecGetData(record
) + sizeof(ginxlogInsert
));
169 forgetIncompleteSplit(data
->node
, PostingItemGetBlockNumber(pitem
), data
->updateBlkno
);
177 Assert(!GinPageIsData(page
));
179 if (!XLByteLE(lsn
, PageGetLSN(page
)))
181 if (data
->updateBlkno
!= InvalidBlockNumber
)
183 /* update link to right page after split */
184 Assert(!GinPageIsLeaf(page
));
185 Assert(data
->offset
>= FirstOffsetNumber
&& data
->offset
<= PageGetMaxOffsetNumber(page
));
186 itup
= (IndexTuple
) PageGetItem(page
, PageGetItemId(page
, data
->offset
));
187 ItemPointerSet(&itup
->t_tid
, data
->updateBlkno
, InvalidOffsetNumber
);
192 Assert(GinPageIsLeaf(page
));
193 Assert(data
->offset
>= FirstOffsetNumber
&& data
->offset
<= PageGetMaxOffsetNumber(page
));
194 PageIndexTupleDelete(page
, data
->offset
);
197 itup
= (IndexTuple
) (XLogRecGetData(record
) + sizeof(ginxlogInsert
));
199 if (PageAddItem(page
, (Item
) itup
, IndexTupleSize(itup
), data
->offset
, false, false) == InvalidOffsetNumber
)
200 elog(ERROR
, "failed to add item to index page in %u/%u/%u",
201 data
->node
.spcNode
, data
->node
.dbNode
, data
->node
.relNode
);
204 if (!data
->isLeaf
&& data
->updateBlkno
!= InvalidBlockNumber
)
206 itup
= (IndexTuple
) (XLogRecGetData(record
) + sizeof(ginxlogInsert
));
207 forgetIncompleteSplit(data
->node
, GinItemPointerGetBlockNumber(&itup
->t_tid
), data
->updateBlkno
);
211 if (!XLByteLE(lsn
, PageGetLSN(page
)))
213 PageSetLSN(page
, lsn
);
214 PageSetTLI(page
, ThisTimeLineID
);
216 MarkBufferDirty(buffer
);
218 UnlockReleaseBuffer(buffer
);
222 ginRedoSplit(XLogRecPtr lsn
, XLogRecord
*record
)
224 ginxlogSplit
*data
= (ginxlogSplit
*) XLogRecGetData(record
);
236 lbuffer
= XLogReadBuffer(data
->node
, data
->lblkno
, data
->isRootSplit
);
237 Assert(BufferIsValid(lbuffer
));
238 lpage
= (Page
) BufferGetPage(lbuffer
);
239 GinInitBuffer(lbuffer
, flags
);
241 rbuffer
= XLogReadBuffer(data
->node
, data
->rblkno
, true);
242 Assert(BufferIsValid(rbuffer
));
243 rpage
= (Page
) BufferGetPage(rbuffer
);
244 GinInitBuffer(rbuffer
, flags
);
246 GinPageGetOpaque(lpage
)->rightlink
= BufferGetBlockNumber(rbuffer
);
247 GinPageGetOpaque(rpage
)->rightlink
= data
->rrlink
;
251 char *ptr
= XLogRecGetData(record
) + sizeof(ginxlogSplit
);
252 Size sizeofitem
= GinSizeOfItem(lpage
);
256 for (i
= 0; i
< data
->separator
; i
++)
258 GinDataPageAddItem(lpage
, ptr
, InvalidOffsetNumber
);
262 for (i
= data
->separator
; i
< data
->nitem
; i
++)
264 GinDataPageAddItem(rpage
, ptr
, InvalidOffsetNumber
);
268 /* set up right key */
269 bound
= GinDataPageGetRightBound(lpage
);
271 *bound
= *(ItemPointerData
*) GinDataPageGetItem(lpage
, GinPageGetOpaque(lpage
)->maxoff
);
273 *bound
= ((PostingItem
*) GinDataPageGetItem(lpage
, GinPageGetOpaque(lpage
)->maxoff
))->key
;
275 bound
= GinDataPageGetRightBound(rpage
);
276 *bound
= data
->rightbound
;
280 IndexTuple itup
= (IndexTuple
) (XLogRecGetData(record
) + sizeof(ginxlogSplit
));
283 for (i
= 0; i
< data
->separator
; i
++)
285 if (PageAddItem(lpage
, (Item
) itup
, IndexTupleSize(itup
), InvalidOffsetNumber
, false, false) == InvalidOffsetNumber
)
286 elog(ERROR
, "failed to add item to index page in %u/%u/%u",
287 data
->node
.spcNode
, data
->node
.dbNode
, data
->node
.relNode
);
288 itup
= (IndexTuple
) (((char *) itup
) + MAXALIGN(IndexTupleSize(itup
)));
291 for (i
= data
->separator
; i
< data
->nitem
; i
++)
293 if (PageAddItem(rpage
, (Item
) itup
, IndexTupleSize(itup
), InvalidOffsetNumber
, false, false) == InvalidOffsetNumber
)
294 elog(ERROR
, "failed to add item to index page in %u/%u/%u",
295 data
->node
.spcNode
, data
->node
.dbNode
, data
->node
.relNode
);
296 itup
= (IndexTuple
) (((char *) itup
) + MAXALIGN(IndexTupleSize(itup
)));
300 PageSetLSN(rpage
, lsn
);
301 PageSetTLI(rpage
, ThisTimeLineID
);
302 MarkBufferDirty(rbuffer
);
304 PageSetLSN(lpage
, lsn
);
305 PageSetTLI(lpage
, ThisTimeLineID
);
306 MarkBufferDirty(lbuffer
);
308 if (!data
->isLeaf
&& data
->updateBlkno
!= InvalidBlockNumber
)
309 forgetIncompleteSplit(data
->node
, data
->leftChildBlkno
, data
->updateBlkno
);
311 if (data
->isRootSplit
)
313 Buffer rootBuf
= XLogReadBuffer(data
->node
, data
->rootBlkno
, false);
314 Page rootPage
= BufferGetPage(rootBuf
);
316 GinInitBuffer(rootBuf
, flags
& ~GIN_LEAF
);
320 Assert(data
->rootBlkno
!= GIN_ROOT_BLKNO
);
321 dataFillRoot(NULL
, rootBuf
, lbuffer
, rbuffer
);
325 Assert(data
->rootBlkno
== GIN_ROOT_BLKNO
);
326 entryFillRoot(NULL
, rootBuf
, lbuffer
, rbuffer
);
329 PageSetLSN(rootPage
, lsn
);
330 PageSetTLI(rootPage
, ThisTimeLineID
);
332 MarkBufferDirty(rootBuf
);
333 UnlockReleaseBuffer(rootBuf
);
336 pushIncompleteSplit(data
->node
, data
->lblkno
, data
->rblkno
, data
->rootBlkno
);
338 UnlockReleaseBuffer(rbuffer
);
339 UnlockReleaseBuffer(lbuffer
);
343 ginRedoVacuumPage(XLogRecPtr lsn
, XLogRecord
*record
)
345 ginxlogVacuumPage
*data
= (ginxlogVacuumPage
*) XLogRecGetData(record
);
349 /* nothing else to do if page was backed up (and no info to do it with) */
350 if (record
->xl_info
& XLR_BKP_BLOCK_1
)
353 buffer
= XLogReadBuffer(data
->node
, data
->blkno
, false);
354 Assert(BufferIsValid(buffer
));
355 page
= (Page
) BufferGetPage(buffer
);
357 if (GinPageIsData(page
))
359 memcpy(GinDataPageGetData(page
), XLogRecGetData(record
) + sizeof(ginxlogVacuumPage
),
360 GinSizeOfItem(page
) *data
->nitem
);
361 GinPageGetOpaque(page
)->maxoff
= data
->nitem
;
367 IndexTuple itup
= (IndexTuple
) (XLogRecGetData(record
) + sizeof(ginxlogVacuumPage
));
369 tod
= (OffsetNumber
*) palloc(sizeof(OffsetNumber
) * PageGetMaxOffsetNumber(page
));
370 for (i
= FirstOffsetNumber
; i
<= PageGetMaxOffsetNumber(page
); i
++)
373 PageIndexMultiDelete(page
, tod
, PageGetMaxOffsetNumber(page
));
375 for (i
= 0; i
< data
->nitem
; i
++)
377 if (PageAddItem(page
, (Item
) itup
, IndexTupleSize(itup
), InvalidOffsetNumber
, false, false) == InvalidOffsetNumber
)
378 elog(ERROR
, "failed to add item to index page in %u/%u/%u",
379 data
->node
.spcNode
, data
->node
.dbNode
, data
->node
.relNode
);
380 itup
= (IndexTuple
) (((char *) itup
) + MAXALIGN(IndexTupleSize(itup
)));
384 PageSetLSN(page
, lsn
);
385 PageSetTLI(page
, ThisTimeLineID
);
387 MarkBufferDirty(buffer
);
388 UnlockReleaseBuffer(buffer
);
392 ginRedoDeletePage(XLogRecPtr lsn
, XLogRecord
*record
)
394 ginxlogDeletePage
*data
= (ginxlogDeletePage
*) XLogRecGetData(record
);
398 if (!(record
->xl_info
& XLR_BKP_BLOCK_1
))
400 buffer
= XLogReadBuffer(data
->node
, data
->blkno
, false);
401 page
= BufferGetPage(buffer
);
402 Assert(GinPageIsData(page
));
403 GinPageGetOpaque(page
)->flags
= GIN_DELETED
;
404 PageSetLSN(page
, lsn
);
405 PageSetTLI(page
, ThisTimeLineID
);
406 MarkBufferDirty(buffer
);
407 UnlockReleaseBuffer(buffer
);
410 if (!(record
->xl_info
& XLR_BKP_BLOCK_2
))
412 buffer
= XLogReadBuffer(data
->node
, data
->parentBlkno
, false);
413 page
= BufferGetPage(buffer
);
414 Assert(GinPageIsData(page
));
415 Assert(!GinPageIsLeaf(page
));
416 PageDeletePostingItem(page
, data
->parentOffset
);
417 PageSetLSN(page
, lsn
);
418 PageSetTLI(page
, ThisTimeLineID
);
419 MarkBufferDirty(buffer
);
420 UnlockReleaseBuffer(buffer
);
423 if (!(record
->xl_info
& XLR_BKP_BLOCK_3
) && data
->leftBlkno
!= InvalidBlockNumber
)
425 buffer
= XLogReadBuffer(data
->node
, data
->leftBlkno
, false);
426 page
= BufferGetPage(buffer
);
427 Assert(GinPageIsData(page
));
428 GinPageGetOpaque(page
)->rightlink
= data
->rightLink
;
429 PageSetLSN(page
, lsn
);
430 PageSetTLI(page
, ThisTimeLineID
);
431 MarkBufferDirty(buffer
);
432 UnlockReleaseBuffer(buffer
);
437 gin_redo(XLogRecPtr lsn
, XLogRecord
*record
)
439 uint8 info
= record
->xl_info
& ~XLR_INFO_MASK
;
441 topCtx
= MemoryContextSwitchTo(opCtx
);
444 case XLOG_GIN_CREATE_INDEX
:
445 ginRedoCreateIndex(lsn
, record
);
447 case XLOG_GIN_CREATE_PTREE
:
448 ginRedoCreatePTree(lsn
, record
);
450 case XLOG_GIN_INSERT
:
451 ginRedoInsert(lsn
, record
);
454 ginRedoSplit(lsn
, record
);
456 case XLOG_GIN_VACUUM_PAGE
:
457 ginRedoVacuumPage(lsn
, record
);
459 case XLOG_GIN_DELETE_PAGE
:
460 ginRedoDeletePage(lsn
, record
);
463 elog(PANIC
, "gin_redo: unknown op code %u", info
);
465 MemoryContextSwitchTo(topCtx
);
466 MemoryContextReset(opCtx
);
470 desc_node(StringInfo buf
, RelFileNode node
, BlockNumber blkno
)
472 appendStringInfo(buf
, "node: %u/%u/%u blkno: %u",
473 node
.spcNode
, node
.dbNode
, node
.relNode
, blkno
);
477 gin_desc(StringInfo buf
, uint8 xl_info
, char *rec
)
479 uint8 info
= xl_info
& ~XLR_INFO_MASK
;
483 case XLOG_GIN_CREATE_INDEX
:
484 appendStringInfo(buf
, "Create index, ");
485 desc_node(buf
, *(RelFileNode
*) rec
, GIN_ROOT_BLKNO
);
487 case XLOG_GIN_CREATE_PTREE
:
488 appendStringInfo(buf
, "Create posting tree, ");
489 desc_node(buf
, ((ginxlogCreatePostingTree
*) rec
)->node
, ((ginxlogCreatePostingTree
*) rec
)->blkno
);
491 case XLOG_GIN_INSERT
:
492 appendStringInfo(buf
, "Insert item, ");
493 desc_node(buf
, ((ginxlogInsert
*) rec
)->node
, ((ginxlogInsert
*) rec
)->blkno
);
494 appendStringInfo(buf
, " offset: %u nitem: %u isdata: %c isleaf %c isdelete %c updateBlkno:%u",
495 ((ginxlogInsert
*) rec
)->offset
,
496 ((ginxlogInsert
*) rec
)->nitem
,
497 (((ginxlogInsert
*) rec
)->isData
) ? 'T' : 'F',
498 (((ginxlogInsert
*) rec
)->isLeaf
) ? 'T' : 'F',
499 (((ginxlogInsert
*) rec
)->isDelete
) ? 'T' : 'F',
500 ((ginxlogInsert
*) rec
)->updateBlkno
505 appendStringInfo(buf
, "Page split, ");
506 desc_node(buf
, ((ginxlogSplit
*) rec
)->node
, ((ginxlogSplit
*) rec
)->lblkno
);
507 appendStringInfo(buf
, " isrootsplit: %c", (((ginxlogSplit
*) rec
)->isRootSplit
) ? 'T' : 'F');
509 case XLOG_GIN_VACUUM_PAGE
:
510 appendStringInfo(buf
, "Vacuum page, ");
511 desc_node(buf
, ((ginxlogVacuumPage
*) rec
)->node
, ((ginxlogVacuumPage
*) rec
)->blkno
);
513 case XLOG_GIN_DELETE_PAGE
:
514 appendStringInfo(buf
, "Delete page, ");
515 desc_node(buf
, ((ginxlogDeletePage
*) rec
)->node
, ((ginxlogDeletePage
*) rec
)->blkno
);
518 elog(PANIC
, "gin_desc: unknown op code %u", info
);
523 gin_xlog_startup(void)
525 incomplete_splits
= NIL
;
527 opCtx
= AllocSetContextCreate(CurrentMemoryContext
,
528 "GIN recovery temporary context",
529 ALLOCSET_DEFAULT_MINSIZE
,
530 ALLOCSET_DEFAULT_INITSIZE
,
531 ALLOCSET_DEFAULT_MAXSIZE
);
535 ginContinueSplit(ginIncompleteSplit
*split
)
543 * elog(NOTICE,"ginContinueSplit root:%u l:%u r:%u", split->rootBlkno,
544 * split->leftBlkno, split->rightBlkno);
546 buffer
= XLogReadBuffer(split
->node
, split
->leftBlkno
, false);
548 reln
= CreateFakeRelcacheEntry(split
->node
);
550 if (split
->rootBlkno
== GIN_ROOT_BLKNO
)
552 prepareEntryScan(&btree
, reln
, InvalidOffsetNumber
, (Datum
) 0, NULL
);
553 btree
.entry
= ginPageGetLinkItup(buffer
);
557 Page page
= BufferGetPage(buffer
);
559 prepareDataScan(&btree
, reln
);
561 PostingItemSetBlockNumber(&(btree
.pitem
), split
->leftBlkno
);
562 if (GinPageIsLeaf(page
))
563 btree
.pitem
.key
= *(ItemPointerData
*) GinDataPageGetItem(page
,
564 GinPageGetOpaque(page
)->maxoff
);
566 btree
.pitem
.key
= ((PostingItem
*) GinDataPageGetItem(page
,
567 GinPageGetOpaque(page
)->maxoff
))->key
;
570 FreeFakeRelcacheEntry(reln
);
572 btree
.rightblkno
= split
->rightBlkno
;
574 stack
.blkno
= split
->leftBlkno
;
575 stack
.buffer
= buffer
;
576 stack
.off
= InvalidOffsetNumber
;
579 findParents(&btree
, &stack
, split
->rootBlkno
);
580 ginInsertValue(&btree
, stack
.parent
);
582 UnlockReleaseBuffer(buffer
);
586 gin_xlog_cleanup(void)
589 MemoryContext topCtx
;
591 topCtx
= MemoryContextSwitchTo(opCtx
);
593 foreach(l
, incomplete_splits
)
595 ginIncompleteSplit
*split
= (ginIncompleteSplit
*) lfirst(l
);
597 ginContinueSplit(split
);
598 MemoryContextReset(opCtx
);
601 MemoryContextSwitchTo(topCtx
);
602 MemoryContextDelete(opCtx
);
603 incomplete_splits
= NIL
;
607 gin_safe_restartpoint(void)
609 if (incomplete_splits
)