Force a checkpoint in CREATE DATABASE before starting to copy the files,
[PostgreSQL.git] / src / backend / access / gin / ginbulk.c
blob5219e551e2b4b673cae4460e1435cadd652e3ea2
1 /*-------------------------------------------------------------------------
3 * ginbulk.c
4 * routines for fast build of inverted index
7 * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
10 * IDENTIFICATION
11 * $PostgreSQL$
12 *-------------------------------------------------------------------------
15 #include "postgres.h"
17 #include "access/gin.h"
18 #include "utils/datum.h"
19 #include "utils/memutils.h"
22 #define DEF_NENTRY 2048
23 #define DEF_NPTR 4
25 void
26 ginInitBA(BuildAccumulator *accum)
28 accum->maxdepth = 1;
29 accum->stackpos = 0;
30 accum->entries = NULL;
31 accum->stack = NULL;
32 accum->allocatedMemory = 0;
33 accum->entryallocator = NULL;
36 static EntryAccumulator *
37 EAAllocate(BuildAccumulator *accum)
39 if (accum->entryallocator == NULL || accum->length >= DEF_NENTRY)
41 accum->entryallocator = palloc(sizeof(EntryAccumulator) * DEF_NENTRY);
42 accum->allocatedMemory += GetMemoryChunkSpace(accum->entryallocator);
43 accum->length = 0;
46 accum->length++;
47 return accum->entryallocator + accum->length - 1;
51 * Stores heap item pointer. For robust, it checks that
52 * item pointer are ordered
54 static void
55 ginInsertData(BuildAccumulator *accum, EntryAccumulator *entry, ItemPointer heapptr)
57 if (entry->number >= entry->length)
59 accum->allocatedMemory -= GetMemoryChunkSpace(entry->list);
60 entry->length *= 2;
61 entry->list = (ItemPointerData *) repalloc(entry->list,
62 sizeof(ItemPointerData) * entry->length);
63 accum->allocatedMemory += GetMemoryChunkSpace(entry->list);
66 if (entry->shouldSort == FALSE)
68 int res = compareItemPointers(entry->list + entry->number - 1, heapptr);
70 Assert(res != 0);
72 if (res > 0)
73 entry->shouldSort = TRUE;
76 entry->list[entry->number] = *heapptr;
77 entry->number++;
81 * This is basically the same as datumCopy(), but modified to count
82 * palloc'd space in accum.
84 static Datum
85 getDatumCopy(BuildAccumulator *accum, OffsetNumber attnum, Datum value)
87 Form_pg_attribute att = accum->ginstate->origTupdesc->attrs[ attnum - 1 ];
88 Datum res;
90 if (att->attbyval)
91 res = value;
92 else
94 res = datumCopy(value, false, att->attlen);
95 accum->allocatedMemory += GetMemoryChunkSpace(DatumGetPointer(res));
97 return res;
101 * Find/store one entry from indexed value.
103 static void
104 ginInsertEntry(BuildAccumulator *accum, ItemPointer heapptr, OffsetNumber attnum, Datum entry)
106 EntryAccumulator *ea = accum->entries,
107 *pea = NULL;
108 int res = 0;
109 uint32 depth = 1;
111 while (ea)
113 res = compareAttEntries(accum->ginstate, attnum, entry, ea->attnum, ea->value);
114 if (res == 0)
115 break; /* found */
116 else
118 pea = ea;
119 if (res < 0)
120 ea = ea->left;
121 else
122 ea = ea->right;
124 depth++;
127 if (depth > accum->maxdepth)
128 accum->maxdepth = depth;
130 if (ea == NULL)
132 ea = EAAllocate(accum);
134 ea->left = ea->right = NULL;
135 ea->attnum = attnum;
136 ea->value = getDatumCopy(accum, attnum, entry);
137 ea->length = DEF_NPTR;
138 ea->number = 1;
139 ea->shouldSort = FALSE;
140 ea->list = (ItemPointerData *) palloc(sizeof(ItemPointerData) * DEF_NPTR);
141 accum->allocatedMemory += GetMemoryChunkSpace(ea->list);
142 ea->list[0] = *heapptr;
144 if (pea == NULL)
145 accum->entries = ea;
146 else
148 Assert(res != 0);
149 if (res < 0)
150 pea->left = ea;
151 else
152 pea->right = ea;
155 else
156 ginInsertData(accum, ea, heapptr);
160 * insert middle of left part the middle of right one,
161 * then calls itself for each parts
163 static void
164 ginChooseElem(BuildAccumulator *accum, ItemPointer heapptr, OffsetNumber attnum,
165 Datum *entries, uint32 nentry,
166 uint32 low, uint32 high, uint32 offset)
168 uint32 pos;
169 uint32 middle = (low + high) >> 1;
171 pos = (low + middle) >> 1;
172 if (low != middle && pos >= offset && pos - offset < nentry)
173 ginInsertEntry(accum, heapptr, attnum, entries[pos - offset]);
174 pos = (high + middle + 1) >> 1;
175 if (middle + 1 != high && pos >= offset && pos - offset < nentry)
176 ginInsertEntry(accum, heapptr, attnum, entries[pos - offset]);
178 if (low != middle)
179 ginChooseElem(accum, heapptr, attnum, entries, nentry, low, middle, offset);
180 if (high != middle + 1)
181 ginChooseElem(accum, heapptr, attnum, entries, nentry, middle + 1, high, offset);
185 * Insert one heap pointer. Suppose entries is sorted.
186 * Insertion order tries to get binary tree balanced: first insert middle value,
187 * next middle on left part and middle of right part.
189 void
190 ginInsertRecordBA(BuildAccumulator *accum, ItemPointer heapptr, OffsetNumber attnum,
191 Datum *entries, int32 nentry)
193 uint32 i,
194 nbit = 0,
195 offset;
197 if (nentry <= 0)
198 return;
200 i = nentry - 1;
201 for (; i > 0; i >>= 1)
202 nbit++;
204 nbit = 1 << nbit;
205 offset = (nbit - nentry) / 2;
207 ginInsertEntry(accum, heapptr, attnum, entries[(nbit >> 1) - offset]);
208 ginChooseElem(accum, heapptr, attnum, entries, nentry, 0, nbit, offset);
211 static int
212 qsortCompareItemPointers(const void *a, const void *b)
214 int res = compareItemPointers((ItemPointer) a, (ItemPointer) b);
216 Assert(res != 0);
217 return res;
221 * walk on binary tree and returns ordered nodes
223 static EntryAccumulator *
224 walkTree(BuildAccumulator *accum)
226 EntryAccumulator *entry = accum->stack[accum->stackpos];
228 if (entry->list != NULL)
230 /* return entry itself: we already was at left sublink */
231 return entry;
233 else if (entry->right && entry->right != accum->stack[accum->stackpos + 1])
235 /* go on right sublink */
236 accum->stackpos++;
237 entry = entry->right;
239 /* find most-left value */
240 for (;;)
242 accum->stack[accum->stackpos] = entry;
243 if (entry->left)
245 accum->stackpos++;
246 entry = entry->left;
248 else
249 break;
252 else
254 /* we already return all left subtree, itself and right subtree */
255 if (accum->stackpos == 0)
256 return 0;
257 accum->stackpos--;
258 return walkTree(accum);
261 return entry;
264 ItemPointerData *
265 ginGetEntry(BuildAccumulator *accum, OffsetNumber *attnum, Datum *value, uint32 *n)
267 EntryAccumulator *entry;
268 ItemPointerData *list;
270 if (accum->stack == NULL)
272 /* first call */
273 accum->stack = palloc0(sizeof(EntryAccumulator *) * (accum->maxdepth + 1));
274 accum->allocatedMemory += GetMemoryChunkSpace(accum->stack);
275 entry = accum->entries;
277 if (entry == NULL)
278 return NULL;
280 /* find most-left value */
281 for (;;)
283 accum->stack[accum->stackpos] = entry;
284 if (entry->left)
286 accum->stackpos++;
287 entry = entry->left;
289 else
290 break;
293 else
295 accum->allocatedMemory -= GetMemoryChunkSpace(accum->stack[accum->stackpos]->list);
296 pfree(accum->stack[accum->stackpos]->list);
297 accum->stack[accum->stackpos]->list = NULL;
298 entry = walkTree(accum);
301 if (entry == NULL)
302 return NULL;
304 *n = entry->number;
305 *attnum = entry->attnum;
306 *value = entry->value;
307 list = entry->list;
309 Assert(list != NULL);
311 if (entry->shouldSort && entry->number > 1)
312 qsort(list, *n, sizeof(ItemPointerData), qsortCompareItemPointers);
314 return list;