1 /*-------------------------------------------------------------------------
4 * interface routines for the postgres GiST index access method.
7 * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
13 *-------------------------------------------------------------------------
17 #include "access/genam.h"
18 #include "access/gist_private.h"
19 #include "commands/vacuum.h"
20 #include "miscadmin.h"
21 #include "storage/bufmgr.h"
22 #include "storage/freespace.h"
23 #include "storage/indexfsm.h"
24 #include "storage/lmgr.h"
25 #include "utils/memutils.h"
28 typedef struct GistBulkDeleteResult
30 IndexBulkDeleteResult std
; /* common state */
32 } GistBulkDeleteResult
;
39 GistBulkDeleteResult
*result
;
40 BufferAccessStrategy strategy
;
51 * Make union of keys on page
54 PageMakeUnionKey(GistVacuum
*gv
, Buffer buffer
)
56 Page page
= BufferGetPage(buffer
);
61 MemoryContext oldCtx
= MemoryContextSwitchTo(gv
->opCtx
);
63 vec
= gistextractpage(page
, &veclen
);
66 * we call gistunion() in temprorary context because user-defined
67 * functions called in gistunion() may do not free all memory
69 tmp
= gistunion(gv
->index
, vec
, veclen
, &(gv
->giststate
));
70 MemoryContextSwitchTo(oldCtx
);
72 res
= (IndexTuple
) palloc(IndexTupleSize(tmp
));
73 memcpy(res
, tmp
, IndexTupleSize(tmp
));
75 ItemPointerSetBlockNumber(&(res
->t_tid
), BufferGetBlockNumber(buffer
));
76 GistTupleSetValid(res
);
78 MemoryContextReset(gv
->opCtx
);
84 gistDeleteSubtree(GistVacuum
*gv
, BlockNumber blkno
)
89 buffer
= ReadBufferWithStrategy(gv
->index
, blkno
, gv
->strategy
);
90 LockBuffer(buffer
, GIST_EXCLUSIVE
);
91 page
= (Page
) BufferGetPage(buffer
);
93 if (!GistPageIsLeaf(page
))
97 for (i
= FirstOffsetNumber
; i
<= PageGetMaxOffsetNumber(page
); i
= OffsetNumberNext(i
))
99 ItemId iid
= PageGetItemId(page
, i
);
100 IndexTuple idxtuple
= (IndexTuple
) PageGetItem(page
, iid
);
102 gistDeleteSubtree(gv
, ItemPointerGetBlockNumber(&(idxtuple
->t_tid
)));
106 START_CRIT_SECTION();
108 MarkBufferDirty(buffer
);
110 page
= (Page
) BufferGetPage(buffer
);
111 GistPageSetDeleted(page
);
112 gv
->result
->std
.pages_deleted
++;
114 if (!gv
->index
->rd_istemp
)
116 XLogRecData rdata
[2];
118 gistxlogPageDelete xlrec
;
120 xlrec
.node
= gv
->index
->rd_node
;
123 rdata
[0].buffer
= buffer
;
124 rdata
[0].buffer_std
= true;
125 rdata
[0].data
= NULL
;
127 rdata
[0].next
= &(rdata
[1]);
129 rdata
[1].buffer
= InvalidBuffer
;
130 rdata
[1].data
= (char *) &xlrec
;
131 rdata
[1].len
= sizeof(gistxlogPageDelete
);
132 rdata
[1].next
= NULL
;
134 recptr
= XLogInsert(RM_GIST_ID
, XLOG_GIST_PAGE_DELETE
, rdata
);
135 PageSetLSN(page
, recptr
);
136 PageSetTLI(page
, ThisTimeLineID
);
139 PageSetLSN(page
, XLogRecPtrForTemp
);
143 UnlockReleaseBuffer(buffer
);
147 GistPageGetCopyPage(Page page
)
149 Size pageSize
= PageGetPageSize(page
);
152 tmppage
= (Page
) palloc(pageSize
);
153 memcpy(tmppage
, page
, pageSize
);
159 vacuumSplitPage(GistVacuum
*gv
, Page tempPage
, Buffer buffer
, IndexTuple
*addon
, int curlenaddon
)
161 ArrayTuple res
= {NULL
, 0, false};
163 SplitedPageLayout
*dist
= NULL
,
167 BlockNumber blkno
= BufferGetBlockNumber(buffer
);
168 MemoryContext oldCtx
= MemoryContextSwitchTo(gv
->opCtx
);
170 vec
= gistextractpage(tempPage
, &veclen
);
171 vec
= gistjoinvector(vec
, &veclen
, addon
, curlenaddon
);
172 dist
= gistSplit(gv
->index
, tempPage
, vec
, veclen
, &(gv
->giststate
));
174 MemoryContextSwitchTo(oldCtx
);
176 if (blkno
!= GIST_ROOT_BLKNO
)
178 /* if non-root split then we should not allocate new buffer */
179 dist
->buffer
= buffer
;
180 dist
->page
= tempPage
;
181 /* during vacuum we never split leaf page */
182 GistPageGetOpaque(dist
->page
)->flags
= 0;
187 res
.itup
= (IndexTuple
*) palloc(sizeof(IndexTuple
) * veclen
);
190 /* make new pages and fills them */
191 for (ptr
= dist
; ptr
; ptr
= ptr
->next
)
195 if (ptr
->buffer
== InvalidBuffer
)
197 ptr
->buffer
= gistNewBuffer(gv
->index
);
198 GISTInitBuffer(ptr
->buffer
, 0);
199 ptr
->page
= BufferGetPage(ptr
->buffer
);
201 ptr
->block
.blkno
= BufferGetBlockNumber(ptr
->buffer
);
203 data
= (char *) (ptr
->list
);
204 for (i
= 0; i
< ptr
->block
.num
; i
++)
206 if (PageAddItem(ptr
->page
, (Item
) data
, IndexTupleSize((IndexTuple
) data
), i
+ FirstOffsetNumber
, false, false) == InvalidOffsetNumber
)
207 elog(ERROR
, "failed to add item to index page in \"%s\"", RelationGetRelationName(gv
->index
));
208 data
+= IndexTupleSize((IndexTuple
) data
);
211 ItemPointerSetBlockNumber(&(ptr
->itup
->t_tid
), ptr
->block
.blkno
);
212 res
.itup
[res
.ituplen
] = (IndexTuple
) palloc(IndexTupleSize(ptr
->itup
));
213 memcpy(res
.itup
[res
.ituplen
], ptr
->itup
, IndexTupleSize(ptr
->itup
));
217 START_CRIT_SECTION();
219 for (ptr
= dist
; ptr
; ptr
= ptr
->next
)
221 MarkBufferDirty(ptr
->buffer
);
222 GistPageGetOpaque(ptr
->page
)->rightlink
= InvalidBlockNumber
;
225 /* restore splitted non-root page */
226 if (blkno
!= GIST_ROOT_BLKNO
)
228 PageRestoreTempPage(dist
->page
, BufferGetPage(dist
->buffer
));
229 dist
->page
= BufferGetPage(dist
->buffer
);
232 if (!gv
->index
->rd_istemp
)
236 ItemPointerData key
; /* set key for incomplete insert */
239 ItemPointerSet(&key
, blkno
, TUPLE_IS_VALID
);
241 rdata
= formSplitRdata(gv
->index
->rd_node
, blkno
,
243 xlinfo
= rdata
->data
;
245 recptr
= XLogInsert(RM_GIST_ID
, XLOG_GIST_PAGE_SPLIT
, rdata
);
246 for (ptr
= dist
; ptr
; ptr
= ptr
->next
)
248 PageSetLSN(BufferGetPage(ptr
->buffer
), recptr
);
249 PageSetTLI(BufferGetPage(ptr
->buffer
), ThisTimeLineID
);
257 for (ptr
= dist
; ptr
; ptr
= ptr
->next
)
258 PageSetLSN(BufferGetPage(ptr
->buffer
), XLogRecPtrForTemp
);
261 for (ptr
= dist
; ptr
; ptr
= ptr
->next
)
263 /* we must keep the buffer pin on the head page */
264 if (BufferGetBlockNumber(ptr
->buffer
) != blkno
)
265 UnlockReleaseBuffer(ptr
->buffer
);
268 if (blkno
== GIST_ROOT_BLKNO
)
270 ItemPointerData key
; /* set key for incomplete insert */
272 ItemPointerSet(&key
, blkno
, TUPLE_IS_VALID
);
274 gistnewroot(gv
->index
, buffer
, res
.itup
, res
.ituplen
, &key
);
279 MemoryContextReset(gv
->opCtx
);
285 gistVacuumUpdate(GistVacuum
*gv
, BlockNumber blkno
, bool needunion
)
287 ArrayTuple res
= {NULL
, 0, false};
300 bool needwrite
= false;
301 OffsetNumber offToDelete
[MaxOffsetNumber
];
302 BlockNumber blkToDelete
[MaxOffsetNumber
];
303 ItemPointerData
*completed
= NULL
;
307 vacuum_delay_point();
309 buffer
= ReadBufferWithStrategy(gv
->index
, blkno
, gv
->strategy
);
310 LockBuffer(buffer
, GIST_EXCLUSIVE
);
311 gistcheckpage(gv
->index
, buffer
);
312 page
= (Page
) BufferGetPage(buffer
);
313 maxoff
= PageGetMaxOffsetNumber(page
);
315 if (GistPageIsLeaf(page
))
317 if (GistTuplesDeleted(page
))
318 needunion
= needwrite
= true;
322 completed
= (ItemPointerData
*) palloc(sizeof(ItemPointerData
) * lencompleted
);
323 addon
= (IndexTuple
*) palloc(sizeof(IndexTuple
) * lenaddon
);
325 /* get copy of page to work */
326 tempPage
= GistPageGetCopyPage(page
);
328 for (i
= FirstOffsetNumber
; i
<= maxoff
; i
= OffsetNumberNext(i
))
330 ArrayTuple chldtuple
;
333 iid
= PageGetItemId(tempPage
, i
);
334 idxtuple
= (IndexTuple
) PageGetItem(tempPage
, iid
);
335 needchildunion
= (GistTupleIsInvalid(idxtuple
)) ? true : false;
338 elog(DEBUG2
, "gistVacuumUpdate: need union for block %u",
339 ItemPointerGetBlockNumber(&(idxtuple
->t_tid
)));
341 chldtuple
= gistVacuumUpdate(gv
, ItemPointerGetBlockNumber(&(idxtuple
->t_tid
)),
343 if (chldtuple
.ituplen
|| chldtuple
.emptypage
)
345 /* update tuple or/and inserts new */
346 if (chldtuple
.emptypage
)
347 blkToDelete
[nBlkToDelete
++] = ItemPointerGetBlockNumber(&(idxtuple
->t_tid
));
348 offToDelete
[nOffToDelete
++] = i
;
349 PageIndexTupleDelete(tempPage
, i
);
352 needwrite
= needunion
= true;
354 if (chldtuple
.ituplen
)
357 Assert(chldtuple
.emptypage
== false);
358 while (curlenaddon
+ chldtuple
.ituplen
>= lenaddon
)
361 addon
= (IndexTuple
*) repalloc(addon
, sizeof(IndexTuple
) * lenaddon
);
364 memcpy(addon
+ curlenaddon
, chldtuple
.itup
, chldtuple
.ituplen
* sizeof(IndexTuple
));
366 curlenaddon
+= chldtuple
.ituplen
;
368 if (chldtuple
.ituplen
> 1)
371 * child was split, so we need mark completion
376 while (ncompleted
+ chldtuple
.ituplen
> lencompleted
)
379 completed
= (ItemPointerData
*) repalloc(completed
, sizeof(ItemPointerData
) * lencompleted
);
381 for (j
= 0; j
< chldtuple
.ituplen
; j
++)
383 ItemPointerCopy(&(chldtuple
.itup
[j
]->t_tid
), completed
+ ncompleted
);
387 pfree(chldtuple
.itup
);
392 Assert(maxoff
== PageGetMaxOffsetNumber(tempPage
));
396 /* insert updated tuples */
397 if (gistnospace(tempPage
, addon
, curlenaddon
, InvalidOffsetNumber
, 0))
399 /* there is no space on page to insert tuples */
400 res
= vacuumSplitPage(gv
, tempPage
, buffer
, addon
, curlenaddon
);
401 tempPage
= NULL
; /* vacuumSplitPage() free tempPage */
402 needwrite
= needunion
= false; /* gistSplit already forms
403 * unions and writes pages */
406 /* enough free space */
407 gistfillbuffer(tempPage
, addon
, curlenaddon
, InvalidOffsetNumber
);
412 * If page is empty, we should remove pointer to it before deleting page
416 if (blkno
!= GIST_ROOT_BLKNO
&& (PageIsEmpty(page
) || (tempPage
&& PageIsEmpty(tempPage
))))
419 * New version of page is empty, so leave it unchanged, upper call
420 * will mark our page as deleted. In case of page split we never will
423 * If page was empty it can't become non-empty during processing
425 res
.emptypage
= true;
426 UnlockReleaseBuffer(buffer
);
430 /* write page and remove its childs if it need */
432 START_CRIT_SECTION();
434 if (tempPage
&& needwrite
)
436 PageRestoreTempPage(tempPage
, page
);
441 if (PageIsEmpty(page
) && blkno
== GIST_ROOT_BLKNO
)
444 GistPageSetLeaf(page
);
450 MarkBufferDirty(buffer
);
451 GistClearTuplesDeleted(page
);
453 if (!gv
->index
->rd_istemp
)
459 rdata
= formUpdateRdata(gv
->index
->rd_node
, buffer
,
460 offToDelete
, nOffToDelete
,
461 addon
, curlenaddon
, NULL
);
462 xlinfo
= rdata
->next
->data
;
464 recptr
= XLogInsert(RM_GIST_ID
, XLOG_GIST_PAGE_UPDATE
, rdata
);
465 PageSetLSN(page
, recptr
);
466 PageSetTLI(page
, ThisTimeLineID
);
472 PageSetLSN(page
, XLogRecPtrForTemp
);
477 if (needunion
&& !PageIsEmpty(page
))
479 res
.itup
= (IndexTuple
*) palloc(sizeof(IndexTuple
));
481 res
.itup
[0] = PageMakeUnionKey(gv
, buffer
);
484 UnlockReleaseBuffer(buffer
);
486 /* delete empty children, now we havn't any links to pointed subtrees */
487 for (i
= 0; i
< nBlkToDelete
; i
++)
488 gistDeleteSubtree(gv
, blkToDelete
[i
]);
490 if (ncompleted
&& !gv
->index
->rd_istemp
)
491 gistxlogInsertCompletion(gv
->index
->rd_node
, completed
, ncompleted
);
495 for (i
= 0; i
< curlenaddon
; i
++)
508 * For usual vacuum just update FSM, for full vacuum
509 * reforms parent tuples if some of childs was deleted or changed,
510 * update invalid tuples (they can exist from last crash recovery only),
511 * tries to get smaller index
515 gistvacuumcleanup(PG_FUNCTION_ARGS
)
517 IndexVacuumInfo
*info
= (IndexVacuumInfo
*) PG_GETARG_POINTER(0);
518 GistBulkDeleteResult
*stats
= (GistBulkDeleteResult
*) PG_GETARG_POINTER(1);
519 Relation rel
= info
->index
;
522 BlockNumber totFreePages
;
523 BlockNumber lastBlock
= GIST_ROOT_BLKNO
,
524 lastFilledBlock
= GIST_ROOT_BLKNO
;
527 /* Set up all-zero stats if gistbulkdelete wasn't called */
530 stats
= (GistBulkDeleteResult
*) palloc0(sizeof(GistBulkDeleteResult
));
531 /* use heap's tuple count */
532 Assert(info
->num_heap_tuples
>= 0);
533 stats
->std
.num_index_tuples
= info
->num_heap_tuples
;
536 * XXX the above is wrong if index is partial. Would it be OK to just
537 * return NULL, or is there work we must do below?
541 /* gistVacuumUpdate may cause hard work */
542 if (info
->vacuum_full
)
547 /* note: vacuum.c already acquired AccessExclusiveLock on index */
550 initGISTstate(&(gv
.giststate
), rel
);
551 gv
.opCtx
= createTempGistContext();
553 gv
.strategy
= info
->strategy
;
555 /* walk through the entire index for update tuples */
556 res
= gistVacuumUpdate(&gv
, GIST_ROOT_BLKNO
, false);
562 for (i
= 0; i
< res
.ituplen
; i
++)
566 freeGISTstate(&(gv
.giststate
));
567 MemoryContextDelete(gv
.opCtx
);
569 else if (stats
->needFullVacuum
)
571 (errmsg("index \"%s\" needs VACUUM FULL or REINDEX to finish crash recovery",
572 RelationGetRelationName(rel
))));
575 * If vacuum full, we already have exclusive lock on the index. Otherwise,
576 * need lock unless it's local to this backend.
578 if (info
->vacuum_full
)
581 needLock
= !RELATION_IS_LOCAL(rel
);
583 /* try to find deleted pages */
585 LockRelationForExtension(rel
, ExclusiveLock
);
586 npages
= RelationGetNumberOfBlocks(rel
);
588 UnlockRelationForExtension(rel
, ExclusiveLock
);
591 for (blkno
= GIST_ROOT_BLKNO
+ 1; blkno
< npages
; blkno
++)
596 vacuum_delay_point();
598 buffer
= ReadBufferWithStrategy(rel
, blkno
, info
->strategy
);
599 LockBuffer(buffer
, GIST_SHARE
);
600 page
= (Page
) BufferGetPage(buffer
);
602 if (PageIsNew(page
) || GistPageIsDeleted(page
))
605 RecordFreeIndexPage(rel
, blkno
);
608 lastFilledBlock
= blkno
;
609 UnlockReleaseBuffer(buffer
);
611 lastBlock
= npages
- 1;
613 if (info
->vacuum_full
&& lastFilledBlock
< lastBlock
)
614 { /* try to truncate index */
615 FreeSpaceMapTruncateRel(rel
, lastFilledBlock
+ 1);
616 RelationTruncate(rel
, lastFilledBlock
+ 1);
618 stats
->std
.pages_removed
= lastBlock
- lastFilledBlock
;
619 totFreePages
= totFreePages
- stats
->std
.pages_removed
;
622 /* Finally, vacuum the FSM */
623 IndexFreeSpaceMapVacuum(info
->index
);
625 /* return statistics */
626 stats
->std
.pages_free
= totFreePages
;
628 LockRelationForExtension(rel
, ExclusiveLock
);
629 stats
->std
.num_pages
= RelationGetNumberOfBlocks(rel
);
631 UnlockRelationForExtension(rel
, ExclusiveLock
);
633 PG_RETURN_POINTER(stats
);
636 typedef struct GistBDItem
640 struct GistBDItem
*next
;
644 pushStackIfSplited(Page page
, GistBDItem
*stack
)
646 GISTPageOpaque opaque
= GistPageGetOpaque(page
);
648 if (stack
->blkno
!= GIST_ROOT_BLKNO
&& !XLogRecPtrIsInvalid(stack
->parentlsn
) &&
649 XLByteLT(stack
->parentlsn
, opaque
->nsn
) &&
650 opaque
->rightlink
!= InvalidBlockNumber
/* sanity check */ )
652 /* split page detected, install right link to the stack */
654 GistBDItem
*ptr
= (GistBDItem
*) palloc(sizeof(GistBDItem
));
656 ptr
->blkno
= opaque
->rightlink
;
657 ptr
->parentlsn
= stack
->parentlsn
;
658 ptr
->next
= stack
->next
;
665 * Bulk deletion of all index entries pointing to a set of heap tuples and
666 * check invalid tuples after crash recovery.
667 * The set of target tuples is specified via a callback routine that tells
668 * whether any given heap tuple (identified by ItemPointer) is being deleted.
670 * Result: a palloc'd struct containing statistical info for VACUUM displays.
673 gistbulkdelete(PG_FUNCTION_ARGS
)
675 IndexVacuumInfo
*info
= (IndexVacuumInfo
*) PG_GETARG_POINTER(0);
676 GistBulkDeleteResult
*stats
= (GistBulkDeleteResult
*) PG_GETARG_POINTER(1);
677 IndexBulkDeleteCallback callback
= (IndexBulkDeleteCallback
) PG_GETARG_POINTER(2);
678 void *callback_state
= (void *) PG_GETARG_POINTER(3);
679 Relation rel
= info
->index
;
683 /* first time through? */
685 stats
= (GistBulkDeleteResult
*) palloc0(sizeof(GistBulkDeleteResult
));
686 /* we'll re-count the tuples each time */
687 stats
->std
.num_index_tuples
= 0;
689 stack
= (GistBDItem
*) palloc0(sizeof(GistBDItem
));
690 stack
->blkno
= GIST_ROOT_BLKNO
;
694 Buffer buffer
= ReadBufferWithStrategy(rel
, stack
->blkno
, info
->strategy
);
701 LockBuffer(buffer
, GIST_SHARE
);
702 gistcheckpage(rel
, buffer
);
703 page
= (Page
) BufferGetPage(buffer
);
705 if (GistPageIsLeaf(page
))
707 OffsetNumber todelete
[MaxOffsetNumber
];
710 LockBuffer(buffer
, GIST_UNLOCK
);
711 LockBuffer(buffer
, GIST_EXCLUSIVE
);
713 page
= (Page
) BufferGetPage(buffer
);
714 if (stack
->blkno
== GIST_ROOT_BLKNO
&& !GistPageIsLeaf(page
))
716 /* only the root can become non-leaf during relock */
717 UnlockReleaseBuffer(buffer
);
723 * check for split proceeded after look at parent, we should check
726 pushStackIfSplited(page
, stack
);
729 * Remove deletable tuples from page
732 maxoff
= PageGetMaxOffsetNumber(page
);
734 for (i
= FirstOffsetNumber
; i
<= maxoff
; i
= OffsetNumberNext(i
))
736 iid
= PageGetItemId(page
, i
);
737 idxtuple
= (IndexTuple
) PageGetItem(page
, iid
);
739 if (callback(&(idxtuple
->t_tid
), callback_state
))
741 todelete
[ntodelete
] = i
- ntodelete
;
743 stats
->std
.tuples_removed
+= 1;
746 stats
->std
.num_index_tuples
+= 1;
751 START_CRIT_SECTION();
753 MarkBufferDirty(buffer
);
755 for (i
= 0; i
< ntodelete
; i
++)
756 PageIndexTupleDelete(page
, todelete
[i
]);
757 GistMarkTuplesDeleted(page
);
763 gistxlogPageUpdate
*xlinfo
;
765 rdata
= formUpdateRdata(rel
->rd_node
, buffer
,
769 xlinfo
= (gistxlogPageUpdate
*) rdata
->next
->data
;
771 recptr
= XLogInsert(RM_GIST_ID
, XLOG_GIST_PAGE_UPDATE
, rdata
);
772 PageSetLSN(page
, recptr
);
773 PageSetTLI(page
, ThisTimeLineID
);
779 PageSetLSN(page
, XLogRecPtrForTemp
);
787 /* check for split proceeded after look at parent */
788 pushStackIfSplited(page
, stack
);
790 maxoff
= PageGetMaxOffsetNumber(page
);
792 for (i
= FirstOffsetNumber
; i
<= maxoff
; i
= OffsetNumberNext(i
))
794 iid
= PageGetItemId(page
, i
);
795 idxtuple
= (IndexTuple
) PageGetItem(page
, iid
);
797 ptr
= (GistBDItem
*) palloc(sizeof(GistBDItem
));
798 ptr
->blkno
= ItemPointerGetBlockNumber(&(idxtuple
->t_tid
));
799 ptr
->parentlsn
= PageGetLSN(page
);
800 ptr
->next
= stack
->next
;
803 if (GistTupleIsInvalid(idxtuple
))
804 stats
->needFullVacuum
= true;
808 UnlockReleaseBuffer(buffer
);
814 vacuum_delay_point();
817 PG_RETURN_POINTER(stats
);