Query in SQL function still not schema-safe; add a couple
[PostgreSQL.git] / src / backend / access / gin / gindatapage.c
blob791b292c462f3b54cc683c399294930d1e25763b
1 /*-------------------------------------------------------------------------
3 * gindatapage.c
4 * page utilities routines for the postgres inverted index access method.
7 * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
10 * IDENTIFICATION
11 * $PostgreSQL$
12 *-------------------------------------------------------------------------
15 #include "postgres.h"
17 #include "access/gin.h"
18 #include "storage/bufmgr.h"
19 #include "utils/rel.h"
21 int
22 compareItemPointers(ItemPointer a, ItemPointer b)
24 if (GinItemPointerGetBlockNumber(a) == GinItemPointerGetBlockNumber(b))
26 if (GinItemPointerGetOffsetNumber(a) == GinItemPointerGetOffsetNumber(b))
27 return 0;
28 return (GinItemPointerGetOffsetNumber(a) > GinItemPointerGetOffsetNumber(b)) ? 1 : -1;
31 return (GinItemPointerGetBlockNumber(a) > GinItemPointerGetBlockNumber(b)) ? 1 : -1;
35 * Merge two ordered arrays of itempointers, eliminating any duplicates.
36 * Returns the number of items in the result.
37 * Caller is responsible that there is enough space at *dst.
39 uint32
40 MergeItemPointers(ItemPointerData *dst,
41 ItemPointerData *a, uint32 na,
42 ItemPointerData *b, uint32 nb)
44 ItemPointerData *dptr = dst;
45 ItemPointerData *aptr = a,
46 *bptr = b;
48 while (aptr - a < na && bptr - b < nb)
50 int cmp = compareItemPointers(aptr, bptr);
52 if (cmp > 0)
53 *dptr++ = *bptr++;
54 else if (cmp == 0)
56 /* we want only one copy of the identical items */
57 *dptr++ = *bptr++;
58 aptr++;
60 else
61 *dptr++ = *aptr++;
64 while (aptr - a < na)
65 *dptr++ = *aptr++;
67 while (bptr - b < nb)
68 *dptr++ = *bptr++;
70 return dptr - dst;
74 * Checks, should we move to right link...
75 * Compares inserting itemp pointer with right bound of current page
77 static bool
78 dataIsMoveRight(GinBtree btree, Page page)
80 ItemPointer iptr = GinDataPageGetRightBound(page);
82 if (GinPageRightMost(page))
83 return FALSE;
85 return (compareItemPointers(btree->items + btree->curitem, iptr) > 0) ? TRUE : FALSE;
89 * Find correct PostingItem in non-leaf page. It supposed that page
90 * correctly chosen and searching value SHOULD be on page
92 static BlockNumber
93 dataLocateItem(GinBtree btree, GinBtreeStack *stack)
95 OffsetNumber low,
96 high,
97 maxoff;
98 PostingItem *pitem = NULL;
99 int result;
100 Page page = BufferGetPage(stack->buffer);
102 Assert(!GinPageIsLeaf(page));
103 Assert(GinPageIsData(page));
105 if (btree->fullScan)
107 stack->off = FirstOffsetNumber;
108 stack->predictNumber *= GinPageGetOpaque(page)->maxoff;
109 return btree->getLeftMostPage(btree, page);
112 low = FirstOffsetNumber;
113 maxoff = high = GinPageGetOpaque(page)->maxoff;
114 Assert(high >= low);
116 high++;
118 while (high > low)
120 OffsetNumber mid = low + ((high - low) / 2);
122 pitem = (PostingItem *) GinDataPageGetItem(page, mid);
124 if (mid == maxoff)
127 * Right infinity, page already correctly chosen with a help of
128 * dataIsMoveRight
130 result = -1;
131 else
133 pitem = (PostingItem *) GinDataPageGetItem(page, mid);
134 result = compareItemPointers(btree->items + btree->curitem, &(pitem->key));
137 if (result == 0)
139 stack->off = mid;
140 return PostingItemGetBlockNumber(pitem);
142 else if (result > 0)
143 low = mid + 1;
144 else
145 high = mid;
148 Assert(high >= FirstOffsetNumber && high <= maxoff);
150 stack->off = high;
151 pitem = (PostingItem *) GinDataPageGetItem(page, high);
152 return PostingItemGetBlockNumber(pitem);
156 * Searches correct position for value on leaf page.
157 * Page should be correctly chosen.
158 * Returns true if value found on page.
160 static bool
161 dataLocateLeafItem(GinBtree btree, GinBtreeStack *stack)
163 Page page = BufferGetPage(stack->buffer);
164 OffsetNumber low,
165 high;
166 int result;
168 Assert(GinPageIsLeaf(page));
169 Assert(GinPageIsData(page));
171 if (btree->fullScan)
173 stack->off = FirstOffsetNumber;
174 return TRUE;
177 low = FirstOffsetNumber;
178 high = GinPageGetOpaque(page)->maxoff;
180 if (high < low)
182 stack->off = FirstOffsetNumber;
183 return false;
186 high++;
188 while (high > low)
190 OffsetNumber mid = low + ((high - low) / 2);
192 result = compareItemPointers(btree->items + btree->curitem, (ItemPointer) GinDataPageGetItem(page, mid));
194 if (result == 0)
196 stack->off = mid;
197 return true;
199 else if (result > 0)
200 low = mid + 1;
201 else
202 high = mid;
205 stack->off = high;
206 return false;
210 * Finds links to blkno on non-leaf page, returns
211 * offset of PostingItem
213 static OffsetNumber
214 dataFindChildPtr(GinBtree btree, Page page, BlockNumber blkno, OffsetNumber storedOff)
216 OffsetNumber i,
217 maxoff = GinPageGetOpaque(page)->maxoff;
218 PostingItem *pitem;
220 Assert(!GinPageIsLeaf(page));
221 Assert(GinPageIsData(page));
223 /* if page isn't changed, we returns storedOff */
224 if (storedOff >= FirstOffsetNumber && storedOff <= maxoff)
226 pitem = (PostingItem *) GinDataPageGetItem(page, storedOff);
227 if (PostingItemGetBlockNumber(pitem) == blkno)
228 return storedOff;
231 * we hope, that needed pointer goes to right. It's true if there
232 * wasn't a deletion
234 for (i = storedOff + 1; i <= maxoff; i++)
236 pitem = (PostingItem *) GinDataPageGetItem(page, i);
237 if (PostingItemGetBlockNumber(pitem) == blkno)
238 return i;
241 maxoff = storedOff - 1;
244 /* last chance */
245 for (i = FirstOffsetNumber; i <= maxoff; i++)
247 pitem = (PostingItem *) GinDataPageGetItem(page, i);
248 if (PostingItemGetBlockNumber(pitem) == blkno)
249 return i;
252 return InvalidOffsetNumber;
256 * returns blkno of leftmost child
258 static BlockNumber
259 dataGetLeftMostPage(GinBtree btree, Page page)
261 PostingItem *pitem;
263 Assert(!GinPageIsLeaf(page));
264 Assert(GinPageIsData(page));
265 Assert(GinPageGetOpaque(page)->maxoff >= FirstOffsetNumber);
267 pitem = (PostingItem *) GinDataPageGetItem(page, FirstOffsetNumber);
268 return PostingItemGetBlockNumber(pitem);
272 * add ItemPointer or PostingItem to page. data should point to
273 * correct value! depending on leaf or non-leaf page
275 void
276 GinDataPageAddItem(Page page, void *data, OffsetNumber offset)
278 OffsetNumber maxoff = GinPageGetOpaque(page)->maxoff;
279 char *ptr;
281 if (offset == InvalidOffsetNumber)
283 ptr = GinDataPageGetItem(page, maxoff + 1);
285 else
287 ptr = GinDataPageGetItem(page, offset);
288 if (maxoff + 1 - offset != 0)
289 memmove(ptr + GinSizeOfItem(page), ptr, (maxoff - offset + 1) * GinSizeOfItem(page));
291 memcpy(ptr, data, GinSizeOfItem(page));
293 GinPageGetOpaque(page)->maxoff++;
297 * Deletes posting item from non-leaf page
299 void
300 PageDeletePostingItem(Page page, OffsetNumber offset)
302 OffsetNumber maxoff = GinPageGetOpaque(page)->maxoff;
304 Assert(!GinPageIsLeaf(page));
305 Assert(offset >= FirstOffsetNumber && offset <= maxoff);
307 if (offset != maxoff)
308 memmove(GinDataPageGetItem(page, offset), GinDataPageGetItem(page, offset + 1),
309 sizeof(PostingItem) * (maxoff - offset));
311 GinPageGetOpaque(page)->maxoff--;
315 * checks space to install new value,
316 * item pointer never deletes!
318 static bool
319 dataIsEnoughSpace(GinBtree btree, Buffer buf, OffsetNumber off)
321 Page page = BufferGetPage(buf);
323 Assert(GinPageIsData(page));
324 Assert(!btree->isDelete);
326 if (GinPageIsLeaf(page))
328 if (GinPageRightMost(page) && off > GinPageGetOpaque(page)->maxoff)
330 if ((btree->nitem - btree->curitem) * sizeof(ItemPointerData) <= GinDataPageGetFreeSpace(page))
331 return true;
333 else if (sizeof(ItemPointerData) <= GinDataPageGetFreeSpace(page))
334 return true;
336 else if (sizeof(PostingItem) <= GinDataPageGetFreeSpace(page))
337 return true;
339 return false;
343 * In case of previous split update old child blkno to
344 * new right page
345 * item pointer never deletes!
347 static BlockNumber
348 dataPrepareData(GinBtree btree, Page page, OffsetNumber off)
350 BlockNumber ret = InvalidBlockNumber;
352 Assert(GinPageIsData(page));
354 if (!GinPageIsLeaf(page) && btree->rightblkno != InvalidBlockNumber)
356 PostingItem *pitem = (PostingItem *) GinDataPageGetItem(page, off);
358 PostingItemSetBlockNumber(pitem, btree->rightblkno);
359 ret = btree->rightblkno;
362 btree->rightblkno = InvalidBlockNumber;
364 return ret;
368 * Places keys to page and fills WAL record. In case leaf page and
369 * build mode puts all ItemPointers to page.
371 static void
372 dataPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off, XLogRecData **prdata)
374 Page page = BufferGetPage(buf);
375 static XLogRecData rdata[3];
376 int sizeofitem = GinSizeOfItem(page);
377 static ginxlogInsert data;
378 int cnt = 0;
380 *prdata = rdata;
381 Assert(GinPageIsData(page));
383 data.updateBlkno = dataPrepareData(btree, page, off);
385 data.node = btree->index->rd_node;
386 data.blkno = BufferGetBlockNumber(buf);
387 data.offset = off;
388 data.nitem = 1;
389 data.isDelete = FALSE;
390 data.isData = TRUE;
391 data.isLeaf = GinPageIsLeaf(page) ? TRUE : FALSE;
394 * Prevent full page write if child's split occurs. That is needed to
395 * remove incomplete splits while replaying WAL
397 * data.updateBlkno contains new block number (of newly created right
398 * page) for recently splited page.
400 if (data.updateBlkno == InvalidBlockNumber)
402 rdata[0].buffer = buf;
403 rdata[0].buffer_std = FALSE;
404 rdata[0].data = NULL;
405 rdata[0].len = 0;
406 rdata[0].next = &rdata[1];
407 cnt++;
410 rdata[cnt].buffer = InvalidBuffer;
411 rdata[cnt].data = (char *) &data;
412 rdata[cnt].len = sizeof(ginxlogInsert);
413 rdata[cnt].next = &rdata[cnt + 1];
414 cnt++;
416 rdata[cnt].buffer = InvalidBuffer;
417 rdata[cnt].data = (GinPageIsLeaf(page)) ? ((char *) (btree->items + btree->curitem)) : ((char *) &(btree->pitem));
418 rdata[cnt].len = sizeofitem;
419 rdata[cnt].next = NULL;
421 if (GinPageIsLeaf(page))
423 if (GinPageRightMost(page) && off > GinPageGetOpaque(page)->maxoff)
425 /* usually, create index... */
426 uint32 savedPos = btree->curitem;
428 while (btree->curitem < btree->nitem)
430 GinDataPageAddItem(page, btree->items + btree->curitem, off);
431 off++;
432 btree->curitem++;
434 data.nitem = btree->curitem - savedPos;
435 rdata[cnt].len = sizeofitem * data.nitem;
437 else
439 GinDataPageAddItem(page, btree->items + btree->curitem, off);
440 btree->curitem++;
443 else
444 GinDataPageAddItem(page, &(btree->pitem), off);
448 * split page and fills WAL record. original buffer(lbuf) leaves untouched,
449 * returns shadow page of lbuf filled new data. In leaf page and build mode puts all
450 * ItemPointers to pages. Also, in build mode splits data by way to full fulled
451 * left page
453 static Page
454 dataSplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogRecData **prdata)
456 static ginxlogSplit data;
457 static XLogRecData rdata[4];
458 static char vector[2 * BLCKSZ];
459 char *ptr;
460 OffsetNumber separator;
461 ItemPointer bound;
462 Page lpage = PageGetTempPageCopy(BufferGetPage(lbuf));
463 ItemPointerData oldbound = *GinDataPageGetRightBound(lpage);
464 int sizeofitem = GinSizeOfItem(lpage);
465 OffsetNumber maxoff = GinPageGetOpaque(lpage)->maxoff;
466 Page rpage = BufferGetPage(rbuf);
467 Size pageSize = PageGetPageSize(lpage);
468 Size freeSpace;
469 uint32 nCopied = 1;
471 GinInitPage(rpage, GinPageGetOpaque(lpage)->flags, pageSize);
472 freeSpace = GinDataPageGetFreeSpace(rpage);
474 *prdata = rdata;
475 data.leftChildBlkno = (GinPageIsLeaf(lpage)) ?
476 InvalidOffsetNumber : PostingItemGetBlockNumber(&(btree->pitem));
477 data.updateBlkno = dataPrepareData(btree, lpage, off);
479 memcpy(vector, GinDataPageGetItem(lpage, FirstOffsetNumber),
480 maxoff * sizeofitem);
482 if (GinPageIsLeaf(lpage) && GinPageRightMost(lpage) && off > GinPageGetOpaque(lpage)->maxoff)
484 nCopied = 0;
485 while (btree->curitem < btree->nitem && maxoff * sizeof(ItemPointerData) < 2 * (freeSpace - sizeof(ItemPointerData)))
487 memcpy(vector + maxoff * sizeof(ItemPointerData), btree->items + btree->curitem,
488 sizeof(ItemPointerData));
489 maxoff++;
490 nCopied++;
491 btree->curitem++;
494 else
496 ptr = vector + (off - 1) * sizeofitem;
497 if (maxoff + 1 - off != 0)
498 memmove(ptr + sizeofitem, ptr, (maxoff - off + 1) * sizeofitem);
499 if (GinPageIsLeaf(lpage))
501 memcpy(ptr, btree->items + btree->curitem, sizeofitem);
502 btree->curitem++;
504 else
505 memcpy(ptr, &(btree->pitem), sizeofitem);
507 maxoff++;
511 * we suppose that during index creation table scaned from begin to end,
512 * so ItemPointers are monotonically increased..
514 if (btree->isBuild && GinPageRightMost(lpage))
515 separator = freeSpace / sizeofitem;
516 else
517 separator = maxoff / 2;
519 GinInitPage(rpage, GinPageGetOpaque(lpage)->flags, pageSize);
520 GinInitPage(lpage, GinPageGetOpaque(rpage)->flags, pageSize);
522 memcpy(GinDataPageGetItem(lpage, FirstOffsetNumber), vector, separator * sizeofitem);
523 GinPageGetOpaque(lpage)->maxoff = separator;
524 memcpy(GinDataPageGetItem(rpage, FirstOffsetNumber),
525 vector + separator * sizeofitem, (maxoff - separator) * sizeofitem);
526 GinPageGetOpaque(rpage)->maxoff = maxoff - separator;
528 PostingItemSetBlockNumber(&(btree->pitem), BufferGetBlockNumber(lbuf));
529 if (GinPageIsLeaf(lpage))
530 btree->pitem.key = *(ItemPointerData *) GinDataPageGetItem(lpage,
531 GinPageGetOpaque(lpage)->maxoff);
532 else
533 btree->pitem.key = ((PostingItem *) GinDataPageGetItem(lpage,
534 GinPageGetOpaque(lpage)->maxoff))->key;
535 btree->rightblkno = BufferGetBlockNumber(rbuf);
537 /* set up right bound for left page */
538 bound = GinDataPageGetRightBound(lpage);
539 *bound = btree->pitem.key;
541 /* set up right bound for right page */
542 bound = GinDataPageGetRightBound(rpage);
543 *bound = oldbound;
545 data.node = btree->index->rd_node;
546 data.rootBlkno = InvalidBlockNumber;
547 data.lblkno = BufferGetBlockNumber(lbuf);
548 data.rblkno = BufferGetBlockNumber(rbuf);
549 data.separator = separator;
550 data.nitem = maxoff;
551 data.isData = TRUE;
552 data.isLeaf = GinPageIsLeaf(lpage) ? TRUE : FALSE;
553 data.isRootSplit = FALSE;
554 data.rightbound = oldbound;
556 rdata[0].buffer = InvalidBuffer;
557 rdata[0].data = (char *) &data;
558 rdata[0].len = sizeof(ginxlogSplit);
559 rdata[0].next = &rdata[1];
561 rdata[1].buffer = InvalidBuffer;
562 rdata[1].data = vector;
563 rdata[1].len = MAXALIGN(maxoff * sizeofitem);
564 rdata[1].next = NULL;
566 return lpage;
570 * Fills new root by right bound values from child.
571 * Also called from ginxlog, should not use btree
573 void
574 dataFillRoot(GinBtree btree, Buffer root, Buffer lbuf, Buffer rbuf)
576 Page page = BufferGetPage(root),
577 lpage = BufferGetPage(lbuf),
578 rpage = BufferGetPage(rbuf);
579 PostingItem li,
582 li.key = *GinDataPageGetRightBound(lpage);
583 PostingItemSetBlockNumber(&li, BufferGetBlockNumber(lbuf));
584 GinDataPageAddItem(page, &li, InvalidOffsetNumber);
586 ri.key = *GinDataPageGetRightBound(rpage);
587 PostingItemSetBlockNumber(&ri, BufferGetBlockNumber(rbuf));
588 GinDataPageAddItem(page, &ri, InvalidOffsetNumber);
591 void
592 prepareDataScan(GinBtree btree, Relation index)
594 memset(btree, 0, sizeof(GinBtreeData));
595 btree->index = index;
596 btree->isMoveRight = dataIsMoveRight;
597 btree->findChildPage = dataLocateItem;
598 btree->findItem = dataLocateLeafItem;
599 btree->findChildPtr = dataFindChildPtr;
600 btree->getLeftMostPage = dataGetLeftMostPage;
601 btree->isEnoughSpace = dataIsEnoughSpace;
602 btree->placeToPage = dataPlaceToPage;
603 btree->splitPage = dataSplitPage;
604 btree->fillRoot = dataFillRoot;
606 btree->searchMode = FALSE;
607 btree->isDelete = FALSE;
608 btree->fullScan = FALSE;
609 btree->isBuild = FALSE;
612 GinPostingTreeScan *
613 prepareScanPostingTree(Relation index, BlockNumber rootBlkno, bool searchMode)
615 GinPostingTreeScan *gdi = (GinPostingTreeScan *) palloc0(sizeof(GinPostingTreeScan));
617 prepareDataScan(&gdi->btree, index);
619 gdi->btree.searchMode = searchMode;
620 gdi->btree.fullScan = searchMode;
622 gdi->stack = ginPrepareFindLeafPage(&gdi->btree, rootBlkno);
624 return gdi;
628 * Inserts array of item pointers, may execute several tree scan (very rare)
630 void
631 insertItemPointer(GinPostingTreeScan *gdi, ItemPointerData *items, uint32 nitem)
633 BlockNumber rootBlkno = gdi->stack->blkno;
635 gdi->btree.items = items;
636 gdi->btree.nitem = nitem;
637 gdi->btree.curitem = 0;
639 while (gdi->btree.curitem < gdi->btree.nitem)
641 if (!gdi->stack)
642 gdi->stack = ginPrepareFindLeafPage(&gdi->btree, rootBlkno);
644 gdi->stack = ginFindLeafPage(&gdi->btree, gdi->stack);
646 if (gdi->btree.findItem(&(gdi->btree), gdi->stack))
649 * gdi->btree.items[gdi->btree.curitem] already exists in index
651 gdi->btree.curitem++;
652 LockBuffer(gdi->stack->buffer, GIN_UNLOCK);
653 freeGinBtreeStack(gdi->stack);
655 else
656 ginInsertValue(&(gdi->btree), gdi->stack);
658 gdi->stack = NULL;
662 Buffer
663 scanBeginPostingTree(GinPostingTreeScan *gdi)
665 gdi->stack = ginFindLeafPage(&gdi->btree, gdi->stack);
666 return gdi->stack->buffer;