4 ** The author disclaims copyright to this source code. In place of
5 ** a legal notice, here is a blessing:
7 ** May you do good and not evil.
8 ** May you find forgiveness for yourself and forgive others.
9 ** May you share freely, never taking more than you give.
11 *************************************************************************
12 ** This file contains code for the VdbeSorter object, used in concert with
13 ** a VdbeCursor to sort large numbers of keys for CREATE INDEX statements
14 ** or by SELECT statements with ORDER BY clauses that cannot be satisfied
15 ** using indexes and without LIMIT clauses.
17 ** The VdbeSorter object implements a multi-threaded external merge sort
18 ** algorithm that is efficient even if the number of elements being sorted
19 ** exceeds the available memory.
21 ** Here is the (internal, non-API) interface between this module and the
22 ** rest of the SQLite system:
24 ** sqlite3VdbeSorterInit() Create a new VdbeSorter object.
26 ** sqlite3VdbeSorterWrite() Add a single new row to the VdbeSorter
27 ** object. The row is a binary blob in the
28 ** OP_MakeRecord format that contains both
29 ** the ORDER BY key columns and result columns
30 ** in the case of a SELECT w/ ORDER BY, or
31 ** the complete record for an index entry
32 ** in the case of a CREATE INDEX.
34 ** sqlite3VdbeSorterRewind() Sort all content previously added.
35 ** Position the read cursor on the
36 ** first sorted element.
38 ** sqlite3VdbeSorterNext() Advance the read cursor to the next sorted
41 ** sqlite3VdbeSorterRowkey() Return the complete binary blob for the
42 ** row currently under the read cursor.
44 ** sqlite3VdbeSorterCompare() Compare the binary blob for the row
45 ** currently under the read cursor against
46 ** another binary blob X and report if
47 ** X is strictly less than the read cursor.
48 ** Used to enforce uniqueness in a
49 ** CREATE UNIQUE INDEX statement.
51 ** sqlite3VdbeSorterClose() Close the VdbeSorter object and reclaim
54 ** sqlite3VdbeSorterReset() Refurbish the VdbeSorter for reuse. This
55 ** is like Close() followed by Init() only
58 ** The interfaces above must be called in a particular order. Write() can
59 ** only occur in between Init()/Reset() and Rewind(). Next(), Rowkey(), and
60 ** Compare() can only occur in between Rewind() and Close()/Reset(). i.e.
63 ** for each record: Write()
71 ** Records passed to the sorter via calls to Write() are initially held
72 ** unsorted in main memory. Assuming the amount of memory used never exceeds
73 ** a threshold, when Rewind() is called the set of records is sorted using
74 ** an in-memory merge sort. In this case, no temporary files are required
75 ** and subsequent calls to Rowkey(), Next() and Compare() read records
76 ** directly from main memory.
78 ** If the amount of space used to store records in main memory exceeds the
79 ** threshold, then the set of records currently in memory are sorted and
80 ** written to a temporary file in "Packed Memory Array" (PMA) format.
81 ** A PMA created at this point is known as a "level-0 PMA". Higher levels
82 ** of PMAs may be created by merging existing PMAs together - for example
83 ** merging two or more level-0 PMAs together creates a level-1 PMA.
85 ** The threshold for the amount of main memory to use before flushing
86 ** records to a PMA is roughly the same as the limit configured for the
87 ** page-cache of the main database. Specifically, the threshold is set to
88 ** the value returned by "PRAGMA main.page_size" multipled by
89 ** that returned by "PRAGMA main.cache_size", in bytes.
91 ** If the sorter is running in single-threaded mode, then all PMAs generated
92 ** are appended to a single temporary file. Or, if the sorter is running in
93 ** multi-threaded mode then up to (N+1) temporary files may be opened, where
94 ** N is the configured number of worker threads. In this case, instead of
95 ** sorting the records and writing the PMA to a temporary file itself, the
96 ** calling thread usually launches a worker thread to do so. Except, if
97 ** there are already N worker threads running, the main thread does the work
100 ** The sorter is running in multi-threaded mode if (a) the library was built
101 ** with pre-processor symbol SQLITE_MAX_WORKER_THREADS set to a value greater
102 ** than zero, and (b) worker threads have been enabled at runtime by calling
103 ** sqlite3_config(SQLITE_CONFIG_WORKER_THREADS, ...).
105 ** When Rewind() is called, any data remaining in memory is flushed to a
106 ** final PMA. So at this point the data is stored in some number of sorted
107 ** PMAs within temporary files on disk.
109 ** If there are fewer than SORTER_MAX_MERGE_COUNT PMAs in total and the
110 ** sorter is running in single-threaded mode, then these PMAs are merged
111 ** incrementally as keys are retreived from the sorter by the VDBE. The
112 ** MergeEngine object, described in further detail below, performs this
115 ** Or, if running in multi-threaded mode, then a background thread is
116 ** launched to merge the existing PMAs. Once the background thread has
117 ** merged T bytes of data into a single sorted PMA, the main thread
118 ** begins reading keys from that PMA while the background thread proceeds
119 ** with merging the next T bytes of data. And so on.
121 ** Parameter T is set to half the value of the memory threshold used
122 ** by Write() above to determine when to create a new PMA.
124 ** If there are more than SORTER_MAX_MERGE_COUNT PMAs in total when
125 ** Rewind() is called, then a hierarchy of incremental-merges is used.
126 ** First, T bytes of data from the first SORTER_MAX_MERGE_COUNT PMAs on
127 ** disk are merged together. Then T bytes of data from the second set, and
128 ** so on, such that no operation ever merges more than SORTER_MAX_MERGE_COUNT
129 ** PMAs at a time. This done is to improve locality.
131 ** If running in multi-threaded mode and there are more than
132 ** SORTER_MAX_MERGE_COUNT PMAs on disk when Rewind() is called, then more
133 ** than one background thread may be created. Specifically, there may be
134 ** one background thread for each temporary file on disk, and one background
135 ** thread to merge the output of each of the others to a single PMA for
136 ** the main thread to read from.
138 #include "sqliteInt.h"
142 ** If SQLITE_DEBUG_SORTER_THREADS is defined, this module outputs various
143 ** messages to stderr that may be helpful in understanding the performance
144 ** characteristics of the sorter in multi-threaded mode.
147 # define SQLITE_DEBUG_SORTER_THREADS 1
151 ** Private objects used by the sorter
153 typedef struct MergeEngine MergeEngine
; /* Merge PMAs together */
154 typedef struct PmaReader PmaReader
; /* Incrementally read one PMA */
155 typedef struct PmaWriter PmaWriter
; /* Incrementally write one PMA */
156 typedef struct SorterRecord SorterRecord
; /* A record being sorted */
157 typedef struct SortSubtask SortSubtask
; /* A sub-task in the sort process */
158 typedef struct SorterFile SorterFile
; /* Temporary file object wrapper */
159 typedef struct SorterList SorterList
; /* In-memory list of records */
160 typedef struct IncrMerger IncrMerger
; /* Read & merge multiple PMAs */
163 ** A container for a temp file handle and the current amount of data
164 ** stored in the file.
167 sqlite3_file
*pFd
; /* File handle */
168 i64 iEof
; /* Bytes of data stored in pFd */
172 ** An in-memory list of objects to be sorted.
174 ** If aMemory==0 then each object is allocated separately and the objects
175 ** are connected using SorterRecord.u.pNext. If aMemory!=0 then all objects
176 ** are stored in the aMemory[] bulk memory, one right after the other, and
177 ** are connected using SorterRecord.u.iNext.
180 SorterRecord
*pList
; /* Linked list of records */
181 u8
*aMemory
; /* If non-NULL, bulk memory to hold pList */
182 int szPMA
; /* Size of pList as PMA in bytes */
186 ** The MergeEngine object is used to combine two or more smaller PMAs into
187 ** one big PMA using a merge operation. Separate PMAs all need to be
188 ** combined into one big PMA in order to be able to step through the sorted
191 ** The aReadr[] array contains a PmaReader object for each of the PMAs being
192 ** merged. An aReadr[] object either points to a valid key or else is at EOF.
193 ** ("EOF" means "End Of File". When aReadr[] is at EOF there is no more data.)
194 ** For the purposes of the paragraphs below, we assume that the array is
195 ** actually N elements in size, where N is the smallest power of 2 greater
196 ** to or equal to the number of PMAs being merged. The extra aReadr[] elements
197 ** are treated as if they are empty (always at EOF).
199 ** The aTree[] array is also N elements in size. The value of N is stored in
200 ** the MergeEngine.nTree variable.
202 ** The final (N/2) elements of aTree[] contain the results of comparing
203 ** pairs of PMA keys together. Element i contains the result of
204 ** comparing aReadr[2*i-N] and aReadr[2*i-N+1]. Whichever key is smaller, the
205 ** aTree element is set to the index of it.
207 ** For the purposes of this comparison, EOF is considered greater than any
208 ** other key value. If the keys are equal (only possible with two EOF
209 ** values), it doesn't matter which index is stored.
211 ** The (N/4) elements of aTree[] that precede the final (N/2) described
212 ** above contains the index of the smallest of each block of 4 PmaReaders
213 ** And so on. So that aTree[1] contains the index of the PmaReader that
214 ** currently points to the smallest key value. aTree[0] is unused.
218 ** aReadr[0] -> Banana
219 ** aReadr[1] -> Feijoa
220 ** aReadr[2] -> Elderberry
221 ** aReadr[3] -> Currant
222 ** aReadr[4] -> Grapefruit
223 ** aReadr[5] -> Apple
224 ** aReadr[6] -> Durian
227 ** aTree[] = { X, 5 0, 5 0, 3, 5, 6 }
229 ** The current element is "Apple" (the value of the key indicated by
230 ** PmaReader 5). When the Next() operation is invoked, PmaReader 5 will
231 ** be advanced to the next key in its segment. Say the next key is
234 ** aReadr[5] -> Eggplant
236 ** The contents of aTree[] are updated first by comparing the new PmaReader
237 ** 5 key to the current key of PmaReader 4 (still "Grapefruit"). The PmaReader
238 ** 5 value is still smaller, so aTree[6] is set to 5. And so on up the tree.
239 ** The value of PmaReader 6 - "Durian" - is now smaller than that of PmaReader
240 ** 5, so aTree[3] is set to 6. Key 0 is smaller than key 6 (Banana<Durian),
241 ** so the value written into element 1 of the array is 0. As follows:
243 ** aTree[] = { X, 0 0, 6 0, 3, 5, 6 }
245 ** In other words, each time we advance to the next sorter element, log2(N)
246 ** key comparison operations are required, where N is the number of segments
247 ** being merged (rounded up to the next power of 2).
250 int nTree
; /* Used size of aTree/aReadr (power of 2) */
251 SortSubtask
*pTask
; /* Used by this thread only */
252 int *aTree
; /* Current state of incremental merge */
253 PmaReader
*aReadr
; /* Array of PmaReaders to merge data from */
257 ** This object represents a single thread of control in a sort operation.
258 ** Exactly VdbeSorter.nTask instances of this object are allocated
259 ** as part of each VdbeSorter object. Instances are never allocated any
260 ** other way. VdbeSorter.nTask is set to the number of worker threads allowed
261 ** (see SQLITE_CONFIG_WORKER_THREADS) plus one (the main thread). Thus for
262 ** single-threaded operation, there is exactly one instance of this object
263 ** and for multi-threaded operation there are two or more instances.
265 ** Essentially, this structure contains all those fields of the VdbeSorter
266 ** structure for which each thread requires a separate instance. For example,
267 ** each thread requries its own UnpackedRecord object to unpack records in
268 ** as part of comparison operations.
270 ** Before a background thread is launched, variable bDone is set to 0. Then,
271 ** right before it exits, the thread itself sets bDone to 1. This is used for
274 ** 1. When flushing the contents of memory to a level-0 PMA on disk, to
275 ** attempt to select a SortSubtask for which there is not already an
276 ** active background thread (since doing so causes the main thread
277 ** to block until it finishes).
279 ** 2. If SQLITE_DEBUG_SORTER_THREADS is defined, to determine if a call
280 ** to sqlite3ThreadJoin() is likely to block. Cases that are likely to
281 ** block provoke debugging output.
283 ** In both cases, the effects of the main thread seeing (bDone==0) even
284 ** after the thread has finished are not dire. So we don't worry about
285 ** memory barriers and such here.
288 SQLiteThread
*pThread
; /* Background thread, if any */
289 int bDone
; /* Set if thread is finished but not joined */
290 VdbeSorter
*pSorter
; /* Sorter that owns this sub-task */
291 UnpackedRecord
*pUnpacked
; /* Space to unpack a record */
292 SorterList list
; /* List for thread to write to a PMA */
293 int nPMA
; /* Number of PMAs currently in file */
294 SorterFile file
; /* Temp file for level-0 PMAs */
295 SorterFile file2
; /* Space for other PMAs */
299 ** Main sorter structure. A single instance of this is allocated for each
300 ** sorter cursor created by the VDBE.
303 ** As records are added to the sorter by calls to sqlite3VdbeSorterWrite(),
304 ** this variable is updated so as to be set to the size on disk of the
305 ** largest record in the sorter.
308 int mnPmaSize
; /* Minimum PMA size, in bytes */
309 int mxPmaSize
; /* Maximum PMA size, in bytes. 0==no limit */
310 int mxKeysize
; /* Largest serialized key seen so far */
311 int pgsz
; /* Main database page size */
312 PmaReader
*pReader
; /* Readr data from here after Rewind() */
313 MergeEngine
*pMerger
; /* Or here, if bUseThreads==0 */
314 sqlite3
*db
; /* Database connection */
315 KeyInfo
*pKeyInfo
; /* How to compare records */
316 UnpackedRecord
*pUnpacked
; /* Used by VdbeSorterCompare() */
317 SorterList list
; /* List of in-memory records */
318 int iMemory
; /* Offset of free space in list.aMemory */
319 int nMemory
; /* Size of list.aMemory allocation in bytes */
320 u8 bUsePMA
; /* True if one or more PMAs created */
321 u8 bUseThreads
; /* True to use background threads */
322 u8 iPrev
; /* Previous thread used to flush PMA */
323 u8 nTask
; /* Size of aTask[] array */
324 SortSubtask aTask
[1]; /* One or more subtasks */
328 ** An instance of the following object is used to read records out of a
329 ** PMA, in sorted order. The next key to be read is cached in nKey/aKey.
330 ** aKey might point into aMap or into aBuffer. If neither of those locations
331 ** contain a contiguous representation of the key, then aAlloc is allocated
332 ** and the key is copied into aAlloc and aKey is made to poitn to aAlloc.
337 i64 iReadOff
; /* Current read offset */
338 i64 iEof
; /* 1 byte past EOF for this PmaReader */
339 int nAlloc
; /* Bytes of space at aAlloc */
340 int nKey
; /* Number of bytes in key */
341 sqlite3_file
*pFd
; /* File handle we are reading from */
342 u8
*aAlloc
; /* Space for aKey if aBuffer and pMap wont work */
343 u8
*aKey
; /* Pointer to current key */
344 u8
*aBuffer
; /* Current read buffer */
345 int nBuffer
; /* Size of read buffer in bytes */
346 u8
*aMap
; /* Pointer to mapping of entire file */
347 IncrMerger
*pIncr
; /* Incremental merger */
351 ** Normally, a PmaReader object iterates through an existing PMA stored
352 ** within a temp file. However, if the PmaReader.pIncr variable points to
353 ** an object of the following type, it may be used to iterate/merge through
354 ** multiple PMAs simultaneously.
356 ** There are two types of IncrMerger object - single (bUseThread==0) and
357 ** multi-threaded (bUseThread==1).
359 ** A multi-threaded IncrMerger object uses two temporary files - aFile[0]
360 ** and aFile[1]. Neither file is allowed to grow to more than mxSz bytes in
361 ** size. When the IncrMerger is initialized, it reads enough data from
362 ** pMerger to populate aFile[0]. It then sets variables within the
363 ** corresponding PmaReader object to read from that file and kicks off
364 ** a background thread to populate aFile[1] with the next mxSz bytes of
365 ** sorted record data from pMerger.
367 ** When the PmaReader reaches the end of aFile[0], it blocks until the
368 ** background thread has finished populating aFile[1]. It then exchanges
369 ** the contents of the aFile[0] and aFile[1] variables within this structure,
370 ** sets the PmaReader fields to read from the new aFile[0] and kicks off
371 ** another background thread to populate the new aFile[1]. And so on, until
372 ** the contents of pMerger are exhausted.
374 ** A single-threaded IncrMerger does not open any temporary files of its
375 ** own. Instead, it has exclusive access to mxSz bytes of space beginning
376 ** at offset iStartOff of file pTask->file2. And instead of using a
377 ** background thread to prepare data for the PmaReader, with a single
378 ** threaded IncrMerger the allocate part of pTask->file2 is "refilled" with
379 ** keys from pMerger by the calling thread whenever the PmaReader runs out
383 SortSubtask
*pTask
; /* Task that owns this merger */
384 MergeEngine
*pMerger
; /* Merge engine thread reads data from */
385 i64 iStartOff
; /* Offset to start writing file at */
386 int mxSz
; /* Maximum bytes of data to store */
387 int bEof
; /* Set to true when merge is finished */
388 int bUseThread
; /* True to use a bg thread for this object */
389 SorterFile aFile
[2]; /* aFile[0] for reading, [1] for writing */
393 ** An instance of this object is used for writing a PMA.
395 ** The PMA is written one record at a time. Each record is of an arbitrary
396 ** size. But I/O is more efficient if it occurs in page-sized blocks where
397 ** each block is aligned on a page boundary. This object caches writes to
398 ** the PMA so that aligned, page-size blocks are written.
401 int eFWErr
; /* Non-zero if in an error state */
402 u8
*aBuffer
; /* Pointer to write buffer */
403 int nBuffer
; /* Size of write buffer in bytes */
404 int iBufStart
; /* First byte of buffer to write */
405 int iBufEnd
; /* Last byte of buffer to write */
406 i64 iWriteOff
; /* Offset of start of buffer in file */
407 sqlite3_file
*pFd
; /* File handle to write to */
411 ** This object is the header on a single record while that record is being
412 ** held in memory and prior to being written out as part of a PMA.
414 ** How the linked list is connected depends on how memory is being managed
415 ** by this module. If using a separate allocation for each in-memory record
416 ** (VdbeSorter.list.aMemory==0), then the list is always connected using the
417 ** SorterRecord.u.pNext pointers.
419 ** Or, if using the single large allocation method (VdbeSorter.list.aMemory!=0),
420 ** then while records are being accumulated the list is linked using the
421 ** SorterRecord.u.iNext offset. This is because the aMemory[] array may
422 ** be sqlite3Realloc()ed while records are being accumulated. Once the VM
423 ** has finished passing records to the sorter, or when the in-memory buffer
424 ** is full, the list is sorted. As part of the sorting process, it is
425 ** converted to use the SorterRecord.u.pNext pointers. See function
426 ** vdbeSorterSort() for details.
428 struct SorterRecord
{
429 int nVal
; /* Size of the record in bytes */
431 SorterRecord
*pNext
; /* Pointer to next record in list */
432 int iNext
; /* Offset within aMemory of next record */
434 /* The data for the record immediately follows this header */
437 /* Return a pointer to the buffer containing the record data for SorterRecord
438 ** object p. Should be used as if:
440 ** void *SRVAL(SorterRecord *p) { return (void*)&p[1]; }
442 #define SRVAL(p) ((void*)((SorterRecord*)(p) + 1))
444 /* The minimum PMA size is set to this value multiplied by the database
445 ** page size in bytes. */
446 #define SORTER_MIN_WORKING 10
448 /* Maximum number of PMAs that a single MergeEngine can merge */
449 #define SORTER_MAX_MERGE_COUNT 16
451 static int vdbeIncrSwap(IncrMerger
*);
452 static void vdbeIncrFree(IncrMerger
*);
455 ** Free all memory belonging to the PmaReader object passed as the
456 ** argument. All structure fields are set to zero before returning.
458 static void vdbePmaReaderClear(PmaReader
*pReadr
){
459 sqlite3_free(pReadr
->aAlloc
);
460 sqlite3_free(pReadr
->aBuffer
);
461 if( pReadr
->aMap
) sqlite3OsUnfetch(pReadr
->pFd
, 0, pReadr
->aMap
);
462 vdbeIncrFree(pReadr
->pIncr
);
463 memset(pReadr
, 0, sizeof(PmaReader
));
467 ** Read the next nByte bytes of data from the PMA p.
468 ** If successful, set *ppOut to point to a buffer containing the data
469 ** and return SQLITE_OK. Otherwise, if an error occurs, return an SQLite
472 ** The buffer returned in *ppOut is only valid until the
473 ** next call to this function.
475 static int vdbePmaReadBlob(
476 PmaReader
*p
, /* PmaReader from which to take the blob */
477 int nByte
, /* Bytes of data to read */
478 u8
**ppOut
/* OUT: Pointer to buffer containing data */
480 int iBuf
; /* Offset within buffer to read from */
481 int nAvail
; /* Bytes of data available in buffer */
484 *ppOut
= &p
->aMap
[p
->iReadOff
];
485 p
->iReadOff
+= nByte
;
489 assert( p
->aBuffer
);
491 /* If there is no more data to be read from the buffer, read the next
492 ** p->nBuffer bytes of data from the file into it. Or, if there are less
493 ** than p->nBuffer bytes remaining in the PMA, read all remaining data. */
494 iBuf
= p
->iReadOff
% p
->nBuffer
;
496 int nRead
; /* Bytes to read from disk */
497 int rc
; /* sqlite3OsRead() return code */
499 /* Determine how many bytes of data to read. */
500 if( (p
->iEof
- p
->iReadOff
) > (i64
)p
->nBuffer
){
503 nRead
= (int)(p
->iEof
- p
->iReadOff
);
507 /* Readr data from the file. Return early if an error occurs. */
508 rc
= sqlite3OsRead(p
->pFd
, p
->aBuffer
, nRead
, p
->iReadOff
);
509 assert( rc
!=SQLITE_IOERR_SHORT_READ
);
510 if( rc
!=SQLITE_OK
) return rc
;
512 nAvail
= p
->nBuffer
- iBuf
;
515 /* The requested data is available in the in-memory buffer. In this
516 ** case there is no need to make a copy of the data, just return a
517 ** pointer into the buffer to the caller. */
518 *ppOut
= &p
->aBuffer
[iBuf
];
519 p
->iReadOff
+= nByte
;
521 /* The requested data is not all available in the in-memory buffer.
522 ** In this case, allocate space at p->aAlloc[] to copy the requested
523 ** range into. Then return a copy of pointer p->aAlloc to the caller. */
524 int nRem
; /* Bytes remaining to copy */
526 /* Extend the p->aAlloc[] allocation if required. */
527 if( p
->nAlloc
<nByte
){
529 int nNew
= MAX(128, p
->nAlloc
*2);
530 while( nByte
>nNew
) nNew
= nNew
*2;
531 aNew
= sqlite3Realloc(p
->aAlloc
, nNew
);
532 if( !aNew
) return SQLITE_NOMEM
;
537 /* Copy as much data as is available in the buffer into the start of
539 memcpy(p
->aAlloc
, &p
->aBuffer
[iBuf
], nAvail
);
540 p
->iReadOff
+= nAvail
;
541 nRem
= nByte
- nAvail
;
543 /* The following loop copies up to p->nBuffer bytes per iteration into
544 ** the p->aAlloc[] buffer. */
546 int rc
; /* vdbePmaReadBlob() return code */
547 int nCopy
; /* Number of bytes to copy */
548 u8
*aNext
; /* Pointer to buffer to copy data from */
551 if( nRem
>p
->nBuffer
) nCopy
= p
->nBuffer
;
552 rc
= vdbePmaReadBlob(p
, nCopy
, &aNext
);
553 if( rc
!=SQLITE_OK
) return rc
;
554 assert( aNext
!=p
->aAlloc
);
555 memcpy(&p
->aAlloc
[nByte
- nRem
], aNext
, nCopy
);
566 ** Read a varint from the stream of data accessed by p. Set *pnOut to
569 static int vdbePmaReadVarint(PmaReader
*p
, u64
*pnOut
){
573 p
->iReadOff
+= sqlite3GetVarint(&p
->aMap
[p
->iReadOff
], pnOut
);
575 iBuf
= p
->iReadOff
% p
->nBuffer
;
576 if( iBuf
&& (p
->nBuffer
-iBuf
)>=9 ){
577 p
->iReadOff
+= sqlite3GetVarint(&p
->aBuffer
[iBuf
], pnOut
);
582 rc
= vdbePmaReadBlob(p
, 1, &a
);
584 aVarint
[(i
++)&0xf] = a
[0];
585 }while( (a
[0]&0x80)!=0 );
586 sqlite3GetVarint(aVarint
, pnOut
);
594 ** Attempt to memory map file pFile. If successful, set *pp to point to the
595 ** new mapping and return SQLITE_OK. If the mapping is not attempted
596 ** (because the file is too large or the VFS layer is configured not to use
597 ** mmap), return SQLITE_OK and set *pp to NULL.
599 ** Or, if an error occurs, return an SQLite error code. The final value of
600 ** *pp is undefined in this case.
602 static int vdbeSorterMapFile(SortSubtask
*pTask
, SorterFile
*pFile
, u8
**pp
){
604 if( pFile
->iEof
<=(i64
)(pTask
->pSorter
->db
->nMaxSorterMmap
) ){
605 sqlite3_file
*pFd
= pFile
->pFd
;
606 if( pFd
->pMethods
->iVersion
>=3 ){
607 rc
= sqlite3OsFetch(pFd
, 0, (int)pFile
->iEof
, (void**)pp
);
608 testcase( rc
!=SQLITE_OK
);
615 ** Attach PmaReader pReadr to file pFile (if it is not already attached to
616 ** that file) and seek it to offset iOff within the file. Return SQLITE_OK
617 ** if successful, or an SQLite error code if an error occurs.
619 static int vdbePmaReaderSeek(
620 SortSubtask
*pTask
, /* Task context */
621 PmaReader
*pReadr
, /* Reader whose cursor is to be moved */
622 SorterFile
*pFile
, /* Sorter file to read from */
623 i64 iOff
/* Offset in pFile */
627 assert( pReadr
->pIncr
==0 || pReadr
->pIncr
->bEof
==0 );
629 if( sqlite3FaultSim(201) ) return SQLITE_IOERR_READ
;
631 sqlite3OsUnfetch(pReadr
->pFd
, 0, pReadr
->aMap
);
634 pReadr
->iReadOff
= iOff
;
635 pReadr
->iEof
= pFile
->iEof
;
636 pReadr
->pFd
= pFile
->pFd
;
638 rc
= vdbeSorterMapFile(pTask
, pFile
, &pReadr
->aMap
);
639 if( rc
==SQLITE_OK
&& pReadr
->aMap
==0 ){
640 int pgsz
= pTask
->pSorter
->pgsz
;
641 int iBuf
= pReadr
->iReadOff
% pgsz
;
642 if( pReadr
->aBuffer
==0 ){
643 pReadr
->aBuffer
= (u8
*)sqlite3Malloc(pgsz
);
644 if( pReadr
->aBuffer
==0 ) rc
= SQLITE_NOMEM
;
645 pReadr
->nBuffer
= pgsz
;
647 if( rc
==SQLITE_OK
&& iBuf
){
648 int nRead
= pgsz
- iBuf
;
649 if( (pReadr
->iReadOff
+ nRead
) > pReadr
->iEof
){
650 nRead
= (int)(pReadr
->iEof
- pReadr
->iReadOff
);
653 pReadr
->pFd
, &pReadr
->aBuffer
[iBuf
], nRead
, pReadr
->iReadOff
655 testcase( rc
!=SQLITE_OK
);
663 ** Advance PmaReader pReadr to the next key in its PMA. Return SQLITE_OK if
664 ** no error occurs, or an SQLite error code if one does.
666 static int vdbePmaReaderNext(PmaReader
*pReadr
){
667 int rc
= SQLITE_OK
; /* Return Code */
668 u64 nRec
= 0; /* Size of record in bytes */
671 if( pReadr
->iReadOff
>=pReadr
->iEof
){
672 IncrMerger
*pIncr
= pReadr
->pIncr
;
675 rc
= vdbeIncrSwap(pIncr
);
676 if( rc
==SQLITE_OK
&& pIncr
->bEof
==0 ){
677 rc
= vdbePmaReaderSeek(
678 pIncr
->pTask
, pReadr
, &pIncr
->aFile
[0], pIncr
->iStartOff
685 /* This is an EOF condition */
686 vdbePmaReaderClear(pReadr
);
687 testcase( rc
!=SQLITE_OK
);
693 rc
= vdbePmaReadVarint(pReadr
, &nRec
);
696 pReadr
->nKey
= (int)nRec
;
697 rc
= vdbePmaReadBlob(pReadr
, (int)nRec
, &pReadr
->aKey
);
698 testcase( rc
!=SQLITE_OK
);
705 ** Initialize PmaReader pReadr to scan through the PMA stored in file pFile
706 ** starting at offset iStart and ending at offset iEof-1. This function
707 ** leaves the PmaReader pointing to the first key in the PMA (or EOF if the
710 ** If the pnByte parameter is NULL, then it is assumed that the file
711 ** contains a single PMA, and that that PMA omits the initial length varint.
713 static int vdbePmaReaderInit(
714 SortSubtask
*pTask
, /* Task context */
715 SorterFile
*pFile
, /* Sorter file to read from */
716 i64 iStart
, /* Start offset in pFile */
717 PmaReader
*pReadr
, /* PmaReader to populate */
718 i64
*pnByte
/* IN/OUT: Increment this value by PMA size */
722 assert( pFile
->iEof
>iStart
);
723 assert( pReadr
->aAlloc
==0 && pReadr
->nAlloc
==0 );
724 assert( pReadr
->aBuffer
==0 );
725 assert( pReadr
->aMap
==0 );
727 rc
= vdbePmaReaderSeek(pTask
, pReadr
, pFile
, iStart
);
729 u64 nByte
; /* Size of PMA in bytes */
730 rc
= vdbePmaReadVarint(pReadr
, &nByte
);
731 pReadr
->iEof
= pReadr
->iReadOff
+ nByte
;
736 rc
= vdbePmaReaderNext(pReadr
);
743 ** Compare key1 (buffer pKey1, size nKey1 bytes) with key2 (buffer pKey2,
744 ** size nKey2 bytes). Use (pTask->pKeyInfo) for the collation sequences
745 ** used by the comparison. Return the result of the comparison.
747 ** Before returning, object (pTask->pUnpacked) is populated with the
748 ** unpacked version of key2. Or, if pKey2 is passed a NULL pointer, then it
749 ** is assumed that the (pTask->pUnpacked) structure already contains the
750 ** unpacked key to use as key2.
752 ** If an OOM error is encountered, (pTask->pUnpacked->error_rc) is set
755 static int vdbeSorterCompare(
756 SortSubtask
*pTask
, /* Subtask context (for pKeyInfo) */
757 const void *pKey1
, int nKey1
, /* Left side of comparison */
758 const void *pKey2
, int nKey2
/* Right side of comparison */
760 UnpackedRecord
*r2
= pTask
->pUnpacked
;
762 sqlite3VdbeRecordUnpack(pTask
->pSorter
->pKeyInfo
, nKey2
, pKey2
, r2
);
764 return sqlite3VdbeRecordCompare(nKey1
, pKey1
, r2
);
768 ** Initialize the temporary index cursor just opened as a sorter cursor.
770 ** Usually, the sorter module uses the value of (pCsr->pKeyInfo->nField)
771 ** to determine the number of fields that should be compared from the
772 ** records being sorted. However, if the value passed as argument nField
773 ** is non-zero and the sorter is able to guarantee a stable sort, nField
774 ** is used instead. This is used when sorting records for a CREATE INDEX
775 ** statement. In this case, keys are always delivered to the sorter in
776 ** order of the primary key, which happens to be make up the final part
777 ** of the records being sorted. So if the sort is stable, there is never
778 ** any reason to compare PK fields and they can be ignored for a small
779 ** performance boost.
781 ** The sorter can guarantee a stable sort when running in single-threaded
782 ** mode, but not in multi-threaded mode.
784 ** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
786 int sqlite3VdbeSorterInit(
787 sqlite3
*db
, /* Database connection (for malloc()) */
788 int nField
, /* Number of key fields in each record */
789 VdbeCursor
*pCsr
/* Cursor that holds the new sorter */
791 int pgsz
; /* Page size of main database */
792 int i
; /* Used to iterate through aTask[] */
793 int mxCache
; /* Cache size */
794 VdbeSorter
*pSorter
; /* The new sorter */
795 KeyInfo
*pKeyInfo
; /* Copy of pCsr->pKeyInfo with db==0 */
796 int szKeyInfo
; /* Size of pCsr->pKeyInfo in bytes */
797 int sz
; /* Size of pSorter in bytes */
799 #if SQLITE_MAX_WORKER_THREADS==0
805 /* Initialize the upper limit on the number of worker threads */
806 #if SQLITE_MAX_WORKER_THREADS>0
807 if( sqlite3TempInMemory(db
) || sqlite3GlobalConfig
.bCoreMutex
==0 ){
810 nWorker
= db
->aLimit
[SQLITE_LIMIT_WORKER_THREADS
];
814 /* Do not allow the total number of threads (main thread + all workers)
815 ** to exceed the maximum merge count */
816 #if SQLITE_MAX_WORKER_THREADS>=SORTER_MAX_MERGE_COUNT
817 if( nWorker
>=SORTER_MAX_MERGE_COUNT
){
818 nWorker
= SORTER_MAX_MERGE_COUNT
-1;
822 assert( pCsr
->pKeyInfo
&& pCsr
->pBt
==0 );
823 szKeyInfo
= sizeof(KeyInfo
) + (pCsr
->pKeyInfo
->nField
-1)*sizeof(CollSeq
*);
824 sz
= sizeof(VdbeSorter
) + nWorker
* sizeof(SortSubtask
);
826 pSorter
= (VdbeSorter
*)sqlite3DbMallocZero(db
, sz
+ szKeyInfo
);
827 pCsr
->pSorter
= pSorter
;
831 pSorter
->pKeyInfo
= pKeyInfo
= (KeyInfo
*)((u8
*)pSorter
+ sz
);
832 memcpy(pKeyInfo
, pCsr
->pKeyInfo
, szKeyInfo
);
834 if( nField
&& nWorker
==0 ) pKeyInfo
->nField
= nField
;
835 pSorter
->pgsz
= pgsz
= sqlite3BtreeGetPageSize(db
->aDb
[0].pBt
);
836 pSorter
->nTask
= nWorker
+ 1;
837 pSorter
->bUseThreads
= (pSorter
->nTask
>1);
839 for(i
=0; i
<pSorter
->nTask
; i
++){
840 SortSubtask
*pTask
= &pSorter
->aTask
[i
];
841 pTask
->pSorter
= pSorter
;
844 if( !sqlite3TempInMemory(db
) ){
845 pSorter
->mnPmaSize
= SORTER_MIN_WORKING
* pgsz
;
846 mxCache
= db
->aDb
[0].pSchema
->cache_size
;
847 if( mxCache
<SORTER_MIN_WORKING
) mxCache
= SORTER_MIN_WORKING
;
848 pSorter
->mxPmaSize
= mxCache
* pgsz
;
850 /* If the application has not configure scratch memory using
851 ** SQLITE_CONFIG_SCRATCH then we assume it is OK to do large memory
852 ** allocations. If scratch memory has been configured, then assume
853 ** large memory allocations should be avoided to prevent heap
856 if( sqlite3GlobalConfig
.pScratch
==0 ){
857 assert( pSorter
->iMemory
==0 );
858 pSorter
->nMemory
= pgsz
;
859 pSorter
->list
.aMemory
= (u8
*)sqlite3Malloc(pgsz
);
860 if( !pSorter
->list
.aMemory
) rc
= SQLITE_NOMEM
;
867 #undef nWorker /* Defined at the top of this function */
870 ** Free the list of sorted records starting at pRecord.
872 static void vdbeSorterRecordFree(sqlite3
*db
, SorterRecord
*pRecord
){
875 for(p
=pRecord
; p
; p
=pNext
){
877 sqlite3DbFree(db
, p
);
882 ** Free all resources owned by the object indicated by argument pTask. All
883 ** fields of *pTask are zeroed before returning.
885 static void vdbeSortSubtaskCleanup(sqlite3
*db
, SortSubtask
*pTask
){
886 sqlite3DbFree(db
, pTask
->pUnpacked
);
887 pTask
->pUnpacked
= 0;
888 #if SQLITE_MAX_WORKER_THREADS>0
889 /* pTask->list.aMemory can only be non-zero if it was handed memory
890 ** from the main thread. That only occurs SQLITE_MAX_WORKER_THREADS>0 */
891 if( pTask
->list
.aMemory
){
892 sqlite3_free(pTask
->list
.aMemory
);
893 pTask
->list
.aMemory
= 0;
897 assert( pTask
->list
.aMemory
==0 );
898 vdbeSorterRecordFree(0, pTask
->list
.pList
);
900 pTask
->list
.pList
= 0;
901 if( pTask
->file
.pFd
){
902 sqlite3OsCloseFree(pTask
->file
.pFd
);
904 pTask
->file
.iEof
= 0;
906 if( pTask
->file2
.pFd
){
907 sqlite3OsCloseFree(pTask
->file2
.pFd
);
908 pTask
->file2
.pFd
= 0;
909 pTask
->file2
.iEof
= 0;
913 #ifdef SQLITE_DEBUG_SORTER_THREADS
914 static void vdbeSorterWorkDebug(SortSubtask
*pTask
, const char *zEvent
){
916 int iTask
= (pTask
- pTask
->pSorter
->aTask
);
917 sqlite3OsCurrentTimeInt64(pTask
->pSorter
->db
->pVfs
, &t
);
918 fprintf(stderr
, "%lld:%d %s\n", t
, iTask
, zEvent
);
920 static void vdbeSorterRewindDebug(const char *zEvent
){
922 sqlite3OsCurrentTimeInt64(sqlite3_vfs_find(0), &t
);
923 fprintf(stderr
, "%lld:X %s\n", t
, zEvent
);
925 static void vdbeSorterPopulateDebug(
930 int iTask
= (pTask
- pTask
->pSorter
->aTask
);
931 sqlite3OsCurrentTimeInt64(pTask
->pSorter
->db
->pVfs
, &t
);
932 fprintf(stderr
, "%lld:bg%d %s\n", t
, iTask
, zEvent
);
934 static void vdbeSorterBlockDebug(
941 sqlite3OsCurrentTimeInt64(pTask
->pSorter
->db
->pVfs
, &t
);
942 fprintf(stderr
, "%lld:main %s\n", t
, zEvent
);
946 # define vdbeSorterWorkDebug(x,y)
947 # define vdbeSorterRewindDebug(y)
948 # define vdbeSorterPopulateDebug(x,y)
949 # define vdbeSorterBlockDebug(x,y,z)
952 #if SQLITE_MAX_WORKER_THREADS>0
954 ** Join thread pTask->thread.
956 static int vdbeSorterJoinThread(SortSubtask
*pTask
){
958 if( pTask
->pThread
){
959 #ifdef SQLITE_DEBUG_SORTER_THREADS
960 int bDone
= pTask
->bDone
;
962 void *pRet
= SQLITE_INT_TO_PTR(SQLITE_ERROR
);
963 vdbeSorterBlockDebug(pTask
, !bDone
, "enter");
964 (void)sqlite3ThreadJoin(pTask
->pThread
, &pRet
);
965 vdbeSorterBlockDebug(pTask
, !bDone
, "exit");
966 rc
= SQLITE_PTR_TO_INT(pRet
);
967 assert( pTask
->bDone
==1 );
975 ** Launch a background thread to run xTask(pIn).
977 static int vdbeSorterCreateThread(
978 SortSubtask
*pTask
, /* Thread will use this task object */
979 void *(*xTask
)(void*), /* Routine to run in a separate thread */
980 void *pIn
/* Argument passed into xTask() */
982 assert( pTask
->pThread
==0 && pTask
->bDone
==0 );
983 return sqlite3ThreadCreate(&pTask
->pThread
, xTask
, pIn
);
987 ** Join all outstanding threads launched by SorterWrite() to create
990 static int vdbeSorterJoinAll(VdbeSorter
*pSorter
, int rcin
){
994 /* This function is always called by the main user thread.
996 ** If this function is being called after SorterRewind() has been called,
997 ** it is possible that thread pSorter->aTask[pSorter->nTask-1].pThread
998 ** is currently attempt to join one of the other threads. To avoid a race
999 ** condition where this thread also attempts to join the same object, join
1000 ** thread pSorter->aTask[pSorter->nTask-1].pThread first. */
1001 for(i
=pSorter
->nTask
-1; i
>=0; i
--){
1002 SortSubtask
*pTask
= &pSorter
->aTask
[i
];
1003 int rc2
= vdbeSorterJoinThread(pTask
);
1004 if( rc
==SQLITE_OK
) rc
= rc2
;
1009 # define vdbeSorterJoinAll(x,rcin) (rcin)
1010 # define vdbeSorterJoinThread(pTask) SQLITE_OK
1014 ** Allocate a new MergeEngine object capable of handling up to
1015 ** nReader PmaReader inputs.
1017 ** nReader is automatically rounded up to the next power of two.
1018 ** nReader may not exceed SORTER_MAX_MERGE_COUNT even after rounding up.
1020 static MergeEngine
*vdbeMergeEngineNew(int nReader
){
1021 int N
= 2; /* Smallest power of two >= nReader */
1022 int nByte
; /* Total bytes of space to allocate */
1023 MergeEngine
*pNew
; /* Pointer to allocated object to return */
1025 assert( nReader
<=SORTER_MAX_MERGE_COUNT
);
1027 while( N
<nReader
) N
+= N
;
1028 nByte
= sizeof(MergeEngine
) + N
* (sizeof(int) + sizeof(PmaReader
));
1030 pNew
= sqlite3FaultSim(100) ? 0 : (MergeEngine
*)sqlite3MallocZero(nByte
);
1034 pNew
->aReadr
= (PmaReader
*)&pNew
[1];
1035 pNew
->aTree
= (int*)&pNew
->aReadr
[N
];
1041 ** Free the MergeEngine object passed as the only argument.
1043 static void vdbeMergeEngineFree(MergeEngine
*pMerger
){
1046 for(i
=0; i
<pMerger
->nTree
; i
++){
1047 vdbePmaReaderClear(&pMerger
->aReadr
[i
]);
1050 sqlite3_free(pMerger
);
1054 ** Free all resources associated with the IncrMerger object indicated by
1055 ** the first argument.
1057 static void vdbeIncrFree(IncrMerger
*pIncr
){
1059 #if SQLITE_MAX_WORKER_THREADS>0
1060 if( pIncr
->bUseThread
){
1061 vdbeSorterJoinThread(pIncr
->pTask
);
1062 if( pIncr
->aFile
[0].pFd
) sqlite3OsCloseFree(pIncr
->aFile
[0].pFd
);
1063 if( pIncr
->aFile
[1].pFd
) sqlite3OsCloseFree(pIncr
->aFile
[1].pFd
);
1066 vdbeMergeEngineFree(pIncr
->pMerger
);
1067 sqlite3_free(pIncr
);
1072 ** Reset a sorting cursor back to its original empty state.
1074 void sqlite3VdbeSorterReset(sqlite3
*db
, VdbeSorter
*pSorter
){
1076 (void)vdbeSorterJoinAll(pSorter
, SQLITE_OK
);
1077 assert( pSorter
->bUseThreads
|| pSorter
->pReader
==0 );
1078 #if SQLITE_MAX_WORKER_THREADS>0
1079 if( pSorter
->pReader
){
1080 vdbePmaReaderClear(pSorter
->pReader
);
1081 sqlite3DbFree(db
, pSorter
->pReader
);
1082 pSorter
->pReader
= 0;
1085 vdbeMergeEngineFree(pSorter
->pMerger
);
1086 pSorter
->pMerger
= 0;
1087 for(i
=0; i
<pSorter
->nTask
; i
++){
1088 SortSubtask
*pTask
= &pSorter
->aTask
[i
];
1089 vdbeSortSubtaskCleanup(db
, pTask
);
1091 if( pSorter
->list
.aMemory
==0 ){
1092 vdbeSorterRecordFree(0, pSorter
->list
.pList
);
1094 pSorter
->list
.pList
= 0;
1095 pSorter
->list
.szPMA
= 0;
1096 pSorter
->bUsePMA
= 0;
1097 pSorter
->iMemory
= 0;
1098 pSorter
->mxKeysize
= 0;
1099 sqlite3DbFree(db
, pSorter
->pUnpacked
);
1100 pSorter
->pUnpacked
= 0;
1104 ** Free any cursor components allocated by sqlite3VdbeSorterXXX routines.
1106 void sqlite3VdbeSorterClose(sqlite3
*db
, VdbeCursor
*pCsr
){
1107 VdbeSorter
*pSorter
= pCsr
->pSorter
;
1109 sqlite3VdbeSorterReset(db
, pSorter
);
1110 sqlite3_free(pSorter
->list
.aMemory
);
1111 sqlite3DbFree(db
, pSorter
);
1116 #if SQLITE_MAX_MMAP_SIZE>0
1118 ** The first argument is a file-handle open on a temporary file. The file
1119 ** is guaranteed to be nByte bytes or smaller in size. This function
1120 ** attempts to extend the file to nByte bytes in size and to ensure that
1121 ** the VFS has memory mapped it.
1123 ** Whether or not the file does end up memory mapped of course depends on
1124 ** the specific VFS implementation.
1126 static void vdbeSorterExtendFile(sqlite3
*db
, sqlite3_file
*pFd
, i64 nByte
){
1127 if( nByte
<=(i64
)(db
->nMaxSorterMmap
) && pFd
->pMethods
->iVersion
>=3 ){
1128 int rc
= sqlite3OsTruncate(pFd
, nByte
);
1129 if( rc
==SQLITE_OK
){
1131 sqlite3OsFetch(pFd
, 0, (int)nByte
, &p
);
1132 sqlite3OsUnfetch(pFd
, 0, p
);
1137 # define vdbeSorterExtendFile(x,y,z)
1141 ** Allocate space for a file-handle and open a temporary file. If successful,
1142 ** set *ppFd to point to the malloc'd file-handle and return SQLITE_OK.
1143 ** Otherwise, set *ppFd to 0 and return an SQLite error code.
1145 static int vdbeSorterOpenTempFile(
1146 sqlite3
*db
, /* Database handle doing sort */
1147 i64 nExtend
, /* Attempt to extend file to this size */
1151 rc
= sqlite3OsOpenMalloc(db
->pVfs
, 0, ppFd
,
1152 SQLITE_OPEN_TEMP_JOURNAL
|
1153 SQLITE_OPEN_READWRITE
| SQLITE_OPEN_CREATE
|
1154 SQLITE_OPEN_EXCLUSIVE
| SQLITE_OPEN_DELETEONCLOSE
, &rc
1156 if( rc
==SQLITE_OK
){
1157 i64 max
= SQLITE_MAX_MMAP_SIZE
;
1158 sqlite3OsFileControlHint(*ppFd
, SQLITE_FCNTL_MMAP_SIZE
, (void*)&max
);
1160 vdbeSorterExtendFile(db
, *ppFd
, nExtend
);
1167 ** If it has not already been allocated, allocate the UnpackedRecord
1168 ** structure at pTask->pUnpacked. Return SQLITE_OK if successful (or
1169 ** if no allocation was required), or SQLITE_NOMEM otherwise.
1171 static int vdbeSortAllocUnpacked(SortSubtask
*pTask
){
1172 if( pTask
->pUnpacked
==0 ){
1174 pTask
->pUnpacked
= sqlite3VdbeAllocUnpackedRecord(
1175 pTask
->pSorter
->pKeyInfo
, 0, 0, &pFree
1177 assert( pTask
->pUnpacked
==(UnpackedRecord
*)pFree
);
1178 if( pFree
==0 ) return SQLITE_NOMEM
;
1179 pTask
->pUnpacked
->nField
= pTask
->pSorter
->pKeyInfo
->nField
;
1180 pTask
->pUnpacked
->errCode
= 0;
1187 ** Merge the two sorted lists p1 and p2 into a single list.
1188 ** Set *ppOut to the head of the new list.
1190 static void vdbeSorterMerge(
1191 SortSubtask
*pTask
, /* Calling thread context */
1192 SorterRecord
*p1
, /* First list to merge */
1193 SorterRecord
*p2
, /* Second list to merge */
1194 SorterRecord
**ppOut
/* OUT: Head of merged list */
1196 SorterRecord
*pFinal
= 0;
1197 SorterRecord
**pp
= &pFinal
;
1198 void *pVal2
= p2
? SRVAL(p2
) : 0;
1202 res
= vdbeSorterCompare(pTask
, SRVAL(p1
), p1
->nVal
, pVal2
, p2
->nVal
);
1221 ** Sort the linked list of records headed at pTask->pList. Return
1222 ** SQLITE_OK if successful, or an SQLite error code (i.e. SQLITE_NOMEM) if
1225 static int vdbeSorterSort(SortSubtask
*pTask
, SorterList
*pList
){
1227 SorterRecord
**aSlot
;
1231 rc
= vdbeSortAllocUnpacked(pTask
);
1232 if( rc
!=SQLITE_OK
) return rc
;
1234 aSlot
= (SorterRecord
**)sqlite3MallocZero(64 * sizeof(SorterRecord
*));
1236 return SQLITE_NOMEM
;
1241 SorterRecord
*pNext
;
1242 if( pList
->aMemory
){
1243 if( (u8
*)p
==pList
->aMemory
){
1246 assert( p
->u
.iNext
<sqlite3MallocSize(pList
->aMemory
) );
1247 pNext
= (SorterRecord
*)&pList
->aMemory
[p
->u
.iNext
];
1254 for(i
=0; aSlot
[i
]; i
++){
1255 vdbeSorterMerge(pTask
, p
, aSlot
[i
], &p
);
1263 for(i
=0; i
<64; i
++){
1264 vdbeSorterMerge(pTask
, p
, aSlot
[i
], &p
);
1268 sqlite3_free(aSlot
);
1269 assert( pTask
->pUnpacked
->errCode
==SQLITE_OK
1270 || pTask
->pUnpacked
->errCode
==SQLITE_NOMEM
1272 return pTask
->pUnpacked
->errCode
;
1276 ** Initialize a PMA-writer object.
1278 static void vdbePmaWriterInit(
1279 sqlite3_file
*pFd
, /* File handle to write to */
1280 PmaWriter
*p
, /* Object to populate */
1281 int nBuf
, /* Buffer size */
1282 i64 iStart
/* Offset of pFd to begin writing at */
1284 memset(p
, 0, sizeof(PmaWriter
));
1285 p
->aBuffer
= (u8
*)sqlite3Malloc(nBuf
);
1287 p
->eFWErr
= SQLITE_NOMEM
;
1289 p
->iBufEnd
= p
->iBufStart
= (iStart
% nBuf
);
1290 p
->iWriteOff
= iStart
- p
->iBufStart
;
1297 ** Write nData bytes of data to the PMA. Return SQLITE_OK
1298 ** if successful, or an SQLite error code if an error occurs.
1300 static void vdbePmaWriteBlob(PmaWriter
*p
, u8
*pData
, int nData
){
1302 while( nRem
>0 && p
->eFWErr
==0 ){
1304 if( nCopy
>(p
->nBuffer
- p
->iBufEnd
) ){
1305 nCopy
= p
->nBuffer
- p
->iBufEnd
;
1308 memcpy(&p
->aBuffer
[p
->iBufEnd
], &pData
[nData
-nRem
], nCopy
);
1309 p
->iBufEnd
+= nCopy
;
1310 if( p
->iBufEnd
==p
->nBuffer
){
1311 p
->eFWErr
= sqlite3OsWrite(p
->pFd
,
1312 &p
->aBuffer
[p
->iBufStart
], p
->iBufEnd
- p
->iBufStart
,
1313 p
->iWriteOff
+ p
->iBufStart
1315 p
->iBufStart
= p
->iBufEnd
= 0;
1316 p
->iWriteOff
+= p
->nBuffer
;
1318 assert( p
->iBufEnd
<p
->nBuffer
);
1325 ** Flush any buffered data to disk and clean up the PMA-writer object.
1326 ** The results of using the PMA-writer after this call are undefined.
1327 ** Return SQLITE_OK if flushing the buffered data succeeds or is not
1328 ** required. Otherwise, return an SQLite error code.
1330 ** Before returning, set *piEof to the offset immediately following the
1331 ** last byte written to the file.
1333 static int vdbePmaWriterFinish(PmaWriter
*p
, i64
*piEof
){
1335 if( p
->eFWErr
==0 && ALWAYS(p
->aBuffer
) && p
->iBufEnd
>p
->iBufStart
){
1336 p
->eFWErr
= sqlite3OsWrite(p
->pFd
,
1337 &p
->aBuffer
[p
->iBufStart
], p
->iBufEnd
- p
->iBufStart
,
1338 p
->iWriteOff
+ p
->iBufStart
1341 *piEof
= (p
->iWriteOff
+ p
->iBufEnd
);
1342 sqlite3_free(p
->aBuffer
);
1344 memset(p
, 0, sizeof(PmaWriter
));
1349 ** Write value iVal encoded as a varint to the PMA. Return
1350 ** SQLITE_OK if successful, or an SQLite error code if an error occurs.
1352 static void vdbePmaWriteVarint(PmaWriter
*p
, u64 iVal
){
1355 nByte
= sqlite3PutVarint(aByte
, iVal
);
1356 vdbePmaWriteBlob(p
, aByte
, nByte
);
1360 ** Write the current contents of in-memory linked-list pList to a level-0
1361 ** PMA in the temp file belonging to sub-task pTask. Return SQLITE_OK if
1362 ** successful, or an SQLite error code otherwise.
1364 ** The format of a PMA is:
1366 ** * A varint. This varint contains the total number of bytes of content
1367 ** in the PMA (not including the varint itself).
1369 ** * One or more records packed end-to-end in order of ascending keys.
1370 ** Each record consists of a varint followed by a blob of data (the
1371 ** key). The varint is the number of bytes in the blob of data.
1373 static int vdbeSorterListToPMA(SortSubtask
*pTask
, SorterList
*pList
){
1374 sqlite3
*db
= pTask
->pSorter
->db
;
1375 int rc
= SQLITE_OK
; /* Return code */
1376 PmaWriter writer
; /* Object used to write to the file */
1379 /* Set iSz to the expected size of file pTask->file after writing the PMA.
1380 ** This is used by an assert() statement at the end of this function. */
1381 i64 iSz
= pList
->szPMA
+ sqlite3VarintLen(pList
->szPMA
) + pTask
->file
.iEof
;
1384 vdbeSorterWorkDebug(pTask
, "enter");
1385 memset(&writer
, 0, sizeof(PmaWriter
));
1386 assert( pList
->szPMA
>0 );
1388 /* If the first temporary PMA file has not been opened, open it now. */
1389 if( pTask
->file
.pFd
==0 ){
1390 rc
= vdbeSorterOpenTempFile(db
, 0, &pTask
->file
.pFd
);
1391 assert( rc
!=SQLITE_OK
|| pTask
->file
.pFd
);
1392 assert( pTask
->file
.iEof
==0 );
1393 assert( pTask
->nPMA
==0 );
1396 /* Try to get the file to memory map */
1397 if( rc
==SQLITE_OK
){
1398 vdbeSorterExtendFile(db
, pTask
->file
.pFd
, pTask
->file
.iEof
+pList
->szPMA
+9);
1402 if( rc
==SQLITE_OK
){
1403 rc
= vdbeSorterSort(pTask
, pList
);
1406 if( rc
==SQLITE_OK
){
1408 SorterRecord
*pNext
= 0;
1410 vdbePmaWriterInit(pTask
->file
.pFd
, &writer
, pTask
->pSorter
->pgsz
,
1413 vdbePmaWriteVarint(&writer
, pList
->szPMA
);
1414 for(p
=pList
->pList
; p
; p
=pNext
){
1416 vdbePmaWriteVarint(&writer
, p
->nVal
);
1417 vdbePmaWriteBlob(&writer
, SRVAL(p
), p
->nVal
);
1418 if( pList
->aMemory
==0 ) sqlite3_free(p
);
1421 rc
= vdbePmaWriterFinish(&writer
, &pTask
->file
.iEof
);
1424 vdbeSorterWorkDebug(pTask
, "exit");
1425 assert( rc
!=SQLITE_OK
|| pList
->pList
==0 );
1426 assert( rc
!=SQLITE_OK
|| pTask
->file
.iEof
==iSz
);
1431 ** Advance the MergeEngine to its next entry.
1432 ** Set *pbEof to true there is no next entry because
1433 ** the MergeEngine has reached the end of all its inputs.
1435 ** Return SQLITE_OK if successful or an error code if an error occurs.
1437 static int vdbeMergeEngineStep(
1438 MergeEngine
*pMerger
, /* The merge engine to advance to the next row */
1439 int *pbEof
/* Set TRUE at EOF. Set false for more content */
1442 int iPrev
= pMerger
->aTree
[1];/* Index of PmaReader to advance */
1443 SortSubtask
*pTask
= pMerger
->pTask
;
1445 /* Advance the current PmaReader */
1446 rc
= vdbePmaReaderNext(&pMerger
->aReadr
[iPrev
]);
1448 /* Update contents of aTree[] */
1449 if( rc
==SQLITE_OK
){
1450 int i
; /* Index of aTree[] to recalculate */
1451 PmaReader
*pReadr1
; /* First PmaReader to compare */
1452 PmaReader
*pReadr2
; /* Second PmaReader to compare */
1453 u8
*pKey2
; /* To pReadr2->aKey, or 0 if record cached */
1455 /* Find the first two PmaReaders to compare. The one that was just
1456 ** advanced (iPrev) and the one next to it in the array. */
1457 pReadr1
= &pMerger
->aReadr
[(iPrev
& 0xFFFE)];
1458 pReadr2
= &pMerger
->aReadr
[(iPrev
| 0x0001)];
1459 pKey2
= pReadr2
->aKey
;
1461 for(i
=(pMerger
->nTree
+iPrev
)/2; i
>0; i
=i
/2){
1462 /* Compare pReadr1 and pReadr2. Store the result in variable iRes. */
1464 if( pReadr1
->pFd
==0 ){
1466 }else if( pReadr2
->pFd
==0 ){
1469 iRes
= vdbeSorterCompare(pTask
,
1470 pReadr1
->aKey
, pReadr1
->nKey
, pKey2
, pReadr2
->nKey
1474 /* If pReadr1 contained the smaller value, set aTree[i] to its index.
1475 ** Then set pReadr2 to the next PmaReader to compare to pReadr1. In this
1476 ** case there is no cache of pReadr2 in pTask->pUnpacked, so set
1477 ** pKey2 to point to the record belonging to pReadr2.
1479 ** Alternatively, if pReadr2 contains the smaller of the two values,
1480 ** set aTree[i] to its index and update pReadr1. If vdbeSorterCompare()
1481 ** was actually called above, then pTask->pUnpacked now contains
1482 ** a value equivalent to pReadr2. So set pKey2 to NULL to prevent
1483 ** vdbeSorterCompare() from decoding pReadr2 again.
1485 ** If the two values were equal, then the value from the oldest
1486 ** PMA should be considered smaller. The VdbeSorter.aReadr[] array
1487 ** is sorted from oldest to newest, so pReadr1 contains older values
1488 ** than pReadr2 iff (pReadr1<pReadr2). */
1489 if( iRes
<0 || (iRes
==0 && pReadr1
<pReadr2
) ){
1490 pMerger
->aTree
[i
] = (int)(pReadr1
- pMerger
->aReadr
);
1491 pReadr2
= &pMerger
->aReadr
[ pMerger
->aTree
[i
^ 0x0001] ];
1492 pKey2
= pReadr2
->aKey
;
1494 if( pReadr1
->pFd
) pKey2
= 0;
1495 pMerger
->aTree
[i
] = (int)(pReadr2
- pMerger
->aReadr
);
1496 pReadr1
= &pMerger
->aReadr
[ pMerger
->aTree
[i
^ 0x0001] ];
1499 *pbEof
= (pMerger
->aReadr
[pMerger
->aTree
[1]].pFd
==0);
1502 return (rc
==SQLITE_OK
? pTask
->pUnpacked
->errCode
: rc
);
1505 #if SQLITE_MAX_WORKER_THREADS>0
1507 ** The main routine for background threads that write level-0 PMAs.
1509 static void *vdbeSorterFlushThread(void *pCtx
){
1510 SortSubtask
*pTask
= (SortSubtask
*)pCtx
;
1511 int rc
; /* Return code */
1512 assert( pTask
->bDone
==0 );
1513 rc
= vdbeSorterListToPMA(pTask
, &pTask
->list
);
1515 return SQLITE_INT_TO_PTR(rc
);
1517 #endif /* SQLITE_MAX_WORKER_THREADS>0 */
1520 ** Flush the current contents of VdbeSorter.list to a new PMA, possibly
1521 ** using a background thread.
1523 static int vdbeSorterFlushPMA(VdbeSorter
*pSorter
){
1524 #if SQLITE_MAX_WORKER_THREADS==0
1525 pSorter
->bUsePMA
= 1;
1526 return vdbeSorterListToPMA(&pSorter
->aTask
[0], &pSorter
->list
);
1530 SortSubtask
*pTask
= 0; /* Thread context used to create new PMA */
1531 int nWorker
= (pSorter
->nTask
-1);
1533 /* Set the flag to indicate that at least one PMA has been written.
1534 ** Or will be, anyhow. */
1535 pSorter
->bUsePMA
= 1;
1537 /* Select a sub-task to sort and flush the current list of in-memory
1538 ** records to disk. If the sorter is running in multi-threaded mode,
1539 ** round-robin between the first (pSorter->nTask-1) tasks. Except, if
1540 ** the background thread from a sub-tasks previous turn is still running,
1541 ** skip it. If the first (pSorter->nTask-1) sub-tasks are all still busy,
1542 ** fall back to using the final sub-task. The first (pSorter->nTask-1)
1543 ** sub-tasks are prefered as they use background threads - the final
1544 ** sub-task uses the main thread. */
1545 for(i
=0; i
<nWorker
; i
++){
1546 int iTest
= (pSorter
->iPrev
+ i
+ 1) % nWorker
;
1547 pTask
= &pSorter
->aTask
[iTest
];
1549 rc
= vdbeSorterJoinThread(pTask
);
1551 if( rc
!=SQLITE_OK
|| pTask
->pThread
==0 ) break;
1554 if( rc
==SQLITE_OK
){
1556 /* Use the foreground thread for this operation */
1557 rc
= vdbeSorterListToPMA(&pSorter
->aTask
[nWorker
], &pSorter
->list
);
1559 /* Launch a background thread for this operation */
1560 u8
*aMem
= pTask
->list
.aMemory
;
1561 void *pCtx
= (void*)pTask
;
1563 assert( pTask
->pThread
==0 && pTask
->bDone
==0 );
1564 assert( pTask
->list
.pList
==0 );
1565 assert( pTask
->list
.aMemory
==0 || pSorter
->list
.aMemory
!=0 );
1567 pSorter
->iPrev
= (u8
)(pTask
- pSorter
->aTask
);
1568 pTask
->list
= pSorter
->list
;
1569 pSorter
->list
.pList
= 0;
1570 pSorter
->list
.szPMA
= 0;
1572 pSorter
->list
.aMemory
= aMem
;
1573 pSorter
->nMemory
= sqlite3MallocSize(aMem
);
1574 }else if( pSorter
->list
.aMemory
){
1575 pSorter
->list
.aMemory
= sqlite3Malloc(pSorter
->nMemory
);
1576 if( !pSorter
->list
.aMemory
) return SQLITE_NOMEM
;
1579 rc
= vdbeSorterCreateThread(pTask
, vdbeSorterFlushThread
, pCtx
);
1584 #endif /* SQLITE_MAX_WORKER_THREADS!=0 */
1588 ** Add a record to the sorter.
1590 int sqlite3VdbeSorterWrite(
1591 const VdbeCursor
*pCsr
, /* Sorter cursor */
1592 Mem
*pVal
/* Memory cell containing record */
1594 VdbeSorter
*pSorter
= pCsr
->pSorter
;
1595 int rc
= SQLITE_OK
; /* Return Code */
1596 SorterRecord
*pNew
; /* New list element */
1598 int bFlush
; /* True to flush contents of memory to PMA */
1599 int nReq
; /* Bytes of memory required */
1600 int nPMA
; /* Bytes of PMA space required */
1604 /* Figure out whether or not the current contents of memory should be
1605 ** flushed to a PMA before continuing. If so, do so.
1607 ** If using the single large allocation mode (pSorter->aMemory!=0), then
1608 ** flush the contents of memory to a new PMA if (a) at least one value is
1609 ** already in memory and (b) the new value will not fit in memory.
1611 ** Or, if using separate allocations for each record, flush the contents
1612 ** of memory to a PMA if either of the following are true:
1614 ** * The total memory allocated for the in-memory list is greater
1615 ** than (page-size * cache-size), or
1617 ** * The total memory allocated for the in-memory list is greater
1618 ** than (page-size * 10) and sqlite3HeapNearlyFull() returns true.
1620 nReq
= pVal
->n
+ sizeof(SorterRecord
);
1621 nPMA
= pVal
->n
+ sqlite3VarintLen(pVal
->n
);
1622 if( pSorter
->mxPmaSize
){
1623 if( pSorter
->list
.aMemory
){
1624 bFlush
= pSorter
->iMemory
&& (pSorter
->iMemory
+nReq
) > pSorter
->mxPmaSize
;
1627 (pSorter
->list
.szPMA
> pSorter
->mxPmaSize
)
1628 || (pSorter
->list
.szPMA
> pSorter
->mnPmaSize
&& sqlite3HeapNearlyFull())
1632 rc
= vdbeSorterFlushPMA(pSorter
);
1633 pSorter
->list
.szPMA
= 0;
1634 pSorter
->iMemory
= 0;
1635 assert( rc
!=SQLITE_OK
|| pSorter
->list
.pList
==0 );
1639 pSorter
->list
.szPMA
+= nPMA
;
1640 if( nPMA
>pSorter
->mxKeysize
){
1641 pSorter
->mxKeysize
= nPMA
;
1644 if( pSorter
->list
.aMemory
){
1645 int nMin
= pSorter
->iMemory
+ nReq
;
1647 if( nMin
>pSorter
->nMemory
){
1649 int nNew
= pSorter
->nMemory
* 2;
1650 while( nNew
< nMin
) nNew
= nNew
*2;
1651 if( nNew
> pSorter
->mxPmaSize
) nNew
= pSorter
->mxPmaSize
;
1652 if( nNew
< nMin
) nNew
= nMin
;
1654 aNew
= sqlite3Realloc(pSorter
->list
.aMemory
, nNew
);
1655 if( !aNew
) return SQLITE_NOMEM
;
1656 pSorter
->list
.pList
= (SorterRecord
*)(
1657 aNew
+ ((u8
*)pSorter
->list
.pList
- pSorter
->list
.aMemory
)
1659 pSorter
->list
.aMemory
= aNew
;
1660 pSorter
->nMemory
= nNew
;
1663 pNew
= (SorterRecord
*)&pSorter
->list
.aMemory
[pSorter
->iMemory
];
1664 pSorter
->iMemory
+= ROUND8(nReq
);
1665 pNew
->u
.iNext
= (int)((u8
*)(pSorter
->list
.pList
) - pSorter
->list
.aMemory
);
1667 pNew
= (SorterRecord
*)sqlite3Malloc(nReq
);
1669 return SQLITE_NOMEM
;
1671 pNew
->u
.pNext
= pSorter
->list
.pList
;
1674 memcpy(SRVAL(pNew
), pVal
->z
, pVal
->n
);
1675 pNew
->nVal
= pVal
->n
;
1676 pSorter
->list
.pList
= pNew
;
1682 ** Read keys from pIncr->pMerger and populate pIncr->aFile[1]. The format
1683 ** of the data stored in aFile[1] is the same as that used by regular PMAs,
1684 ** except that the number-of-bytes varint is omitted from the start.
1686 static int vdbeIncrPopulate(IncrMerger
*pIncr
){
1689 i64 iStart
= pIncr
->iStartOff
;
1690 SorterFile
*pOut
= &pIncr
->aFile
[1];
1691 SortSubtask
*pTask
= pIncr
->pTask
;
1692 MergeEngine
*pMerger
= pIncr
->pMerger
;
1694 assert( pIncr
->bEof
==0 );
1696 vdbeSorterPopulateDebug(pTask
, "enter");
1698 vdbePmaWriterInit(pOut
->pFd
, &writer
, pTask
->pSorter
->pgsz
, iStart
);
1699 while( rc
==SQLITE_OK
){
1701 PmaReader
*pReader
= &pMerger
->aReadr
[ pMerger
->aTree
[1] ];
1702 int nKey
= pReader
->nKey
;
1703 i64 iEof
= writer
.iWriteOff
+ writer
.iBufEnd
;
1705 /* Check if the output file is full or if the input has been exhausted.
1706 ** In either case exit the loop. */
1707 if( pReader
->pFd
==0 ) break;
1708 if( (iEof
+ nKey
+ sqlite3VarintLen(nKey
))>(iStart
+ pIncr
->mxSz
) ) break;
1710 /* Write the next key to the output. */
1711 vdbePmaWriteVarint(&writer
, nKey
);
1712 vdbePmaWriteBlob(&writer
, pReader
->aKey
, nKey
);
1713 assert( pIncr
->pMerger
->pTask
==pTask
);
1714 rc
= vdbeMergeEngineStep(pIncr
->pMerger
, &dummy
);
1717 rc2
= vdbePmaWriterFinish(&writer
, &pOut
->iEof
);
1718 if( rc
==SQLITE_OK
) rc
= rc2
;
1719 vdbeSorterPopulateDebug(pTask
, "exit");
1723 #if SQLITE_MAX_WORKER_THREADS>0
1725 ** The main routine for background threads that populate aFile[1] of
1726 ** multi-threaded IncrMerger objects.
1728 static void *vdbeIncrPopulateThread(void *pCtx
){
1729 IncrMerger
*pIncr
= (IncrMerger
*)pCtx
;
1730 void *pRet
= SQLITE_INT_TO_PTR( vdbeIncrPopulate(pIncr
) );
1731 pIncr
->pTask
->bDone
= 1;
1736 ** Launch a background thread to populate aFile[1] of pIncr.
1738 static int vdbeIncrBgPopulate(IncrMerger
*pIncr
){
1739 void *p
= (void*)pIncr
;
1740 assert( pIncr
->bUseThread
);
1741 return vdbeSorterCreateThread(pIncr
->pTask
, vdbeIncrPopulateThread
, p
);
1746 ** This function is called when the PmaReader corresponding to pIncr has
1747 ** finished reading the contents of aFile[0]. Its purpose is to "refill"
1748 ** aFile[0] such that the PmaReader should start rereading it from the
1751 ** For single-threaded objects, this is accomplished by literally reading
1752 ** keys from pIncr->pMerger and repopulating aFile[0].
1754 ** For multi-threaded objects, all that is required is to wait until the
1755 ** background thread is finished (if it is not already) and then swap
1756 ** aFile[0] and aFile[1] in place. If the contents of pMerger have not
1757 ** been exhausted, this function also launches a new background thread
1758 ** to populate the new aFile[1].
1760 ** SQLITE_OK is returned on success, or an SQLite error code otherwise.
1762 static int vdbeIncrSwap(IncrMerger
*pIncr
){
1765 #if SQLITE_MAX_WORKER_THREADS>0
1766 if( pIncr
->bUseThread
){
1767 rc
= vdbeSorterJoinThread(pIncr
->pTask
);
1769 if( rc
==SQLITE_OK
){
1770 SorterFile f0
= pIncr
->aFile
[0];
1771 pIncr
->aFile
[0] = pIncr
->aFile
[1];
1772 pIncr
->aFile
[1] = f0
;
1775 if( rc
==SQLITE_OK
){
1776 if( pIncr
->aFile
[0].iEof
==pIncr
->iStartOff
){
1779 rc
= vdbeIncrBgPopulate(pIncr
);
1785 rc
= vdbeIncrPopulate(pIncr
);
1786 pIncr
->aFile
[0] = pIncr
->aFile
[1];
1787 if( pIncr
->aFile
[0].iEof
==pIncr
->iStartOff
){
1796 ** Allocate and return a new IncrMerger object to read data from pMerger.
1798 ** If an OOM condition is encountered, return NULL. In this case free the
1799 ** pMerger argument before returning.
1801 static int vdbeIncrMergerNew(
1802 SortSubtask
*pTask
, /* The thread that will be using the new IncrMerger */
1803 MergeEngine
*pMerger
, /* The MergeEngine that the IncrMerger will control */
1804 IncrMerger
**ppOut
/* Write the new IncrMerger here */
1807 IncrMerger
*pIncr
= *ppOut
= (IncrMerger
*)
1808 (sqlite3FaultSim(100) ? 0 : sqlite3MallocZero(sizeof(*pIncr
)));
1810 pIncr
->pMerger
= pMerger
;
1811 pIncr
->pTask
= pTask
;
1812 pIncr
->mxSz
= MAX(pTask
->pSorter
->mxKeysize
+9,pTask
->pSorter
->mxPmaSize
/2);
1813 pTask
->file2
.iEof
+= pIncr
->mxSz
;
1815 vdbeMergeEngineFree(pMerger
);
1821 #if SQLITE_MAX_WORKER_THREADS>0
1823 ** Set the "use-threads" flag on object pIncr.
1825 static void vdbeIncrMergerSetThreads(IncrMerger
*pIncr
){
1826 pIncr
->bUseThread
= 1;
1827 pIncr
->pTask
->file2
.iEof
-= pIncr
->mxSz
;
1829 #endif /* SQLITE_MAX_WORKER_THREADS>0 */
1834 ** Recompute pMerger->aTree[iOut] by comparing the next keys on the
1835 ** two PmaReaders that feed that entry. Neither of the PmaReaders
1836 ** are advanced. This routine merely does the comparison.
1838 static void vdbeMergeEngineCompare(
1839 MergeEngine
*pMerger
, /* Merge engine containing PmaReaders to compare */
1840 int iOut
/* Store the result in pMerger->aTree[iOut] */
1848 assert( iOut
<pMerger
->nTree
&& iOut
>0 );
1850 if( iOut
>=(pMerger
->nTree
/2) ){
1851 i1
= (iOut
- pMerger
->nTree
/2) * 2;
1854 i1
= pMerger
->aTree
[iOut
*2];
1855 i2
= pMerger
->aTree
[iOut
*2+1];
1858 p1
= &pMerger
->aReadr
[i1
];
1859 p2
= &pMerger
->aReadr
[i2
];
1863 }else if( p2
->pFd
==0 ){
1867 assert( pMerger
->pTask
->pUnpacked
!=0 ); /* from vdbeSortSubtaskMain() */
1868 res
= vdbeSorterCompare(
1869 pMerger
->pTask
, p1
->aKey
, p1
->nKey
, p2
->aKey
, p2
->nKey
1878 pMerger
->aTree
[iOut
] = iRes
;
1882 ** Allowed values for the eMode parameter to vdbeMergeEngineInit()
1883 ** and vdbePmaReaderIncrMergeInit().
1885 ** Only INCRINIT_NORMAL is valid in single-threaded builds (when
1886 ** SQLITE_MAX_WORKER_THREADS==0). The other values are only used
1887 ** when there exists one or more separate worker threads.
1889 #define INCRINIT_NORMAL 0
1890 #define INCRINIT_TASK 1
1891 #define INCRINIT_ROOT 2
1893 /* Forward reference.
1894 ** The vdbeIncrMergeInit() and vdbePmaReaderIncrMergeInit() routines call each
1895 ** other (when building a merge tree).
1897 static int vdbePmaReaderIncrMergeInit(PmaReader
*pReadr
, int eMode
);
1900 ** Initialize the MergeEngine object passed as the second argument. Once this
1901 ** function returns, the first key of merged data may be read from the
1902 ** MergeEngine object in the usual fashion.
1904 ** If argument eMode is INCRINIT_ROOT, then it is assumed that any IncrMerge
1905 ** objects attached to the PmaReader objects that the merger reads from have
1906 ** already been populated, but that they have not yet populated aFile[0] and
1907 ** set the PmaReader objects up to read from it. In this case all that is
1908 ** required is to call vdbePmaReaderNext() on each PmaReader to point it at
1911 ** Otherwise, if eMode is any value other than INCRINIT_ROOT, then use
1912 ** vdbePmaReaderIncrMergeInit() to initialize each PmaReader that feeds data
1915 ** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
1917 static int vdbeMergeEngineInit(
1918 SortSubtask
*pTask
, /* Thread that will run pMerger */
1919 MergeEngine
*pMerger
, /* MergeEngine to initialize */
1920 int eMode
/* One of the INCRINIT_XXX constants */
1922 int rc
= SQLITE_OK
; /* Return code */
1923 int i
; /* For looping over PmaReader objects */
1924 int nTree
= pMerger
->nTree
;
1926 /* eMode is always INCRINIT_NORMAL in single-threaded mode */
1927 assert( SQLITE_MAX_WORKER_THREADS
>0 || eMode
==INCRINIT_NORMAL
);
1929 /* Verify that the MergeEngine is assigned to a single thread */
1930 assert( pMerger
->pTask
==0 );
1931 pMerger
->pTask
= pTask
;
1933 for(i
=0; i
<nTree
; i
++){
1934 if( SQLITE_MAX_WORKER_THREADS
>0 && eMode
==INCRINIT_ROOT
){
1935 /* PmaReaders should be normally initialized in order, as if they are
1936 ** reading from the same temp file this makes for more linear file IO.
1937 ** However, in the INCRINIT_ROOT case, if PmaReader aReadr[nTask-1] is
1938 ** in use it will block the vdbePmaReaderNext() call while it uses
1939 ** the main thread to fill its buffer. So calling PmaReaderNext()
1940 ** on this PmaReader before any of the multi-threaded PmaReaders takes
1941 ** better advantage of multi-processor hardware. */
1942 rc
= vdbePmaReaderNext(&pMerger
->aReadr
[nTree
-i
-1]);
1944 rc
= vdbePmaReaderIncrMergeInit(&pMerger
->aReadr
[i
], INCRINIT_NORMAL
);
1946 if( rc
!=SQLITE_OK
) return rc
;
1949 for(i
=pMerger
->nTree
-1; i
>0; i
--){
1950 vdbeMergeEngineCompare(pMerger
, i
);
1952 return pTask
->pUnpacked
->errCode
;
1956 ** Initialize the IncrMerge field of a PmaReader.
1958 ** If the PmaReader passed as the first argument is not an incremental-reader
1959 ** (if pReadr->pIncr==0), then this function is a no-op. Otherwise, it serves
1960 ** to open and/or initialize the temp file related fields of the IncrMerge
1961 ** object at (pReadr->pIncr).
1963 ** If argument eMode is set to INCRINIT_NORMAL, then all PmaReaders
1964 ** in the sub-tree headed by pReadr are also initialized. Data is then loaded
1965 ** into the buffers belonging to pReadr and it is set to
1966 ** point to the first key in its range.
1968 ** If argument eMode is set to INCRINIT_TASK, then pReadr is guaranteed
1969 ** to be a multi-threaded PmaReader and this function is being called in a
1970 ** background thread. In this case all PmaReaders in the sub-tree are
1971 ** initialized as for INCRINIT_NORMAL and the aFile[1] buffer belonging to
1972 ** pReadr is populated. However, pReadr itself is not set up to point
1973 ** to its first key. A call to vdbePmaReaderNext() is still required to do
1976 ** The reason this function does not call vdbePmaReaderNext() immediately
1977 ** in the INCRINIT_TASK case is that vdbePmaReaderNext() assumes that it has
1978 ** to block on thread (pTask->thread) before accessing aFile[1]. But, since
1979 ** this entire function is being run by thread (pTask->thread), that will
1980 ** lead to the current background thread attempting to join itself.
1982 ** Finally, if argument eMode is set to INCRINIT_ROOT, it may be assumed
1983 ** that pReadr->pIncr is a multi-threaded IncrMerge objects, and that all
1984 ** child-trees have already been initialized using IncrInit(INCRINIT_TASK).
1985 ** In this case vdbePmaReaderNext() is called on all child PmaReaders and
1986 ** the current PmaReader set to point to the first key in its range.
1988 ** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
1990 static int vdbePmaReaderIncrMergeInit(PmaReader
*pReadr
, int eMode
){
1992 IncrMerger
*pIncr
= pReadr
->pIncr
;
1994 /* eMode is always INCRINIT_NORMAL in single-threaded mode */
1995 assert( SQLITE_MAX_WORKER_THREADS
>0 || eMode
==INCRINIT_NORMAL
);
1998 SortSubtask
*pTask
= pIncr
->pTask
;
1999 sqlite3
*db
= pTask
->pSorter
->db
;
2001 rc
= vdbeMergeEngineInit(pTask
, pIncr
->pMerger
, eMode
);
2003 /* Set up the required files for pIncr. A multi-theaded IncrMerge object
2004 ** requires two temp files to itself, whereas a single-threaded object
2005 ** only requires a region of pTask->file2. */
2006 if( rc
==SQLITE_OK
){
2007 int mxSz
= pIncr
->mxSz
;
2008 #if SQLITE_MAX_WORKER_THREADS>0
2009 if( pIncr
->bUseThread
){
2010 rc
= vdbeSorterOpenTempFile(db
, mxSz
, &pIncr
->aFile
[0].pFd
);
2011 if( rc
==SQLITE_OK
){
2012 rc
= vdbeSorterOpenTempFile(db
, mxSz
, &pIncr
->aFile
[1].pFd
);
2016 /*if( !pIncr->bUseThread )*/{
2017 if( pTask
->file2
.pFd
==0 ){
2018 assert( pTask
->file2
.iEof
>0 );
2019 rc
= vdbeSorterOpenTempFile(db
, pTask
->file2
.iEof
, &pTask
->file2
.pFd
);
2020 pTask
->file2
.iEof
= 0;
2022 if( rc
==SQLITE_OK
){
2023 pIncr
->aFile
[1].pFd
= pTask
->file2
.pFd
;
2024 pIncr
->iStartOff
= pTask
->file2
.iEof
;
2025 pTask
->file2
.iEof
+= mxSz
;
2030 #if SQLITE_MAX_WORKER_THREADS>0
2031 if( rc
==SQLITE_OK
&& pIncr
->bUseThread
){
2032 /* Use the current thread to populate aFile[1], even though this
2033 ** PmaReader is multi-threaded. The reason being that this function
2034 ** is already running in background thread pIncr->pTask->thread. */
2035 assert( eMode
==INCRINIT_ROOT
|| eMode
==INCRINIT_TASK
);
2036 rc
= vdbeIncrPopulate(pIncr
);
2041 && (SQLITE_MAX_WORKER_THREADS
==0 || eMode
!=INCRINIT_TASK
)
2043 rc
= vdbePmaReaderNext(pReadr
);
2049 #if SQLITE_MAX_WORKER_THREADS>0
2051 ** The main routine for vdbePmaReaderIncrMergeInit() operations run in
2052 ** background threads.
2054 static void *vdbePmaReaderBgInit(void *pCtx
){
2055 PmaReader
*pReader
= (PmaReader
*)pCtx
;
2056 void *pRet
= SQLITE_INT_TO_PTR(
2057 vdbePmaReaderIncrMergeInit(pReader
,INCRINIT_TASK
)
2059 pReader
->pIncr
->pTask
->bDone
= 1;
2064 ** Use a background thread to invoke vdbePmaReaderIncrMergeInit(INCRINIT_TASK)
2065 ** on the PmaReader object passed as the first argument.
2067 ** This call will initialize the various fields of the pReadr->pIncr
2068 ** structure and, if it is a multi-threaded IncrMerger, launch a
2069 ** background thread to populate aFile[1].
2071 static int vdbePmaReaderBgIncrInit(PmaReader
*pReadr
){
2072 void *pCtx
= (void*)pReadr
;
2073 return vdbeSorterCreateThread(pReadr
->pIncr
->pTask
, vdbePmaReaderBgInit
, pCtx
);
2078 ** Allocate a new MergeEngine object to merge the contents of nPMA level-0
2079 ** PMAs from pTask->file. If no error occurs, set *ppOut to point to
2080 ** the new object and return SQLITE_OK. Or, if an error does occur, set *ppOut
2081 ** to NULL and return an SQLite error code.
2083 ** When this function is called, *piOffset is set to the offset of the
2084 ** first PMA to read from pTask->file. Assuming no error occurs, it is
2085 ** set to the offset immediately following the last byte of the last
2086 ** PMA before returning. If an error does occur, then the final value of
2087 ** *piOffset is undefined.
2089 static int vdbeMergeEngineLevel0(
2090 SortSubtask
*pTask
, /* Sorter task to read from */
2091 int nPMA
, /* Number of PMAs to read */
2092 i64
*piOffset
, /* IN/OUT: Readr offset in pTask->file */
2093 MergeEngine
**ppOut
/* OUT: New merge-engine */
2095 MergeEngine
*pNew
; /* Merge engine to return */
2096 i64 iOff
= *piOffset
;
2100 *ppOut
= pNew
= vdbeMergeEngineNew(nPMA
);
2101 if( pNew
==0 ) rc
= SQLITE_NOMEM
;
2103 for(i
=0; i
<nPMA
&& rc
==SQLITE_OK
; i
++){
2105 PmaReader
*pReadr
= &pNew
->aReadr
[i
];
2106 rc
= vdbePmaReaderInit(pTask
, &pTask
->file
, iOff
, pReadr
, &nDummy
);
2107 iOff
= pReadr
->iEof
;
2110 if( rc
!=SQLITE_OK
){
2111 vdbeMergeEngineFree(pNew
);
2119 ** Return the depth of a tree comprising nPMA PMAs, assuming a fanout of
2120 ** SORTER_MAX_MERGE_COUNT. The returned value does not include leaf nodes.
2124 ** nPMA<=16 -> TreeDepth() == 0
2125 ** nPMA<=256 -> TreeDepth() == 1
2126 ** nPMA<=65536 -> TreeDepth() == 2
2128 static int vdbeSorterTreeDepth(int nPMA
){
2130 i64 nDiv
= SORTER_MAX_MERGE_COUNT
;
2131 while( nDiv
< (i64
)nPMA
){
2132 nDiv
= nDiv
* SORTER_MAX_MERGE_COUNT
;
2139 ** pRoot is the root of an incremental merge-tree with depth nDepth (according
2140 ** to vdbeSorterTreeDepth()). pLeaf is the iSeq'th leaf to be added to the
2141 ** tree, counting from zero. This function adds pLeaf to the tree.
2143 ** If successful, SQLITE_OK is returned. If an error occurs, an SQLite error
2144 ** code is returned and pLeaf is freed.
2146 static int vdbeSorterAddToTree(
2147 SortSubtask
*pTask
, /* Task context */
2148 int nDepth
, /* Depth of tree according to TreeDepth() */
2149 int iSeq
, /* Sequence number of leaf within tree */
2150 MergeEngine
*pRoot
, /* Root of tree */
2151 MergeEngine
*pLeaf
/* Leaf to add to tree */
2156 MergeEngine
*p
= pRoot
;
2159 rc
= vdbeIncrMergerNew(pTask
, pLeaf
, &pIncr
);
2161 for(i
=1; i
<nDepth
; i
++){
2162 nDiv
= nDiv
* SORTER_MAX_MERGE_COUNT
;
2165 for(i
=1; i
<nDepth
&& rc
==SQLITE_OK
; i
++){
2166 int iIter
= (iSeq
/ nDiv
) % SORTER_MAX_MERGE_COUNT
;
2167 PmaReader
*pReadr
= &p
->aReadr
[iIter
];
2169 if( pReadr
->pIncr
==0 ){
2170 MergeEngine
*pNew
= vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT
);
2174 rc
= vdbeIncrMergerNew(pTask
, pNew
, &pReadr
->pIncr
);
2177 if( rc
==SQLITE_OK
){
2178 p
= pReadr
->pIncr
->pMerger
;
2179 nDiv
= nDiv
/ SORTER_MAX_MERGE_COUNT
;
2183 if( rc
==SQLITE_OK
){
2184 p
->aReadr
[iSeq
% SORTER_MAX_MERGE_COUNT
].pIncr
= pIncr
;
2186 vdbeIncrFree(pIncr
);
2192 ** This function is called as part of a SorterRewind() operation on a sorter
2193 ** that has already written two or more level-0 PMAs to one or more temp
2194 ** files. It builds a tree of MergeEngine/IncrMerger/PmaReader objects that
2195 ** can be used to incrementally merge all PMAs on disk.
2197 ** If successful, SQLITE_OK is returned and *ppOut set to point to the
2198 ** MergeEngine object at the root of the tree before returning. Or, if an
2199 ** error occurs, an SQLite error code is returned and the final value
2200 ** of *ppOut is undefined.
2202 static int vdbeSorterMergeTreeBuild(
2203 VdbeSorter
*pSorter
, /* The VDBE cursor that implements the sort */
2204 MergeEngine
**ppOut
/* Write the MergeEngine here */
2206 MergeEngine
*pMain
= 0;
2210 #if SQLITE_MAX_WORKER_THREADS>0
2211 /* If the sorter uses more than one task, then create the top-level
2212 ** MergeEngine here. This MergeEngine will read data from exactly
2213 ** one PmaReader per sub-task. */
2214 assert( pSorter
->bUseThreads
|| pSorter
->nTask
==1 );
2215 if( pSorter
->nTask
>1 ){
2216 pMain
= vdbeMergeEngineNew(pSorter
->nTask
);
2217 if( pMain
==0 ) rc
= SQLITE_NOMEM
;
2221 for(iTask
=0; rc
==SQLITE_OK
&& iTask
<pSorter
->nTask
; iTask
++){
2222 SortSubtask
*pTask
= &pSorter
->aTask
[iTask
];
2223 assert( pTask
->nPMA
>0 || SQLITE_MAX_WORKER_THREADS
>0 );
2224 if( SQLITE_MAX_WORKER_THREADS
==0 || pTask
->nPMA
){
2225 MergeEngine
*pRoot
= 0; /* Root node of tree for this task */
2226 int nDepth
= vdbeSorterTreeDepth(pTask
->nPMA
);
2229 if( pTask
->nPMA
<=SORTER_MAX_MERGE_COUNT
){
2230 rc
= vdbeMergeEngineLevel0(pTask
, pTask
->nPMA
, &iReadOff
, &pRoot
);
2234 pRoot
= vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT
);
2235 if( pRoot
==0 ) rc
= SQLITE_NOMEM
;
2236 for(i
=0; i
<pTask
->nPMA
&& rc
==SQLITE_OK
; i
+= SORTER_MAX_MERGE_COUNT
){
2237 MergeEngine
*pMerger
= 0; /* New level-0 PMA merger */
2238 int nReader
; /* Number of level-0 PMAs to merge */
2240 nReader
= MIN(pTask
->nPMA
- i
, SORTER_MAX_MERGE_COUNT
);
2241 rc
= vdbeMergeEngineLevel0(pTask
, nReader
, &iReadOff
, &pMerger
);
2242 if( rc
==SQLITE_OK
){
2243 rc
= vdbeSorterAddToTree(pTask
, nDepth
, iSeq
++, pRoot
, pMerger
);
2248 if( rc
==SQLITE_OK
){
2249 #if SQLITE_MAX_WORKER_THREADS>0
2251 rc
= vdbeIncrMergerNew(pTask
, pRoot
, &pMain
->aReadr
[iTask
].pIncr
);
2259 vdbeMergeEngineFree(pRoot
);
2264 if( rc
!=SQLITE_OK
){
2265 vdbeMergeEngineFree(pMain
);
2273 ** This function is called as part of an sqlite3VdbeSorterRewind() operation
2274 ** on a sorter that has written two or more PMAs to temporary files. It sets
2275 ** up either VdbeSorter.pMerger (for single threaded sorters) or pReader
2276 ** (for multi-threaded sorters) so that it can be used to iterate through
2277 ** all records stored in the sorter.
2279 ** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
2281 static int vdbeSorterSetupMerge(VdbeSorter
*pSorter
){
2282 int rc
; /* Return code */
2283 SortSubtask
*pTask0
= &pSorter
->aTask
[0];
2284 MergeEngine
*pMain
= 0;
2285 #if SQLITE_MAX_WORKER_THREADS
2286 sqlite3
*db
= pTask0
->pSorter
->db
;
2289 rc
= vdbeSorterMergeTreeBuild(pSorter
, &pMain
);
2290 if( rc
==SQLITE_OK
){
2291 #if SQLITE_MAX_WORKER_THREADS
2292 assert( pSorter
->bUseThreads
==0 || pSorter
->nTask
>1 );
2293 if( pSorter
->bUseThreads
){
2295 PmaReader
*pReadr
= 0;
2296 SortSubtask
*pLast
= &pSorter
->aTask
[pSorter
->nTask
-1];
2297 rc
= vdbeSortAllocUnpacked(pLast
);
2298 if( rc
==SQLITE_OK
){
2299 pReadr
= (PmaReader
*)sqlite3DbMallocZero(db
, sizeof(PmaReader
));
2300 pSorter
->pReader
= pReadr
;
2301 if( pReadr
==0 ) rc
= SQLITE_NOMEM
;
2303 if( rc
==SQLITE_OK
){
2304 rc
= vdbeIncrMergerNew(pLast
, pMain
, &pReadr
->pIncr
);
2305 if( rc
==SQLITE_OK
){
2306 vdbeIncrMergerSetThreads(pReadr
->pIncr
);
2307 for(iTask
=0; iTask
<(pSorter
->nTask
-1); iTask
++){
2309 if( (pIncr
= pMain
->aReadr
[iTask
].pIncr
) ){
2310 vdbeIncrMergerSetThreads(pIncr
);
2311 assert( pIncr
->pTask
!=pLast
);
2314 for(iTask
=0; rc
==SQLITE_OK
&& iTask
<pSorter
->nTask
; iTask
++){
2315 PmaReader
*p
= &pMain
->aReadr
[iTask
];
2316 assert( p
->pIncr
==0 || p
->pIncr
->pTask
==&pSorter
->aTask
[iTask
] );
2318 if( iTask
==pSorter
->nTask
-1 ){
2319 rc
= vdbePmaReaderIncrMergeInit(p
, INCRINIT_TASK
);
2321 rc
= vdbePmaReaderBgIncrInit(p
);
2328 if( rc
==SQLITE_OK
){
2329 rc
= vdbePmaReaderIncrMergeInit(pReadr
, INCRINIT_ROOT
);
2334 rc
= vdbeMergeEngineInit(pTask0
, pMain
, INCRINIT_NORMAL
);
2335 pSorter
->pMerger
= pMain
;
2340 if( rc
!=SQLITE_OK
){
2341 vdbeMergeEngineFree(pMain
);
2348 ** Once the sorter has been populated by calls to sqlite3VdbeSorterWrite,
2349 ** this function is called to prepare for iterating through the records
2352 int sqlite3VdbeSorterRewind(const VdbeCursor
*pCsr
, int *pbEof
){
2353 VdbeSorter
*pSorter
= pCsr
->pSorter
;
2354 int rc
= SQLITE_OK
; /* Return code */
2358 /* If no data has been written to disk, then do not do so now. Instead,
2359 ** sort the VdbeSorter.pRecord list. The vdbe layer will read data directly
2360 ** from the in-memory list. */
2361 if( pSorter
->bUsePMA
==0 ){
2362 if( pSorter
->list
.pList
){
2364 rc
= vdbeSorterSort(&pSorter
->aTask
[0], &pSorter
->list
);
2371 /* Write the current in-memory list to a PMA. When the VdbeSorterWrite()
2372 ** function flushes the contents of memory to disk, it immediately always
2373 ** creates a new list consisting of a single key immediately afterwards.
2374 ** So the list is never empty at this point. */
2375 assert( pSorter
->list
.pList
);
2376 rc
= vdbeSorterFlushPMA(pSorter
);
2378 /* Join all threads */
2379 rc
= vdbeSorterJoinAll(pSorter
, rc
);
2381 vdbeSorterRewindDebug("rewind");
2383 /* Assuming no errors have occurred, set up a merger structure to
2384 ** incrementally read and merge all remaining PMAs. */
2385 assert( pSorter
->pReader
==0 );
2386 if( rc
==SQLITE_OK
){
2387 rc
= vdbeSorterSetupMerge(pSorter
);
2391 vdbeSorterRewindDebug("rewinddone");
2396 ** Advance to the next element in the sorter.
2398 int sqlite3VdbeSorterNext(sqlite3
*db
, const VdbeCursor
*pCsr
, int *pbEof
){
2399 VdbeSorter
*pSorter
= pCsr
->pSorter
;
2400 int rc
; /* Return code */
2402 assert( pSorter
->bUsePMA
|| (pSorter
->pReader
==0 && pSorter
->pMerger
==0) );
2403 if( pSorter
->bUsePMA
){
2404 assert( pSorter
->pReader
==0 || pSorter
->pMerger
==0 );
2405 assert( pSorter
->bUseThreads
==0 || pSorter
->pReader
);
2406 assert( pSorter
->bUseThreads
==1 || pSorter
->pMerger
);
2407 #if SQLITE_MAX_WORKER_THREADS>0
2408 if( pSorter
->bUseThreads
){
2409 rc
= vdbePmaReaderNext(pSorter
->pReader
);
2410 *pbEof
= (pSorter
->pReader
->pFd
==0);
2413 /*if( !pSorter->bUseThreads )*/ {
2414 assert( pSorter
->pMerger
->pTask
==(&pSorter
->aTask
[0]) );
2415 rc
= vdbeMergeEngineStep(pSorter
->pMerger
, pbEof
);
2418 SorterRecord
*pFree
= pSorter
->list
.pList
;
2419 pSorter
->list
.pList
= pFree
->u
.pNext
;
2421 if( pSorter
->list
.aMemory
==0 ) vdbeSorterRecordFree(db
, pFree
);
2422 *pbEof
= !pSorter
->list
.pList
;
2429 ** Return a pointer to a buffer owned by the sorter that contains the
2432 static void *vdbeSorterRowkey(
2433 const VdbeSorter
*pSorter
, /* Sorter object */
2434 int *pnKey
/* OUT: Size of current key in bytes */
2437 if( pSorter
->bUsePMA
){
2439 #if SQLITE_MAX_WORKER_THREADS>0
2440 if( pSorter
->bUseThreads
){
2441 pReader
= pSorter
->pReader
;
2444 /*if( !pSorter->bUseThreads )*/{
2445 pReader
= &pSorter
->pMerger
->aReadr
[pSorter
->pMerger
->aTree
[1]];
2447 *pnKey
= pReader
->nKey
;
2448 pKey
= pReader
->aKey
;
2450 *pnKey
= pSorter
->list
.pList
->nVal
;
2451 pKey
= SRVAL(pSorter
->list
.pList
);
2457 ** Copy the current sorter key into the memory cell pOut.
2459 int sqlite3VdbeSorterRowkey(const VdbeCursor
*pCsr
, Mem
*pOut
){
2460 VdbeSorter
*pSorter
= pCsr
->pSorter
;
2461 void *pKey
; int nKey
; /* Sorter key to copy into pOut */
2463 pKey
= vdbeSorterRowkey(pSorter
, &nKey
);
2464 if( sqlite3VdbeMemClearAndResize(pOut
, nKey
) ){
2465 return SQLITE_NOMEM
;
2468 MemSetTypeFlag(pOut
, MEM_Blob
);
2469 memcpy(pOut
->z
, pKey
, nKey
);
2475 ** Compare the key in memory cell pVal with the key that the sorter cursor
2476 ** passed as the first argument currently points to. For the purposes of
2477 ** the comparison, ignore the rowid field at the end of each record.
2479 ** If the sorter cursor key contains any NULL values, consider it to be
2480 ** less than pVal. Even if pVal also contains NULL values.
2482 ** If an error occurs, return an SQLite error code (i.e. SQLITE_NOMEM).
2483 ** Otherwise, set *pRes to a negative, zero or positive value if the
2484 ** key in pVal is smaller than, equal to or larger than the current sorter
2487 ** This routine forms the core of the OP_SorterCompare opcode, which in
2488 ** turn is used to verify uniqueness when constructing a UNIQUE INDEX.
2490 int sqlite3VdbeSorterCompare(
2491 const VdbeCursor
*pCsr
, /* Sorter cursor */
2492 Mem
*pVal
, /* Value to compare to current sorter key */
2493 int nKeyCol
, /* Compare this many columns */
2494 int *pRes
/* OUT: Result of comparison */
2496 VdbeSorter
*pSorter
= pCsr
->pSorter
;
2497 UnpackedRecord
*r2
= pSorter
->pUnpacked
;
2498 KeyInfo
*pKeyInfo
= pCsr
->pKeyInfo
;
2500 void *pKey
; int nKey
; /* Sorter key to compare pVal with */
2504 r2
= pSorter
->pUnpacked
= sqlite3VdbeAllocUnpackedRecord(pKeyInfo
,0,0,&p
);
2505 assert( pSorter
->pUnpacked
==(UnpackedRecord
*)p
);
2506 if( r2
==0 ) return SQLITE_NOMEM
;
2507 r2
->nField
= nKeyCol
;
2509 assert( r2
->nField
==nKeyCol
);
2511 pKey
= vdbeSorterRowkey(pSorter
, &nKey
);
2512 sqlite3VdbeRecordUnpack(pKeyInfo
, nKey
, pKey
, r2
);
2513 for(i
=0; i
<nKeyCol
; i
++){
2514 if( r2
->aMem
[i
].flags
& MEM_Null
){
2520 *pRes
= sqlite3VdbeRecordCompare(pVal
->n
, pVal
->z
, r2
);