1 /*-------------------------------------------------------------------------
4 * implementation of insert algorithm
7 * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
11 * src/backend/access/spgist/spgdoinsert.c
13 *-------------------------------------------------------------------------
18 #include "access/genam.h"
19 #include "access/spgist_private.h"
20 #include "access/spgxlog.h"
21 #include "access/xloginsert.h"
22 #include "common/int.h"
23 #include "common/pg_prng.h"
24 #include "miscadmin.h"
25 #include "storage/bufmgr.h"
26 #include "utils/rel.h"
30 * SPPageDesc tracks all info about a page we are inserting into. In some
31 * situations it actually identifies a tuple, or even a specific node within
32 * an inner tuple. But any of the fields can be invalid. If the buffer
33 * field is valid, it implies we hold pin and exclusive lock on that buffer.
34 * page pointer should be valid exactly when buffer is.
36 typedef struct SPPageDesc
38 BlockNumber blkno
; /* block number, or InvalidBlockNumber */
39 Buffer buffer
; /* page's buffer number, or InvalidBuffer */
40 Page page
; /* pointer to page buffer, or NULL */
41 OffsetNumber offnum
; /* offset of tuple, or InvalidOffsetNumber */
42 int node
; /* node number within inner tuple, or -1 */
47 * Set the item pointer in the nodeN'th entry in inner tuple tup. This
48 * is used to update the parent inner tuple's downlink after a move or
52 spgUpdateNodeLink(SpGistInnerTuple tup
, int nodeN
,
53 BlockNumber blkno
, OffsetNumber offset
)
58 SGITITERATE(tup
, i
, node
)
62 ItemPointerSet(&node
->t_tid
, blkno
, offset
);
67 elog(ERROR
, "failed to find requested node %d in SPGiST inner tuple",
72 * Form a new inner tuple containing one more node than the given one, with
73 * the specified label datum, inserted at offset "offset" in the node array.
74 * The new tuple's prefix is the same as the old one's.
76 * Note that the new node initially has an invalid downlink. We'll find a
77 * page to point it to later.
79 static SpGistInnerTuple
80 addNode(SpGistState
*state
, SpGistInnerTuple tuple
, Datum label
, int offset
)
86 /* if offset is negative, insert at end */
88 offset
= tuple
->nNodes
;
89 else if (offset
> tuple
->nNodes
)
90 elog(ERROR
, "invalid offset for adding node to SPGiST inner tuple");
92 nodes
= palloc(sizeof(SpGistNodeTuple
) * (tuple
->nNodes
+ 1));
93 SGITITERATE(tuple
, i
, node
)
101 nodes
[offset
] = spgFormNodeTuple(state
, label
, false);
103 return spgFormInnerTuple(state
,
104 (tuple
->prefixSize
> 0),
105 SGITDATUM(tuple
, state
),
110 /* qsort comparator for sorting OffsetNumbers */
112 cmpOffsetNumbers(const void *a
, const void *b
)
114 return pg_cmp_u16(*(const OffsetNumber
*) a
, *(const OffsetNumber
*) b
);
118 * Delete multiple tuples from an index page, preserving tuple offset numbers.
120 * The first tuple in the given list is replaced with a dead tuple of type
121 * "firststate" (REDIRECT/DEAD/PLACEHOLDER); the remaining tuples are replaced
122 * with dead tuples of type "reststate". If either firststate or reststate
123 * is REDIRECT, blkno/offnum specify where to link to.
125 * NB: this is used during WAL replay, so beware of trying to make it too
126 * smart. In particular, it shouldn't use "state" except for calling
127 * spgFormDeadTuple(). This is also used in a critical section, so no
131 spgPageIndexMultiDelete(SpGistState
*state
, Page page
,
132 OffsetNumber
*itemnos
, int nitems
,
133 int firststate
, int reststate
,
134 BlockNumber blkno
, OffsetNumber offnum
)
136 OffsetNumber firstItem
;
137 OffsetNumber sortednos
[MaxIndexTuplesPerPage
];
138 SpGistDeadTuple tuple
= NULL
;
142 return; /* nothing to do */
145 * For efficiency we want to use PageIndexMultiDelete, which requires the
146 * targets to be listed in sorted order, so we have to sort the itemnos
147 * array. (This also greatly simplifies the math for reinserting the
148 * replacement tuples.) However, we must not scribble on the caller's
149 * array, so we have to make a copy.
151 memcpy(sortednos
, itemnos
, sizeof(OffsetNumber
) * nitems
);
153 qsort(sortednos
, nitems
, sizeof(OffsetNumber
), cmpOffsetNumbers
);
155 PageIndexMultiDelete(page
, sortednos
, nitems
);
157 firstItem
= itemnos
[0];
159 for (i
= 0; i
< nitems
; i
++)
161 OffsetNumber itemno
= sortednos
[i
];
164 tupstate
= (itemno
== firstItem
) ? firststate
: reststate
;
165 if (tuple
== NULL
|| tuple
->tupstate
!= tupstate
)
166 tuple
= spgFormDeadTuple(state
, tupstate
, blkno
, offnum
);
168 if (PageAddItem(page
, (Item
) tuple
, tuple
->size
,
169 itemno
, false, false) != itemno
)
170 elog(ERROR
, "failed to add item of size %u to SPGiST index page",
173 if (tupstate
== SPGIST_REDIRECT
)
174 SpGistPageGetOpaque(page
)->nRedirection
++;
175 else if (tupstate
== SPGIST_PLACEHOLDER
)
176 SpGistPageGetOpaque(page
)->nPlaceholder
++;
181 * Update the parent inner tuple's downlink, and mark the parent buffer
182 * dirty (this must be the last change to the parent page in the current
186 saveNodeLink(Relation index
, SPPageDesc
*parent
,
187 BlockNumber blkno
, OffsetNumber offnum
)
189 SpGistInnerTuple innerTuple
;
191 innerTuple
= (SpGistInnerTuple
) PageGetItem(parent
->page
,
192 PageGetItemId(parent
->page
, parent
->offnum
));
194 spgUpdateNodeLink(innerTuple
, parent
->node
, blkno
, offnum
);
196 MarkBufferDirty(parent
->buffer
);
200 * Add a leaf tuple to a leaf page where there is known to be room for it
203 addLeafTuple(Relation index
, SpGistState
*state
, SpGistLeafTuple leafTuple
,
204 SPPageDesc
*current
, SPPageDesc
*parent
, bool isNulls
, bool isNew
)
206 spgxlogAddLeaf xlrec
;
208 xlrec
.newPage
= isNew
;
209 xlrec
.storesNulls
= isNulls
;
211 /* these will be filled below as needed */
212 xlrec
.offnumLeaf
= InvalidOffsetNumber
;
213 xlrec
.offnumHeadLeaf
= InvalidOffsetNumber
;
214 xlrec
.offnumParent
= InvalidOffsetNumber
;
217 START_CRIT_SECTION();
219 if (current
->offnum
== InvalidOffsetNumber
||
220 SpGistBlockIsRoot(current
->blkno
))
222 /* Tuple is not part of a chain */
223 SGLT_SET_NEXTOFFSET(leafTuple
, InvalidOffsetNumber
);
224 current
->offnum
= SpGistPageAddNewItem(state
, current
->page
,
225 (Item
) leafTuple
, leafTuple
->size
,
228 xlrec
.offnumLeaf
= current
->offnum
;
230 /* Must update parent's downlink if any */
231 if (parent
->buffer
!= InvalidBuffer
)
233 xlrec
.offnumParent
= parent
->offnum
;
234 xlrec
.nodeI
= parent
->node
;
236 saveNodeLink(index
, parent
, current
->blkno
, current
->offnum
);
242 * Tuple must be inserted into existing chain. We mustn't change the
243 * chain's head address, but we don't need to chase the entire chain
244 * to put the tuple at the end; we can insert it second.
246 * Also, it's possible that the "chain" consists only of a DEAD tuple,
247 * in which case we should replace the DEAD tuple in-place.
249 SpGistLeafTuple head
;
252 head
= (SpGistLeafTuple
) PageGetItem(current
->page
,
253 PageGetItemId(current
->page
, current
->offnum
));
254 if (head
->tupstate
== SPGIST_LIVE
)
256 SGLT_SET_NEXTOFFSET(leafTuple
, SGLT_GET_NEXTOFFSET(head
));
257 offnum
= SpGistPageAddNewItem(state
, current
->page
,
258 (Item
) leafTuple
, leafTuple
->size
,
262 * re-get head of list because it could have been moved on page,
263 * and set new second element
265 head
= (SpGistLeafTuple
) PageGetItem(current
->page
,
266 PageGetItemId(current
->page
, current
->offnum
));
267 SGLT_SET_NEXTOFFSET(head
, offnum
);
269 xlrec
.offnumLeaf
= offnum
;
270 xlrec
.offnumHeadLeaf
= current
->offnum
;
272 else if (head
->tupstate
== SPGIST_DEAD
)
274 SGLT_SET_NEXTOFFSET(leafTuple
, InvalidOffsetNumber
);
275 PageIndexTupleDelete(current
->page
, current
->offnum
);
276 if (PageAddItem(current
->page
,
277 (Item
) leafTuple
, leafTuple
->size
,
278 current
->offnum
, false, false) != current
->offnum
)
279 elog(ERROR
, "failed to add item of size %u to SPGiST index page",
282 /* WAL replay distinguishes this case by equal offnums */
283 xlrec
.offnumLeaf
= current
->offnum
;
284 xlrec
.offnumHeadLeaf
= current
->offnum
;
287 elog(ERROR
, "unexpected SPGiST tuple state: %d", head
->tupstate
);
290 MarkBufferDirty(current
->buffer
);
292 if (RelationNeedsWAL(index
) && !state
->isBuild
)
298 XLogRegisterData((char *) &xlrec
, sizeof(xlrec
));
299 XLogRegisterData((char *) leafTuple
, leafTuple
->size
);
301 flags
= REGBUF_STANDARD
;
303 flags
|= REGBUF_WILL_INIT
;
304 XLogRegisterBuffer(0, current
->buffer
, flags
);
305 if (xlrec
.offnumParent
!= InvalidOffsetNumber
)
306 XLogRegisterBuffer(1, parent
->buffer
, REGBUF_STANDARD
);
308 recptr
= XLogInsert(RM_SPGIST_ID
, XLOG_SPGIST_ADD_LEAF
);
310 PageSetLSN(current
->page
, recptr
);
312 /* update parent only if we actually changed it */
313 if (xlrec
.offnumParent
!= InvalidOffsetNumber
)
315 PageSetLSN(parent
->page
, recptr
);
323 * Count the number and total size of leaf tuples in the chain starting at
324 * current->offnum. Return number into *nToSplit and total size as function
327 * Klugy special case when considering the root page (i.e., root is a leaf
328 * page, but we're about to split for the first time): return fake large
329 * values to force spgdoinsert() to take the doPickSplit rather than
330 * moveLeafs code path. moveLeafs is not prepared to deal with root page.
333 checkSplitConditions(Relation index
, SpGistState
*state
,
334 SPPageDesc
*current
, int *nToSplit
)
340 if (SpGistBlockIsRoot(current
->blkno
))
342 /* return impossible values to force split */
348 while (i
!= InvalidOffsetNumber
)
352 Assert(i
>= FirstOffsetNumber
&&
353 i
<= PageGetMaxOffsetNumber(current
->page
));
354 it
= (SpGistLeafTuple
) PageGetItem(current
->page
,
355 PageGetItemId(current
->page
, i
));
356 if (it
->tupstate
== SPGIST_LIVE
)
359 totalSize
+= it
->size
+ sizeof(ItemIdData
);
361 else if (it
->tupstate
== SPGIST_DEAD
)
363 /* We could see a DEAD tuple as first/only chain item */
364 Assert(i
== current
->offnum
);
365 Assert(SGLT_GET_NEXTOFFSET(it
) == InvalidOffsetNumber
);
366 /* Don't count it in result, because it won't go to other page */
369 elog(ERROR
, "unexpected SPGiST tuple state: %d", it
->tupstate
);
371 i
= SGLT_GET_NEXTOFFSET(it
);
380 * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
381 * but the chain has to be moved because there's not enough room to add
382 * newLeafTuple to its page. We use this method when the chain contains
383 * very little data so a split would be inefficient. We are sure we can
384 * fit the chain plus newLeafTuple on one other page.
387 moveLeafs(Relation index
, SpGistState
*state
,
388 SPPageDesc
*current
, SPPageDesc
*parent
,
389 SpGistLeafTuple newLeafTuple
, bool isNulls
)
397 OffsetNumber r
= InvalidOffsetNumber
,
398 startOffset
= InvalidOffsetNumber
;
399 bool replaceDead
= false;
400 OffsetNumber
*toDelete
;
401 OffsetNumber
*toInsert
;
403 spgxlogMoveLeafs xlrec
;
407 /* This doesn't work on root page */
408 Assert(parent
->buffer
!= InvalidBuffer
);
409 Assert(parent
->buffer
!= current
->buffer
);
411 /* Locate the tuples to be moved, and count up the space needed */
412 i
= PageGetMaxOffsetNumber(current
->page
);
413 toDelete
= (OffsetNumber
*) palloc(sizeof(OffsetNumber
) * i
);
414 toInsert
= (OffsetNumber
*) palloc(sizeof(OffsetNumber
) * (i
+ 1));
416 size
= newLeafTuple
->size
+ sizeof(ItemIdData
);
420 while (i
!= InvalidOffsetNumber
)
424 Assert(i
>= FirstOffsetNumber
&&
425 i
<= PageGetMaxOffsetNumber(current
->page
));
426 it
= (SpGistLeafTuple
) PageGetItem(current
->page
,
427 PageGetItemId(current
->page
, i
));
429 if (it
->tupstate
== SPGIST_LIVE
)
431 toDelete
[nDelete
] = i
;
432 size
+= it
->size
+ sizeof(ItemIdData
);
435 else if (it
->tupstate
== SPGIST_DEAD
)
437 /* We could see a DEAD tuple as first/only chain item */
438 Assert(i
== current
->offnum
);
439 Assert(SGLT_GET_NEXTOFFSET(it
) == InvalidOffsetNumber
);
440 /* We don't want to move it, so don't count it in size */
441 toDelete
[nDelete
] = i
;
446 elog(ERROR
, "unexpected SPGiST tuple state: %d", it
->tupstate
);
448 i
= SGLT_GET_NEXTOFFSET(it
);
451 /* Find a leaf page that will hold them */
452 nbuf
= SpGistGetBuffer(index
, GBUF_LEAF
| (isNulls
? GBUF_NULLS
: 0),
453 size
, &xlrec
.newPage
);
454 npage
= BufferGetPage(nbuf
);
455 nblkno
= BufferGetBlockNumber(nbuf
);
456 Assert(nblkno
!= current
->blkno
);
458 leafdata
= leafptr
= palloc(size
);
460 START_CRIT_SECTION();
462 /* copy all the old tuples to new page, unless they're dead */
466 for (i
= 0; i
< nDelete
; i
++)
470 it
= (SpGistLeafTuple
) PageGetItem(current
->page
,
471 PageGetItemId(current
->page
, toDelete
[i
]));
472 Assert(it
->tupstate
== SPGIST_LIVE
);
475 * Update chain link (notice the chain order gets reversed, but we
476 * don't care). We're modifying the tuple on the source page
477 * here, but it's okay since we're about to delete it.
479 SGLT_SET_NEXTOFFSET(it
, r
);
481 r
= SpGistPageAddNewItem(state
, npage
, (Item
) it
, it
->size
,
482 &startOffset
, false);
484 toInsert
[nInsert
] = r
;
487 /* save modified tuple into leafdata as well */
488 memcpy(leafptr
, it
, it
->size
);
493 /* add the new tuple as well */
494 SGLT_SET_NEXTOFFSET(newLeafTuple
, r
);
495 r
= SpGistPageAddNewItem(state
, npage
,
496 (Item
) newLeafTuple
, newLeafTuple
->size
,
497 &startOffset
, false);
498 toInsert
[nInsert
] = r
;
500 memcpy(leafptr
, newLeafTuple
, newLeafTuple
->size
);
501 leafptr
+= newLeafTuple
->size
;
504 * Now delete the old tuples, leaving a redirection pointer behind for the
505 * first one, unless we're doing an index build; in which case there can't
506 * be any concurrent scan so we need not provide a redirect.
508 spgPageIndexMultiDelete(state
, current
->page
, toDelete
, nDelete
,
509 state
->isBuild
? SPGIST_PLACEHOLDER
: SPGIST_REDIRECT
,
513 /* Update parent's downlink and mark parent page dirty */
514 saveNodeLink(index
, parent
, nblkno
, r
);
516 /* Mark the leaf pages too */
517 MarkBufferDirty(current
->buffer
);
518 MarkBufferDirty(nbuf
);
520 if (RelationNeedsWAL(index
) && !state
->isBuild
)
524 /* prepare WAL info */
525 STORE_STATE(state
, xlrec
.stateSrc
);
527 xlrec
.nMoves
= nDelete
;
528 xlrec
.replaceDead
= replaceDead
;
529 xlrec
.storesNulls
= isNulls
;
531 xlrec
.offnumParent
= parent
->offnum
;
532 xlrec
.nodeI
= parent
->node
;
535 XLogRegisterData((char *) &xlrec
, SizeOfSpgxlogMoveLeafs
);
536 XLogRegisterData((char *) toDelete
,
537 sizeof(OffsetNumber
) * nDelete
);
538 XLogRegisterData((char *) toInsert
,
539 sizeof(OffsetNumber
) * nInsert
);
540 XLogRegisterData((char *) leafdata
, leafptr
- leafdata
);
542 XLogRegisterBuffer(0, current
->buffer
, REGBUF_STANDARD
);
543 XLogRegisterBuffer(1, nbuf
, REGBUF_STANDARD
| (xlrec
.newPage
? REGBUF_WILL_INIT
: 0));
544 XLogRegisterBuffer(2, parent
->buffer
, REGBUF_STANDARD
);
546 recptr
= XLogInsert(RM_SPGIST_ID
, XLOG_SPGIST_MOVE_LEAFS
);
548 PageSetLSN(current
->page
, recptr
);
549 PageSetLSN(npage
, recptr
);
550 PageSetLSN(parent
->page
, recptr
);
555 /* Update local free-space cache and release new buffer */
556 SpGistSetLastUsedPage(index
, nbuf
);
557 UnlockReleaseBuffer(nbuf
);
561 * Update previously-created redirection tuple with appropriate destination
563 * We use this when it's not convenient to know the destination first.
564 * The tuple should have been made with the "impossible" destination of
568 setRedirectionTuple(SPPageDesc
*current
, OffsetNumber position
,
569 BlockNumber blkno
, OffsetNumber offnum
)
573 dt
= (SpGistDeadTuple
) PageGetItem(current
->page
,
574 PageGetItemId(current
->page
, position
));
575 Assert(dt
->tupstate
== SPGIST_REDIRECT
);
576 Assert(ItemPointerGetBlockNumber(&dt
->pointer
) == SPGIST_METAPAGE_BLKNO
);
577 ItemPointerSet(&dt
->pointer
, blkno
, offnum
);
581 * Test to see if the user-defined picksplit function failed to do its job,
582 * ie, it put all the leaf tuples into the same node.
583 * If so, randomly divide the tuples into several nodes (all with the same
584 * label) and return true to select allTheSame mode for this inner tuple.
586 * (This code is also used to forcibly select allTheSame mode for nulls.)
588 * If we know that the leaf tuples wouldn't all fit on one page, then we
589 * exclude the last tuple (which is the incoming new tuple that forced a split)
590 * from the check to see if more than one node is used. The reason for this
591 * is that if the existing tuples are put into only one chain, then even if
592 * we move them all to an empty page, there would still not be room for the
593 * new tuple, so we'd get into an infinite loop of picksplit attempts.
594 * Forcing allTheSame mode dodges this problem by ensuring the old tuples will
595 * be split across pages. (Exercise for the reader: figure out why this
596 * fixes the problem even when there is only one old tuple.)
599 checkAllTheSame(spgPickSplitIn
*in
, spgPickSplitOut
*out
, bool tooBig
,
606 /* For the moment, assume we can include the new leaf tuple */
609 /* If there's only the new leaf tuple, don't select allTheSame mode */
610 if (in
->nTuples
<= 1)
613 /* If tuple set doesn't fit on one page, ignore the new tuple in test */
614 limit
= tooBig
? in
->nTuples
- 1 : in
->nTuples
;
616 /* Check to see if more than one node is populated */
617 theNode
= out
->mapTuplesToNodes
[0];
618 for (i
= 1; i
< limit
; i
++)
620 if (out
->mapTuplesToNodes
[i
] != theNode
)
624 /* Nope, so override the picksplit function's decisions */
626 /* If the new tuple is in its own node, it can't be included in split */
627 if (tooBig
&& out
->mapTuplesToNodes
[in
->nTuples
- 1] != theNode
)
630 out
->nNodes
= 8; /* arbitrary number of child nodes */
632 /* Random assignment of tuples to nodes (note we include new tuple) */
633 for (i
= 0; i
< in
->nTuples
; i
++)
634 out
->mapTuplesToNodes
[i
] = i
% out
->nNodes
;
636 /* The opclass may not use node labels, but if it does, duplicate 'em */
639 Datum theLabel
= out
->nodeLabels
[theNode
];
641 out
->nodeLabels
= (Datum
*) palloc(sizeof(Datum
) * out
->nNodes
);
642 for (i
= 0; i
< out
->nNodes
; i
++)
643 out
->nodeLabels
[i
] = theLabel
;
646 /* We don't touch the prefix or the leaf tuple datum assignments */
652 * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
653 * but the chain has to be split because there's not enough room to add
654 * newLeafTuple to its page.
656 * This function splits the leaf tuple set according to picksplit's rules,
657 * creating one or more new chains that are spread across the current page
658 * and an additional leaf page (we assume that two leaf pages will be
659 * sufficient). A new inner tuple is created, and the parent downlink
660 * pointer is updated to point to that inner tuple instead of the leaf chain.
662 * On exit, current contains the address of the new inner tuple.
664 * Returns true if we successfully inserted newLeafTuple during this function,
665 * false if caller still has to do it (meaning another picksplit operation is
666 * probably needed). Failure could occur if the picksplit result is fairly
667 * unbalanced, or if newLeafTuple is just plain too big to fit on a page.
668 * Because we force the picksplit result to be at least two chains, each
669 * cycle will get rid of at least one leaf tuple from the chain, so the loop
670 * will eventually terminate if lack of balance is the issue. If the tuple
671 * is too big, we assume that repeated picksplit operations will eventually
672 * make it small enough by repeated prefix-stripping. A broken opclass could
673 * make this an infinite loop, though, so spgdoinsert() checks that the
674 * leaf datums get smaller each time.
677 doPickSplit(Relation index
, SpGistState
*state
,
678 SPPageDesc
*current
, SPPageDesc
*parent
,
679 SpGistLeafTuple newLeafTuple
,
680 int level
, bool isNulls
, bool isNew
)
682 bool insertedNew
= false;
690 SpGistInnerTuple innerTuple
;
691 SpGistNodeTuple node
,
693 Buffer newInnerBuffer
,
695 uint8
*leafPageSelect
;
697 OffsetNumber
*toDelete
;
698 OffsetNumber
*toInsert
;
699 OffsetNumber redirectTuplePos
= InvalidOffsetNumber
;
700 OffsetNumber startOffsets
[2];
701 SpGistLeafTuple
*oldLeafs
;
702 SpGistLeafTuple
*newLeafs
;
703 Datum leafDatums
[INDEX_MAX_KEYS
];
704 bool leafIsnulls
[INDEX_MAX_KEYS
];
706 int currentFreeSpace
;
709 spgxlogPickSplit xlrec
;
712 SPPageDesc saveCurrent
;
720 * Allocate per-leaf-tuple work arrays with max possible size
722 max
= PageGetMaxOffsetNumber(current
->page
);
724 in
.datums
= (Datum
*) palloc(sizeof(Datum
) * n
);
725 toDelete
= (OffsetNumber
*) palloc(sizeof(OffsetNumber
) * n
);
726 toInsert
= (OffsetNumber
*) palloc(sizeof(OffsetNumber
) * n
);
727 oldLeafs
= (SpGistLeafTuple
*) palloc(sizeof(SpGistLeafTuple
) * n
);
728 newLeafs
= (SpGistLeafTuple
*) palloc(sizeof(SpGistLeafTuple
) * n
);
729 leafPageSelect
= (uint8
*) palloc(sizeof(uint8
) * n
);
731 STORE_STATE(state
, xlrec
.stateSrc
);
734 * Form list of leaf tuples which will be distributed as split result;
735 * also, count up the amount of space that will be freed from current.
736 * (Note that in the non-root case, we won't actually delete the old
737 * tuples, only replace them with redirects or placeholders.)
742 if (SpGistBlockIsRoot(current
->blkno
))
745 * We are splitting the root (which up to now is also a leaf page).
746 * Its tuples are not linked, so scan sequentially to get them all. We
747 * ignore the original value of current->offnum.
749 for (i
= FirstOffsetNumber
; i
<= max
; i
++)
753 it
= (SpGistLeafTuple
) PageGetItem(current
->page
,
754 PageGetItemId(current
->page
, i
));
755 if (it
->tupstate
== SPGIST_LIVE
)
757 in
.datums
[nToInsert
] =
758 isNulls
? (Datum
) 0 : SGLTDATUM(it
, state
);
759 oldLeafs
[nToInsert
] = it
;
761 toDelete
[nToDelete
] = i
;
763 /* we will delete the tuple altogether, so count full space */
764 spaceToDelete
+= it
->size
+ sizeof(ItemIdData
);
766 else /* tuples on root should be live */
767 elog(ERROR
, "unexpected SPGiST tuple state: %d", it
->tupstate
);
772 /* Normal case, just collect the leaf tuples in the chain */
774 while (i
!= InvalidOffsetNumber
)
778 Assert(i
>= FirstOffsetNumber
&& i
<= max
);
779 it
= (SpGistLeafTuple
) PageGetItem(current
->page
,
780 PageGetItemId(current
->page
, i
));
781 if (it
->tupstate
== SPGIST_LIVE
)
783 in
.datums
[nToInsert
] =
784 isNulls
? (Datum
) 0 : SGLTDATUM(it
, state
);
785 oldLeafs
[nToInsert
] = it
;
787 toDelete
[nToDelete
] = i
;
789 /* we will not delete the tuple, only replace with dead */
790 Assert(it
->size
>= SGDTSIZE
);
791 spaceToDelete
+= it
->size
- SGDTSIZE
;
793 else if (it
->tupstate
== SPGIST_DEAD
)
795 /* We could see a DEAD tuple as first/only chain item */
796 Assert(i
== current
->offnum
);
797 Assert(SGLT_GET_NEXTOFFSET(it
) == InvalidOffsetNumber
);
798 toDelete
[nToDelete
] = i
;
800 /* replacing it with redirect will save no space */
803 elog(ERROR
, "unexpected SPGiST tuple state: %d", it
->tupstate
);
805 i
= SGLT_GET_NEXTOFFSET(it
);
808 in
.nTuples
= nToInsert
;
811 * We may not actually insert new tuple because another picksplit may be
812 * necessary due to too large value, but we will try to allocate enough
813 * space to include it; and in any case it has to be included in the input
814 * for the picksplit function. So don't increment nToInsert yet.
816 in
.datums
[in
.nTuples
] =
817 isNulls
? (Datum
) 0 : SGLTDATUM(newLeafTuple
, state
);
818 oldLeafs
[in
.nTuples
] = newLeafTuple
;
821 memset(&out
, 0, sizeof(out
));
826 * Perform split using user-defined method.
828 procinfo
= index_getprocinfo(index
, 1, SPGIST_PICKSPLIT_PROC
);
829 FunctionCall2Coll(procinfo
,
830 index
->rd_indcollation
[0],
831 PointerGetDatum(&in
),
832 PointerGetDatum(&out
));
835 * Form new leaf tuples and count up the total space needed.
838 for (i
= 0; i
< in
.nTuples
; i
++)
840 if (state
->leafTupDesc
->natts
> 1)
841 spgDeformLeafTuple(oldLeafs
[i
],
847 leafDatums
[spgKeyColumn
] = out
.leafTupleDatums
[i
];
848 leafIsnulls
[spgKeyColumn
] = false;
850 newLeafs
[i
] = spgFormLeafTuple(state
, &oldLeafs
[i
]->heapPtr
,
853 totalLeafSizes
+= newLeafs
[i
]->size
+ sizeof(ItemIdData
);
859 * Perform dummy split that puts all tuples into one node.
860 * checkAllTheSame will override this and force allTheSame mode.
862 out
.hasPrefix
= false;
864 out
.nodeLabels
= NULL
;
865 out
.mapTuplesToNodes
= palloc0(sizeof(int) * in
.nTuples
);
868 * Form new leaf tuples and count up the total space needed.
871 for (i
= 0; i
< in
.nTuples
; i
++)
873 if (state
->leafTupDesc
->natts
> 1)
874 spgDeformLeafTuple(oldLeafs
[i
],
881 * Nulls tree can contain only null key values.
883 leafDatums
[spgKeyColumn
] = (Datum
) 0;
884 leafIsnulls
[spgKeyColumn
] = true;
886 newLeafs
[i
] = spgFormLeafTuple(state
, &oldLeafs
[i
]->heapPtr
,
889 totalLeafSizes
+= newLeafs
[i
]->size
+ sizeof(ItemIdData
);
894 * Check to see if the picksplit function failed to separate the values,
895 * ie, it put them all into the same child node. If so, select allTheSame
896 * mode and create a random split instead. See comments for
897 * checkAllTheSame as to why we need to know if the new leaf tuples could
900 allTheSame
= checkAllTheSame(&in
, &out
,
901 totalLeafSizes
> SPGIST_PAGE_CAPACITY
,
905 * If checkAllTheSame decided we must exclude the new tuple, don't
906 * consider it any further.
909 maxToInclude
= in
.nTuples
;
912 maxToInclude
= in
.nTuples
- 1;
913 totalLeafSizes
-= newLeafs
[in
.nTuples
- 1]->size
+ sizeof(ItemIdData
);
917 * Allocate per-node work arrays. Since checkAllTheSame could replace
918 * out.nNodes with a value larger than the number of tuples on the input
919 * page, we can't allocate these arrays before here.
921 nodes
= (SpGistNodeTuple
*) palloc(sizeof(SpGistNodeTuple
) * out
.nNodes
);
922 leafSizes
= (int *) palloc0(sizeof(int) * out
.nNodes
);
925 * Form nodes of inner tuple and inner tuple itself
927 for (i
= 0; i
< out
.nNodes
; i
++)
929 Datum label
= (Datum
) 0;
930 bool labelisnull
= (out
.nodeLabels
== NULL
);
933 label
= out
.nodeLabels
[i
];
934 nodes
[i
] = spgFormNodeTuple(state
, label
, labelisnull
);
936 innerTuple
= spgFormInnerTuple(state
,
937 out
.hasPrefix
, out
.prefixDatum
,
939 innerTuple
->allTheSame
= allTheSame
;
942 * Update nodes[] array to point into the newly formed innerTuple, so that
943 * we can adjust their downlinks below.
945 SGITITERATE(innerTuple
, i
, node
)
951 * Re-scan new leaf tuples and count up the space needed under each node.
953 for (i
= 0; i
< maxToInclude
; i
++)
955 n
= out
.mapTuplesToNodes
[i
];
956 if (n
< 0 || n
>= out
.nNodes
)
957 elog(ERROR
, "inconsistent result of SPGiST picksplit function");
958 leafSizes
[n
] += newLeafs
[i
]->size
+ sizeof(ItemIdData
);
962 * To perform the split, we must insert a new inner tuple, which can't go
963 * on a leaf page; and unless we are splitting the root page, we must then
964 * update the parent tuple's downlink to point to the inner tuple. If
965 * there is room, we'll put the new inner tuple on the same page as the
966 * parent tuple, otherwise we need another non-leaf buffer. But if the
967 * parent page is the root, we can't add the new inner tuple there,
968 * because the root page must have only one inner tuple.
970 xlrec
.initInner
= false;
971 if (parent
->buffer
!= InvalidBuffer
&&
972 !SpGistBlockIsRoot(parent
->blkno
) &&
973 (SpGistPageGetFreeSpace(parent
->page
, 1) >=
974 innerTuple
->size
+ sizeof(ItemIdData
)))
976 /* New inner tuple will fit on parent page */
977 newInnerBuffer
= parent
->buffer
;
979 else if (parent
->buffer
!= InvalidBuffer
)
981 /* Send tuple to page with next triple parity (see README) */
982 newInnerBuffer
= SpGistGetBuffer(index
,
983 GBUF_INNER_PARITY(parent
->blkno
+ 1) |
984 (isNulls
? GBUF_NULLS
: 0),
985 innerTuple
->size
+ sizeof(ItemIdData
),
990 /* Root page split ... inner tuple will go to root page */
991 newInnerBuffer
= InvalidBuffer
;
995 * The new leaf tuples converted from the existing ones should require the
996 * same or less space, and therefore should all fit onto one page
997 * (although that's not necessarily the current page, since we can't
998 * delete the old tuples but only replace them with placeholders).
999 * However, the incoming new tuple might not also fit, in which case we
1000 * might need another picksplit cycle to reduce it some more.
1002 * If there's not room to put everything back onto the current page, then
1003 * we decide on a per-node basis which tuples go to the new page. (We do
1004 * it like that because leaf tuple chains can't cross pages, so we must
1005 * place all leaf tuples belonging to the same parent node on the same
1008 * If we are splitting the root page (turning it from a leaf page into an
1009 * inner page), then no leaf tuples can go back to the current page; they
1010 * must all go somewhere else.
1012 if (!SpGistBlockIsRoot(current
->blkno
))
1013 currentFreeSpace
= PageGetExactFreeSpace(current
->page
) + spaceToDelete
;
1015 currentFreeSpace
= 0; /* prevent assigning any tuples to current */
1017 xlrec
.initDest
= false;
1019 if (totalLeafSizes
<= currentFreeSpace
)
1021 /* All the leaf tuples will fit on current page */
1022 newLeafBuffer
= InvalidBuffer
;
1023 /* mark new leaf tuple as included in insertions, if allowed */
1029 for (i
= 0; i
< nToInsert
; i
++)
1030 leafPageSelect
[i
] = 0; /* signifies current page */
1032 else if (in
.nTuples
== 1 && totalLeafSizes
> SPGIST_PAGE_CAPACITY
)
1035 * We're trying to split up a long value by repeated suffixing, but
1036 * it's not going to fit yet. Don't bother allocating a second leaf
1037 * buffer that we won't be able to use.
1039 newLeafBuffer
= InvalidBuffer
;
1041 Assert(nToInsert
== 0);
1045 /* We will need another leaf page */
1046 uint8
*nodePageSelect
;
1050 newLeafBuffer
= SpGistGetBuffer(index
,
1051 GBUF_LEAF
| (isNulls
? GBUF_NULLS
: 0),
1053 SPGIST_PAGE_CAPACITY
),
1057 * Attempt to assign node groups to the two pages. We might fail to
1058 * do so, even if totalLeafSizes is less than the available space,
1059 * because we can't split a group across pages.
1061 nodePageSelect
= (uint8
*) palloc(sizeof(uint8
) * out
.nNodes
);
1063 curspace
= currentFreeSpace
;
1064 newspace
= PageGetExactFreeSpace(BufferGetPage(newLeafBuffer
));
1065 for (i
= 0; i
< out
.nNodes
; i
++)
1067 if (leafSizes
[i
] <= curspace
)
1069 nodePageSelect
[i
] = 0; /* signifies current page */
1070 curspace
-= leafSizes
[i
];
1074 nodePageSelect
[i
] = 1; /* signifies new leaf page */
1075 newspace
-= leafSizes
[i
];
1078 if (curspace
>= 0 && newspace
>= 0)
1080 /* Successful assignment, so we can include the new leaf tuple */
1087 else if (includeNew
)
1089 /* We must exclude the new leaf tuple from the split */
1090 int nodeOfNewTuple
= out
.mapTuplesToNodes
[in
.nTuples
- 1];
1092 leafSizes
[nodeOfNewTuple
] -=
1093 newLeafs
[in
.nTuples
- 1]->size
+ sizeof(ItemIdData
);
1095 /* Repeat the node assignment process --- should succeed now */
1096 curspace
= currentFreeSpace
;
1097 newspace
= PageGetExactFreeSpace(BufferGetPage(newLeafBuffer
));
1098 for (i
= 0; i
< out
.nNodes
; i
++)
1100 if (leafSizes
[i
] <= curspace
)
1102 nodePageSelect
[i
] = 0; /* signifies current page */
1103 curspace
-= leafSizes
[i
];
1107 nodePageSelect
[i
] = 1; /* signifies new leaf page */
1108 newspace
-= leafSizes
[i
];
1111 if (curspace
< 0 || newspace
< 0)
1112 elog(ERROR
, "failed to divide leaf tuple groups across pages");
1116 /* oops, we already excluded new tuple ... should not get here */
1117 elog(ERROR
, "failed to divide leaf tuple groups across pages");
1119 /* Expand the per-node assignments to be shown per leaf tuple */
1120 for (i
= 0; i
< nToInsert
; i
++)
1122 n
= out
.mapTuplesToNodes
[i
];
1123 leafPageSelect
[i
] = nodePageSelect
[n
];
1127 /* Start preparing WAL record */
1129 xlrec
.initSrc
= isNew
;
1130 xlrec
.storesNulls
= isNulls
;
1131 xlrec
.isRootSplit
= SpGistBlockIsRoot(current
->blkno
);
1133 leafdata
= leafptr
= (char *) palloc(totalLeafSizes
);
1135 /* Here we begin making the changes to the target pages */
1136 START_CRIT_SECTION();
1139 * Delete old leaf tuples from current buffer, except when we're splitting
1140 * the root; in that case there's no need because we'll re-init the page
1141 * below. We do this first to make room for reinserting new leaf tuples.
1143 if (!SpGistBlockIsRoot(current
->blkno
))
1146 * Init buffer instead of deleting individual tuples, but only if
1147 * there aren't any other live tuples and only during build; otherwise
1148 * we need to set a redirection tuple for concurrent scans.
1150 if (state
->isBuild
&&
1151 nToDelete
+ SpGistPageGetOpaque(current
->page
)->nPlaceholder
==
1152 PageGetMaxOffsetNumber(current
->page
))
1154 SpGistInitBuffer(current
->buffer
,
1155 SPGIST_LEAF
| (isNulls
? SPGIST_NULLS
: 0));
1156 xlrec
.initSrc
= true;
1160 /* don't expose the freshly init'd buffer as a backup block */
1161 Assert(nToDelete
== 0);
1165 xlrec
.nDelete
= nToDelete
;
1167 if (!state
->isBuild
)
1170 * Need to create redirect tuple (it will point to new inner
1171 * tuple) but right now the new tuple's location is not known
1172 * yet. So, set the redirection pointer to "impossible" value
1173 * and remember its position to update tuple later.
1176 redirectTuplePos
= toDelete
[0];
1177 spgPageIndexMultiDelete(state
, current
->page
,
1178 toDelete
, nToDelete
,
1181 SPGIST_METAPAGE_BLKNO
,
1187 * During index build there is not concurrent searches, so we
1188 * don't need to create redirection tuple.
1190 spgPageIndexMultiDelete(state
, current
->page
,
1191 toDelete
, nToDelete
,
1195 InvalidOffsetNumber
);
1201 * Put leaf tuples on proper pages, and update downlinks in innerTuple's
1204 startOffsets
[0] = startOffsets
[1] = InvalidOffsetNumber
;
1205 for (i
= 0; i
< nToInsert
; i
++)
1207 SpGistLeafTuple it
= newLeafs
[i
];
1209 BlockNumber leafBlock
;
1210 OffsetNumber newoffset
;
1212 /* Which page is it going to? */
1213 leafBuffer
= leafPageSelect
[i
] ? newLeafBuffer
: current
->buffer
;
1214 leafBlock
= BufferGetBlockNumber(leafBuffer
);
1216 /* Link tuple into correct chain for its node */
1217 n
= out
.mapTuplesToNodes
[i
];
1219 if (ItemPointerIsValid(&nodes
[n
]->t_tid
))
1221 Assert(ItemPointerGetBlockNumber(&nodes
[n
]->t_tid
) == leafBlock
);
1222 SGLT_SET_NEXTOFFSET(it
, ItemPointerGetOffsetNumber(&nodes
[n
]->t_tid
));
1225 SGLT_SET_NEXTOFFSET(it
, InvalidOffsetNumber
);
1227 /* Insert it on page */
1228 newoffset
= SpGistPageAddNewItem(state
, BufferGetPage(leafBuffer
),
1229 (Item
) it
, it
->size
,
1230 &startOffsets
[leafPageSelect
[i
]],
1232 toInsert
[i
] = newoffset
;
1234 /* ... and complete the chain linking */
1235 ItemPointerSet(&nodes
[n
]->t_tid
, leafBlock
, newoffset
);
1237 /* Also copy leaf tuple into WAL data */
1238 memcpy(leafptr
, newLeafs
[i
], newLeafs
[i
]->size
);
1239 leafptr
+= newLeafs
[i
]->size
;
1243 * We're done modifying the other leaf buffer (if any), so mark it dirty.
1244 * current->buffer will be marked below, after we're entirely done
1247 if (newLeafBuffer
!= InvalidBuffer
)
1249 MarkBufferDirty(newLeafBuffer
);
1252 /* Remember current buffer, since we're about to change "current" */
1253 saveCurrent
= *current
;
1256 * Store the new innerTuple
1258 if (newInnerBuffer
== parent
->buffer
&& newInnerBuffer
!= InvalidBuffer
)
1261 * new inner tuple goes to parent page
1263 Assert(current
->buffer
!= parent
->buffer
);
1265 /* Repoint "current" at the new inner tuple */
1266 current
->blkno
= parent
->blkno
;
1267 current
->buffer
= parent
->buffer
;
1268 current
->page
= parent
->page
;
1269 xlrec
.offnumInner
= current
->offnum
=
1270 SpGistPageAddNewItem(state
, current
->page
,
1271 (Item
) innerTuple
, innerTuple
->size
,
1275 * Update parent node link and mark parent page dirty
1277 xlrec
.innerIsParent
= true;
1278 xlrec
.offnumParent
= parent
->offnum
;
1279 xlrec
.nodeI
= parent
->node
;
1280 saveNodeLink(index
, parent
, current
->blkno
, current
->offnum
);
1283 * Update redirection link (in old current buffer)
1285 if (redirectTuplePos
!= InvalidOffsetNumber
)
1286 setRedirectionTuple(&saveCurrent
, redirectTuplePos
,
1287 current
->blkno
, current
->offnum
);
1289 /* Done modifying old current buffer, mark it dirty */
1290 MarkBufferDirty(saveCurrent
.buffer
);
1292 else if (parent
->buffer
!= InvalidBuffer
)
1295 * new inner tuple will be stored on a new page
1297 Assert(newInnerBuffer
!= InvalidBuffer
);
1299 /* Repoint "current" at the new inner tuple */
1300 current
->buffer
= newInnerBuffer
;
1301 current
->blkno
= BufferGetBlockNumber(current
->buffer
);
1302 current
->page
= BufferGetPage(current
->buffer
);
1303 xlrec
.offnumInner
= current
->offnum
=
1304 SpGistPageAddNewItem(state
, current
->page
,
1305 (Item
) innerTuple
, innerTuple
->size
,
1308 /* Done modifying new current buffer, mark it dirty */
1309 MarkBufferDirty(current
->buffer
);
1312 * Update parent node link and mark parent page dirty
1314 xlrec
.innerIsParent
= (parent
->buffer
== current
->buffer
);
1315 xlrec
.offnumParent
= parent
->offnum
;
1316 xlrec
.nodeI
= parent
->node
;
1317 saveNodeLink(index
, parent
, current
->blkno
, current
->offnum
);
1320 * Update redirection link (in old current buffer)
1322 if (redirectTuplePos
!= InvalidOffsetNumber
)
1323 setRedirectionTuple(&saveCurrent
, redirectTuplePos
,
1324 current
->blkno
, current
->offnum
);
1326 /* Done modifying old current buffer, mark it dirty */
1327 MarkBufferDirty(saveCurrent
.buffer
);
1332 * Splitting root page, which was a leaf but now becomes inner page
1333 * (and so "current" continues to point at it)
1335 Assert(SpGistBlockIsRoot(current
->blkno
));
1336 Assert(redirectTuplePos
== InvalidOffsetNumber
);
1338 SpGistInitBuffer(current
->buffer
, (isNulls
? SPGIST_NULLS
: 0));
1339 xlrec
.initInner
= true;
1340 xlrec
.innerIsParent
= false;
1342 xlrec
.offnumInner
= current
->offnum
=
1343 PageAddItem(current
->page
, (Item
) innerTuple
, innerTuple
->size
,
1344 InvalidOffsetNumber
, false, false);
1345 if (current
->offnum
!= FirstOffsetNumber
)
1346 elog(ERROR
, "failed to add item of size %u to SPGiST index page",
1349 /* No parent link to update, nor redirection to do */
1350 xlrec
.offnumParent
= InvalidOffsetNumber
;
1353 /* Done modifying new current buffer, mark it dirty */
1354 MarkBufferDirty(current
->buffer
);
1356 /* saveCurrent doesn't represent a different buffer */
1357 saveCurrent
.buffer
= InvalidBuffer
;
1360 if (RelationNeedsWAL(index
) && !state
->isBuild
)
1367 xlrec
.nInsert
= nToInsert
;
1368 XLogRegisterData((char *) &xlrec
, SizeOfSpgxlogPickSplit
);
1370 XLogRegisterData((char *) toDelete
,
1371 sizeof(OffsetNumber
) * xlrec
.nDelete
);
1372 XLogRegisterData((char *) toInsert
,
1373 sizeof(OffsetNumber
) * xlrec
.nInsert
);
1374 XLogRegisterData((char *) leafPageSelect
,
1375 sizeof(uint8
) * xlrec
.nInsert
);
1376 XLogRegisterData((char *) innerTuple
, innerTuple
->size
);
1377 XLogRegisterData(leafdata
, leafptr
- leafdata
);
1380 if (BufferIsValid(saveCurrent
.buffer
))
1382 flags
= REGBUF_STANDARD
;
1384 flags
|= REGBUF_WILL_INIT
;
1385 XLogRegisterBuffer(0, saveCurrent
.buffer
, flags
);
1389 if (BufferIsValid(newLeafBuffer
))
1391 flags
= REGBUF_STANDARD
;
1393 flags
|= REGBUF_WILL_INIT
;
1394 XLogRegisterBuffer(1, newLeafBuffer
, flags
);
1398 flags
= REGBUF_STANDARD
;
1399 if (xlrec
.initInner
)
1400 flags
|= REGBUF_WILL_INIT
;
1401 XLogRegisterBuffer(2, current
->buffer
, flags
);
1403 /* Parent page, if different from inner page */
1404 if (parent
->buffer
!= InvalidBuffer
)
1406 if (parent
->buffer
!= current
->buffer
)
1407 XLogRegisterBuffer(3, parent
->buffer
, REGBUF_STANDARD
);
1409 Assert(xlrec
.innerIsParent
);
1412 /* Issue the WAL record */
1413 recptr
= XLogInsert(RM_SPGIST_ID
, XLOG_SPGIST_PICKSPLIT
);
1415 /* Update page LSNs on all affected pages */
1416 if (newLeafBuffer
!= InvalidBuffer
)
1418 Page page
= BufferGetPage(newLeafBuffer
);
1420 PageSetLSN(page
, recptr
);
1423 if (saveCurrent
.buffer
!= InvalidBuffer
)
1425 Page page
= BufferGetPage(saveCurrent
.buffer
);
1427 PageSetLSN(page
, recptr
);
1430 PageSetLSN(current
->page
, recptr
);
1432 if (parent
->buffer
!= InvalidBuffer
)
1434 PageSetLSN(parent
->page
, recptr
);
1440 /* Update local free-space cache and unlock buffers */
1441 if (newLeafBuffer
!= InvalidBuffer
)
1443 SpGistSetLastUsedPage(index
, newLeafBuffer
);
1444 UnlockReleaseBuffer(newLeafBuffer
);
1446 if (saveCurrent
.buffer
!= InvalidBuffer
)
1448 SpGistSetLastUsedPage(index
, saveCurrent
.buffer
);
1449 UnlockReleaseBuffer(saveCurrent
.buffer
);
1456 * spgMatchNode action: descend to N'th child node of current inner tuple
1459 spgMatchNodeAction(Relation index
, SpGistState
*state
,
1460 SpGistInnerTuple innerTuple
,
1461 SPPageDesc
*current
, SPPageDesc
*parent
, int nodeN
)
1464 SpGistNodeTuple node
;
1466 /* Release previous parent buffer if any */
1467 if (parent
->buffer
!= InvalidBuffer
&&
1468 parent
->buffer
!= current
->buffer
)
1470 SpGistSetLastUsedPage(index
, parent
->buffer
);
1471 UnlockReleaseBuffer(parent
->buffer
);
1474 /* Repoint parent to specified node of current inner tuple */
1475 parent
->blkno
= current
->blkno
;
1476 parent
->buffer
= current
->buffer
;
1477 parent
->page
= current
->page
;
1478 parent
->offnum
= current
->offnum
;
1479 parent
->node
= nodeN
;
1481 /* Locate that node */
1482 SGITITERATE(innerTuple
, i
, node
)
1489 elog(ERROR
, "failed to find requested node %d in SPGiST inner tuple",
1492 /* Point current to the downlink location, if any */
1493 if (ItemPointerIsValid(&node
->t_tid
))
1495 current
->blkno
= ItemPointerGetBlockNumber(&node
->t_tid
);
1496 current
->offnum
= ItemPointerGetOffsetNumber(&node
->t_tid
);
1500 /* Downlink is empty, so we'll need to find a new page */
1501 current
->blkno
= InvalidBlockNumber
;
1502 current
->offnum
= InvalidOffsetNumber
;
1505 current
->buffer
= InvalidBuffer
;
1506 current
->page
= NULL
;
1510 * spgAddNode action: add a node to the inner tuple at current
1513 spgAddNodeAction(Relation index
, SpGistState
*state
,
1514 SpGistInnerTuple innerTuple
,
1515 SPPageDesc
*current
, SPPageDesc
*parent
,
1516 int nodeN
, Datum nodeLabel
)
1518 SpGistInnerTuple newInnerTuple
;
1519 spgxlogAddNode xlrec
;
1521 /* Should not be applied to nulls */
1522 Assert(!SpGistPageStoresNulls(current
->page
));
1524 /* Construct new inner tuple with additional node */
1525 newInnerTuple
= addNode(state
, innerTuple
, nodeLabel
, nodeN
);
1527 /* Prepare WAL record */
1528 STORE_STATE(state
, xlrec
.stateSrc
);
1529 xlrec
.offnum
= current
->offnum
;
1531 /* we don't fill these unless we need to change the parent downlink */
1532 xlrec
.parentBlk
= -1;
1533 xlrec
.offnumParent
= InvalidOffsetNumber
;
1536 /* we don't fill these unless tuple has to be moved */
1537 xlrec
.offnumNew
= InvalidOffsetNumber
;
1538 xlrec
.newPage
= false;
1540 if (PageGetExactFreeSpace(current
->page
) >=
1541 newInnerTuple
->size
- innerTuple
->size
)
1544 * We can replace the inner tuple by new version in-place
1546 START_CRIT_SECTION();
1548 PageIndexTupleDelete(current
->page
, current
->offnum
);
1549 if (PageAddItem(current
->page
,
1550 (Item
) newInnerTuple
, newInnerTuple
->size
,
1551 current
->offnum
, false, false) != current
->offnum
)
1552 elog(ERROR
, "failed to add item of size %u to SPGiST index page",
1553 newInnerTuple
->size
);
1555 MarkBufferDirty(current
->buffer
);
1557 if (RelationNeedsWAL(index
) && !state
->isBuild
)
1562 XLogRegisterData((char *) &xlrec
, sizeof(xlrec
));
1563 XLogRegisterData((char *) newInnerTuple
, newInnerTuple
->size
);
1565 XLogRegisterBuffer(0, current
->buffer
, REGBUF_STANDARD
);
1567 recptr
= XLogInsert(RM_SPGIST_ID
, XLOG_SPGIST_ADD_NODE
);
1569 PageSetLSN(current
->page
, recptr
);
1577 * move inner tuple to another page, and update parent
1580 SPPageDesc saveCurrent
;
1583 * It should not be possible to get here for the root page, since we
1584 * allow only one inner tuple on the root page, and spgFormInnerTuple
1585 * always checks that inner tuples don't exceed the size of a page.
1587 if (SpGistBlockIsRoot(current
->blkno
))
1588 elog(ERROR
, "cannot enlarge root tuple any more");
1589 Assert(parent
->buffer
!= InvalidBuffer
);
1591 saveCurrent
= *current
;
1593 xlrec
.offnumParent
= parent
->offnum
;
1594 xlrec
.nodeI
= parent
->node
;
1597 * obtain new buffer with the same parity as current, since it will be
1598 * a child of same parent tuple
1600 current
->buffer
= SpGistGetBuffer(index
,
1601 GBUF_INNER_PARITY(current
->blkno
),
1602 newInnerTuple
->size
+ sizeof(ItemIdData
),
1604 current
->blkno
= BufferGetBlockNumber(current
->buffer
);
1605 current
->page
= BufferGetPage(current
->buffer
);
1608 * Let's just make real sure new current isn't same as old. Right now
1609 * that's impossible, but if SpGistGetBuffer ever got smart enough to
1610 * delete placeholder tuples before checking space, maybe it wouldn't
1611 * be impossible. The case would appear to work except that WAL
1612 * replay would be subtly wrong, so I think a mere assert isn't enough
1615 if (current
->blkno
== saveCurrent
.blkno
)
1616 elog(ERROR
, "SPGiST new buffer shouldn't be same as old buffer");
1619 * New current and parent buffer will both be modified; but note that
1620 * parent buffer could be same as either new or old current.
1622 if (parent
->buffer
== saveCurrent
.buffer
)
1623 xlrec
.parentBlk
= 0;
1624 else if (parent
->buffer
== current
->buffer
)
1625 xlrec
.parentBlk
= 1;
1627 xlrec
.parentBlk
= 2;
1629 START_CRIT_SECTION();
1631 /* insert new ... */
1632 xlrec
.offnumNew
= current
->offnum
=
1633 SpGistPageAddNewItem(state
, current
->page
,
1634 (Item
) newInnerTuple
, newInnerTuple
->size
,
1637 MarkBufferDirty(current
->buffer
);
1639 /* update parent's downlink and mark parent page dirty */
1640 saveNodeLink(index
, parent
, current
->blkno
, current
->offnum
);
1643 * Replace old tuple with a placeholder or redirection tuple. Unless
1644 * doing an index build, we have to insert a redirection tuple for
1645 * possible concurrent scans. We can't just delete it in any case,
1646 * because that could change the offsets of other tuples on the page,
1647 * breaking downlinks from their parents.
1650 dt
= spgFormDeadTuple(state
, SPGIST_PLACEHOLDER
,
1651 InvalidBlockNumber
, InvalidOffsetNumber
);
1653 dt
= spgFormDeadTuple(state
, SPGIST_REDIRECT
,
1654 current
->blkno
, current
->offnum
);
1656 PageIndexTupleDelete(saveCurrent
.page
, saveCurrent
.offnum
);
1657 if (PageAddItem(saveCurrent
.page
, (Item
) dt
, dt
->size
,
1659 false, false) != saveCurrent
.offnum
)
1660 elog(ERROR
, "failed to add item of size %u to SPGiST index page",
1664 SpGistPageGetOpaque(saveCurrent
.page
)->nPlaceholder
++;
1666 SpGistPageGetOpaque(saveCurrent
.page
)->nRedirection
++;
1668 MarkBufferDirty(saveCurrent
.buffer
);
1670 if (RelationNeedsWAL(index
) && !state
->isBuild
)
1678 XLogRegisterBuffer(0, saveCurrent
.buffer
, REGBUF_STANDARD
);
1680 flags
= REGBUF_STANDARD
;
1682 flags
|= REGBUF_WILL_INIT
;
1683 XLogRegisterBuffer(1, current
->buffer
, flags
);
1684 /* parent page (if different from orig and new) */
1685 if (xlrec
.parentBlk
== 2)
1686 XLogRegisterBuffer(2, parent
->buffer
, REGBUF_STANDARD
);
1688 XLogRegisterData((char *) &xlrec
, sizeof(xlrec
));
1689 XLogRegisterData((char *) newInnerTuple
, newInnerTuple
->size
);
1691 recptr
= XLogInsert(RM_SPGIST_ID
, XLOG_SPGIST_ADD_NODE
);
1693 /* we don't bother to check if any of these are redundant */
1694 PageSetLSN(current
->page
, recptr
);
1695 PageSetLSN(parent
->page
, recptr
);
1696 PageSetLSN(saveCurrent
.page
, recptr
);
1701 /* Release saveCurrent if it's not same as current or parent */
1702 if (saveCurrent
.buffer
!= current
->buffer
&&
1703 saveCurrent
.buffer
!= parent
->buffer
)
1705 SpGistSetLastUsedPage(index
, saveCurrent
.buffer
);
1706 UnlockReleaseBuffer(saveCurrent
.buffer
);
1712 * spgSplitNode action: split inner tuple at current into prefix and postfix
1715 spgSplitNodeAction(Relation index
, SpGistState
*state
,
1716 SpGistInnerTuple innerTuple
,
1717 SPPageDesc
*current
, spgChooseOut
*out
)
1719 SpGistInnerTuple prefixTuple
,
1721 SpGistNodeTuple node
,
1723 BlockNumber postfixBlkno
;
1724 OffsetNumber postfixOffset
;
1726 spgxlogSplitTuple xlrec
;
1727 Buffer newBuffer
= InvalidBuffer
;
1729 /* Should not be applied to nulls */
1730 Assert(!SpGistPageStoresNulls(current
->page
));
1732 /* Check opclass gave us sane values */
1733 if (out
->result
.splitTuple
.prefixNNodes
<= 0 ||
1734 out
->result
.splitTuple
.prefixNNodes
> SGITMAXNNODES
)
1735 elog(ERROR
, "invalid number of prefix nodes: %d",
1736 out
->result
.splitTuple
.prefixNNodes
);
1737 if (out
->result
.splitTuple
.childNodeN
< 0 ||
1738 out
->result
.splitTuple
.childNodeN
>=
1739 out
->result
.splitTuple
.prefixNNodes
)
1740 elog(ERROR
, "invalid child node number: %d",
1741 out
->result
.splitTuple
.childNodeN
);
1744 * Construct new prefix tuple with requested number of nodes. We'll fill
1745 * in the childNodeN'th node's downlink below.
1747 nodes
= (SpGistNodeTuple
*) palloc(sizeof(SpGistNodeTuple
) *
1748 out
->result
.splitTuple
.prefixNNodes
);
1750 for (i
= 0; i
< out
->result
.splitTuple
.prefixNNodes
; i
++)
1752 Datum label
= (Datum
) 0;
1755 labelisnull
= (out
->result
.splitTuple
.prefixNodeLabels
== NULL
);
1757 label
= out
->result
.splitTuple
.prefixNodeLabels
[i
];
1758 nodes
[i
] = spgFormNodeTuple(state
, label
, labelisnull
);
1761 prefixTuple
= spgFormInnerTuple(state
,
1762 out
->result
.splitTuple
.prefixHasPrefix
,
1763 out
->result
.splitTuple
.prefixPrefixDatum
,
1764 out
->result
.splitTuple
.prefixNNodes
,
1767 /* it must fit in the space that innerTuple now occupies */
1768 if (prefixTuple
->size
> innerTuple
->size
)
1769 elog(ERROR
, "SPGiST inner-tuple split must not produce longer prefix");
1772 * Construct new postfix tuple, containing all nodes of innerTuple with
1773 * same node datums, but with the prefix specified by the picksplit
1776 nodes
= palloc(sizeof(SpGistNodeTuple
) * innerTuple
->nNodes
);
1777 SGITITERATE(innerTuple
, i
, node
)
1782 postfixTuple
= spgFormInnerTuple(state
,
1783 out
->result
.splitTuple
.postfixHasPrefix
,
1784 out
->result
.splitTuple
.postfixPrefixDatum
,
1785 innerTuple
->nNodes
, nodes
);
1787 /* Postfix tuple is allTheSame if original tuple was */
1788 postfixTuple
->allTheSame
= innerTuple
->allTheSame
;
1790 /* prep data for WAL record */
1791 xlrec
.newPage
= false;
1794 * If we can't fit both tuples on the current page, get a new page for the
1795 * postfix tuple. In particular, can't split to the root page.
1797 * For the space calculation, note that prefixTuple replaces innerTuple
1798 * but postfixTuple will be a new entry.
1800 if (SpGistBlockIsRoot(current
->blkno
) ||
1801 SpGistPageGetFreeSpace(current
->page
, 1) + innerTuple
->size
<
1802 prefixTuple
->size
+ postfixTuple
->size
+ sizeof(ItemIdData
))
1805 * Choose page with next triple parity, because postfix tuple is a
1806 * child of prefix one
1808 newBuffer
= SpGistGetBuffer(index
,
1809 GBUF_INNER_PARITY(current
->blkno
+ 1),
1810 postfixTuple
->size
+ sizeof(ItemIdData
),
1814 START_CRIT_SECTION();
1817 * Replace old tuple by prefix tuple
1819 PageIndexTupleDelete(current
->page
, current
->offnum
);
1820 xlrec
.offnumPrefix
= PageAddItem(current
->page
,
1821 (Item
) prefixTuple
, prefixTuple
->size
,
1822 current
->offnum
, false, false);
1823 if (xlrec
.offnumPrefix
!= current
->offnum
)
1824 elog(ERROR
, "failed to add item of size %u to SPGiST index page",
1828 * put postfix tuple into appropriate page
1830 if (newBuffer
== InvalidBuffer
)
1832 postfixBlkno
= current
->blkno
;
1833 xlrec
.offnumPostfix
= postfixOffset
=
1834 SpGistPageAddNewItem(state
, current
->page
,
1835 (Item
) postfixTuple
, postfixTuple
->size
,
1837 xlrec
.postfixBlkSame
= true;
1841 postfixBlkno
= BufferGetBlockNumber(newBuffer
);
1842 xlrec
.offnumPostfix
= postfixOffset
=
1843 SpGistPageAddNewItem(state
, BufferGetPage(newBuffer
),
1844 (Item
) postfixTuple
, postfixTuple
->size
,
1846 MarkBufferDirty(newBuffer
);
1847 xlrec
.postfixBlkSame
= false;
1851 * And set downlink pointer in the prefix tuple to point to postfix tuple.
1852 * (We can't avoid this step by doing the above two steps in opposite
1853 * order, because there might not be enough space on the page to insert
1854 * the postfix tuple first.) We have to update the local copy of the
1855 * prefixTuple too, because that's what will be written to WAL.
1857 spgUpdateNodeLink(prefixTuple
, out
->result
.splitTuple
.childNodeN
,
1858 postfixBlkno
, postfixOffset
);
1859 prefixTuple
= (SpGistInnerTuple
) PageGetItem(current
->page
,
1860 PageGetItemId(current
->page
, current
->offnum
));
1861 spgUpdateNodeLink(prefixTuple
, out
->result
.splitTuple
.childNodeN
,
1862 postfixBlkno
, postfixOffset
);
1864 MarkBufferDirty(current
->buffer
);
1866 if (RelationNeedsWAL(index
) && !state
->isBuild
)
1871 XLogRegisterData((char *) &xlrec
, sizeof(xlrec
));
1872 XLogRegisterData((char *) prefixTuple
, prefixTuple
->size
);
1873 XLogRegisterData((char *) postfixTuple
, postfixTuple
->size
);
1875 XLogRegisterBuffer(0, current
->buffer
, REGBUF_STANDARD
);
1876 if (newBuffer
!= InvalidBuffer
)
1880 flags
= REGBUF_STANDARD
;
1882 flags
|= REGBUF_WILL_INIT
;
1883 XLogRegisterBuffer(1, newBuffer
, flags
);
1886 recptr
= XLogInsert(RM_SPGIST_ID
, XLOG_SPGIST_SPLIT_TUPLE
);
1888 PageSetLSN(current
->page
, recptr
);
1890 if (newBuffer
!= InvalidBuffer
)
1892 PageSetLSN(BufferGetPage(newBuffer
), recptr
);
1898 /* Update local free-space cache and release buffer */
1899 if (newBuffer
!= InvalidBuffer
)
1901 SpGistSetLastUsedPage(index
, newBuffer
);
1902 UnlockReleaseBuffer(newBuffer
);
1907 * Insert one item into the index.
1909 * Returns true on success, false if we failed to complete the insertion
1910 * (typically because of conflict with a concurrent insert). In the latter
1911 * case, caller should re-call spgdoinsert() with the same args.
1914 spgdoinsert(Relation index
, SpGistState
*state
,
1915 ItemPointer heapPtr
, Datum
*datums
, bool *isnulls
)
1918 TupleDesc leafDescriptor
= state
->leafTupDesc
;
1919 bool isnull
= isnulls
[spgKeyColumn
];
1921 Datum leafDatums
[INDEX_MAX_KEYS
];
1924 int numNoProgressCycles
= 0;
1927 FmgrInfo
*procinfo
= NULL
;
1930 * Look up FmgrInfo of the user-defined choose function once, to save
1931 * cycles in the loop below.
1934 procinfo
= index_getprocinfo(index
, 1, SPGIST_CHOOSE_PROC
);
1937 * Prepare the leaf datum to insert.
1939 * If an optional "compress" method is provided, then call it to form the
1940 * leaf key datum from the input datum. Otherwise, store the input datum
1941 * as is. Since we don't use index_form_tuple in this AM, we have to make
1942 * sure value to be inserted is not toasted; FormIndexDatum doesn't
1943 * guarantee that. But we assume the "compress" method to return an
1948 if (OidIsValid(index_getprocid(index
, 1, SPGIST_COMPRESS_PROC
)))
1950 FmgrInfo
*compressProcinfo
= NULL
;
1952 compressProcinfo
= index_getprocinfo(index
, 1, SPGIST_COMPRESS_PROC
);
1953 leafDatums
[spgKeyColumn
] =
1954 FunctionCall1Coll(compressProcinfo
,
1955 index
->rd_indcollation
[spgKeyColumn
],
1956 datums
[spgKeyColumn
]);
1960 Assert(state
->attLeafType
.type
== state
->attType
.type
);
1962 if (state
->attType
.attlen
== -1)
1963 leafDatums
[spgKeyColumn
] =
1964 PointerGetDatum(PG_DETOAST_DATUM(datums
[spgKeyColumn
]));
1966 leafDatums
[spgKeyColumn
] = datums
[spgKeyColumn
];
1970 leafDatums
[spgKeyColumn
] = (Datum
) 0;
1972 /* Likewise, ensure that any INCLUDE values are not toasted */
1973 for (int i
= spgFirstIncludeColumn
; i
< leafDescriptor
->natts
; i
++)
1977 if (TupleDescAttr(leafDescriptor
, i
)->attlen
== -1)
1978 leafDatums
[i
] = PointerGetDatum(PG_DETOAST_DATUM(datums
[i
]));
1980 leafDatums
[i
] = datums
[i
];
1983 leafDatums
[i
] = (Datum
) 0;
1987 * Compute space needed for a leaf tuple containing the given data.
1989 leafSize
= SpGistGetLeafTupleSize(leafDescriptor
, leafDatums
, isnulls
);
1990 /* Account for an item pointer, too */
1991 leafSize
+= sizeof(ItemIdData
);
1994 * If it isn't gonna fit, and the opclass can't reduce the datum size by
1995 * suffixing, bail out now rather than doing a lot of useless work.
1997 if (leafSize
> SPGIST_PAGE_CAPACITY
&&
1998 (isnull
|| !state
->config
.longValuesOK
))
2000 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED
),
2001 errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
2002 leafSize
- sizeof(ItemIdData
),
2003 SPGIST_PAGE_CAPACITY
- sizeof(ItemIdData
),
2004 RelationGetRelationName(index
)),
2005 errhint("Values larger than a buffer page cannot be indexed.")));
2006 bestLeafSize
= leafSize
;
2008 /* Initialize "current" to the appropriate root page */
2009 current
.blkno
= isnull
? SPGIST_NULL_BLKNO
: SPGIST_ROOT_BLKNO
;
2010 current
.buffer
= InvalidBuffer
;
2011 current
.page
= NULL
;
2012 current
.offnum
= FirstOffsetNumber
;
2015 /* "parent" is invalid for the moment */
2016 parent
.blkno
= InvalidBlockNumber
;
2017 parent
.buffer
= InvalidBuffer
;
2019 parent
.offnum
= InvalidOffsetNumber
;
2023 * Before entering the loop, try to clear any pending interrupt condition.
2024 * If a query cancel is pending, we might as well accept it now not later;
2025 * while if a non-canceling condition is pending, servicing it here avoids
2026 * having to restart the insertion and redo all the work so far.
2028 CHECK_FOR_INTERRUPTS();
2035 * Bail out if query cancel is pending. We must have this somewhere
2036 * in the loop since a broken opclass could produce an infinite
2037 * picksplit loop. However, because we'll be holding buffer lock(s)
2038 * after the first iteration, ProcessInterrupts() wouldn't be able to
2039 * throw a cancel error here. Hence, if we see that an interrupt is
2040 * pending, break out of the loop and deal with the situation below.
2041 * Set result = false because we must restart the insertion if the
2042 * interrupt isn't a query-cancel-or-die case.
2044 if (INTERRUPTS_PENDING_CONDITION())
2050 if (current
.blkno
== InvalidBlockNumber
)
2053 * Create a leaf page. If leafSize is too large to fit on a page,
2054 * we won't actually use the page yet, but it simplifies the API
2055 * for doPickSplit to always have a leaf page at hand; so just
2056 * quietly limit our request to a page size.
2059 SpGistGetBuffer(index
,
2060 GBUF_LEAF
| (isnull
? GBUF_NULLS
: 0),
2061 Min(leafSize
, SPGIST_PAGE_CAPACITY
),
2063 current
.blkno
= BufferGetBlockNumber(current
.buffer
);
2065 else if (parent
.buffer
== InvalidBuffer
)
2067 /* we hold no parent-page lock, so no deadlock is possible */
2068 current
.buffer
= ReadBuffer(index
, current
.blkno
);
2069 LockBuffer(current
.buffer
, BUFFER_LOCK_EXCLUSIVE
);
2071 else if (current
.blkno
!= parent
.blkno
)
2073 /* descend to a new child page */
2074 current
.buffer
= ReadBuffer(index
, current
.blkno
);
2077 * Attempt to acquire lock on child page. We must beware of
2078 * deadlock against another insertion process descending from that
2079 * page to our parent page (see README). If we fail to get lock,
2080 * abandon the insertion and tell our caller to start over.
2082 * XXX this could be improved, because failing to get lock on a
2083 * buffer is not proof of a deadlock situation; the lock might be
2084 * held by a reader, or even just background writer/checkpointer
2085 * process. Perhaps it'd be worth retrying after sleeping a bit?
2087 if (!ConditionalLockBuffer(current
.buffer
))
2089 ReleaseBuffer(current
.buffer
);
2090 UnlockReleaseBuffer(parent
.buffer
);
2096 /* inner tuple can be stored on the same page as parent one */
2097 current
.buffer
= parent
.buffer
;
2099 current
.page
= BufferGetPage(current
.buffer
);
2101 /* should not arrive at a page of the wrong type */
2102 if (isnull
? !SpGistPageStoresNulls(current
.page
) :
2103 SpGistPageStoresNulls(current
.page
))
2104 elog(ERROR
, "SPGiST index page %u has wrong nulls flag",
2107 if (SpGistPageIsLeaf(current
.page
))
2109 SpGistLeafTuple leafTuple
;
2113 leafTuple
= spgFormLeafTuple(state
, heapPtr
, leafDatums
, isnulls
);
2114 if (leafTuple
->size
+ sizeof(ItemIdData
) <=
2115 SpGistPageGetFreeSpace(current
.page
, 1))
2117 /* it fits on page, so insert it and we're done */
2118 addLeafTuple(index
, state
, leafTuple
,
2119 ¤t
, &parent
, isnull
, isNew
);
2122 else if ((sizeToSplit
=
2123 checkSplitConditions(index
, state
, ¤t
,
2124 &nToSplit
)) < SPGIST_PAGE_CAPACITY
/ 2 &&
2126 leafTuple
->size
+ sizeof(ItemIdData
) + sizeToSplit
<= SPGIST_PAGE_CAPACITY
)
2129 * the amount of data is pretty small, so just move the whole
2130 * chain to another leaf page rather than splitting it.
2133 moveLeafs(index
, state
, ¤t
, &parent
, leafTuple
, isnull
);
2134 break; /* we're done */
2139 if (doPickSplit(index
, state
, ¤t
, &parent
,
2140 leafTuple
, level
, isnull
, isNew
))
2141 break; /* doPickSplit installed new tuples */
2143 /* leaf tuple will not be inserted yet */
2147 * current now describes new inner tuple, go insert into it
2149 Assert(!SpGistPageIsLeaf(current
.page
));
2150 goto process_inner_tuple
;
2153 else /* non-leaf page */
2156 * Apply the opclass choose function to figure out how to insert
2157 * the given datum into the current inner tuple.
2159 SpGistInnerTuple innerTuple
;
2164 * spgAddNode and spgSplitTuple cases will loop back to here to
2165 * complete the insertion operation. Just in case the choose
2166 * function is broken and produces add or split requests
2167 * repeatedly, check for query cancel (see comments above).
2169 process_inner_tuple
:
2170 if (INTERRUPTS_PENDING_CONDITION())
2176 innerTuple
= (SpGistInnerTuple
) PageGetItem(current
.page
,
2177 PageGetItemId(current
.page
, current
.offnum
));
2179 in
.datum
= datums
[spgKeyColumn
];
2180 in
.leafDatum
= leafDatums
[spgKeyColumn
];
2182 in
.allTheSame
= innerTuple
->allTheSame
;
2183 in
.hasPrefix
= (innerTuple
->prefixSize
> 0);
2184 in
.prefixDatum
= SGITDATUM(innerTuple
, state
);
2185 in
.nNodes
= innerTuple
->nNodes
;
2186 in
.nodeLabels
= spgExtractNodeLabels(state
, innerTuple
);
2188 memset(&out
, 0, sizeof(out
));
2192 /* use user-defined choose method */
2193 FunctionCall2Coll(procinfo
,
2194 index
->rd_indcollation
[0],
2195 PointerGetDatum(&in
),
2196 PointerGetDatum(&out
));
2200 /* force "match" action (to insert to random subnode) */
2201 out
.resultType
= spgMatchNode
;
2204 if (innerTuple
->allTheSame
)
2207 * It's not allowed to do an AddNode at an allTheSame tuple.
2208 * Opclass must say "match", in which case we choose a random
2209 * one of the nodes to descend into, or "split".
2211 if (out
.resultType
== spgAddNode
)
2212 elog(ERROR
, "cannot add a node to an allTheSame inner tuple");
2213 else if (out
.resultType
== spgMatchNode
)
2214 out
.result
.matchNode
.nodeN
=
2215 pg_prng_uint64_range(&pg_global_prng_state
,
2216 0, innerTuple
->nNodes
- 1);
2219 switch (out
.resultType
)
2222 /* Descend to N'th child node */
2223 spgMatchNodeAction(index
, state
, innerTuple
,
2225 out
.result
.matchNode
.nodeN
);
2226 /* Adjust level as per opclass request */
2227 level
+= out
.result
.matchNode
.levelAdd
;
2228 /* Replace leafDatum and recompute leafSize */
2231 leafDatums
[spgKeyColumn
] = out
.result
.matchNode
.restDatum
;
2232 leafSize
= SpGistGetLeafTupleSize(leafDescriptor
,
2233 leafDatums
, isnulls
);
2234 leafSize
+= sizeof(ItemIdData
);
2238 * Check new tuple size; fail if it can't fit, unless the
2239 * opclass says it can handle the situation by suffixing.
2241 * However, the opclass can only shorten the leaf datum,
2242 * which may not be enough to ever make the tuple fit,
2243 * since INCLUDE columns might alone use more than a page.
2244 * Depending on the opclass' behavior, that could lead to
2245 * an infinite loop --- spgtextproc.c, for example, will
2246 * just repeatedly generate an empty-string leaf datum
2247 * once it runs out of data. Actual bugs in opclasses
2248 * might cause infinite looping, too. To detect such a
2249 * loop, check to see if we are making progress by
2250 * reducing the leafSize in each pass. This is a bit
2251 * tricky though. Because of alignment considerations,
2252 * the total tuple size might not decrease on every pass.
2253 * Also, there are edge cases where the choose method
2254 * might seem to not make progress for a cycle or two.
2255 * Somewhat arbitrarily, we allow up to 10 no-progress
2256 * iterations before failing. (This limit should be more
2257 * than MAXALIGN, to accommodate opclasses that trim one
2258 * byte from the leaf datum per pass.)
2260 if (leafSize
> SPGIST_PAGE_CAPACITY
)
2264 if (state
->config
.longValuesOK
&& !isnull
)
2266 if (leafSize
< bestLeafSize
)
2269 bestLeafSize
= leafSize
;
2270 numNoProgressCycles
= 0;
2272 else if (++numNoProgressCycles
< 10)
2277 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED
),
2278 errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
2279 leafSize
- sizeof(ItemIdData
),
2280 SPGIST_PAGE_CAPACITY
- sizeof(ItemIdData
),
2281 RelationGetRelationName(index
)),
2282 errhint("Values larger than a buffer page cannot be indexed.")));
2286 * Loop around and attempt to insert the new leafDatum at
2287 * "current" (which might reference an existing child
2288 * tuple, or might be invalid to force us to find a new
2289 * page for the tuple).
2293 /* AddNode is not sensible if nodes don't have labels */
2294 if (in
.nodeLabels
== NULL
)
2295 elog(ERROR
, "cannot add a node to an inner tuple without node labels");
2296 /* Add node to inner tuple, per request */
2297 spgAddNodeAction(index
, state
, innerTuple
,
2299 out
.result
.addNode
.nodeN
,
2300 out
.result
.addNode
.nodeLabel
);
2303 * Retry insertion into the enlarged node. We assume that
2304 * we'll get a MatchNode result this time.
2306 goto process_inner_tuple
;
2309 /* Split inner tuple, per request */
2310 spgSplitNodeAction(index
, state
, innerTuple
,
2313 /* Retry insertion into the split node */
2314 goto process_inner_tuple
;
2317 elog(ERROR
, "unrecognized SPGiST choose result: %d",
2318 (int) out
.resultType
);
2325 * Release any buffers we're still holding. Beware of possibility that
2326 * current and parent reference same buffer.
2328 if (current
.buffer
!= InvalidBuffer
)
2330 SpGistSetLastUsedPage(index
, current
.buffer
);
2331 UnlockReleaseBuffer(current
.buffer
);
2333 if (parent
.buffer
!= InvalidBuffer
&&
2334 parent
.buffer
!= current
.buffer
)
2336 SpGistSetLastUsedPage(index
, parent
.buffer
);
2337 UnlockReleaseBuffer(parent
.buffer
);
2341 * We do not support being called while some outer function is holding a
2342 * buffer lock (or any other reason to postpone query cancels). If that
2343 * were the case, telling the caller to retry would create an infinite
2346 Assert(INTERRUPTS_CAN_BE_PROCESSED());
2349 * Finally, check for interrupts again. If there was a query cancel,
2350 * ProcessInterrupts() will be able to throw the error here. If it was
2351 * some other kind of interrupt that can just be cleared, return false to
2352 * tell our caller to retry.
2354 CHECK_FOR_INTERRUPTS();