4 ** The author disclaims copyright to this source code. In place of
5 ** a legal notice, here is a blessing:
7 ** May you do good and not evil.
8 ** May you find forgiveness for yourself and forgive others.
9 ** May you share freely, never taking more than you give.
11 *************************************************************************
13 ** This file contains code to read and write checkpoints.
15 ** A checkpoint represents the database layout at a single point in time.
16 ** It includes a log offset. When an existing database is opened, the
17 ** current state is determined by reading the newest checkpoint and updating
18 ** it with all committed transactions from the log that follow the specified
24 ** CHECKPOINT BLOB FORMAT:
26 ** A checkpoint blob is a series of unsigned 32-bit integers stored in
27 ** big-endian byte order. As follows:
29 ** Checkpoint header (see the CKPT_HDR_XXX #defines):
31 ** 1. The checkpoint id MSW.
32 ** 2. The checkpoint id LSW.
33 ** 3. The number of integer values in the entire checkpoint, including
34 ** the two checksum values.
35 ** 4. The compression scheme id.
36 ** 5. The total number of blocks in the database.
38 ** 7. The number of levels.
39 ** 8. The nominal database page size.
40 ** 9. The number of pages (in total) written to the database file.
44 ** 1. The log offset MSW.
45 ** 2. The log offset LSW.
49 ** Note that the "log offset" is not the literal byte offset. Instead,
50 ** it is the byte offset multiplied by 2, with least significant bit
51 ** toggled each time the log pointer value is changed. This is to make
52 ** sure that this field changes each time the log pointer is updated,
53 ** even if the log file itself is disabled. See lsmTreeMakeOld().
55 ** See ckptExportLog() and ckptImportLog().
59 ** 8 integers (4 * 64-bit page numbers). See ckptExportAppendlist().
61 ** For each level in the database, a level record. Formatted as follows:
63 ** 0. Age of the level (least significant 16-bits). And flags mask (most
64 ** significant 16-bits).
65 ** 1. The number of right-hand segments (nRight, possibly 0),
66 ** 2. Segment record for left-hand segment (8 integers defined below),
67 ** 3. Segment record for each right-hand segment (8 integers defined below),
68 ** 4. If nRight>0, The number of segments involved in the merge
69 ** 5. if nRight>0, Current nSkip value (see Merge structure defn.),
70 ** 6. For each segment in the merge:
71 ** 5a. Page number of next cell to read during merge (this field
72 ** is 64-bits - 2 integers)
73 ** 5b. Cell number of next cell to read during merge
74 ** 7. Page containing current split-key (64-bits - 2 integers).
75 ** 8. Cell within page containing current split-key.
76 ** 9. Current pointer value (64-bits - 2 integers).
78 ** The block redirect array:
80 ** 1. Number of redirections (maximum LSM_MAX_BLOCK_REDIRECTS).
81 ** 2. For each redirection:
82 ** a. "from" block number
83 ** b. "to" block number
85 ** The in-memory freelist entries. Each entry is either an insert or a
86 ** delete. The in-memory freelist is to the free-block-list as the
87 ** in-memory tree is to the users database content.
89 ** 1. Number of free-list entries stored in checkpoint header.
90 ** 2. Number of free blocks (in total).
91 ** 3. Total number of blocks freed during database lifetime.
93 ** 2a. Block number of free block.
94 ** 2b. A 64-bit integer (MSW followed by LSW). -1 for a delete entry,
95 ** or the associated checkpoint id for an insert.
99 ** 1. Checksum value 1.
100 ** 2. Checksum value 2.
102 ** In the above, a segment record consists of the following four 64-bit
103 ** fields (converted to 2 * u32 by storing the MSW followed by LSW):
105 ** 1. First page of array,
106 ** 2. Last page of array,
107 ** 3. Root page of array (or 0),
108 ** 4. Size of array in pages.
112 ** LARGE NUMBERS OF LEVEL RECORDS:
114 ** A limit on the number of rhs segments that may be present in the database
115 ** file. Defining this limit ensures that all level records fit within
116 ** the 4096 byte limit for checkpoint blobs.
118 ** The number of right-hand-side segments in a database is counted as
121 ** * For each level in the database not undergoing a merge, add 1.
123 ** * For each level in the database that is undergoing a merge, add
124 ** the number of segments on the rhs of the level.
126 ** A level record not undergoing a merge is 10 integers. A level record
127 ** with nRhs rhs segments and (nRhs+1) input segments (i.e. including the
128 ** separators from the next level) is (11*nRhs+20) integers. The maximum
129 ** per right-hand-side level is therefore 21 integers. So the maximum
130 ** size of all level records in a checkpoint is 21*40=820 integers.
132 ** TODO: Before pointer values were changed from 32 to 64 bits, the above
133 ** used to come to 420 bytes - leaving significant space for a free-list
134 ** prefix. No more. To fix this, reduce the size of the level records in
135 ** a db snapshot, and improve management of the free-list tail in
138 #define LSM_MAX_RHS_SEGMENTS 40
141 ** LARGE NUMBERS OF FREELIST ENTRIES:
143 ** There is also a limit (LSM_MAX_FREELIST_ENTRIES - defined in lsmInt.h)
144 ** on the number of free-list entries stored in a checkpoint. Since each
145 ** free-list entry consists of 3 integers, the maximum free-list size is
146 ** 3*100=300 integers. Combined with the limit on rhs segments defined
147 ** above, this ensures that a checkpoint always fits within a 4096 byte
150 ** If the database contains more than 100 free blocks, the "overflow" flag
151 ** in the checkpoint header is set and the remainder are stored in the
152 ** system FREELIST entry in the LSM (along with user data). The value
153 ** accompanying the FREELIST key in the LSM is, like a checkpoint, an array
154 ** of 32-bit big-endian integers. As follows:
157 ** a. Block number of free block.
158 ** b. MSW of associated checkpoint id.
159 ** c. LSW of associated checkpoint id.
161 ** The number of entries is not required - it is implied by the size of the
162 ** value blob containing the integer array.
164 ** Note that the limit defined by LSM_MAX_FREELIST_ENTRIES is a hard limit.
165 ** The actual value used may be configured using LSM_CONFIG_MAX_FREELIST.
169 ** The argument to this macro must be of type u32. On a little-endian
170 ** architecture, it returns the u32 value that results from interpreting
171 ** the 4 bytes as a big-endian value. On a big-endian architecture, it
172 ** returns the value that would be produced by intepreting the 4 bytes
173 ** of the input value as a little-endian integer.
175 #define BYTESWAP32(x) ( \
176 (((x)&0x000000FF)<<24) + (((x)&0x0000FF00)<<8) \
177 + (((x)&0x00FF0000)>>8) + (((x)&0xFF000000)>>24) \
180 static const int one
= 1;
181 #define LSM_LITTLE_ENDIAN (*(u8 *)(&one))
183 /* Sizes, in integers, of various parts of the checkpoint. */
184 #define CKPT_HDR_SIZE 9
185 #define CKPT_LOGPTR_SIZE 4
186 #define CKPT_APPENDLIST_SIZE (LSM_APPLIST_SZ * 2)
188 /* A #define to describe each integer in the checkpoint header. */
189 #define CKPT_HDR_ID_MSW 0
190 #define CKPT_HDR_ID_LSW 1
191 #define CKPT_HDR_NCKPT 2
192 #define CKPT_HDR_CMPID 3
193 #define CKPT_HDR_NBLOCK 4
194 #define CKPT_HDR_BLKSZ 5
195 #define CKPT_HDR_NLEVEL 6
196 #define CKPT_HDR_PGSZ 7
197 #define CKPT_HDR_NWRITE 8
199 #define CKPT_HDR_LO_MSW 9
200 #define CKPT_HDR_LO_LSW 10
201 #define CKPT_HDR_LO_CKSUM1 11
202 #define CKPT_HDR_LO_CKSUM2 12
204 typedef struct CkptBuffer CkptBuffer
;
207 ** Dynamic buffer used to accumulate data for a checkpoint.
216 ** Calculate the checksum of the checkpoint specified by arguments aCkpt and
217 ** nCkpt. Store the checksum in *piCksum1 and *piCksum2 before returning.
219 ** The value of the nCkpt parameter includes the two checksum values at
220 ** the end of the checkpoint. They are not used as inputs to the checksum
221 ** calculation. The checksum is based on the array of (nCkpt-2) integers
224 static void ckptChecksum(u32
*aCkpt
, u32 nCkpt
, u32
*piCksum1
, u32
*piCksum2
){
230 cksum1
+= aCkpt
[nCkpt
-3] & 0x0000FFFF;
231 cksum2
+= aCkpt
[nCkpt
-3] & 0xFFFF0000;
234 for(i
=0; (i
+3)<nCkpt
; i
+=2){
235 cksum1
+= cksum2
+ aCkpt
[i
];
236 cksum2
+= cksum1
+ aCkpt
[i
+1];
244 ** Set integer iIdx of the checkpoint accumulating in buffer *p to iVal.
246 static void ckptSetValue(CkptBuffer
*p
, int iIdx
, u32 iVal
, int *pRc
){
248 if( iIdx
>=p
->nAlloc
){
249 int nNew
= LSM_MAX(8, iIdx
*2);
250 p
->aCkpt
= (u32
*)lsmReallocOrFree(p
->pEnv
, p
->aCkpt
, nNew
*sizeof(u32
));
252 *pRc
= LSM_NOMEM_BKPT
;
257 p
->aCkpt
[iIdx
] = iVal
;
261 ** Argument aInt points to an array nInt elements in size. Switch the
262 ** endian-ness of each element of the array.
264 static void ckptChangeEndianness(u32
*aInt
, int nInt
){
265 if( LSM_LITTLE_ENDIAN
){
267 for(i
=0; i
<nInt
; i
++) aInt
[i
] = BYTESWAP32(aInt
[i
]);
272 ** Object *p contains a checkpoint in native byte-order. The checkpoint is
273 ** nCkpt integers in size, not including any checksum. This function sets
274 ** the two checksum elements of the checkpoint accordingly.
276 static void ckptAddChecksum(CkptBuffer
*p
, int nCkpt
, int *pRc
){
278 u32 aCksum
[2] = {0, 0};
279 ckptChecksum(p
->aCkpt
, nCkpt
+2, &aCksum
[0], &aCksum
[1]);
280 ckptSetValue(p
, nCkpt
, aCksum
[0], pRc
);
281 ckptSetValue(p
, nCkpt
+1, aCksum
[1], pRc
);
285 static void ckptAppend64(CkptBuffer
*p
, int *piOut
, i64 iVal
, int *pRc
){
287 ckptSetValue(p
, iOut
++, (iVal
>> 32) & 0xFFFFFFFF, pRc
);
288 ckptSetValue(p
, iOut
++, (iVal
& 0xFFFFFFFF), pRc
);
292 static i64
ckptRead64(u32
*a
){
293 return (((i64
)a
[0]) << 32) + (i64
)a
[1];
296 static i64
ckptGobble64(u32
*a
, int *piIn
){
299 return ckptRead64(&a
[iIn
]);
304 ** Append a 6-value segment record corresponding to pSeg to the checkpoint
305 ** buffer passed as the third argument.
307 static void ckptExportSegment(
313 ckptAppend64(p
, piOut
, pSeg
->iFirst
, pRc
);
314 ckptAppend64(p
, piOut
, pSeg
->iLastPg
, pRc
);
315 ckptAppend64(p
, piOut
, pSeg
->iRoot
, pRc
);
316 ckptAppend64(p
, piOut
, pSeg
->nSize
, pRc
);
319 static void ckptExportLevel(
320 Level
*pLevel
, /* Level object to serialize */
321 CkptBuffer
*p
, /* Append new level record to this ckpt */
322 int *piOut
, /* IN/OUT: Size of checkpoint so far */
323 int *pRc
/* IN/OUT: Error code */
328 pMerge
= pLevel
->pMerge
;
329 ckptSetValue(p
, iOut
++, (u32
)pLevel
->iAge
+ (u32
)(pLevel
->flags
<<16), pRc
);
330 ckptSetValue(p
, iOut
++, pLevel
->nRight
, pRc
);
331 ckptExportSegment(&pLevel
->lhs
, p
, &iOut
, pRc
);
333 assert( (pLevel
->nRight
>0)==(pMerge
!=0) );
336 for(i
=0; i
<pLevel
->nRight
; i
++){
337 ckptExportSegment(&pLevel
->aRhs
[i
], p
, &iOut
, pRc
);
339 assert( pMerge
->nInput
==pLevel
->nRight
340 || pMerge
->nInput
==pLevel
->nRight
+1
342 ckptSetValue(p
, iOut
++, pMerge
->nInput
, pRc
);
343 ckptSetValue(p
, iOut
++, pMerge
->nSkip
, pRc
);
344 for(i
=0; i
<pMerge
->nInput
; i
++){
345 ckptAppend64(p
, &iOut
, pMerge
->aInput
[i
].iPg
, pRc
);
346 ckptSetValue(p
, iOut
++, pMerge
->aInput
[i
].iCell
, pRc
);
348 ckptAppend64(p
, &iOut
, pMerge
->splitkey
.iPg
, pRc
);
349 ckptSetValue(p
, iOut
++, pMerge
->splitkey
.iCell
, pRc
);
350 ckptAppend64(p
, &iOut
, pMerge
->iCurrentPtr
, pRc
);
357 ** Populate the log offset fields of the checkpoint buffer. 4 values.
359 static void ckptExportLog(
368 assert( iOut
==CKPT_HDR_LO_MSW
);
371 i64 iOff
= pDb
->treehdr
.iOldLog
;
372 ckptAppend64(p
, &iOut
, iOff
, pRc
);
373 ckptSetValue(p
, iOut
++, pDb
->treehdr
.oldcksum0
, pRc
);
374 ckptSetValue(p
, iOut
++, pDb
->treehdr
.oldcksum1
, pRc
);
376 for(; iOut
<=CKPT_HDR_LO_CKSUM2
; iOut
++){
377 ckptSetValue(p
, iOut
, pDb
->pShmhdr
->aSnap2
[iOut
], pRc
);
381 assert( *pRc
|| iOut
==CKPT_HDR_LO_CKSUM2
+1 );
385 static void ckptExportAppendlist(
386 lsm_db
*db
, /* Database connection */
387 CkptBuffer
*p
, /* Checkpoint buffer to write to */
388 int *piOut
, /* IN/OUT: Offset within checkpoint buffer */
389 int *pRc
/* IN/OUT: Error code */
392 LsmPgno
*aiAppend
= db
->pWorker
->aiAppend
;
394 for(i
=0; i
<LSM_APPLIST_SZ
; i
++){
395 ckptAppend64(p
, piOut
, aiAppend
[i
], pRc
);
399 static int ckptExportSnapshot(
400 lsm_db
*pDb
, /* Connection handle */
401 int bLog
, /* True to update log-offset fields */
402 i64 iId
, /* Checkpoint id */
403 int bCksum
, /* If true, include checksums */
404 void **ppCkpt
, /* OUT: Buffer containing checkpoint */
405 int *pnCkpt
/* OUT: Size of checkpoint in bytes */
407 int rc
= LSM_OK
; /* Return Code */
408 FileSystem
*pFS
= pDb
->pFS
; /* File system object */
409 Snapshot
*pSnap
= pDb
->pWorker
; /* Worker snapshot */
410 int nLevel
= 0; /* Number of levels in checkpoint */
411 int iLevel
; /* Used to count out nLevel levels */
412 int iOut
= 0; /* Current offset in aCkpt[] */
413 Level
*pLevel
; /* Level iterator */
414 int i
; /* Iterator used while serializing freelist */
417 /* Initialize the output buffer */
418 memset(&ckpt
, 0, sizeof(CkptBuffer
));
419 ckpt
.pEnv
= pDb
->pEnv
;
420 iOut
= CKPT_HDR_SIZE
;
422 /* Write the log offset into the checkpoint. */
423 ckptExportLog(pDb
, bLog
, &ckpt
, &iOut
, &rc
);
425 /* Write the append-point list */
426 ckptExportAppendlist(pDb
, &ckpt
, &iOut
, &rc
);
428 /* Figure out how many levels will be written to the checkpoint. */
429 for(pLevel
=lsmDbSnapshotLevel(pSnap
); pLevel
; pLevel
=pLevel
->pNext
) nLevel
++;
431 /* Serialize nLevel levels. */
433 for(pLevel
=lsmDbSnapshotLevel(pSnap
); iLevel
<nLevel
; pLevel
=pLevel
->pNext
){
434 ckptExportLevel(pLevel
, &ckpt
, &iOut
, &rc
);
438 /* Write the block-redirect list */
439 ckptSetValue(&ckpt
, iOut
++, pSnap
->redirect
.n
, &rc
);
440 for(i
=0; i
<pSnap
->redirect
.n
; i
++){
441 ckptSetValue(&ckpt
, iOut
++, pSnap
->redirect
.a
[i
].iFrom
, &rc
);
442 ckptSetValue(&ckpt
, iOut
++, pSnap
->redirect
.a
[i
].iTo
, &rc
);
445 /* Write the freelist */
446 assert( pSnap
->freelist
.nEntry
<=pDb
->nMaxFreelist
);
448 int nFree
= pSnap
->freelist
.nEntry
;
449 ckptSetValue(&ckpt
, iOut
++, nFree
, &rc
);
450 for(i
=0; i
<nFree
; i
++){
451 FreelistEntry
*p
= &pSnap
->freelist
.aEntry
[i
];
452 ckptSetValue(&ckpt
, iOut
++, p
->iBlk
, &rc
);
453 ckptSetValue(&ckpt
, iOut
++, (p
->iId
>> 32) & 0xFFFFFFFF, &rc
);
454 ckptSetValue(&ckpt
, iOut
++, p
->iId
& 0xFFFFFFFF, &rc
);
458 /* Write the checkpoint header */
460 assert( pSnap
->iCmpId
==pDb
->compress
.iId
461 || pSnap
->iCmpId
==LSM_COMPRESSION_EMPTY
463 ckptSetValue(&ckpt
, CKPT_HDR_ID_MSW
, (u32
)(iId
>>32), &rc
);
464 ckptSetValue(&ckpt
, CKPT_HDR_ID_LSW
, (u32
)(iId
&0xFFFFFFFF), &rc
);
465 ckptSetValue(&ckpt
, CKPT_HDR_NCKPT
, iOut
+2, &rc
);
466 ckptSetValue(&ckpt
, CKPT_HDR_CMPID
, pDb
->compress
.iId
, &rc
);
467 ckptSetValue(&ckpt
, CKPT_HDR_NBLOCK
, pSnap
->nBlock
, &rc
);
468 ckptSetValue(&ckpt
, CKPT_HDR_BLKSZ
, lsmFsBlockSize(pFS
), &rc
);
469 ckptSetValue(&ckpt
, CKPT_HDR_NLEVEL
, nLevel
, &rc
);
470 ckptSetValue(&ckpt
, CKPT_HDR_PGSZ
, lsmFsPageSize(pFS
), &rc
);
471 ckptSetValue(&ckpt
, CKPT_HDR_NWRITE
, pSnap
->nWrite
, &rc
);
474 ckptAddChecksum(&ckpt
, iOut
, &rc
);
476 ckptSetValue(&ckpt
, iOut
, 0, &rc
);
477 ckptSetValue(&ckpt
, iOut
+1, 0, &rc
);
480 assert( iOut
<=1024 );
482 #ifdef LSM_LOG_FREELIST
483 lsmLogMessage(pDb
, rc
,
484 "ckptExportSnapshot(): id=%lld freelist: %d", iId
, pSnap
->freelist
.nEntry
486 for(i
=0; i
<pSnap
->freelist
.nEntry
; i
++){
487 lsmLogMessage(pDb
, rc
,
488 "ckptExportSnapshot(): iBlk=%d id=%lld",
489 pSnap
->freelist
.aEntry
[i
].iBlk
,
490 pSnap
->freelist
.aEntry
[i
].iId
495 *ppCkpt
= (void *)ckpt
.aCkpt
;
496 if( pnCkpt
) *pnCkpt
= sizeof(u32
)*iOut
;
502 ** Helper function for ckptImport().
504 static void ckptNewSegment(
507 Segment
*pSegment
/* Populate this structure */
509 assert( pSegment
->iFirst
==0 && pSegment
->iLastPg
==0 );
510 assert( pSegment
->nSize
==0 && pSegment
->iRoot
==0 );
511 pSegment
->iFirst
= ckptGobble64(aIn
, piIn
);
512 pSegment
->iLastPg
= ckptGobble64(aIn
, piIn
);
513 pSegment
->iRoot
= ckptGobble64(aIn
, piIn
);
514 pSegment
->nSize
= ckptGobble64(aIn
, piIn
);
515 assert( pSegment
->iFirst
);
518 static int ckptSetupMerge(lsm_db
*pDb
, u32
*aInt
, int *piIn
, Level
*pLevel
){
519 Merge
*pMerge
; /* Allocated Merge object */
520 int nInput
; /* Number of input segments in merge */
521 int iIn
= *piIn
; /* Next value to read from aInt[] */
522 int i
; /* Iterator variable */
523 int nByte
; /* Number of bytes to allocate */
525 /* Allocate the Merge object. If malloc() fails, return LSM_NOMEM. */
526 nInput
= (int)aInt
[iIn
++];
527 nByte
= sizeof(Merge
) + sizeof(MergeInput
) * nInput
;
528 pMerge
= (Merge
*)lsmMallocZero(pDb
->pEnv
, nByte
);
529 if( !pMerge
) return LSM_NOMEM_BKPT
;
530 pLevel
->pMerge
= pMerge
;
532 /* Populate the Merge object. */
533 pMerge
->aInput
= (MergeInput
*)&pMerge
[1];
534 pMerge
->nInput
= nInput
;
535 pMerge
->iOutputOff
= -1;
536 pMerge
->nSkip
= (int)aInt
[iIn
++];
537 for(i
=0; i
<nInput
; i
++){
538 pMerge
->aInput
[i
].iPg
= ckptGobble64(aInt
, &iIn
);
539 pMerge
->aInput
[i
].iCell
= (int)aInt
[iIn
++];
541 pMerge
->splitkey
.iPg
= ckptGobble64(aInt
, &iIn
);
542 pMerge
->splitkey
.iCell
= (int)aInt
[iIn
++];
543 pMerge
->iCurrentPtr
= ckptGobble64(aInt
, &iIn
);
545 /* Set *piIn and return LSM_OK. */
551 static int ckptLoadLevels(
565 for(i
=0; rc
==LSM_OK
&& i
<nLevel
; i
++){
569 /* Allocate space for the Level structure and Level.apRight[] array */
570 pLevel
= (Level
*)lsmMallocZeroRc(pDb
->pEnv
, sizeof(Level
), &rc
);
572 pLevel
->iAge
= (u16
)(aIn
[iIn
] & 0x0000FFFF);
573 pLevel
->flags
= (u16
)((aIn
[iIn
]>>16) & 0x0000FFFF);
575 pLevel
->nRight
= aIn
[iIn
++];
576 if( pLevel
->nRight
){
577 int nByte
= sizeof(Segment
) * pLevel
->nRight
;
578 pLevel
->aRhs
= (Segment
*)lsmMallocZeroRc(pDb
->pEnv
, nByte
, &rc
);
582 ppNext
= &pLevel
->pNext
;
584 /* Allocate the main segment */
585 ckptNewSegment(aIn
, &iIn
, &pLevel
->lhs
);
587 /* Allocate each of the right-hand segments, if any */
588 for(iRight
=0; iRight
<pLevel
->nRight
; iRight
++){
589 ckptNewSegment(aIn
, &iIn
, &pLevel
->aRhs
[iRight
]);
592 /* Set up the Merge object, if required */
593 if( pLevel
->nRight
>0 ){
594 rc
= ckptSetupMerge(pDb
, aIn
, &iIn
, pLevel
);
601 /* An OOM must have occurred. Free any level structures allocated and
602 ** return the error to the caller. */
603 lsmSortedFreeLevel(pDb
->pEnv
, pRet
);
613 int lsmCheckpointLoadLevels(lsm_db
*pDb
, void *pVal
, int nVal
){
618 aIn
= lsmMallocRc(pDb
->pEnv
, nVal
, &rc
);
626 memcpy(aIn
, pVal
, nVal
);
627 nIn
= nVal
/ sizeof(u32
);
629 ckptChangeEndianness(aIn
, nIn
);
631 rc
= ckptLoadLevels(pDb
, aIn
, &iIn
, nLevel
, &pLevel
);
632 lsmFree(pDb
->pEnv
, aIn
);
633 assert( rc
==LSM_OK
|| pLevel
==0 );
635 pParent
= lsmDbSnapshotLevel(pDb
->pWorker
);
637 while( pParent
->pNext
) pParent
= pParent
->pNext
;
638 pParent
->pNext
= pLevel
;
647 ** Return the data for the LEVELS record.
649 ** The size of the checkpoint that can be stored in the database header
650 ** must not exceed 1024 32-bit integers. Normally, it does not. However,
651 ** if it does, part of the checkpoint must be stored in the LSM. This
652 ** routine returns that part.
654 int lsmCheckpointLevels(
655 lsm_db
*pDb
, /* Database handle */
656 int nLevel
, /* Number of levels to write to blob */
657 void **paVal
, /* OUT: Pointer to LEVELS blob */
658 int *pnVal
/* OUT: Size of LEVELS blob in bytes */
660 Level
*p
; /* Used to iterate through levels */
668 for(p
=lsmDbSnapshotLevel(pDb
->pWorker
); p
; p
=p
->pNext
) nAll
++;
670 assert( nAll
>nLevel
);
672 for(p
=lsmDbSnapshotLevel(pDb
->pWorker
); p
&& nAll
>0; p
=p
->pNext
) nAll
--;
674 memset(&ckpt
, 0, sizeof(CkptBuffer
));
675 ckpt
.pEnv
= pDb
->pEnv
;
677 ckptSetValue(&ckpt
, 0, nLevel
, &rc
);
679 for(i
=0; rc
==LSM_OK
&& i
<nLevel
; i
++){
680 ckptExportLevel(p
, &ckpt
, &iOut
, &rc
);
683 assert( rc
!=LSM_OK
|| p
==0 );
686 ckptChangeEndianness(ckpt
.aCkpt
, iOut
);
687 *paVal
= (void *)ckpt
.aCkpt
;
688 *pnVal
= iOut
* sizeof(u32
);
698 ** Read the checkpoint id from meta-page pPg.
700 static i64
ckptLoadId(MetaPage
*pPg
){
704 u8
*aData
= lsmFsMetaPageData(pPg
, &nData
);
705 ret
= (((i64
)lsmGetU32(&aData
[CKPT_HDR_ID_MSW
*4])) << 32) +
706 ((i64
)lsmGetU32(&aData
[CKPT_HDR_ID_LSW
*4]));
712 ** Return true if the buffer passed as an argument contains a valid
715 static int ckptChecksumOk(u32
*aCkpt
){
716 u32 nCkpt
= aCkpt
[CKPT_HDR_NCKPT
];
720 if( nCkpt
<CKPT_HDR_NCKPT
|| nCkpt
>(LSM_META_RW_PAGE_SIZE
)/sizeof(u32
) ){
723 ckptChecksum(aCkpt
, nCkpt
, &cksum1
, &cksum2
);
724 return (cksum1
==aCkpt
[nCkpt
-2] && cksum2
==aCkpt
[nCkpt
-1]);
728 ** Attempt to load a checkpoint from meta page iMeta.
730 ** This function is a no-op if *pRc is set to any value other than LSM_OK
731 ** when it is called. If an error occurs, *pRc is set to an LSM error code
734 ** If no error occurs and the checkpoint is successfully loaded, copy it to
735 ** ShmHeader.aSnap1[] and ShmHeader.aSnap2[], and set ShmHeader.iMetaPage
736 ** to indicate its origin. In this case return 1. Or, if the checkpoint
737 ** cannot be loaded (because the checksum does not compute), return 0.
739 static int ckptTryLoad(lsm_db
*pDb
, MetaPage
*pPg
, u32 iMeta
, int *pRc
){
740 int bLoaded
= 0; /* Return value */
742 int rc
= LSM_OK
; /* Error code */
743 u32
*aCkpt
= 0; /* Pointer to buffer containing checkpoint */
744 u32 nCkpt
; /* Number of elements in aCkpt[] */
745 int nData
; /* Bytes of data in aData[] */
746 u8
*aData
; /* Meta page data */
748 aData
= lsmFsMetaPageData(pPg
, &nData
);
749 nCkpt
= (u32
)lsmGetU32(&aData
[CKPT_HDR_NCKPT
*sizeof(u32
)]);
750 if( nCkpt
<=nData
/sizeof(u32
) && nCkpt
>CKPT_HDR_NCKPT
){
751 aCkpt
= (u32
*)lsmMallocRc(pDb
->pEnv
, nCkpt
*sizeof(u32
), &rc
);
754 memcpy(aCkpt
, aData
, nCkpt
*sizeof(u32
));
755 ckptChangeEndianness(aCkpt
, nCkpt
);
756 if( ckptChecksumOk(aCkpt
) ){
757 ShmHeader
*pShm
= pDb
->pShmhdr
;
758 memcpy(pShm
->aSnap1
, aCkpt
, nCkpt
*sizeof(u32
));
759 memcpy(pShm
->aSnap2
, aCkpt
, nCkpt
*sizeof(u32
));
760 memcpy(pDb
->aSnapshot
, aCkpt
, nCkpt
*sizeof(u32
));
761 pShm
->iMetaPage
= iMeta
;
766 lsmFree(pDb
->pEnv
, aCkpt
);
773 ** Initialize the shared-memory header with an empty snapshot. This function
774 ** is called when no valid snapshot can be found in the database header.
776 static void ckptLoadEmpty(lsm_db
*pDb
){
778 0, /* CKPT_HDR_ID_MSW */
779 10, /* CKPT_HDR_ID_LSW */
780 0, /* CKPT_HDR_NCKPT */
781 LSM_COMPRESSION_EMPTY
, /* CKPT_HDR_CMPID */
782 0, /* CKPT_HDR_NBLOCK */
783 0, /* CKPT_HDR_BLKSZ */
784 0, /* CKPT_HDR_NLEVEL */
785 0, /* CKPT_HDR_PGSZ */
786 0, /* CKPT_HDR_NWRITE */
787 0, 0, 1234, 5678, /* The log pointer and initial checksum */
788 0,0,0,0, 0,0,0,0, /* The append list */
789 0, /* The redirected block list */
790 0, /* The free block list */
791 0, 0 /* Space for checksum values */
793 u32 nCkpt
= array_size(aCkpt
);
794 ShmHeader
*pShm
= pDb
->pShmhdr
;
796 aCkpt
[CKPT_HDR_NCKPT
] = nCkpt
;
797 aCkpt
[CKPT_HDR_BLKSZ
] = pDb
->nDfltBlksz
;
798 aCkpt
[CKPT_HDR_PGSZ
] = pDb
->nDfltPgsz
;
799 ckptChecksum(aCkpt
, array_size(aCkpt
), &aCkpt
[nCkpt
-2], &aCkpt
[nCkpt
-1]);
801 memcpy(pShm
->aSnap1
, aCkpt
, nCkpt
*sizeof(u32
));
802 memcpy(pShm
->aSnap2
, aCkpt
, nCkpt
*sizeof(u32
));
803 memcpy(pDb
->aSnapshot
, aCkpt
, nCkpt
*sizeof(u32
));
807 ** This function is called as part of database recovery to initialize the
808 ** ShmHeader.aSnap1[] and ShmHeader.aSnap2[] snapshots.
810 int lsmCheckpointRecover(lsm_db
*pDb
){
811 int rc
= LSM_OK
; /* Return Code */
812 i64 iId1
; /* Id of checkpoint on meta-page 1 */
813 i64 iId2
; /* Id of checkpoint on meta-page 2 */
814 int bLoaded
= 0; /* True once checkpoint has been loaded */
815 int cmp
; /* True if (iId2>iId1) */
816 MetaPage
*apPg
[2] = {0, 0}; /* Meta-pages 1 and 2 */
818 rc
= lsmFsMetaPageGet(pDb
->pFS
, 0, 1, &apPg
[0]);
819 if( rc
==LSM_OK
) rc
= lsmFsMetaPageGet(pDb
->pFS
, 0, 2, &apPg
[1]);
821 iId1
= ckptLoadId(apPg
[0]);
822 iId2
= ckptLoadId(apPg
[1]);
824 bLoaded
= ckptTryLoad(pDb
, apPg
[cmp
?1:0], (cmp
?2:1), &rc
);
826 bLoaded
= ckptTryLoad(pDb
, apPg
[cmp
?0:1], (cmp
?1:2), &rc
);
829 /* The database does not contain a valid checkpoint. Initialize the shared
830 ** memory header with an empty checkpoint. */
835 lsmFsMetaPageRelease(apPg
[0]);
836 lsmFsMetaPageRelease(apPg
[1]);
842 ** Store the snapshot in pDb->aSnapshot[] in meta-page iMeta.
844 int lsmCheckpointStore(lsm_db
*pDb
, int iMeta
){
848 assert( iMeta
==1 || iMeta
==2 );
849 rc
= lsmFsMetaPageGet(pDb
->pFS
, 1, iMeta
, &pPg
);
855 nCkpt
= (int)pDb
->aSnapshot
[CKPT_HDR_NCKPT
];
856 aData
= lsmFsMetaPageData(pPg
, &nData
);
857 memcpy(aData
, pDb
->aSnapshot
, nCkpt
*sizeof(u32
));
858 ckptChangeEndianness((u32
*)aData
, nCkpt
);
859 rc
= lsmFsMetaPageRelease(pPg
);
866 ** Copy the current client snapshot from shared-memory to pDb->aSnapshot[].
868 int lsmCheckpointLoad(lsm_db
*pDb
, int *piRead
){
869 int nRem
= LSM_ATTEMPTS_BEFORE_PROTOCOL
;
870 ShmHeader
*pShm
= pDb
->pShmhdr
;
874 nInt
= pShm
->aSnap1
[CKPT_HDR_NCKPT
];
875 if( nInt
<=(LSM_META_RW_PAGE_SIZE
/ sizeof(u32
)) ){
876 memcpy(pDb
->aSnapshot
, pShm
->aSnap1
, nInt
*sizeof(u32
));
877 if( ckptChecksumOk(pDb
->aSnapshot
) ){
878 if( piRead
) *piRead
= 1;
883 nInt
= pShm
->aSnap2
[CKPT_HDR_NCKPT
];
884 if( nInt
<=(LSM_META_RW_PAGE_SIZE
/ sizeof(u32
)) ){
885 memcpy(pDb
->aSnapshot
, pShm
->aSnap2
, nInt
*sizeof(u32
));
886 if( ckptChecksumOk(pDb
->aSnapshot
) ){
887 if( piRead
) *piRead
= 2;
894 return LSM_PROTOCOL_BKPT
;
897 int lsmInfoCompressionId(lsm_db
*db
, u32
*piCmpId
){
900 assert( db
->pClient
==0 && db
->pWorker
==0 );
901 rc
= lsmCheckpointLoad(db
, 0);
903 *piCmpId
= db
->aSnapshot
[CKPT_HDR_CMPID
];
909 int lsmCheckpointLoadOk(lsm_db
*pDb
, int iSnap
){
911 assert( iSnap
==1 || iSnap
==2 );
912 aShm
= (iSnap
==1) ? pDb
->pShmhdr
->aSnap1
: pDb
->pShmhdr
->aSnap2
;
913 return (lsmCheckpointId(pDb
->aSnapshot
, 0)==lsmCheckpointId(aShm
, 0) );
916 int lsmCheckpointClientCacheOk(lsm_db
*pDb
){
917 return ( pDb
->pClient
918 && pDb
->pClient
->iId
==lsmCheckpointId(pDb
->aSnapshot
, 0)
919 && pDb
->pClient
->iId
==lsmCheckpointId(pDb
->pShmhdr
->aSnap1
, 0)
920 && pDb
->pClient
->iId
==lsmCheckpointId(pDb
->pShmhdr
->aSnap2
, 0)
924 int lsmCheckpointLoadWorker(lsm_db
*pDb
){
926 ShmHeader
*pShm
= pDb
->pShmhdr
;
930 /* Must be holding the WORKER lock to do this. Or DMS2. */
932 lsmShmAssertLock(pDb
, LSM_LOCK_WORKER
, LSM_LOCK_EXCL
)
933 || lsmShmAssertLock(pDb
, LSM_LOCK_DMS1
, LSM_LOCK_EXCL
)
936 /* Check that the two snapshots match. If not, repair them. */
937 nInt1
= pShm
->aSnap1
[CKPT_HDR_NCKPT
];
938 nInt2
= pShm
->aSnap2
[CKPT_HDR_NCKPT
];
939 if( nInt1
!=nInt2
|| memcmp(pShm
->aSnap1
, pShm
->aSnap2
, nInt2
*sizeof(u32
)) ){
940 if( ckptChecksumOk(pShm
->aSnap1
) ){
941 memcpy(pShm
->aSnap2
, pShm
->aSnap1
, sizeof(u32
)*nInt1
);
942 }else if( ckptChecksumOk(pShm
->aSnap2
) ){
943 memcpy(pShm
->aSnap1
, pShm
->aSnap2
, sizeof(u32
)*nInt2
);
945 return LSM_PROTOCOL_BKPT
;
949 rc
= lsmCheckpointDeserialize(pDb
, 1, pShm
->aSnap1
, &pDb
->pWorker
);
950 if( pDb
->pWorker
) pDb
->pWorker
->pDatabase
= pDb
->pDatabase
;
953 rc
= lsmCheckCompressionId(pDb
, pDb
->pWorker
->iCmpId
);
957 assert( rc
!=LSM_OK
|| lsmFsIntegrityCheck(pDb
) );
962 int lsmCheckpointDeserialize(
964 int bInclFreelist
, /* If true, deserialize free-list */
971 pNew
= (Snapshot
*)lsmMallocZeroRc(pDb
->pEnv
, sizeof(Snapshot
), &rc
);
976 int nLevel
= (int)aCkpt
[CKPT_HDR_NLEVEL
];
977 int iIn
= CKPT_HDR_SIZE
+ CKPT_APPENDLIST_SIZE
+ CKPT_LOGPTR_SIZE
;
979 pNew
->iId
= lsmCheckpointId(aCkpt
, 0);
980 pNew
->nBlock
= aCkpt
[CKPT_HDR_NBLOCK
];
981 pNew
->nWrite
= aCkpt
[CKPT_HDR_NWRITE
];
982 rc
= ckptLoadLevels(pDb
, aCkpt
, &iIn
, nLevel
, &pNew
->pLevel
);
983 pNew
->iLogOff
= lsmCheckpointLogOffset(aCkpt
);
984 pNew
->iCmpId
= aCkpt
[CKPT_HDR_CMPID
];
986 /* Make a copy of the append-list */
987 for(i
=0; i
<LSM_APPLIST_SZ
; i
++){
988 u32
*a
= &aCkpt
[CKPT_HDR_SIZE
+ CKPT_LOGPTR_SIZE
+ i
*2];
989 pNew
->aiAppend
[i
] = ckptRead64(a
);
992 /* Read the block-redirect list */
993 pNew
->redirect
.n
= aCkpt
[iIn
++];
994 if( pNew
->redirect
.n
){
995 pNew
->redirect
.a
= lsmMallocZeroRc(pDb
->pEnv
,
996 (sizeof(struct RedirectEntry
) * LSM_MAX_BLOCK_REDIRECTS
), &rc
999 for(i
=0; i
<pNew
->redirect
.n
; i
++){
1000 pNew
->redirect
.a
[i
].iFrom
= aCkpt
[iIn
++];
1001 pNew
->redirect
.a
[i
].iTo
= aCkpt
[iIn
++];
1004 for(pLvl
=pNew
->pLevel
; pLvl
->pNext
; pLvl
=pLvl
->pNext
);
1006 pLvl
->aRhs
[pLvl
->nRight
-1].pRedirect
= &pNew
->redirect
;
1008 pLvl
->lhs
.pRedirect
= &pNew
->redirect
;
1012 /* Copy the free-list */
1013 if( rc
==LSM_OK
&& bInclFreelist
){
1014 nFree
= aCkpt
[iIn
++];
1016 pNew
->freelist
.aEntry
= (FreelistEntry
*)lsmMallocZeroRc(
1017 pDb
->pEnv
, sizeof(FreelistEntry
)*nFree
, &rc
1021 for(j
=0; j
<nFree
; j
++){
1022 FreelistEntry
*p
= &pNew
->freelist
.aEntry
[j
];
1023 p
->iBlk
= aCkpt
[iIn
++];
1024 p
->iId
= ((i64
)(aCkpt
[iIn
])<<32) + aCkpt
[iIn
+1];
1027 pNew
->freelist
.nEntry
= pNew
->freelist
.nAlloc
= nFree
;
1034 lsmFreeSnapshot(pDb
->pEnv
, pNew
);
1043 ** Connection pDb must be the worker connection in order to call this
1044 ** function. It returns true if the database already contains the maximum
1045 ** number of levels or false otherwise.
1047 ** This is used when flushing the in-memory tree to disk. If the database
1048 ** is already full, then the caller should invoke lsm_work() or similar
1049 ** until it is not full before creating a new level by flushing the in-memory
1050 ** tree to disk. Limiting the number of levels in the database ensures that
1051 ** the records describing them always fit within the checkpoint blob.
1053 int lsmDatabaseFull(lsm_db
*pDb
){
1057 assert( lsmShmAssertLock(pDb
, LSM_LOCK_WORKER
, LSM_LOCK_EXCL
) );
1058 assert( pDb
->pWorker
);
1060 for(p
=pDb
->pWorker
->pLevel
; p
; p
=p
->pNext
){
1061 nRhs
+= (p
->nRight
? p
->nRight
: 1);
1064 return (nRhs
>= LSM_MAX_RHS_SEGMENTS
);
1068 ** The connection passed as the only argument is currently the worker
1069 ** connection. Some work has been performed on the database by the connection,
1070 ** but no new snapshot has been written into shared memory.
1072 ** This function updates the shared-memory worker and client snapshots with
1073 ** the new snapshot produced by the work performed by pDb.
1075 ** If successful, LSM_OK is returned. Otherwise, if an error occurs, an LSM
1076 ** error code is returned.
1078 int lsmCheckpointSaveWorker(lsm_db
*pDb
, int bFlush
){
1079 Snapshot
*pSnap
= pDb
->pWorker
;
1080 ShmHeader
*pShm
= pDb
->pShmhdr
;
1086 rc
= ckptExportSnapshot(pDb
, bFlush
, pSnap
->iId
, 1, &p
, &n
);
1087 if( rc
!=LSM_OK
) return rc
;
1088 assert( ckptChecksumOk((u32
*)p
) );
1090 assert( n
<=LSM_META_RW_PAGE_SIZE
);
1091 memcpy(pShm
->aSnap2
, p
, n
);
1093 memcpy(pShm
->aSnap1
, p
, n
);
1094 lsmFree(pDb
->pEnv
, p
);
1096 /* assert( lsmFsIntegrityCheck(pDb) ); */
1101 ** This function is used to determine the snapshot-id of the most recently
1102 ** checkpointed snapshot. Variable ShmHeader.iMetaPage indicates which of
1103 ** the two meta-pages said snapshot resides on (if any).
1105 ** If successful, this function loads the snapshot from the meta-page,
1106 ** verifies its checksum and sets *piId to the snapshot-id before returning
1107 ** LSM_OK. Or, if the checksum attempt fails, *piId is set to zero and
1108 ** LSM_OK returned. If an error occurs, an LSM error code is returned and
1109 ** the final value of *piId is undefined.
1111 int lsmCheckpointSynced(lsm_db
*pDb
, i64
*piId
, i64
*piLog
, u32
*pnWrite
){
1116 iMeta
= pDb
->pShmhdr
->iMetaPage
;
1117 if( iMeta
==1 || iMeta
==2 ){
1118 rc
= lsmFsMetaPageGet(pDb
->pFS
, 0, iMeta
, &pPg
);
1124 aData
= lsmFsMetaPageData(pPg
, &nData
);
1125 assert( nData
==LSM_META_RW_PAGE_SIZE
);
1126 nCkpt
= lsmGetU32(&aData
[CKPT_HDR_NCKPT
*sizeof(u32
)]);
1127 if( nCkpt
<(LSM_META_RW_PAGE_SIZE
/sizeof(u32
)) ){
1128 u32
*aCopy
= lsmMallocRc(pDb
->pEnv
, sizeof(u32
) * nCkpt
, &rc
);
1130 memcpy(aCopy
, aData
, nCkpt
*sizeof(u32
));
1131 ckptChangeEndianness(aCopy
, nCkpt
);
1132 if( ckptChecksumOk(aCopy
) ){
1133 if( piId
) *piId
= lsmCheckpointId(aCopy
, 0);
1134 if( piLog
) *piLog
= (lsmCheckpointLogOffset(aCopy
) >> 1);
1135 if( pnWrite
) *pnWrite
= aCopy
[CKPT_HDR_NWRITE
];
1137 lsmFree(pDb
->pEnv
, aCopy
);
1140 lsmFsMetaPageRelease(pPg
);
1144 if( (iMeta
!=1 && iMeta
!=2) || rc
!=LSM_OK
|| pDb
->pShmhdr
->iMetaPage
!=iMeta
){
1145 if( piId
) *piId
= 0;
1146 if( piLog
) *piLog
= 0;
1147 if( pnWrite
) *pnWrite
= 0;
1153 ** Return the checkpoint-id of the checkpoint array passed as the first
1154 ** argument to this function. If the second argument is true, then assume
1155 ** that the checkpoint is made up of 32-bit big-endian integers. If it
1156 ** is false, assume that the integers are in machine byte order.
1158 i64
lsmCheckpointId(u32
*aCkpt
, int bDisk
){
1161 u8
*aData
= (u8
*)aCkpt
;
1162 iId
= (((i64
)lsmGetU32(&aData
[CKPT_HDR_ID_MSW
*4])) << 32);
1163 iId
+= ((i64
)lsmGetU32(&aData
[CKPT_HDR_ID_LSW
*4]));
1165 iId
= ((i64
)aCkpt
[CKPT_HDR_ID_MSW
] << 32) + (i64
)aCkpt
[CKPT_HDR_ID_LSW
];
1170 u32
lsmCheckpointNBlock(u32
*aCkpt
){
1171 return aCkpt
[CKPT_HDR_NBLOCK
];
1174 u32
lsmCheckpointNWrite(u32
*aCkpt
, int bDisk
){
1176 return lsmGetU32((u8
*)&aCkpt
[CKPT_HDR_NWRITE
]);
1178 return aCkpt
[CKPT_HDR_NWRITE
];
1182 i64
lsmCheckpointLogOffset(u32
*aCkpt
){
1183 return ((i64
)aCkpt
[CKPT_HDR_LO_MSW
] << 32) + (i64
)aCkpt
[CKPT_HDR_LO_LSW
];
1186 int lsmCheckpointPgsz(u32
*aCkpt
){ return (int)aCkpt
[CKPT_HDR_PGSZ
]; }
1188 int lsmCheckpointBlksz(u32
*aCkpt
){ return (int)aCkpt
[CKPT_HDR_BLKSZ
]; }
1190 void lsmCheckpointLogoffset(
1194 pLog
->aRegion
[2].iStart
= (lsmCheckpointLogOffset(aCkpt
) >> 1);
1196 pLog
->cksum0
= aCkpt
[CKPT_HDR_LO_CKSUM1
];
1197 pLog
->cksum1
= aCkpt
[CKPT_HDR_LO_CKSUM2
];
1198 pLog
->iSnapshotId
= lsmCheckpointId(aCkpt
, 0);
1201 void lsmCheckpointZeroLogoffset(lsm_db
*pDb
){
1204 nCkpt
= pDb
->aSnapshot
[CKPT_HDR_NCKPT
];
1205 assert( nCkpt
>CKPT_HDR_NCKPT
);
1206 assert( nCkpt
==pDb
->pShmhdr
->aSnap1
[CKPT_HDR_NCKPT
] );
1207 assert( 0==memcmp(pDb
->aSnapshot
, pDb
->pShmhdr
->aSnap1
, nCkpt
*sizeof(u32
)) );
1208 assert( 0==memcmp(pDb
->aSnapshot
, pDb
->pShmhdr
->aSnap2
, nCkpt
*sizeof(u32
)) );
1210 pDb
->aSnapshot
[CKPT_HDR_LO_MSW
] = 0;
1211 pDb
->aSnapshot
[CKPT_HDR_LO_LSW
] = 0;
1212 ckptChecksum(pDb
->aSnapshot
, nCkpt
,
1213 &pDb
->aSnapshot
[nCkpt
-2], &pDb
->aSnapshot
[nCkpt
-1]
1216 memcpy(pDb
->pShmhdr
->aSnap1
, pDb
->aSnapshot
, nCkpt
*sizeof(u32
));
1217 memcpy(pDb
->pShmhdr
->aSnap2
, pDb
->aSnapshot
, nCkpt
*sizeof(u32
));
1221 ** Set the output variable to the number of KB of data written into the
1222 ** database file since the most recent checkpoint.
1224 int lsmCheckpointSize(lsm_db
*db
, int *pnKB
){
1228 /* Set nSynced to the number of pages that had been written when the
1229 ** database was last checkpointed. */
1230 rc
= lsmCheckpointSynced(db
, 0, 0, &nSynced
);
1233 u32 nPgsz
= db
->pShmhdr
->aSnap1
[CKPT_HDR_PGSZ
];
1234 u32 nWrite
= db
->pShmhdr
->aSnap1
[CKPT_HDR_NWRITE
];
1235 *pnKB
= (int)(( ((i64
)(nWrite
- nSynced
) * nPgsz
) + 1023) / 1024);