4 ** The author disclaims copyright to this source code. In place of
5 ** a legal notice, here is a blessing:
7 ** May you do good and not evil.
8 ** May you find forgiveness for yourself and forgive others.
9 ** May you share freely, never taking more than you give.
11 *************************************************************************
13 ** Utilities used to help multiple LSM clients to coexist within the
14 ** same process space.
19 ** Global data. All global variables used by code in this file are grouped
20 ** into the following structure instance.
23 ** Linked list of all Database objects allocated within this process.
24 ** This list may not be traversed without holding the global mutex (see
25 ** functions enterGlobalMutex() and leaveGlobalMutex()).
27 static struct SharedData
{
28 Database
*pDatabase
; /* Linked list of all Database objects */
32 ** Database structure. There is one such structure for each distinct
33 ** database accessed by this process. They are stored in the singly linked
34 ** list starting at global variable gShared.pDatabase. Database objects are
35 ** reference counted. Once the number of connections to the associated
36 ** database drops to zero, they are removed from the linked list and deleted.
39 ** In multi-process mode, this file descriptor is used to obtain locks
40 ** and to access shared-memory. In single process mode, its only job is
41 ** to hold the exclusive lock on the file.
45 /* Protected by the global mutex (enterGlobalMutex/leaveGlobalMutex): */
46 char *zName
; /* Canonical path to database file */
47 int nName
; /* strlen(zName) */
48 int nDbRef
; /* Number of associated lsm_db handles */
49 Database
*pDbNext
; /* Next Database structure in global list */
51 /* Protected by the local mutex (pClientMutex) */
52 int bReadonly
; /* True if Database.pFile is read-only */
53 int bMultiProc
; /* True if running in multi-process mode */
54 lsm_file
*pFile
; /* Used for locks/shm in multi-proc mode */
55 LsmFile
*pLsmFile
; /* List of deferred closes */
56 lsm_mutex
*pClientMutex
; /* Protects the apShmChunk[] and pConn */
57 int nShmChunk
; /* Number of entries in apShmChunk[] array */
58 void **apShmChunk
; /* Array of "shared" memory regions */
59 lsm_db
*pConn
; /* List of connections to this db. */
63 ** Functions to enter and leave the global mutex. This mutex is used
64 ** to protect the global linked-list headed at gShared.pDatabase.
66 static int enterGlobalMutex(lsm_env
*pEnv
){
68 int rc
= lsmMutexStatic(pEnv
, LSM_MUTEX_GLOBAL
, &p
);
69 if( rc
==LSM_OK
) lsmMutexEnter(pEnv
, p
);
72 static void leaveGlobalMutex(lsm_env
*pEnv
){
74 lsmMutexStatic(pEnv
, LSM_MUTEX_GLOBAL
, &p
);
75 lsmMutexLeave(pEnv
, p
);
79 static int holdingGlobalMutex(lsm_env
*pEnv
){
81 lsmMutexStatic(pEnv
, LSM_MUTEX_GLOBAL
, &p
);
82 return lsmMutexHeld(pEnv
, p
);
87 static void assertNotInFreelist(Freelist
*p
, int iBlk
){
89 for(i
=0; i
<p
->nEntry
; i
++){
90 assert( p
->aEntry
[i
].iBlk
!=iBlk
);
94 # define assertNotInFreelist(x,y)
98 ** Append an entry to the free-list. If (iId==-1), this is a delete.
100 int freelistAppend(lsm_db
*db
, u32 iBlk
, i64 iId
){
101 lsm_env
*pEnv
= db
->pEnv
;
105 assert( iId
==-1 || iId
>=0 );
106 p
= db
->bUseFreelist
? db
->pFreelist
: &db
->pWorker
->freelist
;
108 /* Extend the space allocated for the freelist, if required */
109 assert( p
->nAlloc
>=p
->nEntry
);
110 if( p
->nAlloc
==p
->nEntry
){
115 nNew
= (p
->nAlloc
==0 ? 4 : p
->nAlloc
*2);
116 nByte
= sizeof(FreelistEntry
) * nNew
;
117 aNew
= (FreelistEntry
*)lsmRealloc(pEnv
, p
->aEntry
, nByte
);
118 if( !aNew
) return LSM_NOMEM_BKPT
;
123 for(i
=0; i
<p
->nEntry
; i
++){
124 assert( i
==0 || p
->aEntry
[i
].iBlk
> p
->aEntry
[i
-1].iBlk
);
125 if( p
->aEntry
[i
].iBlk
>=iBlk
) break;
128 if( i
<p
->nEntry
&& p
->aEntry
[i
].iBlk
==iBlk
){
129 /* Clobber an existing entry */
130 p
->aEntry
[i
].iId
= iId
;
132 /* Insert a new entry into the list */
133 int nByte
= sizeof(FreelistEntry
)*(p
->nEntry
-i
);
134 memmove(&p
->aEntry
[i
+1], &p
->aEntry
[i
], nByte
);
135 p
->aEntry
[i
].iBlk
= iBlk
;
136 p
->aEntry
[i
].iId
= iId
;
144 ** This function frees all resources held by the Database structure passed
145 ** as the only argument.
147 static void freeDatabase(lsm_env
*pEnv
, Database
*p
){
148 assert( holdingGlobalMutex(pEnv
) );
150 /* Free the mutexes */
151 lsmMutexDel(pEnv
, p
->pClientMutex
);
154 lsmEnvClose(pEnv
, p
->pFile
);
157 /* Free the array of shm pointers */
158 lsmFree(pEnv
, p
->apShmChunk
);
160 /* Free the memory allocated for the Database struct itself */
165 typedef struct DbTruncateCtx DbTruncateCtx
;
166 struct DbTruncateCtx
{
171 static int dbTruncateCb(void *pCtx
, int iBlk
, i64 iSnapshot
){
172 DbTruncateCtx
*p
= (DbTruncateCtx
*)pCtx
;
173 if( iBlk
!=p
->nBlock
|| (p
->iInUse
>=0 && iSnapshot
>=p
->iInUse
) ) return 1;
178 static int dbTruncate(lsm_db
*pDb
, i64 iInUse
){
184 assert( pDb
->pWorker
);
185 ctx
.nBlock
= pDb
->pWorker
->nBlock
;
188 rc
= lsmWalkFreelist(pDb
, 1, dbTruncateCb
, (void *)&ctx
);
189 for(i
=ctx
.nBlock
+1; rc
==LSM_OK
&& i
<=pDb
->pWorker
->nBlock
; i
++){
190 rc
= freelistAppend(pDb
, i
, -1);
194 #ifdef LSM_LOG_FREELIST
195 if( ctx
.nBlock
!=pDb
->pWorker
->nBlock
){
196 lsmLogMessage(pDb
, 0,
197 "dbTruncate(): truncated db to %d blocks",ctx
.nBlock
201 pDb
->pWorker
->nBlock
= ctx
.nBlock
;
209 ** This function is called during database shutdown (when the number of
210 ** connections drops from one to zero). It truncates the database file
211 ** to as small a size as possible without truncating away any blocks that
214 static int dbTruncateFile(lsm_db
*pDb
){
217 assert( pDb
->pWorker
==0 );
218 assert( lsmShmAssertLock(pDb
, LSM_LOCK_DMS1
, LSM_LOCK_EXCL
) );
219 rc
= lsmCheckpointLoadWorker(pDb
);
224 /* Walk the database free-block-list in reverse order. Set ctx.nBlock
225 ** to the block number of the last block in the database that actually
227 ctx
.nBlock
= pDb
->pWorker
->nBlock
;
229 rc
= lsmWalkFreelist(pDb
, 1, dbTruncateCb
, (void *)&ctx
);
231 /* If the last block that contains data is not already the last block in
232 ** the database file, truncate the database file so that it is. */
234 rc
= lsmFsTruncateDb(
235 pDb
->pFS
, (i64
)ctx
.nBlock
*lsmFsBlockSize(pDb
->pFS
)
240 lsmFreeSnapshot(pDb
->pEnv
, pDb
->pWorker
);
245 static void doDbDisconnect(lsm_db
*pDb
){
248 if( pDb
->bReadonly
){
249 lsmShmLock(pDb
, LSM_LOCK_DMS3
, LSM_LOCK_UNLOCK
, 0);
251 /* Block for an exclusive lock on DMS1. This lock serializes all calls
252 ** to doDbConnect() and doDbDisconnect() across all processes. */
253 rc
= lsmShmLock(pDb
, LSM_LOCK_DMS1
, LSM_LOCK_EXCL
, 1);
256 lsmShmLock(pDb
, LSM_LOCK_DMS2
, LSM_LOCK_UNLOCK
, 0);
258 /* Try an exclusive lock on DMS2. If successful, this is the last
259 ** connection to the database. In this case flush the contents of the
260 ** in-memory tree to disk and write a checkpoint. */
261 rc
= lsmShmTestLock(pDb
, LSM_LOCK_DMS2
, 1, LSM_LOCK_EXCL
);
263 rc
= lsmShmTestLock(pDb
, LSM_LOCK_CHECKPOINTER
, 1, LSM_LOCK_EXCL
);
266 int bReadonly
= 0; /* True if there exist read-only conns. */
268 /* Flush the in-memory tree, if required. If there is data to flush,
269 ** this will create a new client snapshot in Database.pClient. The
270 ** checkpoint (serialization) of this snapshot may be written to disk
271 ** by the following block.
273 ** There is no need to take a WRITER lock here. That there are no
274 ** other locks on DMS2 guarantees that there are no other read-write
275 ** connections at this time (and the lock on DMS1 guarantees that
276 ** no new ones may appear).
278 rc
= lsmTreeLoadHeader(pDb
, 0);
279 if( rc
==LSM_OK
&& (lsmTreeHasOld(pDb
) || lsmTreeSize(pDb
)>0) ){
280 rc
= lsmFlushTreeToDisk(pDb
);
283 /* Now check if there are any read-only connections. If there are,
284 ** then do not truncate the db file or unlink the shared-memory
287 rc
= lsmShmTestLock(pDb
, LSM_LOCK_DMS3
, 1, LSM_LOCK_EXCL
);
294 /* Write a checkpoint to disk. */
296 rc
= lsmCheckpointWrite(pDb
, 0);
299 /* If the checkpoint was written successfully, delete the log file
300 ** and, if possible, truncate the database file. */
303 Database
*p
= pDb
->pDatabase
;
305 /* The log file may only be deleted if there are no clients
306 ** read-only clients running rotrans transactions. */
307 rc
= lsmDetectRoTrans(pDb
, &bRotrans
);
308 if( rc
==LSM_OK
&& bRotrans
==0 ){
309 lsmFsCloseAndDeleteLog(pDb
->pFS
);
312 /* The database may only be truncated if there exist no read-only
313 ** clients - either connected or running rotrans transactions. */
314 if( bReadonly
==0 && bRotrans
==0 ){
315 lsmFsUnmap(pDb
->pFS
);
317 if( p
->pFile
&& p
->bMultiProc
){
318 lsmEnvShmUnmap(pDb
->pEnv
, p
->pFile
, 1);
325 if( pDb
->iRwclient
>=0 ){
326 lsmShmLock(pDb
, LSM_LOCK_RWCLIENT(pDb
->iRwclient
), LSM_LOCK_UNLOCK
, 0);
330 lsmShmLock(pDb
, LSM_LOCK_DMS1
, LSM_LOCK_UNLOCK
, 0);
335 static int doDbConnect(lsm_db
*pDb
){
336 const int nUsMax
= 100000; /* Max value for nUs */
337 int nUs
= 1000; /* us to wait between DMS1 attempts */
340 /* Obtain a pointer to the shared-memory header */
341 assert( pDb
->pShmhdr
==0 );
342 assert( pDb
->bReadonly
==0 );
344 /* Block for an exclusive lock on DMS1. This lock serializes all calls
345 ** to doDbConnect() and doDbDisconnect() across all processes. */
347 rc
= lsmShmLock(pDb
, LSM_LOCK_DMS1
, LSM_LOCK_EXCL
, 1);
348 if( rc
!=LSM_BUSY
) break;
349 lsmEnvSleep(pDb
->pEnv
, nUs
);
351 if( nUs
>nUsMax
) nUs
= nUsMax
;
354 rc
= lsmShmCacheChunks(pDb
, 1);
356 if( rc
!=LSM_OK
) return rc
;
357 pDb
->pShmhdr
= (ShmHeader
*)pDb
->apShm
[0];
359 /* Try an exclusive lock on DMS2/DMS3. If successful, this is the first
360 ** and only connection to the database. In this case initialize the
361 ** shared-memory and run log file recovery. */
362 assert( LSM_LOCK_DMS3
==1+LSM_LOCK_DMS2
);
363 rc
= lsmShmTestLock(pDb
, LSM_LOCK_DMS2
, 2, LSM_LOCK_EXCL
);
365 memset(pDb
->pShmhdr
, 0, sizeof(ShmHeader
));
366 rc
= lsmCheckpointRecover(pDb
);
368 rc
= lsmLogRecover(pDb
);
371 ShmHeader
*pShm
= pDb
->pShmhdr
;
372 pShm
->aReader
[0].iLsmId
= lsmCheckpointId(pShm
->aSnap1
, 0);
373 pShm
->aReader
[0].iTreeId
= pDb
->treehdr
.iUsedShmid
;
375 }else if( rc
==LSM_BUSY
){
379 /* Take a shared lock on DMS2. In multi-process mode this lock "cannot"
380 ** fail, as connections may only hold an exclusive lock on DMS2 if they
381 ** first hold an exclusive lock on DMS1. And this connection is currently
382 ** holding the exclusive lock on DSM1.
384 ** However, if some other connection has the database open in single-process
385 ** mode, this operation will fail. In this case, return the error to the
386 ** caller - the attempt to connect to the db has failed.
389 rc
= lsmShmLock(pDb
, LSM_LOCK_DMS2
, LSM_LOCK_SHARED
, 0);
392 /* If anything went wrong, unlock DMS2. Otherwise, try to take an exclusive
393 ** lock on one of the LSM_LOCK_RWCLIENT() locks. Unlock DMS1 in any case. */
398 for(i
=0; i
<LSM_LOCK_NRWCLIENT
; i
++){
399 int rc2
= lsmShmLock(pDb
, LSM_LOCK_RWCLIENT(i
), LSM_LOCK_EXCL
, 0);
400 if( rc2
==LSM_OK
) pDb
->iRwclient
= i
;
407 lsmShmLock(pDb
, LSM_LOCK_DMS1
, LSM_LOCK_UNLOCK
, 0);
412 static int dbOpenSharedFd(lsm_env
*pEnv
, Database
*p
, int bRoOk
){
415 rc
= lsmEnvOpen(pEnv
, p
->zName
, 0, &p
->pFile
);
416 if( rc
==LSM_IOERR
&& bRoOk
){
417 rc
= lsmEnvOpen(pEnv
, p
->zName
, LSM_OPEN_READONLY
, &p
->pFile
);
425 ** Return a reference to the shared Database handle for the database
426 ** identified by canonical path zName. If this is the first connection to
427 ** the named database, a new Database object is allocated. Otherwise, a
428 ** pointer to an existing object is returned.
430 ** If successful, *ppDatabase is set to point to the shared Database
431 ** structure and LSM_OK returned. Otherwise, *ppDatabase is set to NULL
432 ** and and LSM error code returned.
434 ** Each successful call to this function should be (eventually) matched
435 ** by a call to lsmDbDatabaseRelease().
437 int lsmDbDatabaseConnect(
438 lsm_db
*pDb
, /* Database handle */
439 const char *zName
/* Full-path to db file */
441 lsm_env
*pEnv
= pDb
->pEnv
;
442 int rc
; /* Return code */
443 Database
*p
= 0; /* Pointer returned via *ppDatabase */
444 int nName
= lsmStrlen(zName
);
446 assert( pDb
->pDatabase
==0 );
447 rc
= enterGlobalMutex(pEnv
);
450 /* Search the global list for an existing object. TODO: Need something
451 ** better than the memcmp() below to figure out if a given Database
452 ** object represents the requested file. */
453 for(p
=gShared
.pDatabase
; p
; p
=p
->pDbNext
){
454 if( nName
==p
->nName
&& 0==memcmp(zName
, p
->zName
, nName
) ) break;
457 /* If no suitable Database object was found, allocate a new one. */
459 p
= (Database
*)lsmMallocZeroRc(pEnv
, sizeof(Database
)+nName
+1, &rc
);
461 /* If the allocation was successful, fill in other fields and
462 ** allocate the client mutex. */
464 p
->bMultiProc
= pDb
->bMultiProc
;
465 p
->zName
= (char *)&p
[1];
467 memcpy((void *)p
->zName
, zName
, nName
+1);
468 rc
= lsmMutexNew(pEnv
, &p
->pClientMutex
);
471 /* If nothing has gone wrong so far, open the shared fd. And if that
472 ** succeeds and this connection requested single-process mode,
473 ** attempt to take the exclusive lock on DMS2. */
475 int bReadonly
= (pDb
->bReadonly
&& pDb
->bMultiProc
);
476 rc
= dbOpenSharedFd(pDb
->pEnv
, p
, bReadonly
);
479 if( rc
==LSM_OK
&& p
->bMultiProc
==0 ){
480 /* Hold an exclusive lock DMS1 while grabbing DMS2. This ensures
481 ** that any ongoing call to doDbDisconnect() (even one in another
482 ** process) is finished before proceeding. */
483 assert( p
->bReadonly
==0 );
484 rc
= lsmEnvLock(pDb
->pEnv
, p
->pFile
, LSM_LOCK_DMS1
, LSM_LOCK_EXCL
);
486 rc
= lsmEnvLock(pDb
->pEnv
, p
->pFile
, LSM_LOCK_DMS2
, LSM_LOCK_EXCL
);
487 lsmEnvLock(pDb
->pEnv
, p
->pFile
, LSM_LOCK_DMS1
, LSM_LOCK_UNLOCK
);
492 p
->pDbNext
= gShared
.pDatabase
;
493 gShared
.pDatabase
= p
;
495 freeDatabase(pEnv
, p
);
503 leaveGlobalMutex(pEnv
);
506 lsmMutexEnter(pDb
->pEnv
, p
->pClientMutex
);
507 pDb
->pNext
= p
->pConn
;
509 lsmMutexLeave(pDb
->pEnv
, p
->pClientMutex
);
516 rc
= lsmFsOpen(pDb
, zName
, p
->bReadonly
);
519 /* If the db handle is read-write, then connect to the system now. Run
520 ** recovery as necessary. Or, if this is a read-only database handle,
521 ** defer attempting to connect to the system until a read-transaction
524 rc
= lsmFsConfigure(pDb
);
526 if( rc
==LSM_OK
&& pDb
->bReadonly
==0 ){
527 rc
= doDbConnect(pDb
);
533 static void dbDeferClose(lsm_db
*pDb
){
536 Database
*p
= pDb
->pDatabase
;
537 pLsmFile
= lsmFsDeferClose(pDb
->pFS
);
538 pLsmFile
->pNext
= p
->pLsmFile
;
539 p
->pLsmFile
= pLsmFile
;
543 LsmFile
*lsmDbRecycleFd(lsm_db
*db
){
545 Database
*p
= db
->pDatabase
;
546 lsmMutexEnter(db
->pEnv
, p
->pClientMutex
);
547 if( (pRet
= p
->pLsmFile
)!=0 ){
548 p
->pLsmFile
= pRet
->pNext
;
550 lsmMutexLeave(db
->pEnv
, p
->pClientMutex
);
555 ** Release a reference to a Database object obtained from
556 ** lsmDbDatabaseConnect(). There should be exactly one call to this function
557 ** for each successful call to Find().
559 void lsmDbDatabaseRelease(lsm_db
*pDb
){
560 Database
*p
= pDb
->pDatabase
;
568 lsmFsUnmap(pDb
->pFS
);
569 lsmMutexEnter(pDb
->pEnv
, p
->pClientMutex
);
570 for(ppDb
=&p
->pConn
; *ppDb
!=pDb
; ppDb
=&((*ppDb
)->pNext
));
573 lsmMutexLeave(pDb
->pEnv
, p
->pClientMutex
);
575 enterGlobalMutex(pDb
->pEnv
);
582 /* Remove the Database structure from the linked list. */
583 for(pp
=&gShared
.pDatabase
; *pp
!=p
; pp
=&((*pp
)->pDbNext
));
586 /* If they were allocated from the heap, free the shared memory chunks */
587 if( p
->bMultiProc
==0 ){
589 for(i
=0; i
<p
->nShmChunk
; i
++){
590 lsmFree(pDb
->pEnv
, p
->apShmChunk
[i
]);
594 /* Close any outstanding file descriptors */
595 for(pIter
=p
->pLsmFile
; pIter
; pIter
=pNext
){
596 pNext
= pIter
->pNext
;
597 lsmEnvClose(pDb
->pEnv
, pIter
->pFile
);
598 lsmFree(pDb
->pEnv
, pIter
);
600 freeDatabase(pDb
->pEnv
, p
);
602 leaveGlobalMutex(pDb
->pEnv
);
606 Level
*lsmDbSnapshotLevel(Snapshot
*pSnapshot
){
607 return pSnapshot
->pLevel
;
610 void lsmDbSnapshotSetLevel(Snapshot
*pSnap
, Level
*pLevel
){
611 pSnap
->pLevel
= pLevel
;
614 /* TODO: Shuffle things around to get rid of this */
615 static int firstSnapshotInUse(lsm_db
*, i64
*);
618 ** Context object used by the lsmWalkFreelist() utility.
620 typedef struct WalkFreelistCtx WalkFreelistCtx
;
621 struct WalkFreelistCtx
{
626 int (*xUsr
)(void *, int, i64
); /* User callback function */
627 void *pUsrctx
; /* User callback context */
628 int bDone
; /* Set to true after xUsr() returns true */
632 ** Callback used by lsmWalkFreelist().
634 static int walkFreelistCb(void *pCtx
, int iBlk
, i64 iSnapshot
){
635 WalkFreelistCtx
*p
= (WalkFreelistCtx
*)pCtx
;
636 const int iDir
= (p
->bReverse
? -1 : 1);
637 Freelist
*pFree
= p
->pFreelist
;
639 assert( p
->bDone
==0 );
642 while( (p
->iFree
< pFree
->nEntry
) && p
->iFree
>=0 ){
643 FreelistEntry
*pEntry
= &pFree
->aEntry
[p
->iFree
];
644 if( (p
->bReverse
==0 && pEntry
->iBlk
>(u32
)iBlk
)
645 || (p
->bReverse
!=0 && pEntry
->iBlk
<(u32
)iBlk
)
651 && p
->xUsr(p
->pUsrctx
, pEntry
->iBlk
, pEntry
->iId
)
656 if( pEntry
->iBlk
==(u32
)iBlk
) return 0;
661 if( p
->xUsr(p
->pUsrctx
, iBlk
, iSnapshot
) ){
669 ** The database handle passed as the first argument must be the worker
670 ** connection. This function iterates through the contents of the current
671 ** free block list, invoking the supplied callback once for each list
674 ** The difference between this function and lsmSortedWalkFreelist() is
675 ** that lsmSortedWalkFreelist() only considers those free-list elements
676 ** stored within the LSM. This function also merges in any in-memory
680 lsm_db
*pDb
, /* Database handle (must be worker) */
681 int bReverse
, /* True to iterate from largest to smallest */
682 int (*x
)(void *, int, i64
), /* Callback function */
683 void *pCtx
/* First argument to pass to callback */
685 const int iDir
= (bReverse
? -1 : 1);
689 WalkFreelistCtx ctx
[2];
692 ctx
[0].bReverse
= bReverse
;
693 ctx
[0].pFreelist
= &pDb
->pWorker
->freelist
;
694 if( ctx
[0].pFreelist
&& bReverse
){
695 ctx
[0].iFree
= ctx
[0].pFreelist
->nEntry
-1;
699 ctx
[0].xUsr
= walkFreelistCb
;
700 ctx
[0].pUsrctx
= (void *)&ctx
[1];
704 ctx
[1].bReverse
= bReverse
;
705 ctx
[1].pFreelist
= pDb
->pFreelist
;
706 if( ctx
[1].pFreelist
&& bReverse
){
707 ctx
[1].iFree
= ctx
[1].pFreelist
->nEntry
-1;
712 ctx
[1].pUsrctx
= pCtx
;
715 rc
= lsmSortedWalkFreelist(pDb
, bReverse
, walkFreelistCb
, (void *)&ctx
[0]);
717 if( ctx
[0].bDone
==0 ){
718 for(iCtx
=0; iCtx
<2; iCtx
++){
720 WalkFreelistCtx
*p
= &ctx
[iCtx
];
722 p
->pFreelist
&& rc
==LSM_OK
&& i
<p
->pFreelist
->nEntry
&& i
>=0;
725 FreelistEntry
*pEntry
= &p
->pFreelist
->aEntry
[i
];
726 if( pEntry
->iId
>=0 && p
->xUsr(p
->pUsrctx
, pEntry
->iBlk
, pEntry
->iId
) ){
737 typedef struct FindFreeblockCtx FindFreeblockCtx
;
738 struct FindFreeblockCtx
{
744 static int findFreeblockCb(void *pCtx
, int iBlk
, i64 iSnapshot
){
745 FindFreeblockCtx
*p
= (FindFreeblockCtx
*)pCtx
;
746 if( iSnapshot
<p
->iInUse
&& (iBlk
!=1 || p
->bNotOne
==0) ){
753 static int findFreeblock(lsm_db
*pDb
, i64 iInUse
, int bNotOne
, int *piRet
){
754 int rc
; /* Return code */
755 FindFreeblockCtx ctx
; /* Context object */
759 ctx
.bNotOne
= bNotOne
;
760 rc
= lsmWalkFreelist(pDb
, 0, findFreeblockCb
, (void *)&ctx
);
767 ** Allocate a new database file block to write data to, either by extending
768 ** the database file or by recycling a free-list entry. The worker snapshot
769 ** must be held in order to call this function.
771 ** If successful, *piBlk is set to the block number allocated and LSM_OK is
772 ** returned. Otherwise, *piBlk is zeroed and an lsm error code returned.
774 int lsmBlockAllocate(lsm_db
*pDb
, int iBefore
, int *piBlk
){
775 Snapshot
*p
= pDb
->pWorker
;
776 int iRet
= 0; /* Block number of allocated block */
778 i64 iInUse
= 0; /* Snapshot id still in use */
779 i64 iSynced
= 0; /* Snapshot id synced to disk */
783 #ifdef LSM_LOG_FREELIST
785 static int nCall
= 0;
788 rc
= lsmInfoFreelist(pDb
, &zFree
);
789 if( rc
!=LSM_OK
) return rc
;
790 lsmLogMessage(pDb
, 0, "lsmBlockAllocate(): %d freelist: %s", nCall
, zFree
);
791 lsmFree(pDb
->pEnv
, zFree
);
795 /* Set iInUse to the smallest snapshot id that is either:
797 ** * Currently in use by a database client,
798 ** * May be used by a database client in the future, or
799 ** * Is the most recently checkpointed snapshot (i.e. the one that will
800 ** be used following recovery if a failure occurs at this point).
802 rc
= lsmCheckpointSynced(pDb
, &iSynced
, 0, 0);
803 if( rc
==LSM_OK
&& iSynced
==0 ) iSynced
= p
->iId
;
805 if( rc
==LSM_OK
&& pDb
->iReader
>=0 ){
806 assert( pDb
->pClient
);
807 iInUse
= LSM_MIN(iInUse
, pDb
->pClient
->iId
);
809 if( rc
==LSM_OK
) rc
= firstSnapshotInUse(pDb
, &iInUse
);
811 #ifdef LSM_LOG_FREELIST
813 lsmLogMessage(pDb
, 0, "lsmBlockAllocate(): "
814 "snapshot-in-use: %lld (iSynced=%lld) (client-id=%lld)",
815 iInUse
, iSynced
, (pDb
->iReader
>=0 ? pDb
->pClient
->iId
: 0)
821 /* Unless there exists a read-only transaction (which prevents us from
822 ** recycling any blocks regardless, query the free block list for a
823 ** suitable block to reuse.
825 ** It might seem more natural to check for a read-only transaction at
826 ** the start of this function. However, it is better do wait until after
827 ** the call to lsmCheckpointSynced() to do so.
831 rc
= lsmDetectRoTrans(pDb
, &bRotrans
);
833 if( rc
==LSM_OK
&& bRotrans
==0 ){
834 rc
= findFreeblock(pDb
, iInUse
, (iBefore
>0), &iRet
);
838 if( iBefore
>0 && (iRet
<=0 || iRet
>=iBefore
) ){
841 }else if( rc
==LSM_OK
){
842 /* If a block was found in the free block list, use it and remove it from
843 ** the list. Otherwise, if no suitable block was found, allocate one from
844 ** the end of the file. */
846 #ifdef LSM_LOG_FREELIST
847 lsmLogMessage(pDb
, 0,
848 "reusing block %d (snapshot-in-use=%lld)", iRet
, iInUse
);
850 rc
= freelistAppend(pDb
, iRet
, -1);
852 rc
= dbTruncate(pDb
, iInUse
);
855 iRet
= ++(p
->nBlock
);
856 #ifdef LSM_LOG_FREELIST
857 lsmLogMessage(pDb
, 0, "extending file to %d blocks", iRet
);
862 assert( iBefore
>0 || iRet
>0 || rc
!=LSM_OK
);
868 ** Free a database block. The worker snapshot must be held in order to call
871 ** If successful, LSM_OK is returned. Otherwise, an lsm error code (e.g.
874 int lsmBlockFree(lsm_db
*pDb
, int iBlk
){
875 Snapshot
*p
= pDb
->pWorker
;
876 assert( lsmShmAssertWorker(pDb
) );
878 #ifdef LSM_LOG_FREELIST
879 lsmLogMessage(pDb
, LSM_OK
, "lsmBlockFree(): Free block %d", iBlk
);
882 return freelistAppend(pDb
, iBlk
, p
->iId
);
886 ** Refree a database block. The worker snapshot must be held in order to call
889 ** Refreeing is required when a block is allocated using lsmBlockAllocate()
890 ** but then not used. This function is used to push the block back onto
891 ** the freelist. Refreeing a block is different from freeing is, as a refreed
892 ** block may be reused immediately. Whereas a freed block can not be reused
893 ** until (at least) after the next checkpoint.
895 int lsmBlockRefree(lsm_db
*pDb
, int iBlk
){
896 int rc
= LSM_OK
; /* Return code */
898 #ifdef LSM_LOG_FREELIST
899 lsmLogMessage(pDb
, LSM_OK
, "lsmBlockRefree(): Refree block %d", iBlk
);
902 rc
= freelistAppend(pDb
, iBlk
, 0);
907 ** If required, copy a database checkpoint from shared memory into the
910 ** The WORKER lock must not be held when this is called. This is because
911 ** this function may indirectly call fsync(). And the WORKER lock should
912 ** not be held that long (in case it is required by a client flushing an
913 ** in-memory tree to disk).
915 int lsmCheckpointWrite(lsm_db
*pDb
, u32
*pnWrite
){
916 int rc
; /* Return Code */
919 assert( pDb
->pWorker
==0 );
920 assert( 1 || pDb
->pClient
==0 );
921 assert( lsmShmAssertLock(pDb
, LSM_LOCK_WORKER
, LSM_LOCK_UNLOCK
) );
923 rc
= lsmShmLock(pDb
, LSM_LOCK_CHECKPOINTER
, LSM_LOCK_EXCL
, 0);
924 if( rc
!=LSM_OK
) return rc
;
926 rc
= lsmCheckpointLoad(pDb
, 0);
928 int nBlock
= lsmCheckpointNBlock(pDb
->aSnapshot
);
929 ShmHeader
*pShm
= pDb
->pShmhdr
;
930 int bDone
= 0; /* True if checkpoint is already stored */
932 /* Check if this checkpoint has already been written to the database
933 ** file. If so, set variable bDone to true. */
934 if( pShm
->iMetaPage
){
935 MetaPage
*pPg
; /* Meta page */
936 u8
*aData
; /* Meta-page data buffer */
937 int nData
; /* Size of aData[] in bytes */
938 i64 iCkpt
; /* Id of checkpoint just loaded */
939 i64 iDisk
= 0; /* Id of checkpoint already stored in db */
940 iCkpt
= lsmCheckpointId(pDb
->aSnapshot
, 0);
941 rc
= lsmFsMetaPageGet(pDb
->pFS
, 0, pShm
->iMetaPage
, &pPg
);
943 aData
= lsmFsMetaPageData(pPg
, &nData
);
944 iDisk
= lsmCheckpointId((u32
*)aData
, 1);
945 nWrite
= lsmCheckpointNWrite((u32
*)aData
, 1);
946 lsmFsMetaPageRelease(pPg
);
948 bDone
= (iDisk
>=iCkpt
);
951 if( rc
==LSM_OK
&& bDone
==0 ){
952 int iMeta
= (pShm
->iMetaPage
% 2) + 1;
953 if( pDb
->eSafety
!=LSM_SAFETY_OFF
){
954 rc
= lsmFsSyncDb(pDb
->pFS
, nBlock
);
956 if( rc
==LSM_OK
) rc
= lsmCheckpointStore(pDb
, iMeta
);
957 if( rc
==LSM_OK
&& pDb
->eSafety
!=LSM_SAFETY_OFF
){
958 rc
= lsmFsSyncDb(pDb
->pFS
, 0);
961 pShm
->iMetaPage
= iMeta
;
962 nWrite
= lsmCheckpointNWrite(pDb
->aSnapshot
, 0) - nWrite
;
965 lsmLogMessage(pDb
, 0, "finish checkpoint %d",
966 (int)lsmCheckpointId(pDb
->aSnapshot
, 0)
972 lsmShmLock(pDb
, LSM_LOCK_CHECKPOINTER
, LSM_LOCK_UNLOCK
, 0);
973 if( pnWrite
&& rc
==LSM_OK
) *pnWrite
= nWrite
;
977 int lsmBeginWork(lsm_db
*pDb
){
980 /* Attempt to take the WORKER lock */
981 rc
= lsmShmLock(pDb
, LSM_LOCK_WORKER
, LSM_LOCK_EXCL
, 0);
983 /* Deserialize the current worker snapshot */
985 rc
= lsmCheckpointLoadWorker(pDb
);
990 void lsmFreeSnapshot(lsm_env
*pEnv
, Snapshot
*p
){
992 lsmSortedFreeLevel(pEnv
, p
->pLevel
);
993 lsmFree(pEnv
, p
->freelist
.aEntry
);
994 lsmFree(pEnv
, p
->redirect
.a
);
1000 ** Attempt to populate one of the read-lock slots to contain lock values
1001 ** iLsm/iShm. Or, if such a slot exists already, this function is a no-op.
1003 ** It is not an error if no slot can be populated because the write-lock
1004 ** cannot be obtained. If any other error occurs, return an LSM error code.
1005 ** Otherwise, LSM_OK.
1007 ** This function is called at various points to try to ensure that there
1008 ** always exists at least one read-lock slot that can be used by a read-only
1009 ** client. And so that, in the usual case, there is an "exact match" available
1010 ** whenever a read transaction is opened by any client. At present this
1011 ** function is called when:
1013 ** * A write transaction that called lsmTreeDiscardOld() is committed, and
1014 ** * Whenever the working snapshot is updated (i.e. lsmFinishWork()).
1016 static int dbSetReadLock(lsm_db
*db
, i64 iLsm
, u32 iShm
){
1018 ShmHeader
*pShm
= db
->pShmhdr
;
1021 /* Check if there is already a slot containing the required values. */
1022 for(i
=0; i
<LSM_LOCK_NREADER
; i
++){
1023 ShmReader
*p
= &pShm
->aReader
[i
];
1024 if( p
->iLsmId
==iLsm
&& p
->iTreeId
==iShm
) return LSM_OK
;
1027 /* Iterate through all read-lock slots, attempting to take a write-lock
1028 ** on each of them. If a write-lock succeeds, populate the locked slot
1029 ** with the required values and break out of the loop. */
1030 for(i
=0; rc
==LSM_OK
&& i
<LSM_LOCK_NREADER
; i
++){
1031 rc
= lsmShmLock(db
, LSM_LOCK_READER(i
), LSM_LOCK_EXCL
, 0);
1035 ShmReader
*p
= &pShm
->aReader
[i
];
1038 lsmShmLock(db
, LSM_LOCK_READER(i
), LSM_LOCK_UNLOCK
, 0);
1047 ** Release the read-lock currently held by connection db.
1049 int dbReleaseReadlock(lsm_db
*db
){
1051 if( db
->iReader
>=0 ){
1052 rc
= lsmShmLock(db
, LSM_LOCK_READER(db
->iReader
), LSM_LOCK_UNLOCK
, 0);
1061 ** Argument bFlush is true if the contents of the in-memory tree has just
1062 ** been flushed to disk. The significance of this is that once the snapshot
1063 ** created to hold the updated state of the database is synced to disk, log
1064 ** file space can be recycled.
1066 void lsmFinishWork(lsm_db
*pDb
, int bFlush
, int *pRc
){
1068 assert( rc
!=0 || pDb
->pWorker
);
1070 /* If no error has occurred, serialize the worker snapshot and write
1071 ** it to shared memory. */
1073 rc
= lsmSaveWorker(pDb
, bFlush
);
1076 /* Assuming no error has occurred, update a read lock slot with the
1077 ** new snapshot id (see comments above function dbSetReadLock()). */
1079 if( pDb
->iReader
<0 ){
1080 rc
= lsmTreeLoadHeader(pDb
, 0);
1083 rc
= dbSetReadLock(pDb
, pDb
->pWorker
->iId
, pDb
->treehdr
.iUsedShmid
);
1087 /* Free the snapshot object. */
1088 lsmFreeSnapshot(pDb
->pEnv
, pDb
->pWorker
);
1092 lsmShmLock(pDb
, LSM_LOCK_WORKER
, LSM_LOCK_UNLOCK
, 0);
1097 ** Called when recovery is finished.
1099 int lsmFinishRecovery(lsm_db
*pDb
){
1100 lsmTreeEndTransaction(pDb
, 1);
1105 ** Check if the currently configured compression functions
1106 ** (LSM_CONFIG_SET_COMPRESSION) are compatible with a database that has its
1107 ** compression id set to iReq. Compression routines are compatible if iReq
1108 ** is zero (indicating the database is empty), or if it is equal to the
1109 ** compression id of the configured compression routines.
1111 ** If the check shows that the current compression are incompatible and there
1112 ** is a compression factory registered, give it a chance to install new
1113 ** compression routines.
1115 ** If, after any registered factory is invoked, the compression functions
1116 ** are still incompatible, return LSM_MISMATCH. Otherwise, LSM_OK.
1118 int lsmCheckCompressionId(lsm_db
*pDb
, u32 iReq
){
1119 if( iReq
!=LSM_COMPRESSION_EMPTY
&& pDb
->compress
.iId
!=iReq
){
1120 if( pDb
->factory
.xFactory
){
1121 pDb
->bInFactory
= 1;
1122 pDb
->factory
.xFactory(pDb
->factory
.pCtx
, pDb
, iReq
);
1123 pDb
->bInFactory
= 0;
1125 if( pDb
->compress
.iId
!=iReq
){
1127 return LSM_MISMATCH
;
1135 ** Begin a read transaction. This function is a no-op if the connection
1136 ** passed as the only argument already has an open read transaction.
1138 int lsmBeginReadTrans(lsm_db
*pDb
){
1139 const int MAX_READLOCK_ATTEMPTS
= 10;
1140 const int nMaxAttempt
= (pDb
->bRoTrans
? 1 : MAX_READLOCK_ATTEMPTS
);
1142 int rc
= LSM_OK
; /* Return code */
1145 assert( pDb
->pWorker
==0 );
1147 while( rc
==LSM_OK
&& pDb
->iReader
<0 && (iAttempt
++)<nMaxAttempt
){
1150 assert( pDb
->pCsr
==0 && pDb
->nTransOpen
==0 );
1152 /* Load the in-memory tree header. */
1153 rc
= lsmTreeLoadHeader(pDb
, &iTreehdr
);
1155 /* Load the database snapshot */
1157 if( lsmCheckpointClientCacheOk(pDb
)==0 ){
1158 lsmFreeSnapshot(pDb
->pEnv
, pDb
->pClient
);
1160 lsmMCursorFreeCache(pDb
);
1161 lsmFsPurgeCache(pDb
->pFS
);
1162 rc
= lsmCheckpointLoad(pDb
, &iSnap
);
1168 /* Take a read-lock on the tree and snapshot just loaded. Then check
1169 ** that the shared-memory still contains the same values. If so, proceed.
1170 ** Otherwise, relinquish the read-lock and retry the whole procedure
1171 ** (starting with loading the in-memory tree header). */
1173 u32 iShmMax
= pDb
->treehdr
.iUsedShmid
;
1174 u32 iShmMin
= pDb
->treehdr
.iNextShmid
+1-LSM_MAX_SHMCHUNKS
;
1176 pDb
, lsmCheckpointId(pDb
->aSnapshot
, 0), iShmMin
, iShmMax
1179 if( lsmTreeLoadHeaderOk(pDb
, iTreehdr
)
1180 && lsmCheckpointLoadOk(pDb
, iSnap
)
1182 /* Read lock has been successfully obtained. Deserialize the
1183 ** checkpoint just loaded. TODO: This will be removed after
1184 ** lsm_sorted.c is changed to work directly from the serialized
1185 ** version of the snapshot. */
1186 if( pDb
->pClient
==0 ){
1187 rc
= lsmCheckpointDeserialize(pDb
, 0, pDb
->aSnapshot
,&pDb
->pClient
);
1189 assert( (rc
==LSM_OK
)==(pDb
->pClient
!=0) );
1190 assert( pDb
->iReader
>=0 );
1192 /* Check that the client has the right compression hooks loaded.
1193 ** If not, set rc to LSM_MISMATCH. */
1195 rc
= lsmCheckCompressionId(pDb
, pDb
->pClient
->iCmpId
);
1198 rc
= dbReleaseReadlock(pDb
);
1207 if( rc
==LSM_OK
&& pDb
->pClient
){
1209 "reading %p: snapshot:%d used-shmid:%d trans-id:%d iOldShmid=%d\n",
1211 (int)pDb
->pClient
->iId
, (int)pDb
->treehdr
.iUsedShmid
,
1212 (int)pDb
->treehdr
.root
.iTransId
,
1213 (int)pDb
->treehdr
.iOldShmid
1220 rc
= lsmShmCacheChunks(pDb
, pDb
->treehdr
.nChunk
);
1223 dbReleaseReadlock(pDb
);
1225 if( pDb
->pClient
==0 && rc
==LSM_OK
) rc
= LSM_BUSY
;
1230 ** This function is used by a read-write connection to determine if there
1231 ** are currently one or more read-only transactions open on the database
1232 ** (in this context a read-only transaction is one opened by a read-only
1233 ** connection on a non-live database).
1235 ** If no error occurs, LSM_OK is returned and *pbExists is set to true if
1236 ** some other connection has a read-only transaction open, or false
1237 ** otherwise. If an error occurs an LSM error code is returned and the final
1238 ** value of *pbExist is undefined.
1240 int lsmDetectRoTrans(lsm_db
*db
, int *pbExist
){
1243 /* Only a read-write connection may use this function. */
1244 assert( db
->bReadonly
==0 );
1246 rc
= lsmShmTestLock(db
, LSM_LOCK_ROTRANS
, 1, LSM_LOCK_EXCL
);
1258 ** db is a read-only database handle in the disconnected state. This function
1259 ** attempts to open a read-transaction on the database. This may involve
1260 ** connecting to the database system (opening shared memory etc.).
1262 int lsmBeginRoTrans(lsm_db
*db
){
1265 assert( db
->bReadonly
&& db
->pShmhdr
==0 );
1266 assert( db
->iReader
<0 );
1268 if( db
->bRoTrans
==0 ){
1270 /* Attempt a shared-lock on DMS1. */
1271 rc
= lsmShmLock(db
, LSM_LOCK_DMS1
, LSM_LOCK_SHARED
, 0);
1272 if( rc
!=LSM_OK
) return rc
;
1274 rc
= lsmShmTestLock(
1275 db
, LSM_LOCK_RWCLIENT(0), LSM_LOCK_NREADER
, LSM_LOCK_SHARED
1278 /* System is not live. Take a SHARED lock on the ROTRANS byte and
1279 ** release DMS1. Locking ROTRANS tells all read-write clients that they
1280 ** may not recycle any disk space from within the database or log files,
1281 ** as a read-only client may be using it. */
1282 rc
= lsmShmLock(db
, LSM_LOCK_ROTRANS
, LSM_LOCK_SHARED
, 0);
1283 lsmShmLock(db
, LSM_LOCK_DMS1
, LSM_LOCK_UNLOCK
, 0);
1287 rc
= lsmShmCacheChunks(db
, 1);
1289 db
->pShmhdr
= (ShmHeader
*)db
->apShm
[0];
1290 memset(db
->pShmhdr
, 0, sizeof(ShmHeader
));
1291 rc
= lsmCheckpointRecover(db
);
1293 rc
= lsmLogRecover(db
);
1297 }else if( rc
==LSM_BUSY
){
1298 /* System is live! */
1299 rc
= lsmShmLock(db
, LSM_LOCK_DMS3
, LSM_LOCK_SHARED
, 0);
1300 lsmShmLock(db
, LSM_LOCK_DMS1
, LSM_LOCK_UNLOCK
, 0);
1302 rc
= lsmShmCacheChunks(db
, 1);
1304 db
->pShmhdr
= (ShmHeader
*)db
->apShm
[0];
1309 /* In 'lsm_open()' we don't update the page and block sizes in the
1310 ** Filesystem for 'readonly' connection. Because member 'db->pShmhdr' is a
1311 ** nullpointer, this prevents loading a checkpoint. Now that the system is
1312 ** live this member should be set. So we can update both values in
1315 ** Configure the file-system connection with the page-size and block-size
1316 ** of this database. Even if the database file is zero bytes in size
1317 ** on disk, these values have been set in shared-memory by now, and so
1318 ** are guaranteed not to change during the lifetime of this connection. */
1320 && 0==lsmCheckpointClientCacheOk(db
)
1321 && LSM_OK
==(rc
=lsmCheckpointLoad(db
, 0))
1323 lsmFsSetPageSize(db
->pFS
, lsmCheckpointPgsz(db
->aSnapshot
));
1324 lsmFsSetBlockSize(db
->pFS
, lsmCheckpointBlksz(db
->aSnapshot
));
1328 rc
= lsmBeginReadTrans(db
);
1336 ** Close the currently open read transaction.
1338 void lsmFinishReadTrans(lsm_db
*pDb
){
1340 /* Worker connections should not be closing read transactions. And
1341 ** read transactions should only be closed after all cursors and write
1342 ** transactions have been closed. Finally pClient should be non-NULL
1343 ** only iff pDb->iReader>=0. */
1344 assert( pDb
->pWorker
==0 );
1345 assert( pDb
->pCsr
==0 && pDb
->nTransOpen
==0 );
1347 if( pDb
->bRoTrans
){
1349 for(i
=0; i
<pDb
->nShm
; i
++){
1350 lsmFree(pDb
->pEnv
, pDb
->apShm
[i
]);
1352 lsmFree(pDb
->pEnv
, pDb
->apShm
);
1357 lsmShmLock(pDb
, LSM_LOCK_ROTRANS
, LSM_LOCK_UNLOCK
, 0);
1359 dbReleaseReadlock(pDb
);
1363 ** Open a write transaction.
1365 int lsmBeginWriteTrans(lsm_db
*pDb
){
1366 int rc
= LSM_OK
; /* Return code */
1367 ShmHeader
*pShm
= pDb
->pShmhdr
; /* Shared memory header */
1369 assert( pDb
->nTransOpen
==0 );
1370 assert( pDb
->bDiscardOld
==0 );
1371 assert( pDb
->bReadonly
==0 );
1373 /* If there is no read-transaction open, open one now. */
1374 if( pDb
->iReader
<0 ){
1375 rc
= lsmBeginReadTrans(pDb
);
1378 /* Attempt to take the WRITER lock */
1380 rc
= lsmShmLock(pDb
, LSM_LOCK_WRITER
, LSM_LOCK_EXCL
, 0);
1383 /* If the previous writer failed mid-transaction, run emergency rollback. */
1384 if( rc
==LSM_OK
&& pShm
->bWriter
){
1385 rc
= lsmTreeRepair(pDb
);
1386 if( rc
==LSM_OK
) pShm
->bWriter
= 0;
1389 /* Check that this connection is currently reading from the most recent
1390 ** version of the database. If not, return LSM_BUSY. */
1391 if( rc
==LSM_OK
&& memcmp(&pShm
->hdr1
, &pDb
->treehdr
, sizeof(TreeHeader
)) ){
1396 rc
= lsmLogBegin(pDb
);
1399 /* If everything was successful, set the "transaction-in-progress" flag
1400 ** and return LSM_OK. Otherwise, if some error occurred, relinquish the
1401 ** WRITER lock and return an error code. */
1403 TreeHeader
*p
= &pDb
->treehdr
;
1406 if( lsmTreeHasOld(pDb
) && p
->iOldLog
==pDb
->pClient
->iLogOff
){
1407 lsmTreeDiscardOld(pDb
);
1408 pDb
->bDiscardOld
= 1;
1411 lsmShmLock(pDb
, LSM_LOCK_WRITER
, LSM_LOCK_UNLOCK
, 0);
1412 if( pDb
->pCsr
==0 ) lsmFinishReadTrans(pDb
);
1418 ** End the current write transaction. The connection is left with an open
1419 ** read transaction. It is an error to call this if there is no open write
1422 ** If the transaction was committed, then a commit record has already been
1423 ** written into the log file when this function is called. Or, if the
1424 ** transaction was rolled back, both the log file and in-memory tree
1425 ** structure have already been restored. In either case, this function
1426 ** merely releases locks and other resources held by the write-transaction.
1428 ** LSM_OK is returned if successful, or an LSM error code otherwise.
1430 int lsmFinishWriteTrans(lsm_db
*pDb
, int bCommit
){
1434 lsmLogEnd(pDb
, bCommit
);
1435 if( rc
==LSM_OK
&& bCommit
&& lsmTreeSize(pDb
)>pDb
->nTreeLimit
){
1437 lsmTreeMakeOld(pDb
);
1439 lsmTreeEndTransaction(pDb
, bCommit
);
1442 if( bFlush
&& pDb
->bAutowork
){
1443 rc
= lsmSortedAutoWork(pDb
, 1);
1444 }else if( bCommit
&& pDb
->bDiscardOld
){
1445 rc
= dbSetReadLock(pDb
, pDb
->pClient
->iId
, pDb
->treehdr
.iUsedShmid
);
1448 pDb
->bDiscardOld
= 0;
1449 lsmShmLock(pDb
, LSM_LOCK_WRITER
, LSM_LOCK_UNLOCK
, 0);
1451 if( bFlush
&& pDb
->bAutowork
==0 && pDb
->xWork
){
1452 pDb
->xWork(pDb
, pDb
->pWorkCtx
);
1459 ** Return non-zero if the caller is holding the client mutex.
1462 int lsmHoldingClientMutex(lsm_db
*pDb
){
1463 return lsmMutexHeld(pDb
->pEnv
, pDb
->pDatabase
->pClientMutex
);
1467 static int slotIsUsable(ShmReader
*p
, i64 iLsm
, u32 iShmMin
, u32 iShmMax
){
1469 p
->iLsmId
&& p
->iLsmId
<=iLsm
1470 && shm_sequence_ge(iShmMax
, p
->iTreeId
)
1471 && shm_sequence_ge(p
->iTreeId
, iShmMin
)
1476 ** Obtain a read-lock on database version identified by the combination
1477 ** of snapshot iLsm and tree iTree. Return LSM_OK if successful, or
1478 ** an LSM error code otherwise.
1480 int lsmReadlock(lsm_db
*db
, i64 iLsm
, u32 iShmMin
, u32 iShmMax
){
1482 ShmHeader
*pShm
= db
->pShmhdr
;
1485 assert( db
->iReader
<0 );
1486 assert( shm_sequence_ge(iShmMax
, iShmMin
) );
1488 /* This is a no-op if the read-only transaction flag is set. */
1494 /* Search for an exact match. */
1495 for(i
=0; db
->iReader
<0 && rc
==LSM_OK
&& i
<LSM_LOCK_NREADER
; i
++){
1496 ShmReader
*p
= &pShm
->aReader
[i
];
1497 if( p
->iLsmId
==iLsm
&& p
->iTreeId
==iShmMax
){
1498 rc
= lsmShmLock(db
, LSM_LOCK_READER(i
), LSM_LOCK_SHARED
, 0);
1499 if( rc
==LSM_OK
&& p
->iLsmId
==iLsm
&& p
->iTreeId
==iShmMax
){
1501 }else if( rc
==LSM_BUSY
){
1507 /* Try to obtain a write-lock on each slot, in order. If successful, set
1508 ** the slot values to iLsm/iTree. */
1509 for(i
=0; db
->iReader
<0 && rc
==LSM_OK
&& i
<LSM_LOCK_NREADER
; i
++){
1510 rc
= lsmShmLock(db
, LSM_LOCK_READER(i
), LSM_LOCK_EXCL
, 0);
1514 ShmReader
*p
= &pShm
->aReader
[i
];
1516 p
->iTreeId
= iShmMax
;
1517 rc
= lsmShmLock(db
, LSM_LOCK_READER(i
), LSM_LOCK_SHARED
, 0);
1518 assert( rc
!=LSM_BUSY
);
1519 if( rc
==LSM_OK
) db
->iReader
= i
;
1523 /* Search for any usable slot */
1524 for(i
=0; db
->iReader
<0 && rc
==LSM_OK
&& i
<LSM_LOCK_NREADER
; i
++){
1525 ShmReader
*p
= &pShm
->aReader
[i
];
1526 if( slotIsUsable(p
, iLsm
, iShmMin
, iShmMax
) ){
1527 rc
= lsmShmLock(db
, LSM_LOCK_READER(i
), LSM_LOCK_SHARED
, 0);
1528 if( rc
==LSM_OK
&& slotIsUsable(p
, iLsm
, iShmMin
, iShmMax
) ){
1530 }else if( rc
==LSM_BUSY
){
1536 if( rc
==LSM_OK
&& db
->iReader
<0 ){
1543 ** This is used to check if there exists a read-lock locking a particular
1544 ** version of either the in-memory tree or database file.
1546 ** If iLsmId is non-zero, then it is a snapshot id. If there exists a
1547 ** read-lock using this snapshot or newer, set *pbInUse to true. Or,
1548 ** if there is no such read-lock, set it to false.
1550 ** Or, if iLsmId is zero, then iShmid is a shared-memory sequence id.
1551 ** Search for a read-lock using this sequence id or newer. etc.
1553 static int isInUse(lsm_db
*db
, i64 iLsmId
, u32 iShmid
, int *pbInUse
){
1554 ShmHeader
*pShm
= db
->pShmhdr
;
1558 for(i
=0; rc
==LSM_OK
&& i
<LSM_LOCK_NREADER
; i
++){
1559 ShmReader
*p
= &pShm
->aReader
[i
];
1561 if( (iLsmId
!=0 && p
->iLsmId
!=0 && iLsmId
>=p
->iLsmId
)
1562 || (iLsmId
==0 && shm_sequence_ge(p
->iTreeId
, iShmid
))
1564 rc
= lsmShmLock(db
, LSM_LOCK_READER(i
), LSM_LOCK_EXCL
, 0);
1567 lsmShmLock(db
, LSM_LOCK_READER(i
), LSM_LOCK_UNLOCK
, 0);
1582 ** This function is called by worker connections to determine the smallest
1583 ** snapshot id that is currently in use by a database client. The worker
1584 ** connection uses this result to determine whether or not it is safe to
1585 ** recycle a database block.
1587 static int firstSnapshotInUse(
1588 lsm_db
*db
, /* Database handle */
1589 i64
*piInUse
/* IN/OUT: Smallest snapshot id in use */
1591 ShmHeader
*pShm
= db
->pShmhdr
;
1592 i64 iInUse
= *piInUse
;
1596 for(i
=0; i
<LSM_LOCK_NREADER
; i
++){
1597 ShmReader
*p
= &pShm
->aReader
[i
];
1599 i64 iThis
= p
->iLsmId
;
1600 if( iThis
!=0 && iInUse
>iThis
){
1601 int rc
= lsmShmLock(db
, LSM_LOCK_READER(i
), LSM_LOCK_EXCL
, 0);
1604 lsmShmLock(db
, LSM_LOCK_READER(i
), LSM_LOCK_UNLOCK
, 0);
1605 }else if( rc
==LSM_BUSY
){
1608 /* Some error other than LSM_BUSY. Return the error code to
1609 ** the caller in this case. */
1620 int lsmTreeInUse(lsm_db
*db
, u32 iShmid
, int *pbInUse
){
1621 if( db
->treehdr
.iUsedShmid
==iShmid
){
1625 return isInUse(db
, 0, iShmid
, pbInUse
);
1628 int lsmLsmInUse(lsm_db
*db
, i64 iLsmId
, int *pbInUse
){
1629 if( db
->pClient
&& db
->pClient
->iId
<=iLsmId
){
1633 return isInUse(db
, iLsmId
, 0, pbInUse
);
1637 ** This function may only be called after a successful call to
1638 ** lsmDbDatabaseConnect(). It returns true if the connection is in
1639 ** multi-process mode, or false otherwise.
1641 int lsmDbMultiProc(lsm_db
*pDb
){
1642 return pDb
->pDatabase
&& pDb
->pDatabase
->bMultiProc
;
1646 /*************************************************************************
1647 **************************************************************************
1648 **************************************************************************
1649 **************************************************************************
1650 **************************************************************************
1651 *************************************************************************/
1654 ** Ensure that database connection db has cached pointers to at least the
1655 ** first nChunk chunks of shared memory.
1657 int lsmShmCacheChunks(lsm_db
*db
, int nChunk
){
1659 if( nChunk
>db
->nShm
){
1660 static const int NINCR
= 16;
1661 Database
*p
= db
->pDatabase
;
1662 lsm_env
*pEnv
= db
->pEnv
;
1666 /* Ensure that the db->apShm[] array is large enough. If an attempt to
1667 ** allocate memory fails, return LSM_NOMEM immediately. The apShm[] array
1668 ** is always extended in multiples of 16 entries - so the actual allocated
1669 ** size can be inferred from nShm. */
1670 nAlloc
= ((db
->nShm
+ NINCR
- 1) / NINCR
) * NINCR
;
1671 while( nChunk
>=nAlloc
){
1674 apShm
= lsmRealloc(pEnv
, db
->apShm
, sizeof(void*)*nAlloc
);
1675 if( !apShm
) return LSM_NOMEM_BKPT
;
1680 for(i
=db
->nShm
; rc
==LSM_OK
&& i
<nChunk
; i
++){
1681 db
->apShm
[i
] = lsmMallocZeroRc(pEnv
, LSM_SHM_CHUNK_SIZE
, &rc
);
1687 /* Enter the client mutex */
1688 lsmMutexEnter(pEnv
, p
->pClientMutex
);
1690 /* Extend the Database objects apShmChunk[] array if necessary. Using the
1691 ** same pattern as for the lsm_db.apShm[] array above. */
1692 nAlloc
= ((p
->nShmChunk
+ NINCR
- 1) / NINCR
) * NINCR
;
1693 while( nChunk
>=nAlloc
){
1696 apShm
= lsmRealloc(pEnv
, p
->apShmChunk
, sizeof(void*)*nAlloc
);
1698 rc
= LSM_NOMEM_BKPT
;
1701 p
->apShmChunk
= apShm
;
1704 for(i
=db
->nShm
; rc
==LSM_OK
&& i
<nChunk
; i
++){
1705 if( i
>=p
->nShmChunk
){
1707 if( p
->bMultiProc
==0 ){
1708 /* Single process mode */
1709 pChunk
= lsmMallocZeroRc(pEnv
, LSM_SHM_CHUNK_SIZE
, &rc
);
1711 /* Multi-process mode */
1712 rc
= lsmEnvShmMap(pEnv
, p
->pFile
, i
, LSM_SHM_CHUNK_SIZE
, &pChunk
);
1715 p
->apShmChunk
[i
] = pChunk
;
1720 db
->apShm
[i
] = p
->apShmChunk
[i
];
1725 /* Release the client mutex */
1726 lsmMutexLeave(pEnv
, p
->pClientMutex
);
1733 static int lockSharedFile(lsm_env
*pEnv
, Database
*p
, int iLock
, int eOp
){
1735 if( p
->bMultiProc
){
1736 rc
= lsmEnvLock(pEnv
, p
->pFile
, iLock
, eOp
);
1742 ** Test if it would be possible for connection db to obtain a lock of type
1743 ** eType on the nLock locks starting at iLock. If so, return LSM_OK. If it
1744 ** would not be possible to obtain the lock due to a lock held by another
1745 ** connection, return LSM_BUSY. If an IO or other error occurs (i.e. in the
1746 ** lsm_env.xTestLock function), return some other LSM error code.
1748 ** Note that this function never actually locks the database - it merely
1749 ** queries the system to see if there exists a lock that would prevent
1750 ** it from doing so.
1760 Database
*p
= db
->pDatabase
;
1764 for(i
=iLock
; i
<(iLock
+nLock
); i
++){
1765 mask
|= ((u64
)1 << (iLock
-1));
1766 if( eOp
==LSM_LOCK_EXCL
) mask
|= ((u64
)1 << (iLock
+32-1));
1769 lsmMutexEnter(db
->pEnv
, p
->pClientMutex
);
1770 for(pIter
=p
->pConn
; pIter
; pIter
=pIter
->pNext
){
1771 if( pIter
!=db
&& (pIter
->mLock
& mask
) ){
1772 assert( pIter
!=db
);
1779 }else if( p
->bMultiProc
){
1780 rc
= lsmEnvTestLock(db
->pEnv
, p
->pFile
, iLock
, nLock
, eOp
);
1783 lsmMutexLeave(db
->pEnv
, p
->pClientMutex
);
1788 ** Attempt to obtain the lock identified by the iLock and bExcl parameters.
1789 ** If successful, return LSM_OK. If the lock cannot be obtained because
1790 ** there exists some other conflicting lock, return LSM_BUSY. If some other
1791 ** error occurs, return an LSM error code.
1793 ** Parameter iLock must be one of LSM_LOCK_WRITER, WORKER or CHECKPOINTER,
1794 ** or else a value returned by the LSM_LOCK_READER macro.
1799 int eOp
, /* One of LSM_LOCK_UNLOCK, SHARED or EXCL */
1800 int bBlock
/* True for a blocking lock */
1803 const u64 me
= ((u64
)1 << (iLock
-1));
1804 const u64 ms
= ((u64
)1 << (iLock
+32-1));
1806 Database
*p
= db
->pDatabase
;
1808 assert( eOp
!=LSM_LOCK_EXCL
|| p
->bReadonly
==0 );
1809 assert( iLock
>=1 && iLock
<=LSM_LOCK_RWCLIENT(LSM_LOCK_NRWCLIENT
-1) );
1810 assert( LSM_LOCK_RWCLIENT(LSM_LOCK_NRWCLIENT
-1)<=32 );
1811 assert( eOp
==LSM_LOCK_UNLOCK
|| eOp
==LSM_LOCK_SHARED
|| eOp
==LSM_LOCK_EXCL
);
1813 /* Check for a no-op. Proceed only if this is not one of those. */
1814 if( (eOp
==LSM_LOCK_UNLOCK
&& (db
->mLock
& (me
|ms
))!=0)
1815 || (eOp
==LSM_LOCK_SHARED
&& (db
->mLock
& (me
|ms
))!=ms
)
1816 || (eOp
==LSM_LOCK_EXCL
&& (db
->mLock
& me
)==0)
1818 int nExcl
= 0; /* Number of connections holding EXCLUSIVE */
1819 int nShared
= 0; /* Number of connections holding SHARED */
1820 lsmMutexEnter(db
->pEnv
, p
->pClientMutex
);
1822 /* Figure out the locks currently held by this process on iLock, not
1823 ** including any held by connection db. */
1824 for(pIter
=p
->pConn
; pIter
; pIter
=pIter
->pNext
){
1825 assert( (pIter
->mLock
& me
)==0 || (pIter
->mLock
& ms
)!=0 );
1827 if( pIter
->mLock
& me
){
1829 }else if( pIter
->mLock
& ms
){
1834 assert( nExcl
==0 || nExcl
==1 );
1835 assert( nExcl
==0 || nShared
==0 );
1836 assert( nExcl
==0 || (db
->mLock
& (me
|ms
))==0 );
1839 case LSM_LOCK_UNLOCK
:
1841 lockSharedFile(db
->pEnv
, p
, iLock
, LSM_LOCK_UNLOCK
);
1843 db
->mLock
&= ~(me
|ms
);
1846 case LSM_LOCK_SHARED
:
1851 rc
= lockSharedFile(db
->pEnv
, p
, iLock
, LSM_LOCK_SHARED
);
1861 assert( eOp
==LSM_LOCK_EXCL
);
1862 if( nExcl
|| nShared
){
1865 rc
= lockSharedFile(db
->pEnv
, p
, iLock
, LSM_LOCK_EXCL
);
1867 db
->mLock
|= (me
|ms
);
1873 lsmMutexLeave(db
->pEnv
, p
->pClientMutex
);
1881 int shmLockType(lsm_db
*db
, int iLock
){
1882 const u64 me
= ((u64
)1 << (iLock
-1));
1883 const u64 ms
= ((u64
)1 << (iLock
+32-1));
1885 if( db
->mLock
& me
) return LSM_LOCK_EXCL
;
1886 if( db
->mLock
& ms
) return LSM_LOCK_SHARED
;
1887 return LSM_LOCK_UNLOCK
;
1891 ** The arguments passed to this function are similar to those passed to
1892 ** the lsmShmLock() function. However, instead of obtaining a new lock
1893 ** this function returns true if the specified connection already holds
1894 ** (or does not hold) such a lock, depending on the value of eOp. As
1897 ** (eOp==LSM_LOCK_UNLOCK) -> true if db has no lock on iLock
1898 ** (eOp==LSM_LOCK_SHARED) -> true if db has at least a SHARED lock on iLock.
1899 ** (eOp==LSM_LOCK_EXCL) -> true if db has an EXCLUSIVE lock on iLock.
1901 int lsmShmAssertLock(lsm_db
*db
, int iLock
, int eOp
){
1905 assert( iLock
>=1 && iLock
<=LSM_LOCK_READER(LSM_LOCK_NREADER
-1) );
1906 assert( iLock
<=16 );
1907 assert( eOp
==LSM_LOCK_UNLOCK
|| eOp
==LSM_LOCK_SHARED
|| eOp
==LSM_LOCK_EXCL
);
1909 eHave
= shmLockType(db
, iLock
);
1912 case LSM_LOCK_UNLOCK
:
1913 ret
= (eHave
==LSM_LOCK_UNLOCK
);
1915 case LSM_LOCK_SHARED
:
1916 ret
= (eHave
!=LSM_LOCK_UNLOCK
);
1919 ret
= (eHave
==LSM_LOCK_EXCL
);
1922 assert( !"bad eOp value passed to lsmShmAssertLock()" );
1929 int lsmShmAssertWorker(lsm_db
*db
){
1930 return lsmShmAssertLock(db
, LSM_LOCK_WORKER
, LSM_LOCK_EXCL
) && db
->pWorker
;
1934 ** This function does not contribute to library functionality, and is not
1935 ** included in release builds. It is intended to be called from within
1936 ** an interactive debugger.
1938 ** When called, this function prints a single line of human readable output
1939 ** to stdout describing the locks currently held by the connection. For
1942 ** (gdb) call print_db_locks(pDb)
1943 ** (shared on dms2) (exclusive on writer)
1945 void print_db_locks(lsm_db
*db
){
1947 for(iLock
=0; iLock
<16; iLock
++){
1949 const char *azLock
[] = {0, "shared", "exclusive"};
1950 const char *azName
[] = {
1951 0, "dms1", "dms2", "writer", "worker", "checkpointer",
1952 "reader0", "reader1", "reader2", "reader3", "reader4", "reader5"
1954 int eHave
= shmLockType(db
, iLock
);
1955 if( azLock
[eHave
] ){
1956 printf("%s(%s on %s)", (bOne
?" ":""), azLock
[eHave
], azName
[iLock
]);
1962 void print_all_db_locks(lsm_db
*db
){
1964 for(p
=db
->pDatabase
->pConn
; p
; p
=p
->pNext
){
1965 printf("%s connection %p ", ((p
==db
)?"*":""), p
);
1971 void lsmShmBarrier(lsm_db
*db
){
1972 lsmEnvShmBarrier(db
->pEnv
);
1975 int lsm_checkpoint(lsm_db
*pDb
, int *pnKB
){
1976 int rc
; /* Return code */
1977 u32 nWrite
= 0; /* Number of pages checkpointed */
1979 /* Attempt the checkpoint. If successful, nWrite is set to the number of
1980 ** pages written between this and the previous checkpoint. */
1981 rc
= lsmCheckpointWrite(pDb
, &nWrite
);
1983 /* If required, calculate the output variable (KB of data checkpointed).
1984 ** Set it to zero if an error occured. */
1987 if( rc
==LSM_OK
&& nWrite
){
1988 nKB
= (((i64
)nWrite
* lsmFsPageSize(pDb
->pFS
)) + 1023) / 1024;