4 ** The author disclaims copyright to this source code. In place of
5 ** a legal notice, here is a blessing:
7 ** May you do good and not evil.
8 ** May you find forgiveness for yourself and forgive others.
9 ** May you share freely, never taking more than you give.
11 *************************************************************************
13 ** This file contains the implementation of LSM database logging. Logging
14 ** has one purpose in LSM - to make transactions durable.
16 ** When data is written to an LSM database, it is initially stored in an
17 ** in-memory tree structure. Since this structure is in volatile memory,
18 ** if a power failure or application crash occurs it may be lost. To
19 ** prevent loss of data in this case, each time a record is written to the
20 ** in-memory tree an equivalent record is appended to the log on disk.
21 ** If a power failure or application crash does occur, data can be recovered
22 ** by reading the log.
24 ** A log file consists of the following types of records representing data
25 ** written into the database:
27 ** LOG_WRITE: A key-value pair written to the database.
28 ** LOG_DELETE: A delete key issued to the database.
29 ** LOG_COMMIT: A transaction commit.
31 ** And the following types of records for ancillary purposes..
33 ** LOG_EOF: A record indicating the end of a log file.
34 ** LOG_PAD1: A single byte padding record.
35 ** LOG_PAD2: An N byte padding record (N>1).
36 ** LOG_JUMP: A pointer to another offset within the log file.
38 ** Each transaction written to the log contains one or more LOG_WRITE and/or
39 ** LOG_DELETE records, followed by a LOG_COMMIT record. The LOG_COMMIT record
40 ** contains an 8-byte checksum based on all previous data written to the
43 ** LOG CHECKSUMS & RECOVERY
45 ** Checksums are found in two types of log records: LOG_COMMIT and
46 ** LOG_CKSUM records. In order to recover content from a log, a client
47 ** reads each record from the start of the log, calculating a checksum as
48 ** it does. Each time a LOG_COMMIT or LOG_CKSUM is encountered, the
49 ** recovery process verifies that the checksum stored in the log
50 ** matches the calculated checksum. If it does not, the recovery process
51 ** can stop reading the log.
53 ** If a recovery process reads records (other than COMMIT or CKSUM)
54 ** consisting of at least LSM_CKSUM_MAXDATA bytes, then the next record in
55 ** the log must be either a LOG_CKSUM or LOG_COMMIT record. If it is
56 ** not, the recovery process also stops reading the log.
58 ** To recover the log file, it must be read twice. The first time to
59 ** determine the location of the last valid commit record. And the second
60 ** time to load data into the in-memory tree.
62 ** Todo: Surely there is a better way...
66 ** If the log file were never deleted or wrapped, it would be possible to
67 ** read it from start to end each time is required recovery (i.e each time
68 ** the number of database clients changes from 0 to 1). Effectively reading
69 ** the entire history of the database each time. This would quickly become
70 ** inefficient. Additionally, since the log file would grow without bound,
71 ** it wastes storage space.
73 ** Instead, part of each checkpoint written into the database file contains
74 ** a log offset (and other information required to read the log starting at
75 ** at this offset) at which to begin recovery. Offset $O.
77 ** Once a checkpoint has been written and synced into the database file, it
78 ** is guaranteed that no recovery process will need to read any data before
79 ** offset $O of the log file. It is therefore safe to begin overwriting
80 ** any data that occurs before offset $O.
82 ** This implementation separates the log into three regions mapped into
83 ** the log file - regions 0, 1 and 2. During recovery, regions are read
84 ** in ascending order (i.e. 0, then 1, then 2). Each region is zero or
85 ** more bytes in size.
87 ** |---1---|..|--0--|.|--2--|....
89 ** New records are always appended to the end of region 2.
91 ** Initially (when it is empty), all three regions are zero bytes in size.
92 ** Each of them are located at the beginning of the file. As records are
93 ** added to the log, region 2 grows, so that the log consists of a zero
94 ** byte region 1, followed by a zero byte region 0, followed by an N byte
95 ** region 2. After one or more checkpoints have been written to disk,
96 ** the start point of region 2 is moved to $O. For example:
98 ** A) ||.........|--2--|....
100 ** (both regions 0 and 1 are 0 bytes in size at offset 0).
102 ** Eventually, the log wraps around to write new records into the start.
103 ** At this point, region 2 is renamed to region 0. Region 0 is renamed
104 ** to region 2. After appending a few records to the new region 2, the
105 ** log file looks like this:
107 ** B) ||--2--|...|--0--|....
109 ** (region 1 is still 0 bytes in size, located at offset 0).
111 ** Any checkpoints made at this point may reduce the size of region 0.
112 ** However, if they do not, and region 2 expands so that it is about to
113 ** overwrite the start of region 0, then region 2 is renamed to region 1,
114 ** and a new region 2 created at the end of the file following the existing
117 ** C) |---1---|..|--0--|.|-2-|
119 ** In this state records are appended to region 2 until checkpoints have
120 ** contracted regions 0 AND 1 UNTil they are both zero bytes in size. They
121 ** are then shifted to the start of the log file, leaving the system in
122 ** the equivalent of state A above.
124 ** Alternatively, state B may transition directly to state A if the size
125 ** of region 0 is reduced to zero bytes before region 2 threatens to
128 ** LOG_PAD1 & LOG_PAD2 RECORDS
130 ** PAD1 and PAD2 records may appear in a log file at any point. They allow
131 ** a process writing the log file align the beginning of transactions with
132 ** the beginning of disk sectors, which increases robustness.
136 ** LOG_EOF: * A single 0x00 byte.
138 ** LOG_PAD1: * A single 0x01 byte.
140 ** LOG_PAD2: * A single 0x02 byte, followed by
141 ** * The number of unused bytes (N) as a varint,
142 ** * An N byte block of unused space.
144 ** LOG_COMMIT: * A single 0x03 byte.
145 ** * An 8-byte checksum.
147 ** LOG_JUMP: * A single 0x04 byte.
148 ** * Absolute file offset to jump to, encoded as a varint.
150 ** LOG_WRITE: * A single 0x06 or 0x07 byte,
151 ** * The number of bytes in the key, encoded as a varint,
152 ** * The number of bytes in the value, encoded as a varint,
153 ** * If the first byte was 0x07, an 8 byte checksum.
157 ** LOG_DELETE: * A single 0x08 or 0x09 byte,
158 ** * The number of bytes in the key, encoded as a varint,
159 ** * If the first byte was 0x09, an 8 byte checksum.
162 ** Varints are as described in lsm_varint.c (SQLite 4 format).
166 ** The checksum is calculated using two 32-bit unsigned integers, s0 and
167 ** s1. The initial value for both is 42. It is updated each time a record
168 ** is written into the log file by treating the encoded (binary) record as
169 ** an array of 32-bit little-endian integers. Then, if x[] is the integer
170 ** array, updating the checksum accumulators as follows:
172 ** for i from 0 to n-1 step 2:
174 ** s1 += x[i+1] + s0;
177 ** If the record is not an even multiple of 8-bytes in size it is padded
178 ** with zeroes to make it so before the checksum is updated.
180 ** The checksum stored in a COMMIT, WRITE or DELETE is based on all bytes
181 ** up to the start of the 8-byte checksum itself, including the COMMIT,
182 ** WRITE or DELETE fields that appear before the checksum in the record.
193 /* Log record types */
194 #define LSM_LOG_EOF 0x00
195 #define LSM_LOG_PAD1 0x01
196 #define LSM_LOG_PAD2 0x02
197 #define LSM_LOG_COMMIT 0x03
198 #define LSM_LOG_JUMP 0x04
200 #define LSM_LOG_WRITE 0x06
201 #define LSM_LOG_WRITE_CKSUM 0x07
203 #define LSM_LOG_DELETE 0x08
204 #define LSM_LOG_DELETE_CKSUM 0x09
206 #define LSM_LOG_DRANGE 0x0A
207 #define LSM_LOG_DRANGE_CKSUM 0x0B
209 /* Require a checksum every 32KB. */
210 #define LSM_CKSUM_MAXDATA (32*1024)
212 /* Do not wrap a log file smaller than this in bytes. */
213 #define LSM_MIN_LOGWRAP (128*1024)
217 ** Commit records must be aligned to end on szSector boundaries. If
218 ** the safety-mode is set to NORMAL or OFF, this value is 1. Otherwise,
219 ** if the safety-mode is set to FULL, it is the size of the file-system
220 ** sectors as reported by lsmFsSectorSize().
223 u32 cksum0
; /* Checksum 0 at offset iOff */
224 u32 cksum1
; /* Checksum 1 at offset iOff */
225 int iCksumBuf
; /* Bytes of buf that have been checksummed */
226 i64 iOff
; /* Offset at start of buffer buf */
227 int szSector
; /* Sector size for this transaction */
228 LogRegion jump
; /* Avoid writing to this region */
229 i64 iRegion1End
; /* End of first region written by trans */
230 i64 iRegion2Start
; /* Start of second regions written by trans */
231 LsmString buf
; /* Buffer containing data not yet written */
235 ** Return the result of interpreting the first 4 bytes in buffer aIn as
236 ** a 32-bit unsigned little-endian integer.
238 static u32
getU32le(u8
*aIn
){
239 return ((u32
)aIn
[3] << 24)
240 + ((u32
)aIn
[2] << 16)
247 ** This function is the same as logCksum(), except that pointer "a" need
248 ** not be aligned to an 8-byte boundary or padded with zero bytes. This
249 ** version is slower, but sometimes more convenient to use.
251 static void logCksumUnaligned(
252 char *z
, /* Input buffer */
253 int n
, /* Size of input buffer in bytes */
254 u32
*pCksum0
, /* IN/OUT: Checksum value 1 */
255 u32
*pCksum1
/* IN/OUT: Checksum value 2 */
258 u32 cksum0
= *pCksum0
;
259 u32 cksum1
= *pCksum1
;
264 for(i
=0; i
<nIn
; i
+=8){
265 cksum0
+= getU32le(&a
[i
]) + cksum1
;
266 cksum1
+= getU32le(&a
[i
+4]) + cksum0
;
270 u8 aBuf
[8] = {0, 0, 0, 0, 0, 0, 0, 0};
271 assert( (n
-nIn
)<8 && n
>nIn
);
272 memcpy(aBuf
, &a
[nIn
], n
-nIn
);
273 cksum0
+= getU32le(aBuf
) + cksum1
;
274 cksum1
+= getU32le(&aBuf
[4]) + cksum0
;
282 ** Update pLog->cksum0 and pLog->cksum1 so that the first nBuf bytes in the
283 ** write buffer (pLog->buf) are included in the checksum.
285 static void logUpdateCksum(LogWriter
*pLog
, int nBuf
){
286 assert( (pLog
->iCksumBuf
% 8)==0 );
287 assert( pLog
->iCksumBuf
<=nBuf
);
288 assert( (nBuf
% 8)==0 || nBuf
==pLog
->buf
.n
);
289 if( nBuf
>pLog
->iCksumBuf
){
291 &pLog
->buf
.z
[pLog
->iCksumBuf
], nBuf
-pLog
->iCksumBuf
,
292 &pLog
->cksum0
, &pLog
->cksum1
295 pLog
->iCksumBuf
= nBuf
;
298 static i64
firstByteOnSector(LogWriter
*pLog
, i64 iOff
){
299 return (iOff
/ pLog
->szSector
) * pLog
->szSector
;
301 static i64
lastByteOnSector(LogWriter
*pLog
, i64 iOff
){
302 return firstByteOnSector(pLog
, iOff
) + pLog
->szSector
- 1;
306 ** If possible, reclaim log file space. Log file space is reclaimed after
307 ** a snapshot that points to the same data in the database file is synced
308 ** into the db header.
310 static int logReclaimSpace(lsm_db
*pDb
){
313 int bRotrans
; /* True if there exists some ro-trans */
315 /* Test if there exists some other connection with a read-only transaction
316 ** open. If there does, then log file space may not be reclaimed. */
317 rc
= lsmDetectRoTrans(pDb
, &bRotrans
);
318 if( rc
!=LSM_OK
|| bRotrans
) return rc
;
320 iMeta
= (int)pDb
->pShmhdr
->iMetaPage
;
321 if( iMeta
==1 || iMeta
==2 ){
322 DbLog
*pLog
= &pDb
->treehdr
.log
;
325 /* Read the snapshot-id of the snapshot stored on meta-page iMeta. Note
326 ** that in theory, the value read is untrustworthy (due to a race
327 ** condition - see comments above lsmFsReadSyncedId()). So it is only
328 ** ever used to conclude that no log space can be reclaimed. If it seems
329 ** to indicate that it may be possible to reclaim log space, a
330 ** second call to lsmCheckpointSynced() (which does return trustworthy
331 ** values) is made below to confirm. */
332 rc
= lsmFsReadSyncedId(pDb
, iMeta
, &iSyncedId
);
334 if( rc
==LSM_OK
&& pLog
->iSnapshotId
!=iSyncedId
){
337 rc
= lsmCheckpointSynced(pDb
, &iSnapshotId
, &iOff
, 0);
338 if( rc
==LSM_OK
&& pLog
->iSnapshotId
<iSnapshotId
){
340 for(iRegion
=0; iRegion
<3; iRegion
++){
341 LogRegion
*p
= &pLog
->aRegion
[iRegion
];
342 if( iOff
>=p
->iStart
&& iOff
<=p
->iEnd
) break;
347 pLog
->aRegion
[iRegion
].iStart
= iOff
;
348 pLog
->iSnapshotId
= iSnapshotId
;
356 ** This function is called when a write-transaction is first opened. It
357 ** is assumed that the caller is holding the client-mutex when it is
360 ** Before returning, this function allocates the LogWriter object that
361 ** will be used to write to the log file during the write transaction.
362 ** LSM_OK is returned if no error occurs, otherwise an LSM error code.
364 int lsmLogBegin(lsm_db
*pDb
){
369 if( pDb
->bUseLog
==0 ) return LSM_OK
;
371 /* If the log file has not yet been opened, open it now. Also allocate
372 ** the LogWriter structure, if it has not already been allocated. */
373 rc
= lsmFsOpenLog(pDb
, 0);
374 if( pDb
->pLogWriter
==0 ){
375 pNew
= lsmMallocZeroRc(pDb
->pEnv
, sizeof(LogWriter
), &rc
);
377 lsmStringInit(&pNew
->buf
, pDb
->pEnv
);
378 rc
= lsmStringExtend(&pNew
->buf
, 2);
380 pDb
->pLogWriter
= pNew
;
382 pNew
= pDb
->pLogWriter
;
383 assert( (u8
*)(&pNew
[1])==(u8
*)(&((&pNew
->buf
)[1])) );
384 memset(pNew
, 0, ((u8
*)&pNew
->buf
) - (u8
*)pNew
);
389 /* The following call detects whether or not a new snapshot has been
390 ** synced into the database file. If so, it updates the contents of
391 ** the pDb->treehdr.log structure to reclaim any space in the log
392 ** file that is no longer required.
394 ** TODO: Calling this every transaction is overkill. And since the
395 ** call has to read and checksum a snapshot from the database file,
396 ** it is expensive. It would be better to figure out a way so that
397 ** this is only called occasionally - say for every 32KB written to
400 rc
= logReclaimSpace(pDb
);
407 /* Set the effective sector-size for this transaction. Sectors are assumed
408 ** to be one byte in size if the safety-mode is OFF or NORMAL, or as
409 ** reported by lsmFsSectorSize if it is FULL. */
410 if( pDb
->eSafety
==LSM_SAFETY_FULL
){
411 pNew
->szSector
= lsmFsSectorSize(pDb
->pFS
);
412 assert( pNew
->szSector
>0 );
417 /* There are now three scenarios:
419 ** 1) Regions 0 and 1 are both zero bytes in size and region 2 begins
420 ** at a file offset greater than LSM_MIN_LOGWRAP. In this case, wrap
421 ** around to the start and write data into the start of the log file.
423 ** 2) Region 1 is zero bytes in size and region 2 occurs earlier in the
424 ** file than region 0. In this case, append data to region 2, but
425 ** remember to jump over region 1 if required.
427 ** 3) Region 2 is the last in the file. Append to it.
429 aReg
= &pDb
->treehdr
.log
.aRegion
[0];
431 assert( aReg
[0].iEnd
==0 || aReg
[0].iEnd
>aReg
[0].iStart
);
432 assert( aReg
[1].iEnd
==0 || aReg
[1].iEnd
>aReg
[1].iStart
);
434 pNew
->cksum0
= pDb
->treehdr
.log
.cksum0
;
435 pNew
->cksum1
= pDb
->treehdr
.log
.cksum1
;
437 if( aReg
[0].iEnd
==0 && aReg
[1].iEnd
==0 && aReg
[2].iStart
>=LSM_MIN_LOGWRAP
){
438 /* Case 1. Wrap around to the start of the file. Write an LSM_LOG_JUMP
439 ** into the log file in this case. Pad it out to 8 bytes using a PAD2
440 ** record so that the checksums can be updated immediately. */
442 LSM_LOG_PAD2
, 0x04, 0x00, 0x00, 0x00, 0x00, LSM_LOG_JUMP
, 0x00
445 lsmStringBinAppend(&pNew
->buf
, aJump
, sizeof(aJump
));
446 logUpdateCksum(pNew
, pNew
->buf
.n
);
447 rc
= lsmFsWriteLog(pDb
->pFS
, aReg
[2].iEnd
, &pNew
->buf
);
448 pNew
->iCksumBuf
= pNew
->buf
.n
= 0;
451 pNew
->jump
= aReg
[0] = aReg
[2];
452 aReg
[2].iStart
= aReg
[2].iEnd
= 0;
453 }else if( aReg
[1].iEnd
==0 && aReg
[2].iEnd
<aReg
[0].iEnd
){
455 pNew
->iOff
= aReg
[2].iEnd
;
456 pNew
->jump
= aReg
[0];
459 assert( aReg
[2].iStart
>=aReg
[0].iEnd
&& aReg
[2].iStart
>=aReg
[1].iEnd
);
460 pNew
->iOff
= aReg
[2].iEnd
;
463 if( pNew
->jump
.iStart
){
465 assert( pNew
->jump
.iStart
>pNew
->iOff
);
467 iRound
= firstByteOnSector(pNew
, pNew
->jump
.iStart
);
468 if( iRound
>pNew
->iOff
) pNew
->jump
.iStart
= iRound
;
469 pNew
->jump
.iEnd
= lastByteOnSector(pNew
, pNew
->jump
.iEnd
);
472 assert( pDb
->pLogWriter
==pNew
);
477 ** This function is called when a write-transaction is being closed.
478 ** Parameter bCommit is true if the transaction is being committed,
479 ** or false otherwise. The caller must hold the client-mutex to call
482 ** A call to this function deletes the LogWriter object allocated by
483 ** lsmLogBegin(). If the transaction is being committed, the shared state
484 ** in *pLog is updated before returning.
486 void lsmLogEnd(lsm_db
*pDb
, int bCommit
){
492 pLog
= &pDb
->treehdr
.log
;
495 pLog
->aRegion
[2].iEnd
= p
->iOff
;
496 pLog
->cksum0
= p
->cksum0
;
497 pLog
->cksum1
= p
->cksum1
;
498 if( p
->iRegion1End
){
499 /* This happens when the transaction had to jump over some other
500 ** part of the log. */
501 assert( pLog
->aRegion
[1].iEnd
==0 );
502 assert( pLog
->aRegion
[2].iStart
<p
->iRegion1End
);
503 pLog
->aRegion
[1].iStart
= pLog
->aRegion
[2].iStart
;
504 pLog
->aRegion
[1].iEnd
= p
->iRegion1End
;
505 pLog
->aRegion
[2].iStart
= p
->iRegion2Start
;
510 static int jumpIfRequired(
516 /* Determine if it is necessary to add an LSM_LOG_JUMP to jump over the
517 ** jump region before writing the LSM_LOG_WRITE or DELETE record. This
518 ** is necessary if there is insufficient room between the current offset
519 ** and the jump region to fit the new WRITE/DELETE record and the largest
520 ** possible JUMP record with up to 7 bytes of padding (a total of 17
522 if( (pLog
->jump
.iStart
> (pLog
->iOff
+ pLog
->buf
.n
))
523 && (pLog
->jump
.iStart
< (pLog
->iOff
+ pLog
->buf
.n
+ (nReq
+ 17)))
525 int rc
; /* Return code */
526 i64 iJump
; /* Offset to jump to */
527 u8 aJump
[10]; /* Encoded jump record */
528 int nJump
; /* Valid bytes in aJump[] */
529 int nPad
; /* Bytes of padding required */
531 /* Serialize the JUMP record */
532 iJump
= pLog
->jump
.iEnd
+1;
533 aJump
[0] = LSM_LOG_JUMP
;
534 nJump
= 1 + lsmVarintPut64(&aJump
[1], iJump
);
536 /* Adding padding to the contents of the buffer so that it will be a
537 ** multiple of 8 bytes in size after the JUMP record is appended. This
538 ** is not strictly required, it just makes the keeping the running
539 ** checksum up to date in this file a little simpler. */
540 nPad
= (pLog
->buf
.n
+ nJump
) % 8;
542 u8 aPad
[7] = {0,0,0,0,0,0,0};
545 aPad
[0] = LSM_LOG_PAD1
;
547 aPad
[0] = LSM_LOG_PAD2
;
548 aPad
[1] = (u8
)(nPad
-2);
550 rc
= lsmStringBinAppend(&pLog
->buf
, aPad
, nPad
);
551 if( rc
!=LSM_OK
) return rc
;
554 /* Append the JUMP record to the buffer. Then flush the buffer to disk
555 ** and update the checksums. The next write to the log file (assuming
556 ** there is no transaction rollback) will be to offset iJump (just past
557 ** the jump region). */
558 rc
= lsmStringBinAppend(&pLog
->buf
, aJump
, nJump
);
559 if( rc
!=LSM_OK
) return rc
;
560 assert( (pLog
->buf
.n
% 8)==0 );
561 rc
= lsmFsWriteLog(pDb
->pFS
, pLog
->iOff
, &pLog
->buf
);
562 if( rc
!=LSM_OK
) return rc
;
563 logUpdateCksum(pLog
, pLog
->buf
.n
);
564 pLog
->iRegion1End
= (pLog
->iOff
+ pLog
->buf
.n
);
565 pLog
->iRegion2Start
= iJump
;
567 pLog
->iCksumBuf
= pLog
->buf
.n
= 0;
568 if( pbJump
) *pbJump
= 1;
574 static int logCksumAndFlush(lsm_db
*pDb
){
575 int rc
; /* Return code */
576 LogWriter
*pLog
= pDb
->pLogWriter
;
578 /* Calculate the checksum value. Append it to the buffer. */
579 logUpdateCksum(pLog
, pLog
->buf
.n
);
580 lsmPutU32((u8
*)&pLog
->buf
.z
[pLog
->buf
.n
], pLog
->cksum0
);
582 lsmPutU32((u8
*)&pLog
->buf
.z
[pLog
->buf
.n
], pLog
->cksum1
);
585 /* Write the contents of the buffer to disk. */
586 rc
= lsmFsWriteLog(pDb
->pFS
, pLog
->iOff
, &pLog
->buf
);
587 pLog
->iOff
+= pLog
->buf
.n
;
588 pLog
->iCksumBuf
= pLog
->buf
.n
= 0;
594 ** Write the contents of the log-buffer to disk. Then write either a CKSUM
595 ** or COMMIT record, depending on the value of parameter eType.
597 static int logFlush(lsm_db
*pDb
, int eType
){
600 LogWriter
*pLog
= pDb
->pLogWriter
;
602 assert( eType
==LSM_LOG_COMMIT
);
605 /* Commit record is always 9 bytes in size. */
607 if( eType
==LSM_LOG_COMMIT
&& pLog
->szSector
>1 ) nReq
+= pLog
->szSector
+ 17;
608 rc
= jumpIfRequired(pDb
, pLog
, nReq
, 0);
610 /* If this is a COMMIT, add padding to the log so that the COMMIT record
611 ** is aligned against the end of a disk sector. In other words, add padding
612 ** so that the first byte following the COMMIT record lies on a different
614 if( eType
==LSM_LOG_COMMIT
&& pLog
->szSector
>1 ){
615 int nPad
; /* Bytes of padding to add */
617 /* Determine the value of nPad. */
618 nPad
= ((pLog
->iOff
+ pLog
->buf
.n
+ 9) % pLog
->szSector
);
619 if( nPad
) nPad
= pLog
->szSector
- nPad
;
620 rc
= lsmStringExtend(&pLog
->buf
, nPad
);
621 if( rc
!=LSM_OK
) return rc
;
625 pLog
->buf
.z
[pLog
->buf
.n
++] = LSM_LOG_PAD1
;
628 int n
= LSM_MIN(200, nPad
-2);
629 pLog
->buf
.z
[pLog
->buf
.n
++] = LSM_LOG_PAD2
;
630 pLog
->buf
.z
[pLog
->buf
.n
++] = (char)n
;
632 memset(&pLog
->buf
.z
[pLog
->buf
.n
], 0x2B, n
);
639 /* Make sure there is room in the log-buffer to add the CKSUM or COMMIT
640 ** record. Then add the first byte of it. */
641 rc
= lsmStringExtend(&pLog
->buf
, 9);
642 if( rc
!=LSM_OK
) return rc
;
643 pLog
->buf
.z
[pLog
->buf
.n
++] = (char)eType
;
644 memset(&pLog
->buf
.z
[pLog
->buf
.n
], 0, 8);
646 rc
= logCksumAndFlush(pDb
);
648 /* If this is a commit and synchronous=full, sync the log to disk. */
649 if( rc
==LSM_OK
&& eType
==LSM_LOG_COMMIT
&& pDb
->eSafety
==LSM_SAFETY_FULL
){
650 rc
= lsmFsSyncLog(pDb
->pFS
);
656 ** Append an LSM_LOG_WRITE (if nVal>=0) or LSM_LOG_DELETE (if nVal<0)
657 ** record to the database log.
660 lsm_db
*pDb
, /* Database handle */
662 void *pKey
, int nKey
, /* Database key to write to log */
663 void *pVal
, int nVal
/* Database value (or nVal<0) to write */
666 LogWriter
*pLog
; /* Log object to write to */
667 int nReq
; /* Bytes of space required in log */
668 int bCksum
= 0; /* True to embed a checksum in this record */
670 assert( eType
==LSM_WRITE
|| eType
==LSM_DELETE
|| eType
==LSM_DRANGE
);
671 assert( LSM_LOG_WRITE
==LSM_WRITE
);
672 assert( LSM_LOG_DELETE
==LSM_DELETE
);
673 assert( LSM_LOG_DRANGE
==LSM_DRANGE
);
674 assert( (eType
==LSM_LOG_DELETE
)==(nVal
<0) );
676 if( pDb
->bUseLog
==0 ) return LSM_OK
;
677 pLog
= pDb
->pLogWriter
;
679 /* Determine how many bytes of space are required, assuming that a checksum
680 ** will be embedded in this record (even though it may not be). */
681 nReq
= 1 + lsmVarintLen32(nKey
) + 8 + nKey
;
682 if( eType
!=LSM_LOG_DELETE
) nReq
+= lsmVarintLen32(nVal
) + nVal
;
684 /* Jump over the jump region if required. Set bCksum to true to tell the
685 ** code below to include a checksum in the record if either (a) writing
686 ** this record would mean that more than LSM_CKSUM_MAXDATA bytes of data
687 ** have been written to the log since the last checksum, or (b) the jump
689 rc
= jumpIfRequired(pDb
, pLog
, nReq
, &bCksum
);
690 if( (pLog
->buf
.n
+nReq
) > LSM_CKSUM_MAXDATA
) bCksum
= 1;
693 rc
= lsmStringExtend(&pLog
->buf
, nReq
);
696 u8
*a
= (u8
*)&pLog
->buf
.z
[pLog
->buf
.n
];
698 /* Write the record header - the type byte followed by either 1 (for
699 ** DELETE) or 2 (for WRITE) varints. */
700 assert( LSM_LOG_WRITE_CKSUM
== (LSM_LOG_WRITE
| 0x0001) );
701 assert( LSM_LOG_DELETE_CKSUM
== (LSM_LOG_DELETE
| 0x0001) );
702 assert( LSM_LOG_DRANGE_CKSUM
== (LSM_LOG_DRANGE
| 0x0001) );
703 *(a
++) = (u8
)eType
| (u8
)bCksum
;
704 a
+= lsmVarintPut32(a
, nKey
);
705 if( eType
!=LSM_LOG_DELETE
) a
+= lsmVarintPut32(a
, nVal
);
708 pLog
->buf
.n
= (a
- (u8
*)pLog
->buf
.z
);
709 rc
= logCksumAndFlush(pDb
);
710 a
= (u8
*)&pLog
->buf
.z
[pLog
->buf
.n
];
713 memcpy(a
, pKey
, nKey
);
715 if( eType
!=LSM_LOG_DELETE
){
716 memcpy(a
, pVal
, nVal
);
719 pLog
->buf
.n
= a
- (u8
*)pLog
->buf
.z
;
720 assert( pLog
->buf
.n
<=pLog
->buf
.nAlloc
);
727 ** Append an LSM_LOG_COMMIT record to the database log.
729 int lsmLogCommit(lsm_db
*pDb
){
730 if( pDb
->bUseLog
==0 ) return LSM_OK
;
731 return logFlush(pDb
, LSM_LOG_COMMIT
);
735 ** Store the current offset and other checksum related information in the
736 ** structure *pMark. Later, *pMark can be passed to lsmLogSeek() to "rewind"
737 ** the LogWriter object to the current log file offset. This is used when
738 ** rolling back savepoint transactions.
741 lsm_db
*pDb
, /* Database handle */
742 LogMark
*pMark
/* Populate this object with current offset */
747 if( pDb
->bUseLog
==0 ) return;
748 pLog
= pDb
->pLogWriter
;
749 nCksum
= pLog
->buf
.n
& 0xFFFFFFF8;
750 logUpdateCksum(pLog
, nCksum
);
751 assert( pLog
->iCksumBuf
==nCksum
);
752 pMark
->nBuf
= pLog
->buf
.n
- nCksum
;
753 memcpy(pMark
->aBuf
, &pLog
->buf
.z
[nCksum
], pMark
->nBuf
);
755 pMark
->iOff
= pLog
->iOff
+ pLog
->buf
.n
;
756 pMark
->cksum0
= pLog
->cksum0
;
757 pMark
->cksum1
= pLog
->cksum1
;
761 ** Seek (rewind) back to the log file offset stored by an ealier call to
762 ** lsmLogTell() in *pMark.
765 lsm_db
*pDb
, /* Database handle */
766 LogMark
*pMark
/* Object containing log offset to seek to */
770 if( pDb
->bUseLog
==0 ) return;
771 pLog
= pDb
->pLogWriter
;
773 assert( pMark
->iOff
<=pLog
->iOff
+pLog
->buf
.n
);
774 if( (pMark
->iOff
& 0xFFFFFFF8)>=pLog
->iOff
){
775 pLog
->buf
.n
= (int)(pMark
->iOff
- pLog
->iOff
);
776 pLog
->iCksumBuf
= (pLog
->buf
.n
& 0xFFFFFFF8);
778 pLog
->buf
.n
= pMark
->nBuf
;
779 memcpy(pLog
->buf
.z
, pMark
->aBuf
, pMark
->nBuf
);
781 pLog
->iOff
= pMark
->iOff
- pMark
->nBuf
;
783 pLog
->cksum0
= pMark
->cksum0
;
784 pLog
->cksum1
= pMark
->cksum1
;
786 if( pMark
->iOff
> pLog
->iRegion1End
) pLog
->iRegion1End
= 0;
787 if( pMark
->iOff
> pLog
->iRegion2Start
) pLog
->iRegion2Start
= 0;
791 ** This function does the work for an lsm_info(LOG_STRUCTURE) request.
793 int lsmInfoLogStructure(lsm_db
*pDb
, char **pzVal
){
797 /* If there is no read or write transaction open, read the latest
798 ** tree-header from shared-memory to report on. If necessary, update
799 ** it based on the contents of the database header.
801 ** No locks are taken here - these are passive read operations only.
803 if( pDb
->pCsr
==0 && pDb
->nTransOpen
==0 ){
804 rc
= lsmTreeLoadHeader(pDb
, 0);
805 if( rc
==LSM_OK
) rc
= logReclaimSpace(pDb
);
809 DbLog
*pLog
= &pDb
->treehdr
.log
;
810 zVal
= lsmMallocPrintf(pDb
->pEnv
,
812 (int)pLog
->aRegion
[0].iStart
, (int)pLog
->aRegion
[0].iEnd
,
813 (int)pLog
->aRegion
[1].iStart
, (int)pLog
->aRegion
[1].iEnd
,
814 (int)pLog
->aRegion
[2].iStart
, (int)pLog
->aRegion
[2].iEnd
816 if( !zVal
) rc
= LSM_NOMEM_BKPT
;
823 /*************************************************************************
824 ** Begin code for log recovery.
827 typedef struct LogReader LogReader
;
829 FileSystem
*pFS
; /* File system to read from */
830 i64 iOff
; /* File offset at end of buf content */
831 int iBuf
; /* Current read offset in buf */
832 LsmString buf
; /* Buffer containing file content */
834 int iCksumBuf
; /* Offset in buf corresponding to cksum[01] */
835 u32 cksum0
; /* Checksum 0 at offset iCksumBuf */
836 u32 cksum1
; /* Checksum 1 at offset iCksumBuf */
839 static void logReaderBlob(
840 LogReader
*p
, /* Log reader object */
841 LsmString
*pBuf
, /* Dynamic storage, if required */
842 int nBlob
, /* Number of bytes to read */
843 u8
**ppBlob
, /* OUT: Pointer to blob read */
844 int *pRc
/* IN/OUT: Error code */
846 static const int LOG_READ_SIZE
= 512;
847 int rc
= *pRc
; /* Return code */
848 int nReq
= nBlob
; /* Bytes required */
850 while( rc
==LSM_OK
&& nReq
>0 ){
851 int nAvail
; /* Bytes of data available in p->buf */
852 if( p
->buf
.n
==p
->iBuf
){
853 int nCksum
; /* Total bytes requiring checksum */
854 int nCarry
= 0; /* Total bytes requiring checksum */
856 nCksum
= p
->iBuf
- p
->iCksumBuf
;
859 nCksum
= ((nCksum
/ 8) * 8);
862 &p
->buf
.z
[p
->iCksumBuf
], nCksum
, &p
->cksum0
, &p
->cksum1
866 if( nCarry
>0 ) memcpy(p
->buf
.z
, &p
->buf
.z
[p
->iBuf
-nCarry
], nCarry
);
870 rc
= lsmFsReadLog(p
->pFS
, p
->iOff
, LOG_READ_SIZE
, &p
->buf
);
871 if( rc
!=LSM_OK
) break;
873 p
->iOff
+= LOG_READ_SIZE
;
876 nAvail
= p
->buf
.n
- p
->iBuf
;
877 if( ppBlob
&& nReq
==nBlob
&& nBlob
<=nAvail
){
878 *ppBlob
= (u8
*)&p
->buf
.z
[p
->iBuf
];
882 int nCopy
= LSM_MIN(nAvail
, nReq
);
886 rc
= lsmStringBinAppend(pBuf
, (u8
*)&p
->buf
.z
[p
->iBuf
], nCopy
);
889 if( nReq
==0 && ppBlob
){
890 *ppBlob
= (u8
*)pBuf
->z
;
898 static void logReaderVarint(
901 int *piVal
, /* OUT: Value read from log */
902 int *pRc
/* IN/OUT: Error code */
906 if( p
->buf
.n
==p
->iBuf
){
907 logReaderBlob(p
, 0, 10, &aVarint
, pRc
);
908 if( LSM_OK
==*pRc
) p
->iBuf
-= (10 - lsmVarintGet32(aVarint
, piVal
));
910 logReaderBlob(p
, pBuf
, lsmVarintSize(p
->buf
.z
[p
->iBuf
]), &aVarint
, pRc
);
911 if( LSM_OK
==*pRc
) lsmVarintGet32(aVarint
, piVal
);
916 static void logReaderByte(LogReader
*p
, u8
*pByte
, int *pRc
){
918 logReaderBlob(p
, 0, 1, &pPtr
, pRc
);
919 if( pPtr
) *pByte
= *pPtr
;
922 static void logReaderCksum(LogReader
*p
, LsmString
*pBuf
, int *pbEof
, int *pRc
){
926 int nCksum
= p
->iBuf
- p
->iCksumBuf
;
928 /* Update in-memory (expected) checksums */
930 logCksumUnaligned(&p
->buf
.z
[p
->iCksumBuf
], nCksum
, &p
->cksum0
, &p
->cksum1
);
931 p
->iCksumBuf
= p
->iBuf
+ 8;
932 logReaderBlob(p
, pBuf
, 8, &pPtr
, pRc
);
933 assert( pPtr
|| *pRc
);
935 /* Read the checksums from the log file. Set *pbEof if they do not match. */
937 cksum0
= lsmGetU32(pPtr
);
938 cksum1
= lsmGetU32(&pPtr
[4]);
939 *pbEof
= (cksum0
!=p
->cksum0
|| cksum1
!=p
->cksum1
);
940 p
->iCksumBuf
= p
->iBuf
;
945 static void logReaderInit(
946 lsm_db
*pDb
, /* Database handle */
947 DbLog
*pLog
, /* Log object associated with pDb */
948 int bInitBuf
, /* True if p->buf is uninitialized */
949 LogReader
*p
/* Initialize this LogReader object */
952 p
->iOff
= pLog
->aRegion
[2].iStart
;
953 p
->cksum0
= pLog
->cksum0
;
954 p
->cksum1
= pLog
->cksum1
;
955 if( bInitBuf
){ lsmStringInit(&p
->buf
, pDb
->pEnv
); }
962 ** This function is called after reading the header of a LOG_DELETE or
963 ** LOG_WRITE record. Parameter nByte is the total size of the key and
964 ** value that follow the header just read. Return true if the size and
965 ** position of the record indicate that it should contain a checksum.
967 static int logRequireCksum(LogReader
*p
, int nByte
){
968 return ((p
->iBuf
+ nByte
- p
->iCksumBuf
) > LSM_CKSUM_MAXDATA
);
972 ** Recover the contents of the log file.
974 int lsmLogRecover(lsm_db
*pDb
){
975 LsmString buf1
; /* Key buffer */
976 LsmString buf2
; /* Value buffer */
977 LogReader reader
; /* Log reader object */
978 int rc
= LSM_OK
; /* Return code */
979 int nCommit
= 0; /* Number of transactions to recover */
981 int nJump
= 0; /* Number of LSM_LOG_JUMP records in pass 0 */
985 rc
= lsmFsOpenLog(pDb
, &bOpen
);
986 if( rc
!=LSM_OK
) return rc
;
988 rc
= lsmTreeInit(pDb
);
989 if( rc
!=LSM_OK
) return rc
;
991 pLog
= &pDb
->treehdr
.log
;
992 lsmCheckpointLogoffset(pDb
->pShmhdr
->aSnap2
, pLog
);
994 logReaderInit(pDb
, pLog
, 1, &reader
);
995 lsmStringInit(&buf1
, pDb
->pEnv
);
996 lsmStringInit(&buf2
, pDb
->pEnv
);
998 /* The outer for() loop runs at most twice. The first iteration is to
999 ** count the number of committed transactions in the log. The second
1000 ** iterates through those transactions and updates the in-memory tree
1001 ** structure with their contents. */
1003 for(iPass
=0; iPass
<2 && rc
==LSM_OK
; iPass
++){
1006 while( rc
==LSM_OK
&& !bEof
){
1008 logReaderByte(&reader
, &eType
, &rc
);
1014 case LSM_LOG_PAD2
: {
1016 logReaderVarint(&reader
, &buf1
, &nPad
, &rc
);
1017 logReaderBlob(&reader
, &buf1
, nPad
, 0, &rc
);
1021 case LSM_LOG_DRANGE
:
1022 case LSM_LOG_DRANGE_CKSUM
:
1024 case LSM_LOG_WRITE_CKSUM
: {
1028 logReaderVarint(&reader
, &buf1
, &nKey
, &rc
);
1029 logReaderVarint(&reader
, &buf2
, &nVal
, &rc
);
1031 if( eType
==LSM_LOG_WRITE_CKSUM
|| eType
==LSM_LOG_DRANGE_CKSUM
){
1032 logReaderCksum(&reader
, &buf1
, &bEof
, &rc
);
1034 bEof
= logRequireCksum(&reader
, nKey
+nVal
);
1038 logReaderBlob(&reader
, &buf1
, nKey
, 0, &rc
);
1039 logReaderBlob(&reader
, &buf2
, nVal
, &aVal
, &rc
);
1040 if( iPass
==1 && rc
==LSM_OK
){
1041 if( eType
==LSM_LOG_WRITE
|| eType
==LSM_LOG_WRITE_CKSUM
){
1042 rc
= lsmTreeInsert(pDb
, (u8
*)buf1
.z
, nKey
, aVal
, nVal
);
1044 rc
= lsmTreeDelete(pDb
, (u8
*)buf1
.z
, nKey
, aVal
, nVal
);
1050 case LSM_LOG_DELETE
:
1051 case LSM_LOG_DELETE_CKSUM
: {
1053 logReaderVarint(&reader
, &buf1
, &nKey
, &rc
);
1055 if( eType
==LSM_LOG_DELETE_CKSUM
){
1056 logReaderCksum(&reader
, &buf1
, &bEof
, &rc
);
1058 bEof
= logRequireCksum(&reader
, nKey
);
1062 logReaderBlob(&reader
, &buf1
, nKey
, &aKey
, &rc
);
1063 if( iPass
==1 && rc
==LSM_OK
){
1064 rc
= lsmTreeInsert(pDb
, aKey
, nKey
, NULL
, -1);
1069 case LSM_LOG_COMMIT
:
1070 logReaderCksum(&reader
, &buf1
, &bEof
, &rc
);
1073 assert( nCommit
>0 || iPass
==1 );
1074 if( nCommit
==0 ) bEof
= 1;
1078 case LSM_LOG_JUMP
: {
1080 logReaderVarint(&reader
, &buf1
, &iOff
, &rc
);
1083 if( pLog
->aRegion
[2].iStart
==0 ){
1084 assert( pLog
->aRegion
[1].iStart
==0 );
1085 pLog
->aRegion
[1].iEnd
= reader
.iOff
;
1087 assert( pLog
->aRegion
[0].iStart
==0 );
1088 pLog
->aRegion
[0].iStart
= pLog
->aRegion
[2].iStart
;
1089 pLog
->aRegion
[0].iEnd
= reader
.iOff
-reader
.buf
.n
+reader
.iBuf
;
1091 pLog
->aRegion
[2].iStart
= iOff
;
1099 reader
.buf
.n
= reader
.iBuf
;
1105 /* Including LSM_LOG_EOF */
1111 if( rc
==LSM_OK
&& iPass
==0 ){
1113 if( pLog
->aRegion
[2].iStart
==0 ){
1116 pLog
->aRegion
[2].iStart
= 0;
1118 lsmCheckpointZeroLogoffset(pDb
);
1121 logReaderInit(pDb
, pLog
, 0, &reader
);
1122 nCommit
= nCommit
* -1;
1127 /* Initialize DbLog object */
1129 pLog
->aRegion
[2].iEnd
= reader
.iOff
- reader
.buf
.n
+ reader
.iBuf
;
1130 pLog
->cksum0
= reader
.cksum0
;
1131 pLog
->cksum1
= reader
.cksum1
;
1135 rc
= lsmFinishRecovery(pDb
);
1137 lsmFinishRecovery(pDb
);
1140 if( pDb
->bRoTrans
){
1144 lsmStringClear(&buf1
);
1145 lsmStringClear(&buf2
);
1146 lsmStringClear(&reader
.buf
);
1150 void lsmLogClose(lsm_db
*db
){
1151 if( db
->pLogWriter
){
1152 lsmFree(db
->pEnv
, db
->pLogWriter
->buf
.z
);
1153 lsmFree(db
->pEnv
, db
->pLogWriter
);