Force a checkpoint in CREATE DATABASE before starting to copy the files,
[PostgreSQL.git] / src / backend / access / gin / ginxlog.c
blob0d40bfbc68ef03e6cce9842e6c97045e2f1e341c
1 /*-------------------------------------------------------------------------
3 * ginxlog.c
4 * WAL replay logic for inverted index.
7 * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
10 * IDENTIFICATION
11 * $PostgreSQL$
12 *-------------------------------------------------------------------------
14 #include "postgres.h"
16 #include "access/gin.h"
17 #include "access/xlogutils.h"
18 #include "storage/bufmgr.h"
19 #include "utils/memutils.h"
21 static MemoryContext opCtx; /* working memory for operations */
22 static MemoryContext topCtx;
24 typedef struct ginIncompleteSplit
26 RelFileNode node;
27 BlockNumber leftBlkno;
28 BlockNumber rightBlkno;
29 BlockNumber rootBlkno;
30 } ginIncompleteSplit;
32 static List *incomplete_splits;
34 static void
35 pushIncompleteSplit(RelFileNode node, BlockNumber leftBlkno, BlockNumber rightBlkno, BlockNumber rootBlkno)
37 ginIncompleteSplit *split;
39 MemoryContextSwitchTo(topCtx);
41 split = palloc(sizeof(ginIncompleteSplit));
43 split->node = node;
44 split->leftBlkno = leftBlkno;
45 split->rightBlkno = rightBlkno;
46 split->rootBlkno = rootBlkno;
48 incomplete_splits = lappend(incomplete_splits, split);
50 MemoryContextSwitchTo(opCtx);
53 static void
54 forgetIncompleteSplit(RelFileNode node, BlockNumber leftBlkno, BlockNumber updateBlkno)
56 ListCell *l;
58 foreach(l, incomplete_splits)
60 ginIncompleteSplit *split = (ginIncompleteSplit *) lfirst(l);
62 if (RelFileNodeEquals(node, split->node) && leftBlkno == split->leftBlkno && updateBlkno == split->rightBlkno)
64 incomplete_splits = list_delete_ptr(incomplete_splits, split);
65 break;
70 static void
71 ginRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
73 RelFileNode *node = (RelFileNode *) XLogRecGetData(record);
74 Buffer buffer;
75 Page page;
77 buffer = XLogReadBuffer(*node, GIN_ROOT_BLKNO, true);
78 Assert(BufferIsValid(buffer));
79 page = (Page) BufferGetPage(buffer);
81 GinInitBuffer(buffer, GIN_LEAF);
83 PageSetLSN(page, lsn);
84 PageSetTLI(page, ThisTimeLineID);
86 MarkBufferDirty(buffer);
87 UnlockReleaseBuffer(buffer);
90 static void
91 ginRedoCreatePTree(XLogRecPtr lsn, XLogRecord *record)
93 ginxlogCreatePostingTree *data = (ginxlogCreatePostingTree *) XLogRecGetData(record);
94 ItemPointerData *items = (ItemPointerData *) (XLogRecGetData(record) + sizeof(ginxlogCreatePostingTree));
95 Buffer buffer;
96 Page page;
98 buffer = XLogReadBuffer(data->node, data->blkno, true);
99 Assert(BufferIsValid(buffer));
100 page = (Page) BufferGetPage(buffer);
102 GinInitBuffer(buffer, GIN_DATA | GIN_LEAF);
103 memcpy(GinDataPageGetData(page), items, sizeof(ItemPointerData) * data->nitem);
104 GinPageGetOpaque(page)->maxoff = data->nitem;
106 PageSetLSN(page, lsn);
107 PageSetTLI(page, ThisTimeLineID);
109 MarkBufferDirty(buffer);
110 UnlockReleaseBuffer(buffer);
113 static void
114 ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
116 ginxlogInsert *data = (ginxlogInsert *) XLogRecGetData(record);
117 Buffer buffer;
118 Page page;
120 /* nothing else to do if page was backed up */
121 if (record->xl_info & XLR_BKP_BLOCK_1)
122 return;
124 buffer = XLogReadBuffer(data->node, data->blkno, false);
125 Assert(BufferIsValid(buffer));
126 page = (Page) BufferGetPage(buffer);
128 if (data->isData)
130 Assert(data->isDelete == FALSE);
131 Assert(GinPageIsData(page));
133 if (!XLByteLE(lsn, PageGetLSN(page)))
135 if (data->isLeaf)
137 OffsetNumber i;
138 ItemPointerData *items = (ItemPointerData *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
140 Assert(GinPageIsLeaf(page));
141 Assert(data->updateBlkno == InvalidBlockNumber);
143 for (i = 0; i < data->nitem; i++)
144 GinDataPageAddItem(page, items + i, data->offset + i);
146 else
148 PostingItem *pitem;
150 Assert(!GinPageIsLeaf(page));
152 if (data->updateBlkno != InvalidBlockNumber)
154 /* update link to right page after split */
155 pitem = (PostingItem *) GinDataPageGetItem(page, data->offset);
156 PostingItemSetBlockNumber(pitem, data->updateBlkno);
159 pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
161 GinDataPageAddItem(page, pitem, data->offset);
165 if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
167 PostingItem *pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
169 forgetIncompleteSplit(data->node, PostingItemGetBlockNumber(pitem), data->updateBlkno);
173 else
175 IndexTuple itup;
177 Assert(!GinPageIsData(page));
179 if (!XLByteLE(lsn, PageGetLSN(page)))
181 if (data->updateBlkno != InvalidBlockNumber)
183 /* update link to right page after split */
184 Assert(!GinPageIsLeaf(page));
185 Assert(data->offset >= FirstOffsetNumber && data->offset <= PageGetMaxOffsetNumber(page));
186 itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, data->offset));
187 ItemPointerSet(&itup->t_tid, data->updateBlkno, InvalidOffsetNumber);
190 if (data->isDelete)
192 Assert(GinPageIsLeaf(page));
193 Assert(data->offset >= FirstOffsetNumber && data->offset <= PageGetMaxOffsetNumber(page));
194 PageIndexTupleDelete(page, data->offset);
197 itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert));
199 if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), data->offset, false, false) == InvalidOffsetNumber)
200 elog(ERROR, "failed to add item to index page in %u/%u/%u",
201 data->node.spcNode, data->node.dbNode, data->node.relNode);
204 if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
206 itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert));
207 forgetIncompleteSplit(data->node, GinItemPointerGetBlockNumber(&itup->t_tid), data->updateBlkno);
211 if (!XLByteLE(lsn, PageGetLSN(page)))
213 PageSetLSN(page, lsn);
214 PageSetTLI(page, ThisTimeLineID);
216 MarkBufferDirty(buffer);
218 UnlockReleaseBuffer(buffer);
221 static void
222 ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
224 ginxlogSplit *data = (ginxlogSplit *) XLogRecGetData(record);
225 Buffer lbuffer,
226 rbuffer;
227 Page lpage,
228 rpage;
229 uint32 flags = 0;
231 if (data->isLeaf)
232 flags |= GIN_LEAF;
233 if (data->isData)
234 flags |= GIN_DATA;
236 lbuffer = XLogReadBuffer(data->node, data->lblkno, data->isRootSplit);
237 Assert(BufferIsValid(lbuffer));
238 lpage = (Page) BufferGetPage(lbuffer);
239 GinInitBuffer(lbuffer, flags);
241 rbuffer = XLogReadBuffer(data->node, data->rblkno, true);
242 Assert(BufferIsValid(rbuffer));
243 rpage = (Page) BufferGetPage(rbuffer);
244 GinInitBuffer(rbuffer, flags);
246 GinPageGetOpaque(lpage)->rightlink = BufferGetBlockNumber(rbuffer);
247 GinPageGetOpaque(rpage)->rightlink = data->rrlink;
249 if (data->isData)
251 char *ptr = XLogRecGetData(record) + sizeof(ginxlogSplit);
252 Size sizeofitem = GinSizeOfItem(lpage);
253 OffsetNumber i;
254 ItemPointer bound;
256 for (i = 0; i < data->separator; i++)
258 GinDataPageAddItem(lpage, ptr, InvalidOffsetNumber);
259 ptr += sizeofitem;
262 for (i = data->separator; i < data->nitem; i++)
264 GinDataPageAddItem(rpage, ptr, InvalidOffsetNumber);
265 ptr += sizeofitem;
268 /* set up right key */
269 bound = GinDataPageGetRightBound(lpage);
270 if (data->isLeaf)
271 *bound = *(ItemPointerData *) GinDataPageGetItem(lpage, GinPageGetOpaque(lpage)->maxoff);
272 else
273 *bound = ((PostingItem *) GinDataPageGetItem(lpage, GinPageGetOpaque(lpage)->maxoff))->key;
275 bound = GinDataPageGetRightBound(rpage);
276 *bound = data->rightbound;
278 else
280 IndexTuple itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogSplit));
281 OffsetNumber i;
283 for (i = 0; i < data->separator; i++)
285 if (PageAddItem(lpage, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
286 elog(ERROR, "failed to add item to index page in %u/%u/%u",
287 data->node.spcNode, data->node.dbNode, data->node.relNode);
288 itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
291 for (i = data->separator; i < data->nitem; i++)
293 if (PageAddItem(rpage, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
294 elog(ERROR, "failed to add item to index page in %u/%u/%u",
295 data->node.spcNode, data->node.dbNode, data->node.relNode);
296 itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
300 PageSetLSN(rpage, lsn);
301 PageSetTLI(rpage, ThisTimeLineID);
302 MarkBufferDirty(rbuffer);
304 PageSetLSN(lpage, lsn);
305 PageSetTLI(lpage, ThisTimeLineID);
306 MarkBufferDirty(lbuffer);
308 if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
309 forgetIncompleteSplit(data->node, data->leftChildBlkno, data->updateBlkno);
311 if (data->isRootSplit)
313 Buffer rootBuf = XLogReadBuffer(data->node, data->rootBlkno, false);
314 Page rootPage = BufferGetPage(rootBuf);
316 GinInitBuffer(rootBuf, flags & ~GIN_LEAF);
318 if (data->isData)
320 Assert(data->rootBlkno != GIN_ROOT_BLKNO);
321 dataFillRoot(NULL, rootBuf, lbuffer, rbuffer);
323 else
325 Assert(data->rootBlkno == GIN_ROOT_BLKNO);
326 entryFillRoot(NULL, rootBuf, lbuffer, rbuffer);
329 PageSetLSN(rootPage, lsn);
330 PageSetTLI(rootPage, ThisTimeLineID);
332 MarkBufferDirty(rootBuf);
333 UnlockReleaseBuffer(rootBuf);
335 else
336 pushIncompleteSplit(data->node, data->lblkno, data->rblkno, data->rootBlkno);
338 UnlockReleaseBuffer(rbuffer);
339 UnlockReleaseBuffer(lbuffer);
342 static void
343 ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record)
345 ginxlogVacuumPage *data = (ginxlogVacuumPage *) XLogRecGetData(record);
346 Buffer buffer;
347 Page page;
349 /* nothing else to do if page was backed up (and no info to do it with) */
350 if (record->xl_info & XLR_BKP_BLOCK_1)
351 return;
353 buffer = XLogReadBuffer(data->node, data->blkno, false);
354 Assert(BufferIsValid(buffer));
355 page = (Page) BufferGetPage(buffer);
357 if (GinPageIsData(page))
359 memcpy(GinDataPageGetData(page), XLogRecGetData(record) + sizeof(ginxlogVacuumPage),
360 GinSizeOfItem(page) *data->nitem);
361 GinPageGetOpaque(page)->maxoff = data->nitem;
363 else
365 OffsetNumber i,
366 *tod;
367 IndexTuple itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogVacuumPage));
369 tod = (OffsetNumber *) palloc(sizeof(OffsetNumber) * PageGetMaxOffsetNumber(page));
370 for (i = FirstOffsetNumber; i <= PageGetMaxOffsetNumber(page); i++)
371 tod[i - 1] = i;
373 PageIndexMultiDelete(page, tod, PageGetMaxOffsetNumber(page));
375 for (i = 0; i < data->nitem; i++)
377 if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
378 elog(ERROR, "failed to add item to index page in %u/%u/%u",
379 data->node.spcNode, data->node.dbNode, data->node.relNode);
380 itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
384 PageSetLSN(page, lsn);
385 PageSetTLI(page, ThisTimeLineID);
387 MarkBufferDirty(buffer);
388 UnlockReleaseBuffer(buffer);
391 static void
392 ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record)
394 ginxlogDeletePage *data = (ginxlogDeletePage *) XLogRecGetData(record);
395 Buffer buffer;
396 Page page;
398 if (!(record->xl_info & XLR_BKP_BLOCK_1))
400 buffer = XLogReadBuffer(data->node, data->blkno, false);
401 page = BufferGetPage(buffer);
402 Assert(GinPageIsData(page));
403 GinPageGetOpaque(page)->flags = GIN_DELETED;
404 PageSetLSN(page, lsn);
405 PageSetTLI(page, ThisTimeLineID);
406 MarkBufferDirty(buffer);
407 UnlockReleaseBuffer(buffer);
410 if (!(record->xl_info & XLR_BKP_BLOCK_2))
412 buffer = XLogReadBuffer(data->node, data->parentBlkno, false);
413 page = BufferGetPage(buffer);
414 Assert(GinPageIsData(page));
415 Assert(!GinPageIsLeaf(page));
416 PageDeletePostingItem(page, data->parentOffset);
417 PageSetLSN(page, lsn);
418 PageSetTLI(page, ThisTimeLineID);
419 MarkBufferDirty(buffer);
420 UnlockReleaseBuffer(buffer);
423 if (!(record->xl_info & XLR_BKP_BLOCK_3) && data->leftBlkno != InvalidBlockNumber)
425 buffer = XLogReadBuffer(data->node, data->leftBlkno, false);
426 page = BufferGetPage(buffer);
427 Assert(GinPageIsData(page));
428 GinPageGetOpaque(page)->rightlink = data->rightLink;
429 PageSetLSN(page, lsn);
430 PageSetTLI(page, ThisTimeLineID);
431 MarkBufferDirty(buffer);
432 UnlockReleaseBuffer(buffer);
436 void
437 gin_redo(XLogRecPtr lsn, XLogRecord *record)
439 uint8 info = record->xl_info & ~XLR_INFO_MASK;
441 topCtx = MemoryContextSwitchTo(opCtx);
442 switch (info)
444 case XLOG_GIN_CREATE_INDEX:
445 ginRedoCreateIndex(lsn, record);
446 break;
447 case XLOG_GIN_CREATE_PTREE:
448 ginRedoCreatePTree(lsn, record);
449 break;
450 case XLOG_GIN_INSERT:
451 ginRedoInsert(lsn, record);
452 break;
453 case XLOG_GIN_SPLIT:
454 ginRedoSplit(lsn, record);
455 break;
456 case XLOG_GIN_VACUUM_PAGE:
457 ginRedoVacuumPage(lsn, record);
458 break;
459 case XLOG_GIN_DELETE_PAGE:
460 ginRedoDeletePage(lsn, record);
461 break;
462 default:
463 elog(PANIC, "gin_redo: unknown op code %u", info);
465 MemoryContextSwitchTo(topCtx);
466 MemoryContextReset(opCtx);
469 static void
470 desc_node(StringInfo buf, RelFileNode node, BlockNumber blkno)
472 appendStringInfo(buf, "node: %u/%u/%u blkno: %u",
473 node.spcNode, node.dbNode, node.relNode, blkno);
476 void
477 gin_desc(StringInfo buf, uint8 xl_info, char *rec)
479 uint8 info = xl_info & ~XLR_INFO_MASK;
481 switch (info)
483 case XLOG_GIN_CREATE_INDEX:
484 appendStringInfo(buf, "Create index, ");
485 desc_node(buf, *(RelFileNode *) rec, GIN_ROOT_BLKNO);
486 break;
487 case XLOG_GIN_CREATE_PTREE:
488 appendStringInfo(buf, "Create posting tree, ");
489 desc_node(buf, ((ginxlogCreatePostingTree *) rec)->node, ((ginxlogCreatePostingTree *) rec)->blkno);
490 break;
491 case XLOG_GIN_INSERT:
492 appendStringInfo(buf, "Insert item, ");
493 desc_node(buf, ((ginxlogInsert *) rec)->node, ((ginxlogInsert *) rec)->blkno);
494 appendStringInfo(buf, " offset: %u nitem: %u isdata: %c isleaf %c isdelete %c updateBlkno:%u",
495 ((ginxlogInsert *) rec)->offset,
496 ((ginxlogInsert *) rec)->nitem,
497 (((ginxlogInsert *) rec)->isData) ? 'T' : 'F',
498 (((ginxlogInsert *) rec)->isLeaf) ? 'T' : 'F',
499 (((ginxlogInsert *) rec)->isDelete) ? 'T' : 'F',
500 ((ginxlogInsert *) rec)->updateBlkno
503 break;
504 case XLOG_GIN_SPLIT:
505 appendStringInfo(buf, "Page split, ");
506 desc_node(buf, ((ginxlogSplit *) rec)->node, ((ginxlogSplit *) rec)->lblkno);
507 appendStringInfo(buf, " isrootsplit: %c", (((ginxlogSplit *) rec)->isRootSplit) ? 'T' : 'F');
508 break;
509 case XLOG_GIN_VACUUM_PAGE:
510 appendStringInfo(buf, "Vacuum page, ");
511 desc_node(buf, ((ginxlogVacuumPage *) rec)->node, ((ginxlogVacuumPage *) rec)->blkno);
512 break;
513 case XLOG_GIN_DELETE_PAGE:
514 appendStringInfo(buf, "Delete page, ");
515 desc_node(buf, ((ginxlogDeletePage *) rec)->node, ((ginxlogDeletePage *) rec)->blkno);
516 break;
517 default:
518 elog(PANIC, "gin_desc: unknown op code %u", info);
522 void
523 gin_xlog_startup(void)
525 incomplete_splits = NIL;
527 opCtx = AllocSetContextCreate(CurrentMemoryContext,
528 "GIN recovery temporary context",
529 ALLOCSET_DEFAULT_MINSIZE,
530 ALLOCSET_DEFAULT_INITSIZE,
531 ALLOCSET_DEFAULT_MAXSIZE);
534 static void
535 ginContinueSplit(ginIncompleteSplit *split)
537 GinBtreeData btree;
538 Relation reln;
539 Buffer buffer;
540 GinBtreeStack stack;
543 * elog(NOTICE,"ginContinueSplit root:%u l:%u r:%u", split->rootBlkno,
544 * split->leftBlkno, split->rightBlkno);
546 buffer = XLogReadBuffer(split->node, split->leftBlkno, false);
548 reln = CreateFakeRelcacheEntry(split->node);
550 if (split->rootBlkno == GIN_ROOT_BLKNO)
552 prepareEntryScan(&btree, reln, InvalidOffsetNumber, (Datum) 0, NULL);
553 btree.entry = ginPageGetLinkItup(buffer);
555 else
557 Page page = BufferGetPage(buffer);
559 prepareDataScan(&btree, reln);
561 PostingItemSetBlockNumber(&(btree.pitem), split->leftBlkno);
562 if (GinPageIsLeaf(page))
563 btree.pitem.key = *(ItemPointerData *) GinDataPageGetItem(page,
564 GinPageGetOpaque(page)->maxoff);
565 else
566 btree.pitem.key = ((PostingItem *) GinDataPageGetItem(page,
567 GinPageGetOpaque(page)->maxoff))->key;
570 FreeFakeRelcacheEntry(reln);
572 btree.rightblkno = split->rightBlkno;
574 stack.blkno = split->leftBlkno;
575 stack.buffer = buffer;
576 stack.off = InvalidOffsetNumber;
577 stack.parent = NULL;
579 findParents(&btree, &stack, split->rootBlkno);
580 ginInsertValue(&btree, stack.parent);
582 UnlockReleaseBuffer(buffer);
585 void
586 gin_xlog_cleanup(void)
588 ListCell *l;
589 MemoryContext topCtx;
591 topCtx = MemoryContextSwitchTo(opCtx);
593 foreach(l, incomplete_splits)
595 ginIncompleteSplit *split = (ginIncompleteSplit *) lfirst(l);
597 ginContinueSplit(split);
598 MemoryContextReset(opCtx);
601 MemoryContextSwitchTo(topCtx);
602 MemoryContextDelete(opCtx);
603 incomplete_splits = NIL;
606 bool
607 gin_safe_restartpoint(void)
609 if (incomplete_splits)
610 return false;
611 return true;