4 ** The author disclaims copyright to this source code. In place of
5 ** a legal notice, here is a blessing:
7 ** May you do good and not evil.
8 ** May you find forgiveness for yourself and forgive others.
9 ** May you share freely, never taking more than you give.
11 *************************************************************************
13 ** This file contains the implementation of an in-memory tree structure.
15 ** Technically the tree is a B-tree of order 4 (in the Knuth sense - each
16 ** node may have up to 4 children). Keys are stored within B-tree nodes by
17 ** reference. This may be slightly slower than a conventional red-black
18 ** tree, but it is simpler. It is also an easier structure to modify to
19 ** create a version that supports nested transaction rollback.
21 ** This tree does not currently support a delete operation. One is not
22 ** required. When LSM deletes a key from a database, it inserts a DELETE
23 ** marker into the data structure. As a result, although the value associated
24 ** with a key stored in the in-memory tree structure may be modified, no
25 ** keys are ever removed.
31 ** The in-memory tree structure supports SQLite-style MVCC. This means
32 ** that while one client is writing to the tree structure, other clients
33 ** may still be querying an older snapshot of the tree.
35 ** One way to implement this is to use an append-only b-tree. In this
36 ** case instead of modifying nodes in-place, a copy of the node is made
37 ** and the required modifications made to the copy. The parent of the
38 ** node is then modified (to update the pointer so that it points to
39 ** the new copy), which causes a copy of the parent to be made, and so on.
40 ** This means that each time the tree is written to a new root node is
41 ** created. A snapshot is identified by the root node that it uses.
43 ** The problem with the above is that each time the tree is written to,
44 ** a copy of the node structure modified and all of its ancestor nodes
45 ** is made. This may prove excessive with large tree structures.
47 ** To reduce this overhead, the data structure used for a tree node is
48 ** designed so that it may be edited in place exactly once without
49 ** affecting existing users. In other words, the node structure is capable
50 ** of storing two separate versions of the node at the same time.
51 ** When a node is to be edited, if the node structure already contains
52 ** two versions, a copy is made as in the append-only approach. Or, if
53 ** it only contains a single version, it is edited in place.
55 ** This reduces the overhead so that, roughly, one new node structure
56 ** must be allocated for each write (on top of those allocations that
57 ** would have been required by a non-MVCC tree). Logic: Assume that at
58 ** any time, 50% of nodes in the tree already contain 2 versions. When
59 ** a new entry is written to a node, there is a 50% chance that a copy
60 ** of the node will be required. And a 25% chance that a copy of its
61 ** parent is required. And so on.
65 ** The in-memory tree also supports transaction and sub-transaction
66 ** rollback. In order to rollback to point in time X, the following is
69 ** 1. All memory allocated since X must be freed, and
70 ** 2. All "v2" data adding to nodes that existed at X should be zeroed.
71 ** 3. The root node must be restored to its X value.
73 ** The Mempool object used to allocate memory for the tree supports
74 ** operation (1) - see the lsmPoolMark() and lsmPoolRevert() functions.
76 ** To support (2), all nodes that have v2 data are part of a singly linked
77 ** list, sorted by the age of the v2 data (nodes that have had data added
78 ** most recently are at the end of the list). So to zero all v2 data added
79 ** since X, the linked list is traversed from the first node added following
92 typedef struct TreeKey TreeKey
;
93 typedef struct TreeNode TreeNode
;
94 typedef struct TreeLeaf TreeLeaf
;
95 typedef struct NodeVersion NodeVersion
;
98 u32 iShmid
; /* Last shared-memory chunk in use by old */
99 u32 iRoot
; /* Offset of root node in shm file */
100 u32 nHeight
; /* Height of tree structure */
105 ** assert() that a TreeKey.flags value is sane. Usage:
107 ** assert( lsmAssertFlagsOk(pTreeKey->flags) );
109 static int lsmAssertFlagsOk(u8 keyflags
){
110 /* At least one flag must be set. Otherwise, what is this key doing? */
111 assert( keyflags
!=0 );
113 /* The POINT_DELETE and INSERT flags cannot both be set. */
114 assert( (keyflags
& LSM_POINT_DELETE
)==0 || (keyflags
& LSM_INSERT
)==0 );
116 /* If both the START_DELETE and END_DELETE flags are set, then the INSERT
117 ** flag must also be set. In other words - the three DELETE flags cannot
119 assert( (keyflags
& LSM_END_DELETE
)==0
120 || (keyflags
& LSM_START_DELETE
)==0
121 || (keyflags
& LSM_POINT_DELETE
)==0
127 static int assert_delete_ranges_match(lsm_db
*);
128 static int treeCountEntries(lsm_db
*db
);
131 ** Container for a key-value pair. Within the *-shm file, each key/value
132 ** pair is stored in a single allocation (which may not actually be
133 ** contiguous in memory). Layout is the TreeKey structure, followed by
134 ** the nKey bytes of key blob, followed by the nValue bytes of value blob
135 ** (if nValue is non-negative).
138 int nKey
; /* Size of pKey in bytes */
139 int nValue
; /* Size of pValue. Or negative. */
140 u8 flags
; /* Various LSM_XXX flags */
143 #define TKV_KEY(p) ((void *)&(p)[1])
144 #define TKV_VAL(p) ((void *)(((u8 *)&(p)[1]) + (p)->nKey))
147 ** A single tree node. A node structure may contain up to 3 key/value
148 ** pairs. Internal (non-leaf) nodes have up to 4 children.
150 ** TODO: Update the format of this to be more compact. Get it working
154 u32 aiKeyPtr
[3]; /* Array of pointers to TreeKey objects */
156 /* The following fields are present for interior nodes only, not leaves. */
157 u32 aiChildPtr
[4]; /* Array of pointers to child nodes */
159 /* The extra child pointer slot. */
160 u32 iV2
; /* Transaction number of v2 */
161 u8 iV2Child
; /* apChild[] entry replaced by pV2Ptr */
162 u32 iV2Ptr
; /* Substitute pointer */
166 u32 aiKeyPtr
[3]; /* Array of pointers to TreeKey objects */
169 typedef struct TreeBlob TreeBlob
;
176 ** Cursor for searching a tree structure.
178 ** If a cursor does not point to any element (a.k.a. EOF), then the
179 ** TreeCursor.iNode variable is set to a negative value. Otherwise, the
180 ** cursor currently points to key aiCell[iNode] on node apTreeNode[iNode].
182 ** Entries in the apTreeNode[] and aiCell[] arrays contain the node and
183 ** index of the TreeNode.apChild[] pointer followed to descend to the
184 ** current element. Hence apTreeNode[0] always contains the root node of
188 lsm_db
*pDb
; /* Database handle for this cursor */
189 TreeRoot
*pRoot
; /* Root node and height of tree to access */
190 int iNode
; /* Cursor points at apTreeNode[iNode] */
191 TreeNode
*apTreeNode
[MAX_DEPTH
];/* Current position in tree */
192 u8 aiCell
[MAX_DEPTH
]; /* Current position in tree */
193 TreeKey
*pSave
; /* Saved key */
194 TreeBlob blob
; /* Dynamic storage for a key */
198 ** A value guaranteed to be larger than the largest possible transaction
199 ** id (TreeHeader.iTransId).
201 #define WORKING_VERSION (1<<30)
203 static int tblobGrow(lsm_db
*pDb
, TreeBlob
*p
, int n
, int *pRc
){
205 lsmFree(pDb
->pEnv
, p
->a
);
206 p
->a
= lsmMallocRc(pDb
->pEnv
, n
, pRc
);
211 static void tblobFree(lsm_db
*pDb
, TreeBlob
*p
){
212 lsmFree(pDb
->pEnv
, p
->a
);
216 /***********************************************************************
217 ** Start of IntArray methods. */
219 ** Append value iVal to the contents of IntArray *p. Return LSM_OK if
220 ** successful, or LSM_NOMEM if an OOM condition is encountered.
222 static int intArrayAppend(lsm_env
*pEnv
, IntArray
*p
, u32 iVal
){
223 assert( p
->nArray
<=p
->nAlloc
);
224 if( p
->nArray
>=p
->nAlloc
){
226 int nNew
= p
->nArray
? p
->nArray
*2 : 128;
227 aNew
= lsmRealloc(pEnv
, p
->aArray
, nNew
*sizeof(u32
));
228 if( !aNew
) return LSM_NOMEM_BKPT
;
233 p
->aArray
[p
->nArray
++] = iVal
;
238 ** Zero the IntArray object.
240 static void intArrayFree(lsm_env
*pEnv
, IntArray
*p
){
245 ** Return the number of entries currently in the int-array object.
247 static int intArraySize(IntArray
*p
){
252 ** Return a copy of the iIdx'th entry in the int-array.
254 static u32
intArrayEntry(IntArray
*p
, int iIdx
){
255 return p
->aArray
[iIdx
];
259 ** Truncate the int-array so that all but the first nVal values are
262 static void intArrayTruncate(IntArray
*p
, int nVal
){
265 /* End of IntArray methods.
266 ***********************************************************************/
268 static int treeKeycmp(void *p1
, int n1
, void *p2
, int n2
){
270 res
= memcmp(p1
, p2
, LSM_MIN(n1
, n2
));
271 if( res
==0 ) res
= (n1
-n2
);
276 ** The pointer passed as the first argument points to an interior node,
277 ** not a leaf. This function returns the offset of the iCell'th child
278 ** sub-tree of the node.
280 static u32
getChildPtr(TreeNode
*p
, int iVersion
, int iCell
){
281 assert( iVersion
>=0 );
282 assert( iCell
>=0 && iCell
<=array_size(p
->aiChildPtr
) );
283 if( p
->iV2
&& p
->iV2
<=(u32
)iVersion
&& iCell
==p
->iV2Child
) return p
->iV2Ptr
;
284 return p
->aiChildPtr
[iCell
];
288 ** Given an offset within the *-shm file, return the associated chunk number.
290 static int treeOffsetToChunk(u32 iOff
){
291 assert( LSM_SHM_CHUNK_SIZE
==(1<<15) );
292 return (int)(iOff
>>15);
295 #define treeShmptrUnsafe(pDb, iPtr) \
296 (&((u8*)((pDb)->apShm[(iPtr)>>15]))[(iPtr) & (LSM_SHM_CHUNK_SIZE-1)])
299 ** Return a pointer to the mapped memory location associated with *-shm
302 static void *treeShmptr(lsm_db
*pDb
, u32 iPtr
){
304 assert( (iPtr
>>15)<(u32
)pDb
->nShm
);
305 assert( pDb
->apShm
[iPtr
>>15] );
307 return iPtr
? treeShmptrUnsafe(pDb
, iPtr
) : 0;
310 static ShmChunk
* treeShmChunk(lsm_db
*pDb
, int iChunk
){
311 return (ShmChunk
*)(pDb
->apShm
[iChunk
]);
314 static ShmChunk
* treeShmChunkRc(lsm_db
*pDb
, int iChunk
, int *pRc
){
315 assert( *pRc
==LSM_OK
);
316 if( iChunk
<pDb
->nShm
|| LSM_OK
==(*pRc
= lsmShmCacheChunks(pDb
, iChunk
+1)) ){
317 return (ShmChunk
*)(pDb
->apShm
[iChunk
]);
324 static void assertIsWorkingChild(
331 u32 iPtr
= getChildPtr(pParent
, WORKING_VERSION
, iCell
);
332 p
= treeShmptr(db
, iPtr
);
336 # define assertIsWorkingChild(w,x,y,z)
339 /* Values for the third argument to treeShmkey(). */
340 #define TKV_LOADKEY 1
341 #define TKV_LOADVAL 2
343 static TreeKey
*treeShmkey(
344 lsm_db
*pDb
, /* Database handle */
345 u32 iPtr
, /* Shmptr to TreeKey struct */
346 int eLoad
, /* Either zero or a TREEKEY_LOADXXX value */
347 TreeBlob
*pBlob
, /* Used if dynamic memory is required */
348 int *pRc
/* IN/OUT: Error code */
352 assert( eLoad
==TKV_LOADKEY
|| eLoad
==TKV_LOADVAL
);
353 pRet
= (TreeKey
*)treeShmptr(pDb
, iPtr
);
355 int nReq
; /* Bytes of space required at pRet */
356 int nAvail
; /* Bytes of space available at pRet */
358 nReq
= sizeof(TreeKey
) + pRet
->nKey
;
359 if( eLoad
==TKV_LOADVAL
&& pRet
->nValue
>0 ){
360 nReq
+= pRet
->nValue
;
362 assert( LSM_SHM_CHUNK_SIZE
==(1<<15) );
363 nAvail
= LSM_SHM_CHUNK_SIZE
- (iPtr
& (LSM_SHM_CHUNK_SIZE
-1));
366 if( tblobGrow(pDb
, pBlob
, nReq
, pRc
)==0 ){
368 while( *pRc
==LSM_OK
){
370 void *p
= treeShmptr(pDb
, iPtr
);
371 int n
= LSM_MIN(nAvail
, nReq
-nLoad
);
373 memcpy(&pBlob
->a
[nLoad
], p
, n
);
375 if( nLoad
==nReq
) break;
377 pChunk
= treeShmChunk(pDb
, treeOffsetToChunk(iPtr
));
379 iPtr
= (pChunk
->iNext
* LSM_SHM_CHUNK_SIZE
) + LSM_SHM_CHUNK_HDR
;
380 nAvail
= LSM_SHM_CHUNK_SIZE
- LSM_SHM_CHUNK_HDR
;
383 pRet
= (TreeKey
*)(pBlob
->a
);
390 #if defined(LSM_DEBUG) && defined(LSM_EXPENSIVE_ASSERT)
391 void assert_leaf_looks_ok(TreeNode
*pNode
){
392 assert( pNode
->apKey
[1] );
395 void assert_node_looks_ok(TreeNode
*pNode
, int nHeight
){
397 assert( pNode
->apKey
[1] );
400 assert( getChildPtr(pNode
, WORKING_VERSION
, 1) );
401 assert( getChildPtr(pNode
, WORKING_VERSION
, 2) );
403 assert_node_looks_ok(getChildPtr(pNode
, WORKING_VERSION
, i
), nHeight
-1);
410 ** Run various assert() statements to check that the working-version of the
411 ** tree is correct in the following respects:
415 void assert_tree_looks_ok(int rc
, Tree
*pTree
){
418 # define assert_tree_looks_ok(x,y)
421 void lsmFlagsToString(int flags
, char *zFlags
){
423 zFlags
[0] = (flags
& LSM_END_DELETE
) ? ']' : '.';
425 /* Only one of LSM_POINT_DELETE, LSM_INSERT and LSM_SEPARATOR should ever
426 ** be set. If this is not true, write a '?' to the output. */
427 switch( flags
& (LSM_POINT_DELETE
|LSM_INSERT
|LSM_SEPARATOR
) ){
428 case 0: zFlags
[1] = '.'; break;
429 case LSM_POINT_DELETE
: zFlags
[1] = '-'; break;
430 case LSM_INSERT
: zFlags
[1] = '+'; break;
431 case LSM_SEPARATOR
: zFlags
[1] = '^'; break;
432 default: zFlags
[1] = '?'; break;
435 zFlags
[2] = (flags
& LSM_SYSTEMKEY
) ? '*' : '.';
436 zFlags
[3] = (flags
& LSM_START_DELETE
) ? '[' : '.';
443 ** Pointer pBlob points to a buffer containing a blob of binary data
444 ** nBlob bytes long. Append the contents of this blob to *pStr, with
445 ** each octet represented by a 2-digit hexadecimal number. For example,
446 ** if the input blob is three bytes in size and contains {0x01, 0x44, 0xFF},
447 ** then "0144ff" is appended to *pStr.
449 static void lsmAppendStrBlob(LsmString
*pStr
, void *pBlob
, int nBlob
){
451 lsmStringExtend(pStr
, nBlob
*2);
452 if( pStr
->nAlloc
==0 ) return;
453 for(i
=0; i
<nBlob
; i
++){
454 u8 c
= ((u8
*)pBlob
)[i
];
455 if( c
>='a' && c
<='z' ){
456 pStr
->z
[pStr
->n
++] = c
;
457 }else if( c
!=0 || nBlob
==1 || i
!=(nBlob
-1) ){
458 pStr
->z
[pStr
->n
++] = "0123456789abcdef"[(c
>>4)&0xf];
459 pStr
->z
[pStr
->n
++] = "0123456789abcdef"[c
&0xf];
462 pStr
->z
[pStr
->n
] = 0;
467 ** Append nIndent space (0x20) characters to string *pStr.
469 static void lsmAppendIndent(LsmString
*pStr
, int nIndent
){
471 lsmStringExtend(pStr
, nIndent
);
472 for(i
=0; i
<nIndent
; i
++) lsmStringAppend(pStr
, " ", 1);
476 static void strAppendFlags(LsmString
*pStr
, u8 flags
){
479 lsmFlagsToString(flags
, zFlags
);
482 lsmStringAppend(pStr
, zFlags
, 5);
485 void dump_node_contents(
487 u32 iNode
, /* Print out the contents of this node */
488 char *zPath
, /* Path from root to this node */
489 int nPath
, /* Number of bytes in zPath */
490 int nHeight
/* Height: (0==leaf) (1==parent-of-leaf) */
492 const char *zSpace
= " ";
499 pNode
= (TreeNode
*)treeShmptr(pDb
, iNode
);
502 /* Append the nIndent bytes of space to string s. */
503 lsmStringInit(&s
, pDb
->pEnv
);
505 /* Append each key to string s. */
507 u32 iPtr
= pNode
->aiKeyPtr
[i
];
509 TreeKey
*pKey
= treeShmkey(pDb
, pNode
->aiKeyPtr
[i
],TKV_LOADKEY
, &b
,&rc
);
510 strAppendFlags(&s
, pKey
->flags
);
511 lsmAppendStrBlob(&s
, TKV_KEY(pKey
), pKey
->nKey
);
512 lsmStringAppend(&s
, " ", -1);
516 printf("% 6d %.*sleaf%.*s: %s\n",
517 iNode
, nPath
, zPath
, 20-nPath
-4, zSpace
, s
.z
521 for(i
=0; i
<4 && nHeight
>0; i
++){
522 u32 iPtr
= getChildPtr(pNode
, pDb
->treehdr
.root
.iTransId
, i
);
523 zPath
[nPath
] = (char)(i
+'0');
524 zPath
[nPath
+1] = '/';
527 dump_node_contents(pDb
, iPtr
, zPath
, nPath
+2, nHeight
-1);
529 if( i
!=3 && pNode
->aiKeyPtr
[i
] ){
530 TreeKey
*pKey
= treeShmkey(pDb
, pNode
->aiKeyPtr
[i
], TKV_LOADKEY
,&b
,&rc
);
531 lsmStringInit(&s
, pDb
->pEnv
);
532 strAppendFlags(&s
, pKey
->flags
);
533 lsmAppendStrBlob(&s
, TKV_KEY(pKey
), pKey
->nKey
);
534 printf("% 6d %.*s%.*s: %s\n",
535 iNode
, nPath
+1, zPath
, 20-nPath
-1, zSpace
, s
.z
);
544 void dump_tree_contents(lsm_db
*pDb
, const char *zCaption
){
546 TreeRoot
*p
= &pDb
->treehdr
.root
;
547 printf("\n%s\n", zCaption
);
550 dump_node_contents(pDb
, p
->iRoot
, zPath
, 1, p
->nHeight
-1);
558 ** Initialize a cursor object, the space for which has already been
561 static void treeCursorInit(lsm_db
*pDb
, int bOld
, TreeCursor
*pCsr
){
562 memset(pCsr
, 0, sizeof(TreeCursor
));
565 pCsr
->pRoot
= &pDb
->treehdr
.oldroot
;
567 pCsr
->pRoot
= &pDb
->treehdr
.root
;
573 ** Return a pointer to the mapping of the TreeKey object that the cursor
576 static TreeKey
*csrGetKey(TreeCursor
*pCsr
, TreeBlob
*pBlob
, int *pRc
){
578 lsm_db
*pDb
= pCsr
->pDb
;
579 u32 iPtr
= pCsr
->apTreeNode
[pCsr
->iNode
]->aiKeyPtr
[pCsr
->aiCell
[pCsr
->iNode
]];
582 pRet
= (TreeKey
*)treeShmptrUnsafe(pDb
, iPtr
);
583 if( !(pRet
->flags
& LSM_CONTIGUOUS
) ){
584 pRet
= treeShmkey(pDb
, iPtr
, TKV_LOADVAL
, pBlob
, pRc
);
591 ** Save the current position of tree cursor pCsr.
593 int lsmTreeCursorSave(TreeCursor
*pCsr
){
595 if( pCsr
&& pCsr
->pSave
==0 ){
596 int iNode
= pCsr
->iNode
;
598 pCsr
->pSave
= csrGetKey(pCsr
, &pCsr
->blob
, &rc
);
606 ** Restore the position of a saved tree cursor.
608 static int treeCursorRestore(TreeCursor
*pCsr
, int *pRes
){
611 TreeKey
*pKey
= pCsr
->pSave
;
614 rc
= lsmTreeCursorSeek(pCsr
, TKV_KEY(pKey
), pKey
->nKey
, pRes
);
621 ** Allocate nByte bytes of space within the *-shm file. If successful,
622 ** return LSM_OK and set *piPtr to the offset within the file at which
623 ** the allocated space is located.
625 static u32
treeShmalloc(lsm_db
*pDb
, int bAlign
, int nByte
, int *pRc
){
628 const static int CHUNK_SIZE
= LSM_SHM_CHUNK_SIZE
;
629 const static int CHUNK_HDR
= LSM_SHM_CHUNK_HDR
;
630 u32 iWrite
; /* Current write offset */
631 u32 iEof
; /* End of current chunk */
632 int iChunk
; /* Current chunk */
634 assert( nByte
<= (CHUNK_SIZE
-CHUNK_HDR
) );
636 /* Check if there is enough space on the current chunk to fit the
637 ** new allocation. If not, link in a new chunk and put the new
638 ** allocation at the start of it. */
639 iWrite
= pDb
->treehdr
.iWrite
;
641 iWrite
= (iWrite
+ 3) & ~0x0003;
642 assert( (iWrite
% 4)==0 );
646 iChunk
= treeOffsetToChunk(iWrite
-1);
647 iEof
= (iChunk
+1) * CHUNK_SIZE
;
648 assert( iEof
>=iWrite
&& (iEof
-iWrite
)<(u32
)CHUNK_SIZE
);
649 if( (iWrite
+nByte
)>iEof
){
650 ShmChunk
*pHdr
; /* Header of chunk just finished (iChunk) */
651 ShmChunk
*pFirst
; /* Header of chunk treehdr.iFirst */
652 ShmChunk
*pNext
; /* Header of new chunk */
653 int iNext
= 0; /* Next chunk */
656 pFirst
= treeShmChunk(pDb
, pDb
->treehdr
.iFirst
);
658 assert( shm_sequence_ge(pDb
->treehdr
.iUsedShmid
, pFirst
->iShmid
) );
659 assert( (pDb
->treehdr
.iNextShmid
+1-pDb
->treehdr
.nChunk
)==pFirst
->iShmid
);
661 /* Check if the chunk at the start of the linked list is still in
662 ** use. If not, reuse it. If so, allocate a new chunk by appending
663 ** to the *-shm file. */
664 if( pDb
->treehdr
.iUsedShmid
!=pFirst
->iShmid
){
666 rc
= lsmTreeInUse(pDb
, pFirst
->iShmid
, &bInUse
);
672 iNext
= pDb
->treehdr
.iFirst
;
673 pDb
->treehdr
.iFirst
= pFirst
->iNext
;
674 assert( pDb
->treehdr
.iFirst
);
677 if( iNext
==0 ) iNext
= pDb
->treehdr
.nChunk
++;
679 /* Set the header values for the new chunk */
680 pNext
= treeShmChunkRc(pDb
, iNext
, &rc
);
683 pNext
->iShmid
= (pDb
->treehdr
.iNextShmid
++);
689 /* Set the header values for the chunk just finished */
690 pHdr
= (ShmChunk
*)treeShmptr(pDb
, iChunk
*CHUNK_SIZE
);
693 /* Advance to the next chunk */
694 iWrite
= iNext
* CHUNK_SIZE
+ CHUNK_HDR
;
697 /* Allocate space at iWrite. */
699 pDb
->treehdr
.iWrite
= iWrite
+ nByte
;
700 pDb
->treehdr
.root
.nByte
+= nByte
;
706 ** Allocate and zero nByte bytes of space within the *-shm file.
708 static void *treeShmallocZero(lsm_db
*pDb
, int nByte
, u32
*piPtr
, int *pRc
){
711 iPtr
= treeShmalloc(pDb
, 1, nByte
, pRc
);
712 p
= treeShmptr(pDb
, iPtr
);
714 assert( *pRc
==LSM_OK
);
721 static TreeNode
*newTreeNode(lsm_db
*pDb
, u32
*piPtr
, int *pRc
){
722 return treeShmallocZero(pDb
, sizeof(TreeNode
), piPtr
, pRc
);
725 static TreeLeaf
*newTreeLeaf(lsm_db
*pDb
, u32
*piPtr
, int *pRc
){
726 return treeShmallocZero(pDb
, sizeof(TreeLeaf
), piPtr
, pRc
);
729 static TreeKey
*newTreeKey(
732 void *pKey
, int nKey
, /* Key data */
733 void *pVal
, int nVal
, /* Value data (or nVal<0 for delete) */
743 /* Allocate space for the TreeKey structure itself */
744 *piPtr
= iPtr
= treeShmalloc(pDb
, 1, sizeof(TreeKey
), pRc
);
745 p
= treeShmptr(pDb
, iPtr
);
750 /* Allocate and populate the space required for the key and value. */
759 iWrite
= (pDb
->treehdr
.iWrite
& (LSM_SHM_CHUNK_SIZE
-1));
760 iWrite
= LSM_MAX(iWrite
, LSM_SHM_CHUNK_HDR
);
761 nAlloc
= LSM_MIN((LSM_SHM_CHUNK_SIZE
-iWrite
), (u32
)nRem
);
763 aAlloc
= treeShmptr(pDb
, treeShmalloc(pDb
, 0, nAlloc
, pRc
));
764 if( aAlloc
==0 ) break;
765 memcpy(aAlloc
, &a
[n
-nRem
], nAlloc
);
773 iEnd
= iPtr
+ sizeof(TreeKey
) + nKey
+ LSM_MAX(0, nVal
);
774 if( (iPtr
& ~(LSM_SHM_CHUNK_SIZE
-1))!=(iEnd
& ~(LSM_SHM_CHUNK_SIZE
-1)) ){
777 p
->flags
= LSM_CONTIGUOUS
;
782 printf("store: %d %s\n", (int)iPtr
, (char *)pKey
);
787 static TreeNode
*copyTreeNode(
795 pNew
= newTreeNode(pDb
, piNew
, pRc
);
797 memcpy(pNew
->aiKeyPtr
, pOld
->aiKeyPtr
, sizeof(pNew
->aiKeyPtr
));
798 memcpy(pNew
->aiChildPtr
, pOld
->aiChildPtr
, sizeof(pNew
->aiChildPtr
));
799 if( pOld
->iV2
) pNew
->aiChildPtr
[pOld
->iV2Child
] = pOld
->iV2Ptr
;
804 static TreeNode
*copyTreeLeaf(
811 pNew
= newTreeLeaf(pDb
, piNew
, pRc
);
813 memcpy(pNew
, pOld
, sizeof(TreeLeaf
));
815 return (TreeNode
*)pNew
;
819 ** The tree cursor passed as the second argument currently points to an
820 ** internal node (not a leaf). Specifically, to a sub-tree pointer. This
821 ** function replaces the sub-tree that the cursor currently points to
822 ** with sub-tree pNew.
824 ** The sub-tree may be replaced either by writing the "v2 data" on the
825 ** internal node, or by allocating a new TreeNode structure and then
826 ** calling this function on the parent of the internal node.
828 static int treeUpdatePtr(lsm_db
*pDb
, TreeCursor
*pCsr
, u32 iNew
){
831 /* iNew is the new root node */
832 pDb
->treehdr
.root
.iRoot
= iNew
;
834 /* If this node already has version 2 content, allocate a copy and
835 ** update the copy with the new pointer value. Otherwise, store the
836 ** new pointer as v2 data within the current node structure. */
838 TreeNode
*p
; /* The node to be modified */
839 int iChildPtr
; /* apChild[] entry to modify */
841 p
= pCsr
->apTreeNode
[pCsr
->iNode
];
842 iChildPtr
= pCsr
->aiCell
[pCsr
->iNode
];
845 /* The "allocate new TreeNode" option */
848 pCopy
= copyTreeNode(pDb
, p
, &iCopy
, &rc
);
850 assert( rc
==LSM_OK
);
851 pCopy
->aiChildPtr
[iChildPtr
] = iNew
;
853 rc
= treeUpdatePtr(pDb
, pCsr
, iCopy
);
856 /* The "v2 data" option */
858 assert( pDb
->treehdr
.root
.iTransId
>0 );
862 pCsr
->apTreeNode
[pCsr
->iNode
-1],
863 pDb
->treehdr
.root
.iTransId
, pCsr
->aiCell
[pCsr
->iNode
-1]
866 iPtr
= pDb
->treehdr
.root
.iRoot
;
868 rc
= intArrayAppend(pDb
->pEnv
, &pDb
->rollback
, iPtr
);
871 p
->iV2
= pDb
->treehdr
.root
.iTransId
;
872 p
->iV2Child
= (u8
)iChildPtr
;
882 ** Cursor pCsr points at a node that is part of pTree. This function
883 ** inserts a new key and optionally child node pointer into that node.
885 ** The position into which the new key and pointer are inserted is
886 ** determined by the iSlot parameter. The new key will be inserted to
887 ** the left of the key currently stored in apKey[iSlot]. Or, if iSlot is
888 ** greater than the index of the rightmost key in the node.
890 ** Pointer pLeftPtr points to a child tree that contains keys that are
891 ** smaller than pTreeKey.
893 static int treeInsert(
894 lsm_db
*pDb
, /* Database handle */
895 TreeCursor
*pCsr
, /* Cursor indicating path to insert at */
896 u32 iLeftPtr
, /* Left child pointer */
897 u32 iTreeKey
, /* Location of key to insert */
898 u32 iRightPtr
, /* Right child pointer */
899 int iSlot
/* Position to insert key into */
902 TreeNode
*pNode
= pCsr
->apTreeNode
[pCsr
->iNode
];
904 /* Check if the node is currently full. If so, split pNode in two and
905 ** call this function recursively to add a key to the parent. Otherwise,
906 ** insert the new key directly into pNode. */
907 assert( pNode
->aiKeyPtr
[1] );
908 if( pNode
->aiKeyPtr
[0] && pNode
->aiKeyPtr
[2] ){
909 u32 iLeft
; TreeNode
*pLeft
; /* New left-hand sibling node */
910 u32 iRight
; TreeNode
*pRight
; /* New right-hand sibling node */
912 pLeft
= newTreeNode(pDb
, &iLeft
, &rc
);
913 pRight
= newTreeNode(pDb
, &iRight
, &rc
);
916 pLeft
->aiChildPtr
[1] = getChildPtr(pNode
, WORKING_VERSION
, 0);
917 pLeft
->aiKeyPtr
[1] = pNode
->aiKeyPtr
[0];
918 pLeft
->aiChildPtr
[2] = getChildPtr(pNode
, WORKING_VERSION
, 1);
920 pRight
->aiChildPtr
[1] = getChildPtr(pNode
, WORKING_VERSION
, 2);
921 pRight
->aiKeyPtr
[1] = pNode
->aiKeyPtr
[2];
922 pRight
->aiChildPtr
[2] = getChildPtr(pNode
, WORKING_VERSION
, 3);
924 if( pCsr
->iNode
==0 ){
925 /* pNode is the root of the tree. Grow the tree by one level. */
926 u32 iRoot
; TreeNode
*pRoot
; /* New root node */
928 pRoot
= newTreeNode(pDb
, &iRoot
, &rc
);
929 pRoot
->aiKeyPtr
[1] = pNode
->aiKeyPtr
[1];
930 pRoot
->aiChildPtr
[1] = iLeft
;
931 pRoot
->aiChildPtr
[2] = iRight
;
933 pDb
->treehdr
.root
.iRoot
= iRoot
;
934 pDb
->treehdr
.root
.nHeight
++;
938 rc
= treeInsert(pDb
, pCsr
,
939 iLeft
, pNode
->aiKeyPtr
[1], iRight
, pCsr
->aiCell
[pCsr
->iNode
]
943 assert( pLeft
->iV2
==0 );
944 assert( pRight
->iV2
==0 );
947 pLeft
->aiKeyPtr
[0] = iTreeKey
;
948 pLeft
->aiChildPtr
[0] = iLeftPtr
;
949 if( iRightPtr
) pLeft
->aiChildPtr
[1] = iRightPtr
;
952 pLeft
->aiChildPtr
[3] = (iRightPtr
? iRightPtr
: pLeft
->aiChildPtr
[2]);
953 pLeft
->aiKeyPtr
[2] = iTreeKey
;
954 pLeft
->aiChildPtr
[2] = iLeftPtr
;
957 pRight
->aiKeyPtr
[0] = iTreeKey
;
958 pRight
->aiChildPtr
[0] = iLeftPtr
;
959 if( iRightPtr
) pRight
->aiChildPtr
[1] = iRightPtr
;
962 pRight
->aiChildPtr
[3] = (iRightPtr
? iRightPtr
: pRight
->aiChildPtr
[2]);
963 pRight
->aiKeyPtr
[2] = iTreeKey
;
964 pRight
->aiChildPtr
[2] = iLeftPtr
;
976 /* Allocate a new version of node pNode. */
977 pNew
= newTreeNode(pDb
, &iNew
, &rc
);
980 piKey
= pNew
->aiKeyPtr
;
981 piChild
= pNew
->aiChildPtr
;
983 for(i
=0; i
<iSlot
; i
++){
984 if( pNode
->aiKeyPtr
[i
] ){
985 *(piKey
++) = pNode
->aiKeyPtr
[i
];
986 *(piChild
++) = getChildPtr(pNode
, WORKING_VERSION
, i
);
991 *piChild
++ = iLeftPtr
;
994 for(i
=iSlot
; i
<3; i
++){
995 if( pNode
->aiKeyPtr
[i
] ){
996 *(piKey
++) = pNode
->aiKeyPtr
[i
];
997 *(piChild
++) = iStore
? iStore
: getChildPtr(pNode
, WORKING_VERSION
, i
);
1005 *piChild
= getChildPtr(pNode
, WORKING_VERSION
,
1006 (pNode
->aiKeyPtr
[2] ? 3 : 2)
1010 rc
= treeUpdatePtr(pDb
, pCsr
, iNew
);
1016 static int treeInsertLeaf(
1017 lsm_db
*pDb
, /* Database handle */
1018 TreeCursor
*pCsr
, /* Cursor structure */
1019 u32 iTreeKey
, /* Key pointer to insert */
1020 int iSlot
/* Insert key to the left of this */
1022 int rc
= LSM_OK
; /* Return code */
1023 TreeNode
*pLeaf
= pCsr
->apTreeNode
[pCsr
->iNode
];
1027 assert( iSlot
>=0 && iSlot
<=4 );
1028 assert( pCsr
->iNode
>0 );
1029 assert( pLeaf
->aiKeyPtr
[1] );
1033 pNew
= newTreeLeaf(pDb
, &iNew
, &rc
);
1035 if( pLeaf
->aiKeyPtr
[0] && pLeaf
->aiKeyPtr
[2] ){
1036 /* The leaf is full. Split it in two. */
1039 pRight
= newTreeLeaf(pDb
, &iRight
, &rc
);
1041 assert( rc
==LSM_OK
);
1042 pNew
->aiKeyPtr
[1] = pLeaf
->aiKeyPtr
[0];
1043 pRight
->aiKeyPtr
[1] = pLeaf
->aiKeyPtr
[2];
1045 case 0: pNew
->aiKeyPtr
[0] = iTreeKey
; break;
1046 case 1: pNew
->aiKeyPtr
[2] = iTreeKey
; break;
1047 case 2: pRight
->aiKeyPtr
[0] = iTreeKey
; break;
1048 case 3: pRight
->aiKeyPtr
[2] = iTreeKey
; break;
1051 rc
= treeInsert(pDb
, pCsr
, iNew
, pLeaf
->aiKeyPtr
[1], iRight
,
1052 pCsr
->aiCell
[pCsr
->iNode
]
1059 if( i
==iSlot
) pNew
->aiKeyPtr
[iOut
++] = iTreeKey
;
1060 if( i
<3 && pLeaf
->aiKeyPtr
[i
] ){
1061 pNew
->aiKeyPtr
[iOut
++] = pLeaf
->aiKeyPtr
[i
];
1064 rc
= treeUpdatePtr(pDb
, pCsr
, iNew
);
1071 void lsmTreeMakeOld(lsm_db
*pDb
){
1073 /* A write transaction must be open. Otherwise the code below that
1074 ** assumes (pDb->pClient->iLogOff) is current may malfunction.
1076 ** Update: currently this assert fails due to lsm_flush(), which does
1077 ** not set nTransOpen.
1079 assert( /* pDb->nTransOpen>0 && */ pDb
->iReader
>=0 );
1081 if( pDb
->treehdr
.iOldShmid
==0 ){
1082 pDb
->treehdr
.iOldLog
= (pDb
->treehdr
.log
.aRegion
[2].iEnd
<< 1);
1083 pDb
->treehdr
.iOldLog
|= (~(pDb
->pClient
->iLogOff
) & (i64
)0x0001);
1085 pDb
->treehdr
.oldcksum0
= pDb
->treehdr
.log
.cksum0
;
1086 pDb
->treehdr
.oldcksum1
= pDb
->treehdr
.log
.cksum1
;
1087 pDb
->treehdr
.iOldShmid
= pDb
->treehdr
.iNextShmid
-1;
1088 memcpy(&pDb
->treehdr
.oldroot
, &pDb
->treehdr
.root
, sizeof(TreeRoot
));
1090 pDb
->treehdr
.root
.iTransId
= 1;
1091 pDb
->treehdr
.root
.iRoot
= 0;
1092 pDb
->treehdr
.root
.nHeight
= 0;
1093 pDb
->treehdr
.root
.nByte
= 0;
1097 void lsmTreeDiscardOld(lsm_db
*pDb
){
1098 assert( lsmShmAssertLock(pDb
, LSM_LOCK_WRITER
, LSM_LOCK_EXCL
)
1099 || lsmShmAssertLock(pDb
, LSM_LOCK_DMS2
, LSM_LOCK_EXCL
)
1101 pDb
->treehdr
.iUsedShmid
= pDb
->treehdr
.iOldShmid
;
1102 pDb
->treehdr
.iOldShmid
= 0;
1105 int lsmTreeHasOld(lsm_db
*pDb
){
1106 return pDb
->treehdr
.iOldShmid
!=0;
1110 ** This function is called during recovery to initialize the
1111 ** tree header. Only the database connections private copy of the tree-header
1112 ** is initialized here - it will be copied into shared memory if log file
1113 ** recovery is successful.
1115 int lsmTreeInit(lsm_db
*pDb
){
1119 memset(&pDb
->treehdr
, 0, sizeof(TreeHeader
));
1120 pDb
->treehdr
.root
.iTransId
= 1;
1121 pDb
->treehdr
.iFirst
= 1;
1122 pDb
->treehdr
.nChunk
= 2;
1123 pDb
->treehdr
.iWrite
= LSM_SHM_CHUNK_SIZE
+ LSM_SHM_CHUNK_HDR
;
1124 pDb
->treehdr
.iNextShmid
= 2;
1125 pDb
->treehdr
.iUsedShmid
= 1;
1127 pOne
= treeShmChunkRc(pDb
, 1, &rc
);
1135 static void treeHeaderChecksum(
1139 u32 cksum1
= 0x12345678;
1140 u32 cksum2
= 0x9ABCDEF0;
1141 u32
*a
= (u32
*)pHdr
;
1144 assert( (offsetof(TreeHeader
, aCksum
) + sizeof(u32
)*2)==sizeof(TreeHeader
) );
1145 assert( (sizeof(TreeHeader
) % (sizeof(u32
)*2))==0 );
1147 for(i
=0; i
<(offsetof(TreeHeader
, aCksum
) / sizeof(u32
)); i
+=2){
1149 cksum2
+= (cksum1
+ a
[i
+1]);
1156 ** Return true if the checksum stored in TreeHeader object *pHdr is
1157 ** consistent with the contents of its other fields.
1159 static int treeHeaderChecksumOk(TreeHeader
*pHdr
){
1161 treeHeaderChecksum(pHdr
, aCksum
);
1162 return (0==memcmp(aCksum
, pHdr
->aCksum
, sizeof(aCksum
)));
1166 ** This type is used by functions lsmTreeRepair() and treeSortByShmid() to
1167 ** make relinking the linked list of shared-memory chunks easier.
1169 typedef struct ShmChunkLoc ShmChunkLoc
;
1170 struct ShmChunkLoc
{
1176 ** This function checks that the linked list of shared memory chunks
1177 ** that starts at chunk db->treehdr.iFirst:
1179 ** 1) Includes all chunks in the shared-memory region, and
1180 ** 2) Links them together in order of ascending shm-id.
1182 ** If no error occurs and the conditions above are met, LSM_OK is returned.
1184 ** If either of the conditions are untrue, LSM_CORRUPT is returned. Or, if
1185 ** an error is encountered before the checks are completed, another LSM error
1186 ** code (i.e. LSM_IOERR or LSM_NOMEM) may be returned.
1188 static int treeCheckLinkedList(lsm_db
*db
){
1193 p
= treeShmChunkRc(db
, db
->treehdr
.iFirst
, &rc
);
1194 while( rc
==LSM_OK
&& p
){
1196 if( p
->iNext
>=db
->treehdr
.nChunk
){
1197 rc
= LSM_CORRUPT_BKPT
;
1199 ShmChunk
*pNext
= treeShmChunkRc(db
, p
->iNext
, &rc
);
1201 if( pNext
->iShmid
!=p
->iShmid
+1 ){
1202 rc
= LSM_CORRUPT_BKPT
;
1213 if( rc
==LSM_OK
&& (u32
)nVisit
!=db
->treehdr
.nChunk
-1 ){
1214 rc
= LSM_CORRUPT_BKPT
;
1220 ** Iterate through the current in-memory tree. If there are any v2-pointers
1221 ** with transaction ids larger than db->treehdr.iTransId, zero them.
1223 static int treeRepairPtrs(lsm_db
*db
){
1226 if( db
->treehdr
.root
.nHeight
>1 ){
1227 TreeCursor csr
; /* Cursor used to iterate through tree */
1228 u32 iTransId
= db
->treehdr
.root
.iTransId
;
1230 /* Initialize the cursor structure. Also decrement the nHeight variable
1231 ** in the tree-header. This will prevent the cursor from visiting any
1233 db
->treehdr
.root
.nHeight
--;
1234 treeCursorInit(db
, 0, &csr
);
1236 rc
= lsmTreeCursorEnd(&csr
, 0);
1237 while( rc
==LSM_OK
&& lsmTreeCursorValid(&csr
) ){
1238 TreeNode
*pNode
= csr
.apTreeNode
[csr
.iNode
];
1239 if( pNode
->iV2
>iTransId
){
1240 pNode
->iV2Child
= 0;
1244 rc
= lsmTreeCursorNext(&csr
);
1246 tblobFree(csr
.pDb
, &csr
.blob
);
1248 db
->treehdr
.root
.nHeight
++;
1254 static int treeRepairList(lsm_db
*db
){
1261 /* Iterate through all shm chunks. Find the smallest shm-id present in
1262 ** the shared-memory region. */
1263 for(i
=1; rc
==LSM_OK
&& (u32
)i
<db
->treehdr
.nChunk
; i
++){
1264 p
= treeShmChunkRc(db
, i
, &rc
);
1265 if( p
&& (pMin
==0 || shm_sequence_ge(pMin
->iShmid
, p
->iShmid
)) ){
1271 /* Fix the shm-id values on any chunks with a shm-id greater than or
1272 ** equal to treehdr.iNextShmid. Then do a merge-sort of all chunks to
1273 ** fix the ShmChunk.iNext pointers.
1281 /* Allocate space for a merge sort. */
1283 while( (u32
)nSort
< (db
->treehdr
.nChunk
-1) ) nSort
= nSort
* 2;
1284 nByte
= sizeof(ShmChunkLoc
) * nSort
* 2;
1285 aSort
= lsmMallocZeroRc(db
->pEnv
, nByte
, &rc
);
1286 iPrevShmid
= pMin
->iShmid
;
1288 /* Fix all shm-ids, if required. */
1290 iPrevShmid
= pMin
->iShmid
-1;
1291 for(i
=1; (u32
)i
<db
->treehdr
.nChunk
; i
++){
1292 p
= treeShmChunk(db
, i
);
1293 aSort
[i
-1].pShm
= p
;
1294 aSort
[i
-1].iLoc
= i
;
1295 if( (u32
)i
!=db
->treehdr
.iFirst
){
1296 if( shm_sequence_ge(p
->iShmid
, db
->treehdr
.iNextShmid
) ){
1297 p
->iShmid
= iPrevShmid
--;
1301 if( iMin
!=db
->treehdr
.iFirst
){
1302 p
= treeShmChunk(db
, db
->treehdr
.iFirst
);
1303 p
->iShmid
= iPrevShmid
;
1308 ShmChunkLoc
*aSpace
= &aSort
[nSort
];
1309 for(i
=0; i
<nSort
; i
++){
1310 if( aSort
[i
].pShm
){
1311 assert( shm_sequence_ge(aSort
[i
].pShm
->iShmid
, iPrevShmid
) );
1312 assert( aSpace
[aSort
[i
].pShm
->iShmid
- iPrevShmid
].pShm
==0 );
1313 aSpace
[aSort
[i
].pShm
->iShmid
- iPrevShmid
] = aSort
[i
];
1317 if( aSpace
[nSort
-1].pShm
) aSpace
[nSort
-1].pShm
->iNext
= 0;
1318 for(i
=0; i
<nSort
-1; i
++){
1319 if( aSpace
[i
].pShm
){
1320 aSpace
[i
].pShm
->iNext
= aSpace
[i
+1].iLoc
;
1324 rc
= treeCheckLinkedList(db
);
1325 lsmFree(db
->pEnv
, aSort
);
1333 ** This function is called as part of opening a write-transaction if the
1334 ** writer-flag is already set - indicating that the previous writer
1335 ** failed before ending its transaction.
1337 int lsmTreeRepair(lsm_db
*db
){
1340 ShmHeader
*pHdr
= db
->pShmhdr
;
1342 /* Ensure that the two tree-headers are consistent. Copy one over the other
1343 ** if necessary. Prefer the data from a tree-header for which the checksum
1344 ** computes. Or, if they both compute, prefer tree-header-1. */
1345 if( memcmp(&pHdr
->hdr1
, &pHdr
->hdr2
, sizeof(TreeHeader
)) ){
1346 if( treeHeaderChecksumOk(&pHdr
->hdr1
) ){
1347 memcpy(&pHdr
->hdr2
, &pHdr
->hdr1
, sizeof(TreeHeader
));
1349 memcpy(&pHdr
->hdr1
, &pHdr
->hdr2
, sizeof(TreeHeader
));
1353 /* Save the connections current copy of the tree-header. It will be
1354 ** restored before returning. */
1355 memcpy(&hdr
, &db
->treehdr
, sizeof(TreeHeader
));
1357 /* Walk the tree. Zero any v2 pointers with a transaction-id greater than
1358 ** the transaction-id currently in the tree-headers. */
1359 rc
= treeRepairPtrs(db
);
1361 /* Repair the linked list of shared-memory chunks. */
1363 rc
= treeRepairList(db
);
1366 memcpy(&db
->treehdr
, &hdr
, sizeof(TreeHeader
));
1370 static void treeOverwriteKey(lsm_db
*db
, TreeCursor
*pCsr
, u32 iKey
, int *pRc
){
1372 TreeRoot
*p
= &db
->treehdr
.root
;
1375 TreeNode
*pNode
= pCsr
->apTreeNode
[pCsr
->iNode
];
1376 int iCell
= pCsr
->aiCell
[pCsr
->iNode
];
1378 /* Create a copy of this node */
1379 if( (pCsr
->iNode
>0 && (u32
)pCsr
->iNode
==(p
->nHeight
-1)) ){
1380 pNew
= copyTreeLeaf(db
, (TreeLeaf
*)pNode
, &iNew
, pRc
);
1382 pNew
= copyTreeNode(db
, pNode
, &iNew
, pRc
);
1386 /* Modify the value in the new version */
1387 pNew
->aiKeyPtr
[iCell
] = iKey
;
1389 /* Change the pointer in the parent (if any) to point at the new
1392 treeUpdatePtr(db
, pCsr
, iNew
);
1397 static int treeNextIsEndDelete(lsm_db
*db
, TreeCursor
*pCsr
){
1398 int iNode
= pCsr
->iNode
;
1399 int iCell
= pCsr
->aiCell
[iNode
]+1;
1401 /* Cursor currently points to a leaf node. */
1402 assert( (u32
)pCsr
->iNode
==(db
->treehdr
.root
.nHeight
-1) );
1405 TreeNode
*pNode
= pCsr
->apTreeNode
[iNode
];
1406 if( iCell
<3 && pNode
->aiKeyPtr
[iCell
] ){
1408 TreeKey
*pKey
= treeShmptr(db
, pNode
->aiKeyPtr
[iCell
]);
1409 assert( rc
==LSM_OK
);
1410 return ((pKey
->flags
& LSM_END_DELETE
) ? 1 : 0);
1413 iCell
= pCsr
->aiCell
[iNode
];
1419 static int treePrevIsStartDelete(lsm_db
*db
, TreeCursor
*pCsr
){
1420 int iNode
= pCsr
->iNode
;
1422 /* Cursor currently points to a leaf node. */
1423 assert( (u32
)pCsr
->iNode
==(db
->treehdr
.root
.nHeight
-1) );
1426 TreeNode
*pNode
= pCsr
->apTreeNode
[iNode
];
1427 int iCell
= pCsr
->aiCell
[iNode
]-1;
1428 if( iCell
>=0 && pNode
->aiKeyPtr
[iCell
] ){
1430 TreeKey
*pKey
= treeShmptr(db
, pNode
->aiKeyPtr
[iCell
]);
1431 assert( rc
==LSM_OK
);
1432 return ((pKey
->flags
& LSM_START_DELETE
) ? 1 : 0);
1441 static int treeInsertEntry(
1442 lsm_db
*pDb
, /* Database handle */
1443 int flags
, /* Flags associated with entry */
1444 void *pKey
, /* Pointer to key data */
1445 int nKey
, /* Size of key data in bytes */
1446 void *pVal
, /* Pointer to value data (or NULL) */
1447 int nVal
/* Bytes in value data (or -ve for delete) */
1449 int rc
= LSM_OK
; /* Return Code */
1450 TreeKey
*pTreeKey
; /* New key-value being inserted */
1452 TreeRoot
*p
= &pDb
->treehdr
.root
;
1453 TreeCursor csr
; /* Cursor to seek to pKey/nKey */
1454 int res
= 0; /* Result of seek operation on csr */
1456 assert( nVal
>=0 || pVal
==0 );
1457 assert_tree_looks_ok(LSM_OK
, pTree
);
1458 assert( flags
==LSM_INSERT
|| flags
==LSM_POINT_DELETE
1459 || flags
==LSM_START_DELETE
|| flags
==LSM_END_DELETE
1461 assert( (flags
& LSM_CONTIGUOUS
)==0 );
1463 dump_tree_contents(pDb
, "before");
1467 TreeKey
*pRes
; /* Key at end of seek operation */
1468 treeCursorInit(pDb
, 0, &csr
);
1470 /* Seek to the leaf (or internal node) that the new key belongs on */
1471 rc
= lsmTreeCursorSeek(&csr
, pKey
, nKey
, &res
);
1472 pRes
= csrGetKey(&csr
, &csr
.blob
, &rc
);
1473 if( rc
!=LSM_OK
) return rc
;
1476 if( flags
==LSM_START_DELETE
){
1477 /* When inserting a start-delete-range entry, if the key that
1478 ** occurs immediately before the new entry is already a START_DELETE,
1479 ** then the new entry is not required. */
1480 if( (res
<=0 && (pRes
->flags
& LSM_START_DELETE
))
1481 || (res
>0 && treePrevIsStartDelete(pDb
, &csr
))
1483 goto insert_entry_out
;
1485 }else if( flags
==LSM_END_DELETE
){
1486 /* When inserting an start-delete-range entry, if the key that
1487 ** occurs immediately after the new entry is already an END_DELETE,
1488 ** then the new entry is not required. */
1489 if( (res
<0 && treeNextIsEndDelete(pDb
, &csr
))
1490 || (res
>=0 && (pRes
->flags
& LSM_END_DELETE
))
1492 goto insert_entry_out
;
1496 if( res
==0 && (flags
& (LSM_END_DELETE
|LSM_START_DELETE
)) ){
1497 if( pRes
->flags
& LSM_INSERT
){
1498 nVal
= pRes
->nValue
;
1499 pVal
= TKV_VAL(pRes
);
1501 flags
= flags
| pRes
->flags
;
1504 if( flags
& (LSM_INSERT
|LSM_POINT_DELETE
) ){
1505 if( (res
<0 && (pRes
->flags
& LSM_START_DELETE
))
1506 || (res
>0 && (pRes
->flags
& LSM_END_DELETE
))
1508 flags
= flags
| (LSM_END_DELETE
|LSM_START_DELETE
);
1510 flags
= flags
| (pRes
->flags
& (LSM_END_DELETE
|LSM_START_DELETE
));
1514 memset(&csr
, 0, sizeof(TreeCursor
));
1517 /* Allocate and populate a new key-value pair structure */
1518 pTreeKey
= newTreeKey(pDb
, &iTreeKey
, pKey
, nKey
, pVal
, nVal
, &rc
);
1519 if( rc
!=LSM_OK
) return rc
;
1520 assert( pTreeKey
->flags
==0 || pTreeKey
->flags
==LSM_CONTIGUOUS
);
1521 pTreeKey
->flags
|= flags
;
1524 /* The tree is completely empty. Add a new root node and install
1525 ** (pKey/nKey) as the middle entry. Even though it is a leaf at the
1526 ** moment, use newTreeNode() to allocate the node (i.e. allocate enough
1527 ** space for the fields used by interior nodes). This is because the
1528 ** treeInsert() routine may convert this node to an interior node. */
1529 TreeNode
*pRoot
= newTreeNode(pDb
, &p
->iRoot
, &rc
);
1531 assert( p
->nHeight
==0 );
1532 pRoot
->aiKeyPtr
[1] = iTreeKey
;
1537 /* The search found a match within the tree. */
1538 treeOverwriteKey(pDb
, &csr
, iTreeKey
, &rc
);
1540 /* The cursor now points to the leaf node into which the new entry should
1541 ** be inserted. There may or may not be a free slot within the leaf for
1542 ** the new key-value pair.
1544 ** iSlot is set to the index of the key within pLeaf that the new key
1545 ** should be inserted to the left of (or to a value 1 greater than the
1546 ** index of the rightmost key if the new key is larger than all keys
1547 ** currently stored in the node).
1549 int iSlot
= csr
.aiCell
[csr
.iNode
] + (res
<0);
1551 rc
= treeInsert(pDb
, &csr
, 0, iTreeKey
, 0, iSlot
);
1553 rc
= treeInsertLeaf(pDb
, &csr
, iTreeKey
, iSlot
);
1559 dump_tree_contents(pDb
, "after");
1562 tblobFree(pDb
, &csr
.blob
);
1563 assert_tree_looks_ok(rc
, pTree
);
1568 ** Insert a new entry into the in-memory tree.
1570 ** If the value of the 5th parameter, nVal, is negative, then a delete-marker
1571 ** is inserted into the tree. In this case the value pointer, pVal, must be
1575 lsm_db
*pDb
, /* Database handle */
1576 void *pKey
, /* Pointer to key data */
1577 int nKey
, /* Size of key data in bytes */
1578 void *pVal
, /* Pointer to value data (or NULL) */
1579 int nVal
/* Bytes in value data (or -ve for delete) */
1583 flags
= LSM_POINT_DELETE
;
1588 return treeInsertEntry(pDb
, flags
, pKey
, nKey
, pVal
, nVal
);
1591 static int treeDeleteEntry(lsm_db
*db
, TreeCursor
*pCsr
, u32 iNewptr
){
1592 TreeRoot
*p
= &db
->treehdr
.root
;
1593 TreeNode
*pNode
= pCsr
->apTreeNode
[pCsr
->iNode
];
1594 int iSlot
= pCsr
->aiCell
[pCsr
->iNode
];
1598 assert( pNode
->aiKeyPtr
[1] );
1599 assert( pNode
->aiKeyPtr
[iSlot
] );
1600 assert( iSlot
==0 || iSlot
==1 || iSlot
==2 );
1601 assert( ((u32
)pCsr
->iNode
==(db
->treehdr
.root
.nHeight
-1))==(iNewptr
==0) );
1603 bLeaf
= ((u32
)pCsr
->iNode
==(p
->nHeight
-1) && p
->nHeight
>1);
1605 if( pNode
->aiKeyPtr
[0] || pNode
->aiKeyPtr
[2] ){
1606 /* There are currently at least 2 keys on this node. So just create
1607 ** a new copy of the node with one of the keys removed. If the node
1608 ** happens to be the root node of the tree, allocate an entire
1609 ** TreeNode structure instead of just a TreeLeaf. */
1614 pNew
= (TreeNode
*)newTreeLeaf(db
, &iNew
, &rc
);
1616 pNew
= newTreeNode(db
, &iNew
, &rc
);
1624 if( bLeaf
==0 ) pNew
->aiChildPtr
[iOut
] = iNewptr
;
1625 if( i
<3 ) pNew
->aiKeyPtr
[iOut
] = pNode
->aiKeyPtr
[i
];
1627 }else if( bLeaf
|| p
->nHeight
==1 ){
1628 if( i
<3 && pNode
->aiKeyPtr
[i
] ){
1629 pNew
->aiKeyPtr
[iOut
++] = pNode
->aiKeyPtr
[i
];
1632 if( getChildPtr(pNode
, WORKING_VERSION
, i
) ){
1633 pNew
->aiChildPtr
[iOut
] = getChildPtr(pNode
, WORKING_VERSION
, i
);
1634 if( i
<3 ) pNew
->aiKeyPtr
[iOut
] = pNode
->aiKeyPtr
[i
];
1640 assert( bLeaf
|| pNew
->aiChildPtr
[0]==0 );
1642 rc
= treeUpdatePtr(db
, pCsr
, iNew
);
1645 }else if( pCsr
->iNode
==0 ){
1646 /* Removing the only key in the root node. iNewptr is the new root. */
1648 db
->treehdr
.root
.iRoot
= iNewptr
;
1649 db
->treehdr
.root
.nHeight
--;
1652 /* There is only one key on this node and the node is not the root
1653 ** node. Find a peer for this node. Then redistribute the contents of
1654 ** the peer and the parent cell between the parent and either one or
1655 ** two new nodes. */
1656 TreeNode
*pParent
; /* Parent tree node */
1658 u32 iPeer
; /* Pointer to peer leaf node */
1660 TreeNode
*pPeer
; /* The peer leaf node */
1661 TreeNode
*pNew1
; u32 iNew1
; /* First new leaf node */
1665 pParent
= pCsr
->apTreeNode
[pCsr
->iNode
-1];
1666 iPSlot
= pCsr
->aiCell
[pCsr
->iNode
-1];
1668 if( iPSlot
>0 && getChildPtr(pParent
, WORKING_VERSION
, iPSlot
-1) ){
1673 iPeer
= getChildPtr(pParent
, WORKING_VERSION
, iPSlot
+iDir
);
1674 pPeer
= (TreeNode
*)treeShmptr(db
, iPeer
);
1675 assertIsWorkingChild(db
, pNode
, pParent
, iPSlot
);
1677 /* Allocate the first new leaf node. This is always required. */
1679 pNew1
= (TreeNode
*)newTreeLeaf(db
, &iNew1
, &rc
);
1681 pNew1
= (TreeNode
*)newTreeNode(db
, &iNew1
, &rc
);
1684 if( pPeer
->aiKeyPtr
[0] && pPeer
->aiKeyPtr
[2] ){
1685 /* Peer node is completely full. This means that two new leaf nodes
1686 ** and a new parent node are required. */
1688 TreeNode
*pNew2
; u32 iNew2
; /* Second new leaf node */
1689 TreeNode
*pNewP
; u32 iNewP
; /* New parent node */
1692 pNew2
= (TreeNode
*)newTreeLeaf(db
, &iNew2
, &rc
);
1694 pNew2
= (TreeNode
*)newTreeNode(db
, &iNew2
, &rc
);
1696 pNewP
= copyTreeNode(db
, pParent
, &iNewP
, &rc
);
1699 pNew1
->aiKeyPtr
[1] = pPeer
->aiKeyPtr
[0];
1701 pNew1
->aiChildPtr
[1] = getChildPtr(pPeer
, WORKING_VERSION
, 0);
1702 pNew1
->aiChildPtr
[2] = getChildPtr(pPeer
, WORKING_VERSION
, 1);
1705 pNewP
->aiChildPtr
[iPSlot
-1] = iNew1
;
1706 pNewP
->aiKeyPtr
[iPSlot
-1] = pPeer
->aiKeyPtr
[1];
1707 pNewP
->aiChildPtr
[iPSlot
] = iNew2
;
1709 pNew2
->aiKeyPtr
[0] = pPeer
->aiKeyPtr
[2];
1710 pNew2
->aiKeyPtr
[1] = pParent
->aiKeyPtr
[iPSlot
-1];
1712 pNew2
->aiChildPtr
[0] = getChildPtr(pPeer
, WORKING_VERSION
, 2);
1713 pNew2
->aiChildPtr
[1] = getChildPtr(pPeer
, WORKING_VERSION
, 3);
1714 pNew2
->aiChildPtr
[2] = iNewptr
;
1717 pNew1
->aiKeyPtr
[1] = pParent
->aiKeyPtr
[iPSlot
];
1719 pNew1
->aiChildPtr
[1] = iNewptr
;
1720 pNew1
->aiChildPtr
[2] = getChildPtr(pPeer
, WORKING_VERSION
, 0);
1723 pNewP
->aiChildPtr
[iPSlot
] = iNew1
;
1724 pNewP
->aiKeyPtr
[iPSlot
] = pPeer
->aiKeyPtr
[0];
1725 pNewP
->aiChildPtr
[iPSlot
+1] = iNew2
;
1727 pNew2
->aiKeyPtr
[0] = pPeer
->aiKeyPtr
[1];
1728 pNew2
->aiKeyPtr
[1] = pPeer
->aiKeyPtr
[2];
1730 pNew2
->aiChildPtr
[0] = getChildPtr(pPeer
, WORKING_VERSION
, 1);
1731 pNew2
->aiChildPtr
[1] = getChildPtr(pPeer
, WORKING_VERSION
, 2);
1732 pNew2
->aiChildPtr
[2] = getChildPtr(pPeer
, WORKING_VERSION
, 3);
1735 assert( pCsr
->iNode
>=1 );
1738 assert( pNew1
->aiKeyPtr
[1] && pNew2
->aiKeyPtr
[1] );
1739 rc
= treeUpdatePtr(db
, pCsr
, iNewP
);
1749 pNew1
->aiKeyPtr
[iKOut
++] = pParent
->aiKeyPtr
[iPSlot
];
1750 if( bLeaf
==0 ) pNew1
->aiChildPtr
[iPOut
++] = iNewptr
;
1753 if( pPeer
->aiKeyPtr
[i
] ){
1754 pNew1
->aiKeyPtr
[iKOut
++] = pPeer
->aiKeyPtr
[i
];
1759 if( getChildPtr(pPeer
, WORKING_VERSION
, i
) ){
1760 pNew1
->aiChildPtr
[iPOut
++] = getChildPtr(pPeer
, WORKING_VERSION
, i
);
1766 pNew1
->aiKeyPtr
[iKOut
++] = pParent
->aiKeyPtr
[iPSlot
];
1767 if( bLeaf
==0 ) pNew1
->aiChildPtr
[iPOut
++] = iNewptr
;
1768 pCsr
->aiCell
[pCsr
->iNode
] = (u8
)iPSlot
;
1771 rc
= treeDeleteEntry(db
, pCsr
, iNew1
);
1779 ** Delete a range of keys from the tree structure (i.e. the lsm_delete_range()
1780 ** function, not lsm_delete()).
1782 ** This is a two step process:
1784 ** 1) Remove all entries currently stored in the tree that have keys
1785 ** that fall into the deleted range.
1787 ** TODO: There are surely good ways to optimize this step - removing
1788 ** a range of keys from a b-tree. But for now, this function removes
1789 ** them one at a time using the usual approach.
1791 ** 2) Unless the largest key smaller than or equal to (pKey1/nKey1) is
1792 ** already marked as START_DELETE, insert a START_DELETE key.
1793 ** Similarly, unless the smallest key greater than or equal to
1794 ** (pKey2/nKey2) is already START_END, insert a START_END key.
1798 void *pKey1
, int nKey1
, /* Start of range */
1799 void *pKey2
, int nKey2
/* End of range */
1803 TreeRoot
*p
= &db
->treehdr
.root
;
1804 TreeBlob blob
= {0, 0};
1806 /* The range must be sensible - that (key1 < key2). */
1807 assert( treeKeycmp(pKey1
, nKey1
, pKey2
, nKey2
)<0 );
1808 assert( assert_delete_ranges_match(db
) );
1811 static int nCall
= 0;
1814 printf("%d delete %s .. %s\n", nCall
, (char *)pKey1
, (char *)pKey2
);
1815 dump_tree_contents(db
, "before delete");
1818 /* Step 1. This loop runs until the tree contains no keys within the
1819 ** range being deleted. Or until an error occurs. */
1820 while( bDone
==0 && rc
==LSM_OK
){
1822 TreeCursor csr
; /* Cursor to seek to first key in range */
1823 void *pDel
; int nDel
; /* Key to (possibly) delete this iteration */
1825 int nEntry
= treeCountEntries(db
);
1828 /* Seek the cursor to the first entry in the tree greater than pKey1. */
1829 treeCursorInit(db
, 0, &csr
);
1830 lsmTreeCursorSeek(&csr
, pKey1
, nKey1
, &res
);
1831 if( res
<=0 && lsmTreeCursorValid(&csr
) ) lsmTreeCursorNext(&csr
);
1833 /* If there is no such entry, or if it is greater than pKey2, then the
1834 ** tree now contains no keys in the range being deleted. In this case
1835 ** break out of the loop. */
1837 if( lsmTreeCursorValid(&csr
) ){
1838 lsmTreeCursorKey(&csr
, 0, &pDel
, &nDel
);
1839 if( treeKeycmp(pDel
, nDel
, pKey2
, nKey2
)<0 ) bDone
= 0;
1843 if( (u32
)csr
.iNode
==(p
->nHeight
-1) ){
1844 /* The element to delete already lies on a leaf node */
1845 rc
= treeDeleteEntry(db
, &csr
, 0);
1847 /* 1. Overwrite the current key with a copy of the next key in the
1850 ** 2. Seek to key N (cursor will stop at the internal node copy of
1851 ** N). Move to the next key (original copy of N). Delete
1856 int iNode
= csr
.iNode
;
1857 lsmTreeCursorNext(&csr
);
1858 assert( (u32
)csr
.iNode
==(p
->nHeight
-1) );
1860 iKey
= csr
.apTreeNode
[csr
.iNode
]->aiKeyPtr
[csr
.aiCell
[csr
.iNode
]];
1861 lsmTreeCursorPrev(&csr
);
1863 treeOverwriteKey(db
, &csr
, iKey
, &rc
);
1864 pKey
= treeShmkey(db
, iKey
, TKV_LOADKEY
, &blob
, &rc
);
1866 rc
= lsmTreeCursorSeek(&csr
, TKV_KEY(pKey
), pKey
->nKey
, &res
);
1869 assert( res
==0 && csr
.iNode
==iNode
);
1870 rc
= lsmTreeCursorNext(&csr
);
1872 rc
= treeDeleteEntry(db
, &csr
, 0);
1878 /* Clean up any memory allocated by the cursor. */
1879 tblobFree(db
, &csr
.blob
);
1881 dump_tree_contents(db
, "ddd delete");
1883 assert( bDone
|| treeCountEntries(db
)==(nEntry
-1) );
1887 dump_tree_contents(db
, "during delete");
1890 /* Now insert the START_DELETE and END_DELETE keys. */
1892 rc
= treeInsertEntry(db
, LSM_START_DELETE
, pKey1
, nKey1
, 0, -1);
1895 dump_tree_contents(db
, "during delete 2");
1898 rc
= treeInsertEntry(db
, LSM_END_DELETE
, pKey2
, nKey2
, 0, -1);
1902 dump_tree_contents(db
, "after delete");
1905 tblobFree(db
, &blob
);
1906 assert( assert_delete_ranges_match(db
) );
1911 ** Return, in bytes, the amount of memory currently used by the tree
1914 int lsmTreeSize(lsm_db
*pDb
){
1915 return pDb
->treehdr
.root
.nByte
;
1919 ** Open a cursor on the in-memory tree pTree.
1921 int lsmTreeCursorNew(lsm_db
*pDb
, int bOld
, TreeCursor
**ppCsr
){
1923 *ppCsr
= pCsr
= lsmMalloc(pDb
->pEnv
, sizeof(TreeCursor
));
1925 treeCursorInit(pDb
, bOld
, pCsr
);
1928 return LSM_NOMEM_BKPT
;
1932 ** Close an in-memory tree cursor.
1934 void lsmTreeCursorDestroy(TreeCursor
*pCsr
){
1936 tblobFree(pCsr
->pDb
, &pCsr
->blob
);
1937 lsmFree(pCsr
->pDb
->pEnv
, pCsr
);
1941 void lsmTreeCursorReset(TreeCursor
*pCsr
){
1949 static int treeCsrCompare(TreeCursor
*pCsr
, void *pKey
, int nKey
, int *pRc
){
1952 assert( pCsr
->iNode
>=0 );
1953 p
= csrGetKey(pCsr
, &pCsr
->blob
, pRc
);
1955 cmp
= treeKeycmp(TKV_KEY(p
), p
->nKey
, pKey
, nKey
);
1963 ** Attempt to seek the cursor passed as the first argument to key (pKey/nKey)
1964 ** in the tree structure. If an exact match for the key is found, leave the
1965 ** cursor pointing to it and set *pRes to zero before returning. If an
1966 ** exact match cannot be found, do one of the following:
1968 ** * Leave the cursor pointing to the smallest element in the tree that
1969 ** is larger than the key and set *pRes to +1, or
1971 ** * Leave the cursor pointing to the largest element in the tree that
1972 ** is smaller than the key and set *pRes to -1, or
1974 ** * If the tree is empty, leave the cursor at EOF and set *pRes to -1.
1976 int lsmTreeCursorSeek(TreeCursor
*pCsr
, void *pKey
, int nKey
, int *pRes
){
1977 int rc
= LSM_OK
; /* Return code */
1978 lsm_db
*pDb
= pCsr
->pDb
;
1979 TreeRoot
*pRoot
= pCsr
->pRoot
;
1980 u32 iNodePtr
; /* Location of current node in search */
1982 /* Discard any saved position data */
1983 treeCursorRestore(pCsr
, 0);
1985 iNodePtr
= pRoot
->iRoot
;
1987 /* Either an error occurred or the tree is completely empty. */
1988 assert( rc
!=LSM_OK
|| pRoot
->iRoot
==0 );
1992 TreeBlob b
= {0, 0};
1993 int res
= 0; /* Result of comparison function */
1996 TreeNode
*pNode
; /* Node at location iNodePtr */
1997 int iTest
; /* Index of second key to test (0 or 2) */
1999 TreeKey
*pTreeKey
; /* Key to compare against */
2001 pNode
= (TreeNode
*)treeShmptrUnsafe(pDb
, iNodePtr
);
2003 pCsr
->apTreeNode
[iNode
] = pNode
;
2005 /* Compare (pKey/nKey) with the key in the middle slot of B-tree node
2006 ** pNode. The middle slot is never empty. If the comparison is a match,
2007 ** then the search is finished. Break out of the loop. */
2008 pTreeKey
= (TreeKey
*)treeShmptrUnsafe(pDb
, pNode
->aiKeyPtr
[1]);
2009 if( !(pTreeKey
->flags
& LSM_CONTIGUOUS
) ){
2010 pTreeKey
= treeShmkey(pDb
, pNode
->aiKeyPtr
[1], TKV_LOADKEY
, &b
, &rc
);
2011 if( rc
!=LSM_OK
) break;
2013 res
= treeKeycmp((void *)&pTreeKey
[1], pTreeKey
->nKey
, pKey
, nKey
);
2015 pCsr
->aiCell
[iNode
] = 1;
2019 /* Based on the results of the previous comparison, compare (pKey/nKey)
2020 ** to either the left or right key of the B-tree node, if such a key
2022 iTest
= (res
>0 ? 0 : 2);
2023 iTreeKey
= pNode
->aiKeyPtr
[iTest
];
2025 pTreeKey
= (TreeKey
*)treeShmptrUnsafe(pDb
, iTreeKey
);
2026 if( !(pTreeKey
->flags
& LSM_CONTIGUOUS
) ){
2027 pTreeKey
= treeShmkey(pDb
, iTreeKey
, TKV_LOADKEY
, &b
, &rc
);
2030 res
= treeKeycmp((void *)&pTreeKey
[1], pTreeKey
->nKey
, pKey
, nKey
);
2032 pCsr
->aiCell
[iNode
] = (u8
)iTest
;
2039 if( (u32
)iNode
<(pRoot
->nHeight
-1) ){
2040 iNodePtr
= getChildPtr(pNode
, pRoot
->iTransId
, iTest
+ (res
<0));
2044 pCsr
->aiCell
[iNode
] = (u8
)(iTest
+ (iNodePtr
&& (res
<0)));
2048 pCsr
->iNode
= iNode
;
2052 /* assert() that *pRes has been set properly */
2054 if( rc
==LSM_OK
&& lsmTreeCursorValid(pCsr
) ){
2055 int cmp
= treeCsrCompare(pCsr
, pKey
, nKey
, &rc
);
2056 assert( rc
!=LSM_OK
|| *pRes
==cmp
|| (*pRes
^ cmp
)>0 );
2063 int lsmTreeCursorNext(TreeCursor
*pCsr
){
2066 TreeBlob key1
= {0, 0};
2068 lsm_db
*pDb
= pCsr
->pDb
;
2069 TreeRoot
*pRoot
= pCsr
->pRoot
;
2070 const int iLeaf
= pRoot
->nHeight
-1;
2075 /* Restore the cursor position, if required */
2077 treeCursorRestore(pCsr
, &iRestore
);
2078 if( iRestore
>0 ) return LSM_OK
;
2080 /* Save a pointer to the current key. This is used in an assert() at the
2081 ** end of this function - to check that the 'next' key really is larger
2082 ** than the current key. */
2084 pK1
= csrGetKey(pCsr
, &key1
, &rc
);
2085 if( rc
!=LSM_OK
) return rc
;
2088 assert( lsmTreeCursorValid(pCsr
) );
2089 assert( pCsr
->aiCell
[pCsr
->iNode
]<3 );
2091 pNode
= pCsr
->apTreeNode
[pCsr
->iNode
];
2092 iCell
= ++pCsr
->aiCell
[pCsr
->iNode
];
2094 /* If the current node is not a leaf, and the current cell has sub-tree
2095 ** associated with it, descend to the left-most key on the left-most
2096 ** leaf of the sub-tree. */
2097 if( pCsr
->iNode
<iLeaf
&& getChildPtr(pNode
, pRoot
->iTransId
, iCell
) ){
2101 iNodePtr
= getChildPtr(pNode
, pRoot
->iTransId
, iCell
);
2102 pNode
= (TreeNode
*)treeShmptr(pDb
, iNodePtr
);
2103 pCsr
->apTreeNode
[pCsr
->iNode
] = pNode
;
2104 iCell
= pCsr
->aiCell
[pCsr
->iNode
] = (pNode
->aiKeyPtr
[0]==0);
2105 }while( pCsr
->iNode
< iLeaf
);
2108 /* Otherwise, the next key is found by following pointer up the tree
2109 ** until there is a key immediately to the right of the pointer followed
2110 ** to reach the sub-tree containing the current key. */
2111 else if( iCell
>=3 || pNode
->aiKeyPtr
[iCell
]==0 ){
2112 while( (--pCsr
->iNode
)>=0 ){
2113 iCell
= pCsr
->aiCell
[pCsr
->iNode
];
2114 if( iCell
<3 && pCsr
->apTreeNode
[pCsr
->iNode
]->aiKeyPtr
[iCell
] ) break;
2119 if( pCsr
->iNode
>=0 ){
2120 TreeKey
*pK2
= csrGetKey(pCsr
, &pCsr
->blob
, &rc
);
2121 assert( rc
||treeKeycmp(TKV_KEY(pK2
),pK2
->nKey
,TKV_KEY(pK1
),pK1
->nKey
)>=0 );
2123 tblobFree(pDb
, &key1
);
2129 int lsmTreeCursorPrev(TreeCursor
*pCsr
){
2132 TreeBlob key1
= {0, 0};
2134 lsm_db
*pDb
= pCsr
->pDb
;
2135 TreeRoot
*pRoot
= pCsr
->pRoot
;
2136 const int iLeaf
= pRoot
->nHeight
-1;
2141 /* Restore the cursor position, if required */
2143 treeCursorRestore(pCsr
, &iRestore
);
2144 if( iRestore
<0 ) return LSM_OK
;
2146 /* Save a pointer to the current key. This is used in an assert() at the
2147 ** end of this function - to check that the 'next' key really is smaller
2148 ** than the current key. */
2150 pK1
= csrGetKey(pCsr
, &key1
, &rc
);
2151 if( rc
!=LSM_OK
) return rc
;
2154 assert( lsmTreeCursorValid(pCsr
) );
2155 pNode
= pCsr
->apTreeNode
[pCsr
->iNode
];
2156 iCell
= pCsr
->aiCell
[pCsr
->iNode
];
2157 assert( iCell
>=0 && iCell
<3 );
2159 /* If the current node is not a leaf, and the current cell has sub-tree
2160 ** associated with it, descend to the right-most key on the right-most
2161 ** leaf of the sub-tree. */
2162 if( pCsr
->iNode
<iLeaf
&& getChildPtr(pNode
, pRoot
->iTransId
, iCell
) ){
2166 iNodePtr
= getChildPtr(pNode
, pRoot
->iTransId
, iCell
);
2167 pNode
= (TreeNode
*)treeShmptr(pDb
, iNodePtr
);
2168 if( rc
!=LSM_OK
) break;
2169 pCsr
->apTreeNode
[pCsr
->iNode
] = pNode
;
2170 iCell
= 1 + (pNode
->aiKeyPtr
[2]!=0) + (pCsr
->iNode
< iLeaf
);
2171 pCsr
->aiCell
[pCsr
->iNode
] = (u8
)iCell
;
2172 }while( pCsr
->iNode
< iLeaf
);
2175 /* Otherwise, the next key is found by following pointer up the tree until
2176 ** there is a key immediately to the left of the pointer followed to reach
2177 ** the sub-tree containing the current key. */
2180 iCell
= pCsr
->aiCell
[pCsr
->iNode
]-1;
2181 if( iCell
>=0 && pCsr
->apTreeNode
[pCsr
->iNode
]->aiKeyPtr
[iCell
] ) break;
2182 }while( (--pCsr
->iNode
)>=0 );
2183 pCsr
->aiCell
[pCsr
->iNode
] = (u8
)iCell
;
2187 if( pCsr
->iNode
>=0 ){
2188 TreeKey
*pK2
= csrGetKey(pCsr
, &pCsr
->blob
, &rc
);
2189 assert( rc
|| treeKeycmp(TKV_KEY(pK2
),pK2
->nKey
,TKV_KEY(pK1
),pK1
->nKey
)<0 );
2191 tblobFree(pDb
, &key1
);
2198 ** Move the cursor to the first (bLast==0) or last (bLast!=0) entry in the
2201 int lsmTreeCursorEnd(TreeCursor
*pCsr
, int bLast
){
2202 lsm_db
*pDb
= pCsr
->pDb
;
2203 TreeRoot
*pRoot
= pCsr
->pRoot
;
2209 /* Discard any saved position data */
2210 treeCursorRestore(pCsr
, 0);
2212 iNodePtr
= pRoot
->iRoot
;
2217 pNode
= (TreeNode
*)treeShmptr(pDb
, iNodePtr
);
2221 iCell
= ((pNode
->aiKeyPtr
[2]==0) ? 2 : 3);
2223 iCell
= ((pNode
->aiKeyPtr
[0]==0) ? 1 : 0);
2226 pCsr
->apTreeNode
[pCsr
->iNode
] = pNode
;
2228 if( (u32
)pCsr
->iNode
<pRoot
->nHeight
-1 ){
2229 iNodePtr
= getChildPtr(pNode
, pRoot
->iTransId
, iCell
);
2233 pCsr
->aiCell
[pCsr
->iNode
] = (u8
)(iCell
- (iNodePtr
==0 && bLast
));
2239 int lsmTreeCursorFlags(TreeCursor
*pCsr
){
2241 if( pCsr
&& pCsr
->iNode
>=0 ){
2243 TreeKey
*pKey
= (TreeKey
*)treeShmptrUnsafe(pCsr
->pDb
,
2244 pCsr
->apTreeNode
[pCsr
->iNode
]->aiKeyPtr
[pCsr
->aiCell
[pCsr
->iNode
]]
2246 assert( rc
==LSM_OK
);
2247 flags
= (pKey
->flags
& ~LSM_CONTIGUOUS
);
2252 int lsmTreeCursorKey(TreeCursor
*pCsr
, int *pFlags
, void **ppKey
, int *pnKey
){
2256 assert( lsmTreeCursorValid(pCsr
) );
2258 pTreeKey
= pCsr
->pSave
;
2260 pTreeKey
= csrGetKey(pCsr
, &pCsr
->blob
, &rc
);
2263 *pnKey
= pTreeKey
->nKey
;
2264 if( pFlags
) *pFlags
= pTreeKey
->flags
;
2265 *ppKey
= (void *)&pTreeKey
[1];
2271 int lsmTreeCursorValue(TreeCursor
*pCsr
, void **ppVal
, int *pnVal
){
2275 rc
= treeCursorRestore(pCsr
, &res
);
2277 TreeKey
*pTreeKey
= csrGetKey(pCsr
, &pCsr
->blob
, &rc
);
2279 if( pTreeKey
->flags
& LSM_INSERT
){
2280 *pnVal
= pTreeKey
->nValue
;
2281 *ppVal
= TKV_VAL(pTreeKey
);
2296 ** Return true if the cursor currently points to a valid entry.
2298 int lsmTreeCursorValid(TreeCursor
*pCsr
){
2299 return (pCsr
&& (pCsr
->pSave
|| pCsr
->iNode
>=0));
2303 ** Store a mark in *pMark. Later on, a call to lsmTreeRollback() with a
2304 ** pointer to the same TreeMark structure may be used to roll the tree
2305 ** contents back to their current state.
2307 void lsmTreeMark(lsm_db
*pDb
, TreeMark
*pMark
){
2308 pMark
->iRoot
= pDb
->treehdr
.root
.iRoot
;
2309 pMark
->nHeight
= pDb
->treehdr
.root
.nHeight
;
2310 pMark
->iWrite
= pDb
->treehdr
.iWrite
;
2311 pMark
->nChunk
= pDb
->treehdr
.nChunk
;
2312 pMark
->iNextShmid
= pDb
->treehdr
.iNextShmid
;
2313 pMark
->iRollback
= intArraySize(&pDb
->rollback
);
2317 ** Roll back to mark pMark. Structure *pMark should have been previously
2318 ** populated by a call to lsmTreeMark().
2320 void lsmTreeRollback(lsm_db
*pDb
, TreeMark
*pMark
){
2328 /* Revert all required v2 pointers. */
2329 nIdx
= intArraySize(&pDb
->rollback
);
2330 for(iIdx
= pMark
->iRollback
; iIdx
<nIdx
; iIdx
++){
2332 pNode
= treeShmptr(pDb
, intArrayEntry(&pDb
->rollback
, iIdx
));
2335 pNode
->iV2Child
= 0;
2338 intArrayTruncate(&pDb
->rollback
, pMark
->iRollback
);
2340 /* Restore the free-chunk list. */
2341 assert( pMark
->iWrite
!=0 );
2342 iChunk
= treeOffsetToChunk(pMark
->iWrite
-1);
2343 pChunk
= treeShmChunk(pDb
, iChunk
);
2344 iNext
= pChunk
->iNext
;
2347 pChunk
= treeShmChunk(pDb
, pDb
->treehdr
.iFirst
);
2348 iShmid
= pChunk
->iShmid
-1;
2351 u32 iFree
= iNext
; /* Current chunk being rollback-freed */
2352 ShmChunk
*pFree
; /* Pointer to chunk iFree */
2354 pFree
= treeShmChunk(pDb
, iFree
);
2355 iNext
= pFree
->iNext
;
2357 if( iFree
<pMark
->nChunk
){
2358 pFree
->iNext
= pDb
->treehdr
.iFirst
;
2359 pFree
->iShmid
= iShmid
--;
2360 pDb
->treehdr
.iFirst
= iFree
;
2364 /* Restore the tree-header fields */
2365 pDb
->treehdr
.root
.iRoot
= pMark
->iRoot
;
2366 pDb
->treehdr
.root
.nHeight
= pMark
->nHeight
;
2367 pDb
->treehdr
.iWrite
= pMark
->iWrite
;
2368 pDb
->treehdr
.nChunk
= pMark
->nChunk
;
2369 pDb
->treehdr
.iNextShmid
= pMark
->iNextShmid
;
2373 ** Load the in-memory tree header from shared-memory into pDb->treehdr.
2374 ** If the header cannot be loaded, return LSM_PROTOCOL.
2376 ** If the header is successfully loaded and parameter piRead is not NULL,
2377 ** is is set to 1 if the header was loaded from ShmHeader.hdr1, or 2 if
2378 ** the header was loaded from ShmHeader.hdr2.
2380 int lsmTreeLoadHeader(lsm_db
*pDb
, int *piRead
){
2381 int nRem
= LSM_ATTEMPTS_BEFORE_PROTOCOL
;
2382 while( (nRem
--)>0 ){
2383 ShmHeader
*pShm
= pDb
->pShmhdr
;
2385 memcpy(&pDb
->treehdr
, &pShm
->hdr1
, sizeof(TreeHeader
));
2386 if( treeHeaderChecksumOk(&pDb
->treehdr
) ){
2387 if( piRead
) *piRead
= 1;
2390 memcpy(&pDb
->treehdr
, &pShm
->hdr2
, sizeof(TreeHeader
));
2391 if( treeHeaderChecksumOk(&pDb
->treehdr
) ){
2392 if( piRead
) *piRead
= 2;
2398 return LSM_PROTOCOL_BKPT
;
2401 int lsmTreeLoadHeaderOk(lsm_db
*pDb
, int iRead
){
2402 TreeHeader
*p
= (iRead
==1) ? &pDb
->pShmhdr
->hdr1
: &pDb
->pShmhdr
->hdr2
;
2403 assert( iRead
==1 || iRead
==2 );
2404 return (0==memcmp(pDb
->treehdr
.aCksum
, p
->aCksum
, sizeof(u32
)*2));
2408 ** This function is called to conclude a transaction. If argument bCommit
2409 ** is true, the transaction is committed. Otherwise it is rolled back.
2411 int lsmTreeEndTransaction(lsm_db
*pDb
, int bCommit
){
2412 ShmHeader
*pShm
= pDb
->pShmhdr
;
2414 treeHeaderChecksum(&pDb
->treehdr
, pDb
->treehdr
.aCksum
);
2415 memcpy(&pShm
->hdr2
, &pDb
->treehdr
, sizeof(TreeHeader
));
2417 memcpy(&pShm
->hdr1
, &pDb
->treehdr
, sizeof(TreeHeader
));
2419 intArrayFree(pDb
->pEnv
, &pDb
->rollback
);
2425 static int assert_delete_ranges_match(lsm_db
*db
){
2427 TreeBlob blob
= {0, 0};
2428 TreeCursor csr
; /* Cursor used to iterate through tree */
2431 treeCursorInit(db
, 0, &csr
);
2432 for( rc
= lsmTreeCursorEnd(&csr
, 0);
2433 rc
==LSM_OK
&& lsmTreeCursorValid(&csr
);
2434 rc
= lsmTreeCursorNext(&csr
)
2436 TreeKey
*pKey
= csrGetKey(&csr
, &blob
, &rc
);
2437 if( rc
!=LSM_OK
) break;
2438 assert( ((prev
&LSM_START_DELETE
)==0)==((pKey
->flags
&LSM_END_DELETE
)==0) );
2442 tblobFree(csr
.pDb
, &csr
.blob
);
2443 tblobFree(csr
.pDb
, &blob
);
2448 static int treeCountEntries(lsm_db
*db
){
2449 TreeCursor csr
; /* Cursor used to iterate through tree */
2453 treeCursorInit(db
, 0, &csr
);
2454 for( rc
= lsmTreeCursorEnd(&csr
, 0);
2455 rc
==LSM_OK
&& lsmTreeCursorValid(&csr
);
2456 rc
= lsmTreeCursorNext(&csr
)
2461 tblobFree(csr
.pDb
, &csr
.blob
);