Force a checkpoint in CREATE DATABASE before starting to copy the files,
[PostgreSQL.git] / src / backend / access / gist / gist.c
blobf4f63b4cf4766d8ac4d0c4de2b63039536f6c8a2
1 /*-------------------------------------------------------------------------
3 * gist.c
4 * interface routines for the postgres GiST index access method.
7 * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
10 * IDENTIFICATION
11 * $PostgreSQL$
13 *-------------------------------------------------------------------------
15 #include "postgres.h"
17 #include "access/genam.h"
18 #include "access/gist_private.h"
19 #include "catalog/index.h"
20 #include "miscadmin.h"
21 #include "storage/bufmgr.h"
22 #include "storage/indexfsm.h"
23 #include "utils/memutils.h"
25 const XLogRecPtr XLogRecPtrForTemp = {1, 1};
27 /* Working state for gistbuild and its callback */
28 typedef struct
30 GISTSTATE giststate;
31 int numindexattrs;
32 double indtuples;
33 MemoryContext tmpCtx;
34 } GISTBuildState;
37 /* non-export function prototypes */
38 static void gistbuildCallback(Relation index,
39 HeapTuple htup,
40 Datum *values,
41 bool *isnull,
42 bool tupleIsAlive,
43 void *state);
44 static void gistdoinsert(Relation r,
45 IndexTuple itup,
46 Size freespace,
47 GISTSTATE *GISTstate);
48 static void gistfindleaf(GISTInsertState *state,
49 GISTSTATE *giststate);
52 #define ROTATEDIST(d) do { \
53 SplitedPageLayout *tmp=(SplitedPageLayout*)palloc(sizeof(SplitedPageLayout)); \
54 memset(tmp,0,sizeof(SplitedPageLayout)); \
55 tmp->block.blkno = InvalidBlockNumber; \
56 tmp->buffer = InvalidBuffer; \
57 tmp->next = (d); \
58 (d)=tmp; \
59 } while(0)
63 * Create and return a temporary memory context for use by GiST. We
64 * _always_ invoke user-provided methods in a temporary memory
65 * context, so that memory leaks in those functions cannot cause
66 * problems. Also, we use some additional temporary contexts in the
67 * GiST code itself, to avoid the need to do some awkward manual
68 * memory management.
70 MemoryContext
71 createTempGistContext(void)
73 return AllocSetContextCreate(CurrentMemoryContext,
74 "GiST temporary context",
75 ALLOCSET_DEFAULT_MINSIZE,
76 ALLOCSET_DEFAULT_INITSIZE,
77 ALLOCSET_DEFAULT_MAXSIZE);
81 * Routine to build an index. Basically calls insert over and over.
83 * XXX: it would be nice to implement some sort of bulk-loading
84 * algorithm, but it is not clear how to do that.
86 Datum
87 gistbuild(PG_FUNCTION_ARGS)
89 Relation heap = (Relation) PG_GETARG_POINTER(0);
90 Relation index = (Relation) PG_GETARG_POINTER(1);
91 IndexInfo *indexInfo = (IndexInfo *) PG_GETARG_POINTER(2);
92 IndexBuildResult *result;
93 double reltuples;
94 GISTBuildState buildstate;
95 Buffer buffer;
96 Page page;
99 * We expect to be called exactly once for any index relation. If that's
100 * not the case, big trouble's what we have.
102 if (RelationGetNumberOfBlocks(index) != 0)
103 elog(ERROR, "index \"%s\" already contains data",
104 RelationGetRelationName(index));
106 /* Initialize FSM */
107 InitIndexFreeSpaceMap(index);
109 /* no locking is needed */
110 initGISTstate(&buildstate.giststate, index);
112 /* initialize the root page */
113 buffer = gistNewBuffer(index);
114 Assert(BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO);
115 page = BufferGetPage(buffer);
117 START_CRIT_SECTION();
119 GISTInitBuffer(buffer, F_LEAF);
121 MarkBufferDirty(buffer);
123 if (!index->rd_istemp)
125 XLogRecPtr recptr;
126 XLogRecData rdata;
128 rdata.data = (char *) &(index->rd_node);
129 rdata.len = sizeof(RelFileNode);
130 rdata.buffer = InvalidBuffer;
131 rdata.next = NULL;
133 recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_CREATE_INDEX, &rdata);
134 PageSetLSN(page, recptr);
135 PageSetTLI(page, ThisTimeLineID);
137 else
138 PageSetLSN(page, XLogRecPtrForTemp);
140 UnlockReleaseBuffer(buffer);
142 END_CRIT_SECTION();
144 /* build the index */
145 buildstate.numindexattrs = indexInfo->ii_NumIndexAttrs;
146 buildstate.indtuples = 0;
149 * create a temporary memory context that is reset once for each tuple
150 * inserted into the index
152 buildstate.tmpCtx = createTempGistContext();
154 /* do the heap scan */
155 reltuples = IndexBuildHeapScan(heap, index, indexInfo,
156 gistbuildCallback, (void *) &buildstate);
158 /* okay, all heap tuples are indexed */
159 MemoryContextDelete(buildstate.tmpCtx);
161 freeGISTstate(&buildstate.giststate);
164 * Return statistics
166 result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
168 result->heap_tuples = reltuples;
169 result->index_tuples = buildstate.indtuples;
171 PG_RETURN_POINTER(result);
175 * Per-tuple callback from IndexBuildHeapScan
177 static void
178 gistbuildCallback(Relation index,
179 HeapTuple htup,
180 Datum *values,
181 bool *isnull,
182 bool tupleIsAlive,
183 void *state)
185 GISTBuildState *buildstate = (GISTBuildState *) state;
186 IndexTuple itup;
187 MemoryContext oldCtx;
189 oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx);
191 /* form an index tuple and point it at the heap tuple */
192 itup = gistFormTuple(&buildstate->giststate, index,
193 values, isnull, true /* size is currently bogus */ );
194 itup->t_tid = htup->t_self;
197 * Since we already have the index relation locked, we call gistdoinsert
198 * directly. Normal access method calls dispatch through gistinsert,
199 * which locks the relation for write. This is the right thing to do if
200 * you're inserting single tups, but not when you're initializing the
201 * whole index at once.
203 * In this path we respect the fillfactor setting, whereas insertions
204 * after initial build do not.
206 gistdoinsert(index, itup,
207 RelationGetTargetPageFreeSpace(index, GIST_DEFAULT_FILLFACTOR),
208 &buildstate->giststate);
210 buildstate->indtuples += 1;
211 MemoryContextSwitchTo(oldCtx);
212 MemoryContextReset(buildstate->tmpCtx);
216 * gistinsert -- wrapper for GiST tuple insertion.
218 * This is the public interface routine for tuple insertion in GiSTs.
219 * It doesn't do any work; just locks the relation and passes the buck.
221 Datum
222 gistinsert(PG_FUNCTION_ARGS)
224 Relation r = (Relation) PG_GETARG_POINTER(0);
225 Datum *values = (Datum *) PG_GETARG_POINTER(1);
226 bool *isnull = (bool *) PG_GETARG_POINTER(2);
227 ItemPointer ht_ctid = (ItemPointer) PG_GETARG_POINTER(3);
229 #ifdef NOT_USED
230 Relation heapRel = (Relation) PG_GETARG_POINTER(4);
231 bool checkUnique = PG_GETARG_BOOL(5);
232 #endif
233 IndexTuple itup;
234 GISTSTATE giststate;
235 MemoryContext oldCtx;
236 MemoryContext insertCtx;
238 insertCtx = createTempGistContext();
239 oldCtx = MemoryContextSwitchTo(insertCtx);
241 initGISTstate(&giststate, r);
243 itup = gistFormTuple(&giststate, r,
244 values, isnull, true /* size is currently bogus */ );
245 itup->t_tid = *ht_ctid;
247 gistdoinsert(r, itup, 0, &giststate);
249 /* cleanup */
250 freeGISTstate(&giststate);
251 MemoryContextSwitchTo(oldCtx);
252 MemoryContextDelete(insertCtx);
254 PG_RETURN_BOOL(true);
259 * Workhouse routine for doing insertion into a GiST index. Note that
260 * this routine assumes it is invoked in a short-lived memory context,
261 * so it does not bother releasing palloc'd allocations.
263 static void
264 gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate)
266 GISTInsertState state;
268 memset(&state, 0, sizeof(GISTInsertState));
270 state.itup = (IndexTuple *) palloc(sizeof(IndexTuple));
271 state.itup[0] = (IndexTuple) palloc(IndexTupleSize(itup));
272 memcpy(state.itup[0], itup, IndexTupleSize(itup));
273 state.ituplen = 1;
274 state.freespace = freespace;
275 state.r = r;
276 state.key = itup->t_tid;
277 state.needInsertComplete = true;
279 state.stack = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
280 state.stack->blkno = GIST_ROOT_BLKNO;
282 gistfindleaf(&state, giststate);
283 gistmakedeal(&state, giststate);
286 static bool
287 gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
289 bool is_splitted = false;
290 bool is_leaf = (GistPageIsLeaf(state->stack->page)) ? true : false;
293 * if (!is_leaf) remove old key: This node's key has been modified, either
294 * because a child split occurred or because we needed to adjust our key
295 * for an insert in a child node. Therefore, remove the old version of
296 * this node's key.
298 * for WAL replay, in the non-split case we handle this by setting up a
299 * one-element todelete array; in the split case, it's handled implicitly
300 * because the tuple vector passed to gistSplit won't include this tuple.
302 * XXX: If we want to change fillfactors between node and leaf, fillfactor
303 * = (is_leaf ? state->leaf_fillfactor : state->node_fillfactor)
305 if (gistnospace(state->stack->page, state->itup, state->ituplen,
306 is_leaf ? InvalidOffsetNumber : state->stack->childoffnum,
307 state->freespace))
309 /* no space for insertion */
310 IndexTuple *itvec;
311 int tlen;
312 SplitedPageLayout *dist = NULL,
313 *ptr;
314 BlockNumber rrlink = InvalidBlockNumber;
315 GistNSN oldnsn;
317 is_splitted = true;
320 * Form index tuples vector to split: remove old tuple if t's needed
321 * and add new tuples to vector
323 itvec = gistextractpage(state->stack->page, &tlen);
324 if (!is_leaf)
326 /* on inner page we should remove old tuple */
327 int pos = state->stack->childoffnum - FirstOffsetNumber;
329 tlen--;
330 if (pos != tlen)
331 memmove(itvec + pos, itvec + pos + 1, sizeof(IndexTuple) * (tlen - pos));
333 itvec = gistjoinvector(itvec, &tlen, state->itup, state->ituplen);
334 dist = gistSplit(state->r, state->stack->page, itvec, tlen, giststate);
336 state->itup = (IndexTuple *) palloc(sizeof(IndexTuple) * tlen);
337 state->ituplen = 0;
339 if (state->stack->blkno != GIST_ROOT_BLKNO)
342 * if non-root split then we should not allocate new buffer, but
343 * we must create temporary page to operate
345 dist->buffer = state->stack->buffer;
346 dist->page = PageGetTempPage(BufferGetPage(dist->buffer), sizeof(GISTPageOpaqueData));
348 /* clean all flags except F_LEAF */
349 GistPageGetOpaque(dist->page)->flags = (is_leaf) ? F_LEAF : 0;
352 /* make new pages and fills them */
353 for (ptr = dist; ptr; ptr = ptr->next)
355 int i;
356 char *data;
358 /* get new page */
359 if (ptr->buffer == InvalidBuffer)
361 ptr->buffer = gistNewBuffer(state->r);
362 GISTInitBuffer(ptr->buffer, (is_leaf) ? F_LEAF : 0);
363 ptr->page = BufferGetPage(ptr->buffer);
365 ptr->block.blkno = BufferGetBlockNumber(ptr->buffer);
368 * fill page, we can do it because all these pages are new (ie not
369 * linked in tree or masked by temp page
371 data = (char *) (ptr->list);
372 for (i = 0; i < ptr->block.num; i++)
374 if (PageAddItem(ptr->page, (Item) data, IndexTupleSize((IndexTuple) data), i + FirstOffsetNumber, false, false) == InvalidOffsetNumber)
375 elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(state->r));
376 data += IndexTupleSize((IndexTuple) data);
379 /* set up ItemPointer and remember it for parent */
380 ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
381 state->itup[state->ituplen] = ptr->itup;
382 state->ituplen++;
385 /* saves old rightlink */
386 if (state->stack->blkno != GIST_ROOT_BLKNO)
387 rrlink = GistPageGetOpaque(dist->page)->rightlink;
389 START_CRIT_SECTION();
392 * must mark buffers dirty before XLogInsert, even though we'll still
393 * be changing their opaque fields below. set up right links.
395 for (ptr = dist; ptr; ptr = ptr->next)
397 MarkBufferDirty(ptr->buffer);
398 GistPageGetOpaque(ptr->page)->rightlink = (ptr->next) ?
399 ptr->next->block.blkno : rrlink;
402 /* restore splitted non-root page */
403 if (state->stack->blkno != GIST_ROOT_BLKNO)
405 PageRestoreTempPage(dist->page, BufferGetPage(dist->buffer));
406 dist->page = BufferGetPage(dist->buffer);
409 if (!state->r->rd_istemp)
411 XLogRecPtr recptr;
412 XLogRecData *rdata;
414 rdata = formSplitRdata(state->r->rd_node, state->stack->blkno,
415 is_leaf, &(state->key), dist);
417 recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
419 for (ptr = dist; ptr; ptr = ptr->next)
421 PageSetLSN(ptr->page, recptr);
422 PageSetTLI(ptr->page, ThisTimeLineID);
425 else
427 for (ptr = dist; ptr; ptr = ptr->next)
429 PageSetLSN(ptr->page, XLogRecPtrForTemp);
433 /* set up NSN */
434 oldnsn = GistPageGetOpaque(dist->page)->nsn;
435 if (state->stack->blkno == GIST_ROOT_BLKNO)
436 /* if root split we should put initial value */
437 oldnsn = PageGetLSN(dist->page);
439 for (ptr = dist; ptr; ptr = ptr->next)
441 /* only for last set oldnsn */
442 GistPageGetOpaque(ptr->page)->nsn = (ptr->next) ?
443 PageGetLSN(ptr->page) : oldnsn;
447 * release buffers, if it was a root split then release all buffers
448 * because we create all buffers
450 ptr = (state->stack->blkno == GIST_ROOT_BLKNO) ? dist : dist->next;
451 for (; ptr; ptr = ptr->next)
452 UnlockReleaseBuffer(ptr->buffer);
454 if (state->stack->blkno == GIST_ROOT_BLKNO)
456 gistnewroot(state->r, state->stack->buffer, state->itup, state->ituplen, &(state->key));
457 state->needInsertComplete = false;
460 END_CRIT_SECTION();
462 else
464 /* enough space */
465 START_CRIT_SECTION();
467 if (!is_leaf)
468 PageIndexTupleDelete(state->stack->page, state->stack->childoffnum);
469 gistfillbuffer(state->stack->page, state->itup, state->ituplen, InvalidOffsetNumber);
471 MarkBufferDirty(state->stack->buffer);
473 if (!state->r->rd_istemp)
475 OffsetNumber noffs = 0,
476 offs[1];
477 XLogRecPtr recptr;
478 XLogRecData *rdata;
480 if (!is_leaf)
482 /* only on inner page we should delete previous version */
483 offs[0] = state->stack->childoffnum;
484 noffs = 1;
487 rdata = formUpdateRdata(state->r->rd_node, state->stack->buffer,
488 offs, noffs,
489 state->itup, state->ituplen,
490 &(state->key));
492 recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
493 PageSetLSN(state->stack->page, recptr);
494 PageSetTLI(state->stack->page, ThisTimeLineID);
496 else
497 PageSetLSN(state->stack->page, XLogRecPtrForTemp);
499 if (state->stack->blkno == GIST_ROOT_BLKNO)
500 state->needInsertComplete = false;
502 END_CRIT_SECTION();
504 if (state->ituplen > 1)
505 { /* previous is_splitted==true */
508 * child was splited, so we must form union for insertion in
509 * parent
511 IndexTuple newtup = gistunion(state->r, state->itup, state->ituplen, giststate);
513 ItemPointerSetBlockNumber(&(newtup->t_tid), state->stack->blkno);
514 state->itup[0] = newtup;
515 state->ituplen = 1;
517 else if (is_leaf)
520 * itup[0] store key to adjust parent, we set it to valid to
521 * correct check by GistTupleIsInvalid macro in gistgetadjusted()
523 ItemPointerSetBlockNumber(&(state->itup[0]->t_tid), state->stack->blkno);
524 GistTupleSetValid(state->itup[0]);
527 return is_splitted;
531 * returns stack of pages, all pages in stack are pinned, and
532 * leaf is X-locked
535 static void
536 gistfindleaf(GISTInsertState *state, GISTSTATE *giststate)
538 ItemId iid;
539 IndexTuple idxtuple;
540 GISTPageOpaque opaque;
543 * walk down, We don't lock page for a long time, but so we should be
544 * ready to recheck path in a bad case... We remember, that page->lsn
545 * should never be invalid.
547 for (;;)
549 if (XLogRecPtrIsInvalid(state->stack->lsn))
550 state->stack->buffer = ReadBuffer(state->r, state->stack->blkno);
551 LockBuffer(state->stack->buffer, GIST_SHARE);
552 gistcheckpage(state->r, state->stack->buffer);
554 state->stack->page = (Page) BufferGetPage(state->stack->buffer);
555 opaque = GistPageGetOpaque(state->stack->page);
557 state->stack->lsn = PageGetLSN(state->stack->page);
558 Assert(state->r->rd_istemp || !XLogRecPtrIsInvalid(state->stack->lsn));
560 if (state->stack->blkno != GIST_ROOT_BLKNO &&
561 XLByteLT(state->stack->parent->lsn, opaque->nsn))
564 * caused split non-root page is detected, go up to parent to
565 * choose best child
567 UnlockReleaseBuffer(state->stack->buffer);
568 state->stack = state->stack->parent;
569 continue;
572 if (!GistPageIsLeaf(state->stack->page))
575 * This is an internal page, so continue to walk down the tree. We
576 * find the child node that has the minimum insertion penalty and
577 * recursively invoke ourselves to modify that node. Once the
578 * recursive call returns, we may need to adjust the parent node
579 * for two reasons: the child node split, or the key in this node
580 * needs to be adjusted for the newly inserted key below us.
582 GISTInsertStack *item = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
584 state->stack->childoffnum = gistchoose(state->r, state->stack->page, state->itup[0], giststate);
586 iid = PageGetItemId(state->stack->page, state->stack->childoffnum);
587 idxtuple = (IndexTuple) PageGetItem(state->stack->page, iid);
588 item->blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
589 LockBuffer(state->stack->buffer, GIST_UNLOCK);
591 item->parent = state->stack;
592 item->child = NULL;
593 if (state->stack)
594 state->stack->child = item;
595 state->stack = item;
597 else
599 /* be carefull, during unlock/lock page may be changed... */
600 LockBuffer(state->stack->buffer, GIST_UNLOCK);
601 LockBuffer(state->stack->buffer, GIST_EXCLUSIVE);
602 state->stack->page = (Page) BufferGetPage(state->stack->buffer);
603 opaque = GistPageGetOpaque(state->stack->page);
605 if (state->stack->blkno == GIST_ROOT_BLKNO)
608 * the only page can become inner instead of leaf is a root
609 * page, so for root we should recheck it
611 if (!GistPageIsLeaf(state->stack->page))
614 * very rarely situation: during unlock/lock index with
615 * number of pages = 1 was increased
617 LockBuffer(state->stack->buffer, GIST_UNLOCK);
618 continue;
622 * we don't need to check root split, because checking
623 * leaf/inner is enough to recognize split for root
627 else if (XLByteLT(state->stack->parent->lsn, opaque->nsn))
630 * detecting split during unlock/lock, so we should find
631 * better child on parent
634 /* forget buffer */
635 UnlockReleaseBuffer(state->stack->buffer);
637 state->stack = state->stack->parent;
638 continue;
641 state->stack->lsn = PageGetLSN(state->stack->page);
643 /* ok we found a leaf page and it X-locked */
644 break;
648 /* now state->stack->(page, buffer and blkno) points to leaf page */
652 * Traverse the tree to find path from root page to specified "child" block.
654 * returns from the beginning of closest parent;
656 * To prevent deadlocks, this should lock only one page simultaneously.
658 GISTInsertStack *
659 gistFindPath(Relation r, BlockNumber child)
661 Page page;
662 Buffer buffer;
663 OffsetNumber i,
664 maxoff;
665 ItemId iid;
666 IndexTuple idxtuple;
667 GISTInsertStack *top,
668 *tail,
669 *ptr;
670 BlockNumber blkno;
672 top = tail = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
673 top->blkno = GIST_ROOT_BLKNO;
675 while (top && top->blkno != child)
677 buffer = ReadBuffer(r, top->blkno);
678 LockBuffer(buffer, GIST_SHARE);
679 gistcheckpage(r, buffer);
680 page = (Page) BufferGetPage(buffer);
682 if (GistPageIsLeaf(page))
684 /* we can safety go away, follows only leaf pages */
685 UnlockReleaseBuffer(buffer);
686 return NULL;
689 top->lsn = PageGetLSN(page);
691 if (top->parent && XLByteLT(top->parent->lsn, GistPageGetOpaque(page)->nsn) &&
692 GistPageGetOpaque(page)->rightlink != InvalidBlockNumber /* sanity check */ )
694 /* page splited while we thinking of... */
695 ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
696 ptr->blkno = GistPageGetOpaque(page)->rightlink;
697 ptr->childoffnum = InvalidOffsetNumber;
698 ptr->parent = top;
699 ptr->next = NULL;
700 tail->next = ptr;
701 tail = ptr;
704 maxoff = PageGetMaxOffsetNumber(page);
706 for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
708 iid = PageGetItemId(page, i);
709 idxtuple = (IndexTuple) PageGetItem(page, iid);
710 blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
711 if (blkno == child)
713 OffsetNumber poff = InvalidOffsetNumber;
715 /* make childs links */
716 ptr = top;
717 while (ptr->parent)
719 /* set child link */
720 ptr->parent->child = ptr;
721 /* move childoffnum.. */
722 if (ptr == top)
724 /* first iteration */
725 poff = ptr->parent->childoffnum;
726 ptr->parent->childoffnum = ptr->childoffnum;
728 else
730 OffsetNumber tmp = ptr->parent->childoffnum;
732 ptr->parent->childoffnum = poff;
733 poff = tmp;
735 ptr = ptr->parent;
737 top->childoffnum = i;
738 UnlockReleaseBuffer(buffer);
739 return top;
741 else
743 /* Install next inner page to the end of stack */
744 ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
745 ptr->blkno = blkno;
746 ptr->childoffnum = i; /* set offsetnumber of child to child
747 * !!! */
748 ptr->parent = top;
749 ptr->next = NULL;
750 tail->next = ptr;
751 tail = ptr;
755 UnlockReleaseBuffer(buffer);
756 top = top->next;
759 return NULL;
764 * Returns X-locked parent of stack page
767 static void
768 gistFindCorrectParent(Relation r, GISTInsertStack *child)
770 GISTInsertStack *parent = child->parent;
772 LockBuffer(parent->buffer, GIST_EXCLUSIVE);
773 gistcheckpage(r, parent->buffer);
774 parent->page = (Page) BufferGetPage(parent->buffer);
776 /* here we don't need to distinguish between split and page update */
777 if (parent->childoffnum == InvalidOffsetNumber || !XLByteEQ(parent->lsn, PageGetLSN(parent->page)))
779 /* parent is changed, look child in right links until found */
780 OffsetNumber i,
781 maxoff;
782 ItemId iid;
783 IndexTuple idxtuple;
784 GISTInsertStack *ptr;
786 while (true)
788 maxoff = PageGetMaxOffsetNumber(parent->page);
789 for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
791 iid = PageGetItemId(parent->page, i);
792 idxtuple = (IndexTuple) PageGetItem(parent->page, iid);
793 if (ItemPointerGetBlockNumber(&(idxtuple->t_tid)) == child->blkno)
795 /* yes!!, found */
796 parent->childoffnum = i;
797 return;
801 parent->blkno = GistPageGetOpaque(parent->page)->rightlink;
802 UnlockReleaseBuffer(parent->buffer);
803 if (parent->blkno == InvalidBlockNumber)
806 * end of chain and still didn't found parent, It's very-very
807 * rare situation when root splited
809 break;
810 parent->buffer = ReadBuffer(r, parent->blkno);
811 LockBuffer(parent->buffer, GIST_EXCLUSIVE);
812 gistcheckpage(r, parent->buffer);
813 parent->page = (Page) BufferGetPage(parent->buffer);
817 * awful!!, we need search tree to find parent ... , but before we
818 * should release all old parent
821 ptr = child->parent->parent; /* child->parent already released
822 * above */
823 while (ptr)
825 ReleaseBuffer(ptr->buffer);
826 ptr = ptr->parent;
829 /* ok, find new path */
830 ptr = parent = gistFindPath(r, child->blkno);
831 Assert(ptr != NULL);
833 /* read all buffers as expected by caller */
834 /* note we don't lock them or gistcheckpage them here! */
835 while (ptr)
837 ptr->buffer = ReadBuffer(r, ptr->blkno);
838 ptr->page = (Page) BufferGetPage(ptr->buffer);
839 ptr = ptr->parent;
842 /* install new chain of parents to stack */
843 child->parent = parent;
844 parent->child = child;
846 /* make recursive call to normal processing */
847 gistFindCorrectParent(r, child);
850 return;
853 void
854 gistmakedeal(GISTInsertState *state, GISTSTATE *giststate)
856 int is_splitted;
857 ItemId iid;
858 IndexTuple oldtup,
859 newtup;
861 /* walk up */
862 while (true)
865 * After this call: 1. if child page was splited, then itup contains
866 * keys for each page 2. if child page wasn't splited, then itup
867 * contains additional for adjustment of current key
870 if (state->stack->parent)
873 * X-lock parent page before proceed child, gistFindCorrectParent
874 * should find and lock it
876 gistFindCorrectParent(state->r, state->stack);
878 is_splitted = gistplacetopage(state, giststate);
880 /* parent locked above, so release child buffer */
881 UnlockReleaseBuffer(state->stack->buffer);
883 /* pop parent page from stack */
884 state->stack = state->stack->parent;
886 /* stack is void */
887 if (!state->stack)
888 break;
891 * child did not split, so we can check is it needed to update parent
892 * tuple
894 if (!is_splitted)
896 /* parent's tuple */
897 iid = PageGetItemId(state->stack->page, state->stack->childoffnum);
898 oldtup = (IndexTuple) PageGetItem(state->stack->page, iid);
899 newtup = gistgetadjusted(state->r, oldtup, state->itup[0], giststate);
901 if (!newtup)
902 { /* not need to update key */
903 LockBuffer(state->stack->buffer, GIST_UNLOCK);
904 break;
907 state->itup[0] = newtup;
909 } /* while */
911 /* release all parent buffers */
912 while (state->stack)
914 ReleaseBuffer(state->stack->buffer);
915 state->stack = state->stack->parent;
918 /* say to xlog that insert is completed */
919 if (state->needInsertComplete && !state->r->rd_istemp)
920 gistxlogInsertCompletion(state->r->rd_node, &(state->key), 1);
924 * gistSplit -- split a page in the tree and fill struct
925 * used for XLOG and real writes buffers. Function is recursive, ie
926 * it will split page until keys will fit in every page.
928 SplitedPageLayout *
929 gistSplit(Relation r,
930 Page page,
931 IndexTuple *itup, /* contains compressed entry */
932 int len,
933 GISTSTATE *giststate)
935 IndexTuple *lvectup,
936 *rvectup;
937 GistSplitVector v;
938 GistEntryVector *entryvec;
939 int i;
940 SplitedPageLayout *res = NULL;
942 /* generate the item array */
943 entryvec = palloc(GEVHDRSZ + (len + 1) * sizeof(GISTENTRY));
944 entryvec->n = len + 1;
946 memset(v.spl_lisnull, TRUE, sizeof(bool) * giststate->tupdesc->natts);
947 memset(v.spl_risnull, TRUE, sizeof(bool) * giststate->tupdesc->natts);
948 gistSplitByKey(r, page, itup, len, giststate,
949 &v, entryvec, 0);
951 /* form left and right vector */
952 lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
953 rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
955 for (i = 0; i < v.splitVector.spl_nleft; i++)
956 lvectup[i] = itup[v.splitVector.spl_left[i] - 1];
958 for (i = 0; i < v.splitVector.spl_nright; i++)
959 rvectup[i] = itup[v.splitVector.spl_right[i] - 1];
961 /* finalize splitting (may need another split) */
962 if (!gistfitpage(rvectup, v.splitVector.spl_nright))
964 res = gistSplit(r, page, rvectup, v.splitVector.spl_nright, giststate);
966 else
968 ROTATEDIST(res);
969 res->block.num = v.splitVector.spl_nright;
970 res->list = gistfillitupvec(rvectup, v.splitVector.spl_nright, &(res->lenlist));
971 res->itup = (v.spl_rightvalid) ? gistFormTuple(giststate, r, v.spl_rattr, v.spl_risnull, false)
972 : gist_form_invalid_tuple(GIST_ROOT_BLKNO);
975 if (!gistfitpage(lvectup, v.splitVector.spl_nleft))
977 SplitedPageLayout *resptr,
978 *subres;
980 resptr = subres = gistSplit(r, page, lvectup, v.splitVector.spl_nleft, giststate);
982 /* install on list's tail */
983 while (resptr->next)
984 resptr = resptr->next;
986 resptr->next = res;
987 res = subres;
989 else
991 ROTATEDIST(res);
992 res->block.num = v.splitVector.spl_nleft;
993 res->list = gistfillitupvec(lvectup, v.splitVector.spl_nleft, &(res->lenlist));
994 res->itup = (v.spl_leftvalid) ? gistFormTuple(giststate, r, v.spl_lattr, v.spl_lisnull, false)
995 : gist_form_invalid_tuple(GIST_ROOT_BLKNO);
998 return res;
1002 * buffer must be pinned and locked by caller
1004 void
1005 gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer key)
1007 Page page;
1009 Assert(BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO);
1010 page = BufferGetPage(buffer);
1012 START_CRIT_SECTION();
1014 GISTInitBuffer(buffer, 0);
1015 gistfillbuffer(page, itup, len, FirstOffsetNumber);
1017 MarkBufferDirty(buffer);
1019 if (!r->rd_istemp)
1021 XLogRecPtr recptr;
1022 XLogRecData *rdata;
1024 rdata = formUpdateRdata(r->rd_node, buffer,
1025 NULL, 0,
1026 itup, len, key);
1028 recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_NEW_ROOT, rdata);
1029 PageSetLSN(page, recptr);
1030 PageSetTLI(page, ThisTimeLineID);
1032 else
1033 PageSetLSN(page, XLogRecPtrForTemp);
1035 END_CRIT_SECTION();
1038 void
1039 initGISTstate(GISTSTATE *giststate, Relation index)
1041 int i;
1043 if (index->rd_att->natts > INDEX_MAX_KEYS)
1044 elog(ERROR, "numberOfAttributes %d > %d",
1045 index->rd_att->natts, INDEX_MAX_KEYS);
1047 giststate->tupdesc = index->rd_att;
1049 for (i = 0; i < index->rd_att->natts; i++)
1051 fmgr_info_copy(&(giststate->consistentFn[i]),
1052 index_getprocinfo(index, i + 1, GIST_CONSISTENT_PROC),
1053 CurrentMemoryContext);
1054 fmgr_info_copy(&(giststate->unionFn[i]),
1055 index_getprocinfo(index, i + 1, GIST_UNION_PROC),
1056 CurrentMemoryContext);
1057 fmgr_info_copy(&(giststate->compressFn[i]),
1058 index_getprocinfo(index, i + 1, GIST_COMPRESS_PROC),
1059 CurrentMemoryContext);
1060 fmgr_info_copy(&(giststate->decompressFn[i]),
1061 index_getprocinfo(index, i + 1, GIST_DECOMPRESS_PROC),
1062 CurrentMemoryContext);
1063 fmgr_info_copy(&(giststate->penaltyFn[i]),
1064 index_getprocinfo(index, i + 1, GIST_PENALTY_PROC),
1065 CurrentMemoryContext);
1066 fmgr_info_copy(&(giststate->picksplitFn[i]),
1067 index_getprocinfo(index, i + 1, GIST_PICKSPLIT_PROC),
1068 CurrentMemoryContext);
1069 fmgr_info_copy(&(giststate->equalFn[i]),
1070 index_getprocinfo(index, i + 1, GIST_EQUAL_PROC),
1071 CurrentMemoryContext);
1075 void
1076 freeGISTstate(GISTSTATE *giststate)
1078 /* no work */