Force a checkpoint in CREATE DATABASE before starting to copy the files,
[PostgreSQL.git] / src / backend / access / gin / ginvacuum.c
bloba0fa4cafe778b0a0d7e6e73921d4e3a945c119c3
1 /*-------------------------------------------------------------------------
3 * ginvacuum.c
4 * delete & vacuum routines for the postgres GIN
7 * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
10 * IDENTIFICATION
11 * $PostgreSQL$
12 *-------------------------------------------------------------------------
15 #include "postgres.h"
17 #include "access/genam.h"
18 #include "access/gin.h"
19 #include "commands/vacuum.h"
20 #include "miscadmin.h"
21 #include "storage/bufmgr.h"
22 #include "storage/freespace.h"
23 #include "storage/indexfsm.h"
24 #include "storage/lmgr.h"
26 typedef struct
28 Relation index;
29 IndexBulkDeleteResult *result;
30 IndexBulkDeleteCallback callback;
31 void *callback_state;
32 GinState ginstate;
33 BufferAccessStrategy strategy;
34 } GinVacuumState;
38 * Cleans array of ItemPointer (removes dead pointers)
39 * Results are always stored in *cleaned, which will be allocated
40 * if it's needed. In case of *cleaned!=NULL caller is responsible to
41 * have allocated enough space. *cleaned and items may point to the same
42 * memory address.
45 static uint32
46 ginVacuumPostingList(GinVacuumState *gvs, ItemPointerData *items, uint32 nitem, ItemPointerData **cleaned)
48 uint32 i,
49 j = 0;
52 * just scan over ItemPointer array
55 for (i = 0; i < nitem; i++)
57 if (gvs->callback(items + i, gvs->callback_state))
59 gvs->result->tuples_removed += 1;
60 if (!*cleaned)
62 *cleaned = (ItemPointerData *) palloc(sizeof(ItemPointerData) * nitem);
63 if (i != 0)
64 memcpy(*cleaned, items, sizeof(ItemPointerData) * i);
67 else
69 gvs->result->num_index_tuples += 1;
70 if (i != j)
71 (*cleaned)[j] = items[i];
72 j++;
76 return j;
80 * fills WAL record for vacuum leaf page
82 static void
83 xlogVacuumPage(Relation index, Buffer buffer)
85 Page page = BufferGetPage(buffer);
86 XLogRecPtr recptr;
87 XLogRecData rdata[3];
88 ginxlogVacuumPage data;
89 char *backup;
90 char itups[BLCKSZ];
91 uint32 len = 0;
93 Assert(GinPageIsLeaf(page));
95 if (index->rd_istemp)
96 return;
98 data.node = index->rd_node;
99 data.blkno = BufferGetBlockNumber(buffer);
101 if (GinPageIsData(page))
103 backup = GinDataPageGetData(page);
104 data.nitem = GinPageGetOpaque(page)->maxoff;
105 if (data.nitem)
106 len = MAXALIGN(sizeof(ItemPointerData) * data.nitem);
108 else
110 char *ptr;
111 OffsetNumber i;
113 ptr = backup = itups;
114 for (i = FirstOffsetNumber; i <= PageGetMaxOffsetNumber(page); i++)
116 IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
118 memcpy(ptr, itup, IndexTupleSize(itup));
119 ptr += MAXALIGN(IndexTupleSize(itup));
122 data.nitem = PageGetMaxOffsetNumber(page);
123 len = ptr - backup;
126 rdata[0].buffer = buffer;
127 rdata[0].buffer_std = (GinPageIsData(page)) ? FALSE : TRUE;
128 rdata[0].len = 0;
129 rdata[0].data = NULL;
130 rdata[0].next = rdata + 1;
132 rdata[1].buffer = InvalidBuffer;
133 rdata[1].len = sizeof(ginxlogVacuumPage);
134 rdata[1].data = (char *) &data;
136 if (len == 0)
138 rdata[1].next = NULL;
140 else
142 rdata[1].next = rdata + 2;
144 rdata[2].buffer = InvalidBuffer;
145 rdata[2].len = len;
146 rdata[2].data = backup;
147 rdata[2].next = NULL;
150 recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_VACUUM_PAGE, rdata);
151 PageSetLSN(page, recptr);
152 PageSetTLI(page, ThisTimeLineID);
155 static bool
156 ginVacuumPostingTreeLeaves(GinVacuumState *gvs, BlockNumber blkno, bool isRoot, Buffer *rootBuffer)
158 Buffer buffer = ReadBufferWithStrategy(gvs->index, blkno, gvs->strategy);
159 Page page = BufferGetPage(buffer);
160 bool hasVoidPage = FALSE;
163 * We should be sure that we don't concurrent with inserts, insert process
164 * never release root page until end (but it can unlock it and lock
165 * again). New scan can't start but previously started ones work
166 * concurrently.
169 if (isRoot)
170 LockBufferForCleanup(buffer);
171 else
172 LockBuffer(buffer, GIN_EXCLUSIVE);
174 Assert(GinPageIsData(page));
176 if (GinPageIsLeaf(page))
178 OffsetNumber newMaxOff,
179 oldMaxOff = GinPageGetOpaque(page)->maxoff;
180 ItemPointerData *cleaned = NULL;
182 newMaxOff = ginVacuumPostingList(gvs,
183 (ItemPointer) GinDataPageGetData(page), oldMaxOff, &cleaned);
185 /* saves changes about deleted tuple ... */
186 if (oldMaxOff != newMaxOff)
189 START_CRIT_SECTION();
191 if (newMaxOff > 0)
192 memcpy(GinDataPageGetData(page), cleaned, sizeof(ItemPointerData) * newMaxOff);
193 pfree(cleaned);
194 GinPageGetOpaque(page)->maxoff = newMaxOff;
196 MarkBufferDirty(buffer);
197 xlogVacuumPage(gvs->index, buffer);
199 END_CRIT_SECTION();
201 /* if root is a leaf page, we don't desire further processing */
202 if (!isRoot && GinPageGetOpaque(page)->maxoff < FirstOffsetNumber)
203 hasVoidPage = TRUE;
206 else
208 OffsetNumber i;
209 bool isChildHasVoid = FALSE;
211 for (i = FirstOffsetNumber; i <= GinPageGetOpaque(page)->maxoff; i++)
213 PostingItem *pitem = (PostingItem *) GinDataPageGetItem(page, i);
215 if (ginVacuumPostingTreeLeaves(gvs, PostingItemGetBlockNumber(pitem), FALSE, NULL))
216 isChildHasVoid = TRUE;
219 if (isChildHasVoid)
220 hasVoidPage = TRUE;
224 * if we have root and theres void pages in tree, then we don't release
225 * lock to go further processing and guarantee that tree is unused
227 if (!(isRoot && hasVoidPage))
229 UnlockReleaseBuffer(buffer);
231 else
233 Assert(rootBuffer);
234 *rootBuffer = buffer;
237 return hasVoidPage;
240 static void
241 ginDeletePage(GinVacuumState *gvs, BlockNumber deleteBlkno, BlockNumber leftBlkno,
242 BlockNumber parentBlkno, OffsetNumber myoff, bool isParentRoot)
244 Buffer dBuffer = ReadBufferWithStrategy(gvs->index, deleteBlkno, gvs->strategy);
245 Buffer lBuffer = (leftBlkno == InvalidBlockNumber) ?
246 InvalidBuffer : ReadBufferWithStrategy(gvs->index, leftBlkno, gvs->strategy);
247 Buffer pBuffer = ReadBufferWithStrategy(gvs->index, parentBlkno, gvs->strategy);
248 Page page,
249 parentPage;
251 LockBuffer(dBuffer, GIN_EXCLUSIVE);
252 if (!isParentRoot) /* parent is already locked by
253 * LockBufferForCleanup() */
254 LockBuffer(pBuffer, GIN_EXCLUSIVE);
255 if (leftBlkno != InvalidBlockNumber)
256 LockBuffer(lBuffer, GIN_EXCLUSIVE);
258 START_CRIT_SECTION();
260 if (leftBlkno != InvalidBlockNumber)
262 BlockNumber rightlink;
264 page = BufferGetPage(dBuffer);
265 rightlink = GinPageGetOpaque(page)->rightlink;
267 page = BufferGetPage(lBuffer);
268 GinPageGetOpaque(page)->rightlink = rightlink;
271 parentPage = BufferGetPage(pBuffer);
272 #ifdef USE_ASSERT_CHECKING
275 PostingItem *tod = (PostingItem *) GinDataPageGetItem(parentPage, myoff);
277 Assert(PostingItemGetBlockNumber(tod) == deleteBlkno);
278 } while (0);
279 #endif
280 PageDeletePostingItem(parentPage, myoff);
282 page = BufferGetPage(dBuffer);
285 * we shouldn't change rightlink field to save workability of running
286 * search scan
288 GinPageGetOpaque(page)->flags = GIN_DELETED;
290 MarkBufferDirty(pBuffer);
291 if (leftBlkno != InvalidBlockNumber)
292 MarkBufferDirty(lBuffer);
293 MarkBufferDirty(dBuffer);
295 if (!gvs->index->rd_istemp)
297 XLogRecPtr recptr;
298 XLogRecData rdata[4];
299 ginxlogDeletePage data;
300 int n;
302 data.node = gvs->index->rd_node;
303 data.blkno = deleteBlkno;
304 data.parentBlkno = parentBlkno;
305 data.parentOffset = myoff;
306 data.leftBlkno = leftBlkno;
307 data.rightLink = GinPageGetOpaque(page)->rightlink;
309 rdata[0].buffer = dBuffer;
310 rdata[0].buffer_std = FALSE;
311 rdata[0].data = NULL;
312 rdata[0].len = 0;
313 rdata[0].next = rdata + 1;
315 rdata[1].buffer = pBuffer;
316 rdata[1].buffer_std = FALSE;
317 rdata[1].data = NULL;
318 rdata[1].len = 0;
319 rdata[1].next = rdata + 2;
321 if (leftBlkno != InvalidBlockNumber)
323 rdata[2].buffer = lBuffer;
324 rdata[2].buffer_std = FALSE;
325 rdata[2].data = NULL;
326 rdata[2].len = 0;
327 rdata[2].next = rdata + 3;
328 n = 3;
330 else
331 n = 2;
333 rdata[n].buffer = InvalidBuffer;
334 rdata[n].buffer_std = FALSE;
335 rdata[n].len = sizeof(ginxlogDeletePage);
336 rdata[n].data = (char *) &data;
337 rdata[n].next = NULL;
339 recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_PAGE, rdata);
340 PageSetLSN(page, recptr);
341 PageSetTLI(page, ThisTimeLineID);
342 PageSetLSN(parentPage, recptr);
343 PageSetTLI(parentPage, ThisTimeLineID);
344 if (leftBlkno != InvalidBlockNumber)
346 page = BufferGetPage(lBuffer);
347 PageSetLSN(page, recptr);
348 PageSetTLI(page, ThisTimeLineID);
352 if (!isParentRoot)
353 LockBuffer(pBuffer, GIN_UNLOCK);
354 ReleaseBuffer(pBuffer);
356 if (leftBlkno != InvalidBlockNumber)
357 UnlockReleaseBuffer(lBuffer);
359 UnlockReleaseBuffer(dBuffer);
361 END_CRIT_SECTION();
363 gvs->result->pages_deleted++;
366 typedef struct DataPageDeleteStack
368 struct DataPageDeleteStack *child;
369 struct DataPageDeleteStack *parent;
371 BlockNumber blkno; /* current block number */
372 BlockNumber leftBlkno; /* rightest non-deleted page on left */
373 bool isRoot;
374 } DataPageDeleteStack;
377 * scans posting tree and deletes empty pages
379 static bool
380 ginScanToDelete(GinVacuumState *gvs, BlockNumber blkno, bool isRoot, DataPageDeleteStack *parent, OffsetNumber myoff)
382 DataPageDeleteStack *me;
383 Buffer buffer;
384 Page page;
385 bool meDelete = FALSE;
387 if (isRoot)
389 me = parent;
391 else
393 if (!parent->child)
395 me = (DataPageDeleteStack *) palloc0(sizeof(DataPageDeleteStack));
396 me->parent = parent;
397 parent->child = me;
398 me->leftBlkno = InvalidBlockNumber;
400 else
401 me = parent->child;
404 buffer = ReadBufferWithStrategy(gvs->index, blkno, gvs->strategy);
405 page = BufferGetPage(buffer);
407 Assert(GinPageIsData(page));
409 if (!GinPageIsLeaf(page))
411 OffsetNumber i;
413 me->blkno = blkno;
414 for (i = FirstOffsetNumber; i <= GinPageGetOpaque(page)->maxoff; i++)
416 PostingItem *pitem = (PostingItem *) GinDataPageGetItem(page, i);
418 if (ginScanToDelete(gvs, PostingItemGetBlockNumber(pitem), FALSE, me, i))
419 i--;
423 if (GinPageGetOpaque(page)->maxoff < FirstOffsetNumber)
425 if (!(me->leftBlkno == InvalidBlockNumber && GinPageRightMost(page)))
427 /* we never delete right most branch */
428 Assert(!isRoot);
429 if (GinPageGetOpaque(page)->maxoff < FirstOffsetNumber)
431 ginDeletePage(gvs, blkno, me->leftBlkno, me->parent->blkno, myoff, me->parent->isRoot);
432 meDelete = TRUE;
437 ReleaseBuffer(buffer);
439 if (!meDelete)
440 me->leftBlkno = blkno;
442 return meDelete;
445 static void
446 ginVacuumPostingTree(GinVacuumState *gvs, BlockNumber rootBlkno)
448 Buffer rootBuffer = InvalidBuffer;
449 DataPageDeleteStack root,
450 *ptr,
451 *tmp;
453 if (ginVacuumPostingTreeLeaves(gvs, rootBlkno, TRUE, &rootBuffer) == FALSE)
455 Assert(rootBuffer == InvalidBuffer);
456 return;
459 memset(&root, 0, sizeof(DataPageDeleteStack));
460 root.leftBlkno = InvalidBlockNumber;
461 root.isRoot = TRUE;
463 vacuum_delay_point();
465 ginScanToDelete(gvs, rootBlkno, TRUE, &root, InvalidOffsetNumber);
467 ptr = root.child;
468 while (ptr)
470 tmp = ptr->child;
471 pfree(ptr);
472 ptr = tmp;
475 UnlockReleaseBuffer(rootBuffer);
479 * returns modified page or NULL if page isn't modified.
480 * Function works with original page until first change is occurred,
481 * then page is copied into temporary one.
483 static Page
484 ginVacuumEntryPage(GinVacuumState *gvs, Buffer buffer, BlockNumber *roots, uint32 *nroot)
486 Page origpage = BufferGetPage(buffer),
487 tmppage;
488 OffsetNumber i,
489 maxoff = PageGetMaxOffsetNumber(origpage);
491 tmppage = origpage;
493 *nroot = 0;
495 for (i = FirstOffsetNumber; i <= maxoff; i++)
497 IndexTuple itup = (IndexTuple) PageGetItem(tmppage, PageGetItemId(tmppage, i));
499 if (GinIsPostingTree(itup))
502 * store posting tree's roots for further processing, we can't
503 * vacuum it just now due to risk of deadlocks with scans/inserts
505 roots[*nroot] = GinItemPointerGetBlockNumber(&itup->t_tid);
506 (*nroot)++;
508 else if (GinGetNPosting(itup) > 0)
511 * if we already create temporary page, we will make changes in
512 * place
514 ItemPointerData *cleaned = (tmppage == origpage) ? NULL : GinGetPosting(itup);
515 uint32 newN = ginVacuumPostingList(gvs, GinGetPosting(itup), GinGetNPosting(itup), &cleaned);
517 if (GinGetNPosting(itup) != newN)
519 Datum value;
520 OffsetNumber attnum;
523 * Some ItemPointers was deleted, so we should remake our
524 * tuple
527 if (tmppage == origpage)
530 * On first difference we create temporary page in memory
531 * and copies content in to it.
533 tmppage = GinPageGetCopyPage(origpage);
535 if (newN > 0)
537 Size pos = ((char *) GinGetPosting(itup)) - ((char *) origpage);
539 memcpy(tmppage + pos, cleaned, sizeof(ItemPointerData) * newN);
542 pfree(cleaned);
544 /* set itup pointer to new page */
545 itup = (IndexTuple) PageGetItem(tmppage, PageGetItemId(tmppage, i));
548 value = gin_index_getattr(&gvs->ginstate, itup);
549 attnum = gintuple_get_attrnum(&gvs->ginstate, itup);
550 itup = GinFormTuple(&gvs->ginstate, attnum, value, GinGetPosting(itup), newN);
551 PageIndexTupleDelete(tmppage, i);
553 if (PageAddItem(tmppage, (Item) itup, IndexTupleSize(itup), i, false, false) != i)
554 elog(ERROR, "failed to add item to index page in \"%s\"",
555 RelationGetRelationName(gvs->index));
557 pfree(itup);
562 return (tmppage == origpage) ? NULL : tmppage;
565 Datum
566 ginbulkdelete(PG_FUNCTION_ARGS)
568 IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
569 IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
570 IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(2);
571 void *callback_state = (void *) PG_GETARG_POINTER(3);
572 Relation index = info->index;
573 BlockNumber blkno = GIN_ROOT_BLKNO;
574 GinVacuumState gvs;
575 Buffer buffer;
576 BlockNumber rootOfPostingTree[BLCKSZ / (sizeof(IndexTupleData) + sizeof(ItemId))];
577 uint32 nRoot;
579 /* first time through? */
580 if (stats == NULL)
581 stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
582 /* we'll re-count the tuples each time */
583 stats->num_index_tuples = 0;
585 gvs.index = index;
586 gvs.result = stats;
587 gvs.callback = callback;
588 gvs.callback_state = callback_state;
589 gvs.strategy = info->strategy;
590 initGinState(&gvs.ginstate, index);
592 buffer = ReadBufferWithStrategy(index, blkno, info->strategy);
594 /* find leaf page */
595 for (;;)
597 Page page = BufferGetPage(buffer);
598 IndexTuple itup;
600 LockBuffer(buffer, GIN_SHARE);
602 Assert(!GinPageIsData(page));
604 if (GinPageIsLeaf(page))
606 LockBuffer(buffer, GIN_UNLOCK);
607 LockBuffer(buffer, GIN_EXCLUSIVE);
609 if (blkno == GIN_ROOT_BLKNO && !GinPageIsLeaf(page))
611 LockBuffer(buffer, GIN_UNLOCK);
612 continue; /* check it one more */
614 break;
617 Assert(PageGetMaxOffsetNumber(page) >= FirstOffsetNumber);
619 itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, FirstOffsetNumber));
620 blkno = GinItemPointerGetBlockNumber(&(itup)->t_tid);
621 Assert(blkno != InvalidBlockNumber);
623 UnlockReleaseBuffer(buffer);
624 buffer = ReadBufferWithStrategy(index, blkno, info->strategy);
627 /* right now we found leftmost page in entry's BTree */
629 for (;;)
631 Page page = BufferGetPage(buffer);
632 Page resPage;
633 uint32 i;
635 Assert(!GinPageIsData(page));
637 resPage = ginVacuumEntryPage(&gvs, buffer, rootOfPostingTree, &nRoot);
639 blkno = GinPageGetOpaque(page)->rightlink;
641 if (resPage)
643 START_CRIT_SECTION();
644 PageRestoreTempPage(resPage, page);
645 MarkBufferDirty(buffer);
646 xlogVacuumPage(gvs.index, buffer);
647 UnlockReleaseBuffer(buffer);
648 END_CRIT_SECTION();
650 else
652 UnlockReleaseBuffer(buffer);
655 vacuum_delay_point();
657 for (i = 0; i < nRoot; i++)
659 ginVacuumPostingTree(&gvs, rootOfPostingTree[i]);
660 vacuum_delay_point();
663 if (blkno == InvalidBlockNumber) /* rightmost page */
664 break;
666 buffer = ReadBufferWithStrategy(index, blkno, info->strategy);
667 LockBuffer(buffer, GIN_EXCLUSIVE);
670 PG_RETURN_POINTER(gvs.result);
673 Datum
674 ginvacuumcleanup(PG_FUNCTION_ARGS)
676 IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
677 IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
678 Relation index = info->index;
679 bool needLock;
680 BlockNumber npages,
681 blkno;
682 BlockNumber totFreePages;
683 BlockNumber lastBlock = GIN_ROOT_BLKNO,
684 lastFilledBlock = GIN_ROOT_BLKNO;
686 /* Set up all-zero stats if ginbulkdelete wasn't called */
687 if (stats == NULL)
688 stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
691 * XXX we always report the heap tuple count as the number of index
692 * entries. This is bogus if the index is partial, but it's real hard to
693 * tell how many distinct heap entries are referenced by a GIN index.
695 stats->num_index_tuples = info->num_heap_tuples;
698 * If vacuum full, we already have exclusive lock on the index. Otherwise,
699 * need lock unless it's local to this backend.
701 if (info->vacuum_full)
702 needLock = false;
703 else
704 needLock = !RELATION_IS_LOCAL(index);
706 if (needLock)
707 LockRelationForExtension(index, ExclusiveLock);
708 npages = RelationGetNumberOfBlocks(index);
709 if (needLock)
710 UnlockRelationForExtension(index, ExclusiveLock);
712 totFreePages = 0;
714 for (blkno = GIN_ROOT_BLKNO + 1; blkno < npages; blkno++)
716 Buffer buffer;
717 Page page;
719 vacuum_delay_point();
721 buffer = ReadBufferWithStrategy(index, blkno, info->strategy);
722 LockBuffer(buffer, GIN_SHARE);
723 page = (Page) BufferGetPage(buffer);
725 if (GinPageIsDeleted(page))
727 RecordFreeIndexPage(index, blkno);
728 totFreePages++;
730 else
731 lastFilledBlock = blkno;
733 UnlockReleaseBuffer(buffer);
735 lastBlock = npages - 1;
737 if (info->vacuum_full && lastBlock > lastFilledBlock)
739 /* try to truncate index */
740 FreeSpaceMapTruncateRel(index, lastFilledBlock + 1);
741 RelationTruncate(index, lastFilledBlock + 1);
743 stats->pages_removed = lastBlock - lastFilledBlock;
744 totFreePages = totFreePages - stats->pages_removed;
747 /* Finally, vacuum the FSM */
748 IndexFreeSpaceMapVacuum(info->index);
750 stats->pages_free = totFreePages;
752 if (needLock)
753 LockRelationForExtension(index, ExclusiveLock);
754 stats->num_pages = RelationGetNumberOfBlocks(index);
755 if (needLock)
756 UnlockRelationForExtension(index, ExclusiveLock);
758 PG_RETURN_POINTER(stats);