No-op GlassTable::readahead_key for overlong keys
[xapian.git] / xapian-core / backends / glass / glass_table.cc
blob7387fd05196d7ed16ae4e60419f714f76103f409
1 /** @file
2 * @brief Btree implementation
3 */
4 /* Copyright 1999,2000,2001 BrightStation PLC
5 * Copyright 2002 Ananova Ltd
6 * Copyright 2002,2003,2004,2005,2006,2007,2008,2009,2010,2011,2012,2013,2014,2015,2016 Olly Betts
7 * Copyright 2008 Lemur Consulting Ltd
9 * This program is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU General Public License as
11 * published by the Free Software Foundation; either version 2 of the
12 * License, or (at your option) any later version.
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with this program; if not, write to the Free Software
21 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301
22 * USA
25 #include <config.h>
27 #include "glass_table.h"
29 #include <xapian/error.h>
31 #include "omassert.h"
32 #include "posixy_wrapper.h"
33 #include "str.h"
34 #include "stringutils.h" // For STRINGIZE().
36 #include <sys/types.h>
38 #include <cerrno>
39 #include <cstring> /* for memmove */
40 #include <climits> /* for CHAR_BIT */
42 #include "glass_freelist.h"
43 #include "glass_changes.h"
44 #include "glass_cursor.h"
45 #include "glass_defs.h"
46 #include "glass_version.h"
48 #include "debuglog.h"
49 #include "filetests.h"
50 #include "io_utils.h"
51 #include "pack.h"
52 #include "wordaccess.h"
54 #include <algorithm> // for std::min()
55 #include <string>
57 #include "xapian/constants.h"
59 using namespace Glass;
60 using namespace std;
62 //#define BTREE_DEBUG_FULL 1
63 #undef BTREE_DEBUG_FULL
65 #ifdef BTREE_DEBUG_FULL
66 /*------debugging aids from here--------*/
68 static void print_key(const uint8_t * p, int c, int j);
69 static void print_tag(const uint8_t * p, int c, int j);
72 static void report_cursor(int N, Btree * B, Glass::Cursor * C)
74 int i;
75 printf("%d)\n", N);
76 for (i = 0; i <= B->level; i++)
77 printf("p=%d, c=%d, n=[%d], rewrite=%d\n",
78 C[i].p, C[i].c, C[i].n, C[i].rewrite);
82 /*------to here--------*/
83 #endif /* BTREE_DEBUG_FULL */
85 static inline uint8_t *zeroed_new(size_t size)
87 uint8_t *temp = new uint8_t[size];
88 memset(temp, 0, size);
89 return temp;
92 /* A B-tree consists of a file with extension GLASS_TABLE_EXTENSION divided
93 into a sequence of equal sized blocks, numbered 0, 1, 2 ... some of which
94 are free, some in use. Those in use are arranged in a tree. Lists of
95 currently unused blocks are stored in a freelist which is itself stored in
96 unused blocks.
98 Some "root info" is needed to locate a particular revision of the B-tree
99 - this is stored in the "version file" (we store this data for all tables
100 at a particular revision together).
102 Each block, b, has a structure like this:
104 R L M T D o1 o2 o3 ... oN <gap> [item] .. [item] .. [item] ...
105 <---------- D ----------> <-M->
107 And then,
109 R = REVISION(b) is the revision number the B-tree had when the block was
110 written into the DB file.
111 L = GET_LEVEL(b) is the level of the block, which is the number of levels
112 towards the root of the B-tree structure. So leaf blocks
113 have level 0 and the one root block has the highest level
114 equal to the number of levels in the B-tree. For blocks
115 storing the freelist, the level is set to 254.
116 M = MAX_FREE(b) is the size of the gap between the end of the directory and
117 the first item of data. (It is not necessarily the maximum
118 size among the bits of space that are free, but I can't
119 think of a better name.)
120 T = TOTAL_FREE(b)is the total amount of free space left in b.
121 D = DIR_END(b) gives the offset to the end of the directory.
123 o1, o2 ... oN are a directory of offsets to the N items held in the block.
124 The items are key-tag pairs, and as they occur in the directory are ordered
125 by the keys.
127 An item has this form:
129 I K key X tag
130 ←K→
131 <------I---->
133 A long tag presented through the API is split up into C pieces small enough
134 to be accommodated in the blocks of the B-tree. The key is extended to
135 include a counter, x, which runs from 1 to C. The key is preceded by a
136 length, K, and the whole item with a length, I, as depicted above. The
137 upper bits of I encode a flag indicating if this item is compressed, and a
138 flag saying if this is the last piece of a tag (i.e. if x == C).
140 Here are the corresponding definitions:
144 /** Flip to sequential addition block-splitting after this number of observed
145 * sequential additions (in negated form). */
146 #define SEQ_START_POINT (-10)
148 /* Note use of the limits.h values:
149 UCHAR_MAX = 255, an unsigned with all bits set, and
150 CHAR_BIT = 8, the number of bits per byte
152 BYTE_PAIR_RANGE below is the smallest +ve number that can't be held in two
153 bytes -- 64K effectively.
156 #define BYTE_PAIR_RANGE (1 << 2 * CHAR_BIT)
158 /// read_block(n, p) reads block n of the DB file to address p.
159 void
160 GlassTable::read_block(uint4 n, uint8_t * p) const
162 // Log the value of p, not the contents of the block it points to...
163 LOGCALL_VOID(DB, "GlassTable::read_block", n | (void*)p);
164 if (rare(handle == -2))
165 GlassTable::throw_database_closed();
166 AssertRel(n,<,free_list.get_first_unused_block());
168 io_read_block(handle, reinterpret_cast<char *>(p), block_size, n, offset);
170 if (GET_LEVEL(p) != LEVEL_FREELIST) {
171 int dir_end = DIR_END(p);
172 if (rare(dir_end < DIR_START || unsigned(dir_end) > block_size)) {
173 string msg("dir_end invalid in block ");
174 msg += str(n);
175 throw Xapian::DatabaseCorruptError(msg);
180 /** write_block(n, p, appending) writes block n in the DB file from address p.
182 * If appending is true (not specified it defaults to false), then this
183 * indicates that we've added data to a block in space which was previously
184 * unused, and are writing the block back in place - we use this to add
185 * free list entries (the information about where the freelist data for a
186 * revision begins and ends is stored in the "iamglass" file). We don't
187 * currently use this flag for much, but it signifies that we don't risk
188 * invalidating any existing revisions, which may be useful information.
190 void
191 GlassTable::write_block(uint4 n, const uint8_t * p, bool appending) const
193 LOGCALL_VOID(DB, "GlassTable::write_block", n | p | appending);
194 Assert(writable);
195 /* Check that n is in range. */
196 AssertRel(n,<,free_list.get_first_unused_block());
198 /* don't write to non-free */
199 // FIXME: We can no longer check this easily.
200 // AssertParanoid(free_list.block_free_at_start(block_size, n));
202 /* write revision is okay */
203 AssertEqParanoid(REVISION(p), revision_number + 1);
205 if (appending) {
206 // In the case of the freelist, new entries can safely be written into
207 // the space at the end of the latest freelist block, as that's unused
208 // by previous revisions. It is also safe to write into a block which
209 // wasn't used in the previous revision.
211 // It's only when we start a new freelist block that we need to worry
212 // about invalidating old revisions.
213 } else if (flags & Xapian::DB_DANGEROUS) {
214 // FIXME: We should somehow flag this to prevent readers opening the
215 // database. Removing "iamglass" or setting a flag in it doesn't deal
216 // with any existing readers though. Perhaps we want to have readers
217 // read lock and try to take an exclusive lock here?
220 const char * p_char = reinterpret_cast<const char *>(p);
221 io_write_block(handle, p_char, block_size, n, offset);
223 if (!changes_obj) return;
225 unsigned char v;
226 // FIXME: track table_type in this class?
227 if (strcmp(tablename, "position") == 0) {
228 v = int(Glass::POSITION);
229 } else if (strcmp(tablename, "postlist") == 0) {
230 v = int(Glass::POSTLIST);
231 } else if (strcmp(tablename, "docdata") == 0) {
232 v = int(Glass::DOCDATA);
233 } else if (strcmp(tablename, "spelling") == 0) {
234 v = int(Glass::SPELLING);
235 } else if (strcmp(tablename, "synonym") == 0) {
236 v = int(Glass::SYNONYM);
237 } else if (strcmp(tablename, "termlist") == 0) {
238 v = int(Glass::TERMLIST);
239 } else {
240 return; // FIXME
243 if (block_size == 2048) {
244 v |= 0 << 3;
245 } else if (block_size == 4096) {
246 v |= 1 << 3;
247 } else if (block_size == 8192) {
248 v |= 2 << 3;
249 } else if (block_size == 16384) {
250 v |= 3 << 3;
251 } else if (block_size == 32768) {
252 v |= 4 << 3;
253 } else if (block_size == 65536) {
254 v |= 5 << 3;
255 } else {
256 return; // FIXME
259 string buf;
260 buf += char(v);
261 // Write the block number to the file
262 pack_uint(buf, n);
264 changes_obj->write_block(buf);
265 changes_obj->write_block(reinterpret_cast<const char *>(p), block_size);
268 /* A note on cursors:
270 Each B-tree level has a corresponding array element C[j] in a
271 cursor, C. C[0] is the leaf (or data) level, and C[B->level] is the
272 root block level. Within a level j,
274 C[j].p addresses the block
275 C[j].c is the offset into the directory entry in the block
276 C[j].n is the number of the block at C[j].p
278 A look up in the B-tree causes navigation of the blocks starting
279 from the root. In each block, p, we find an offset, c, to an item
280 which gives the number, n, of the block for the next level. This
281 leads to an array of values p,c,n which are held inside the cursor.
283 Any Btree object B has a built-in cursor, at B->C. But other cursors may
284 be created. If BC is a created cursor, BC->C is the cursor in the
285 sense given above, and BC->B is the handle for the B-tree again.
288 void
289 GlassTable::set_overwritten() const
291 LOGCALL_VOID(DB, "GlassTable::set_overwritten", NO_ARGS);
292 // If we're writable, there shouldn't be another writer who could cause
293 // overwritten to be flagged, so that's a DatabaseCorruptError.
294 if (writable)
295 throw Xapian::DatabaseCorruptError("Block overwritten - run xapian-check on this database");
296 throw Xapian::DatabaseModifiedError("The revision being read has been discarded - you should call Xapian::Database::reopen() and retry the operation");
299 /* block_to_cursor(C, j, n) puts block n into position C[j] of cursor
300 C, writing the block currently at C[j] back to disk if necessary.
301 Note that
303 C[j].rewrite
305 is true iff C[j].n is different from block n in file DB. If it is
306 false no rewriting is necessary.
309 void
310 GlassTable::block_to_cursor(Glass::Cursor * C_, int j, uint4 n) const
312 LOGCALL_VOID(DB, "GlassTable::block_to_cursor", (void*)C_ | j | n);
313 if (n == C_[j].get_n()) return;
315 if (writable && C_[j].rewrite) {
316 Assert(C == C_);
317 write_block(C_[j].get_n(), C_[j].get_p());
318 C_[j].rewrite = false;
321 // Check if the block is in the built-in cursor (potentially in
322 // modified form).
323 const uint8_t * p;
324 if (n == C[j].get_n()) {
325 p = C_[j].clone(C[j]);
326 } else {
327 uint8_t * q = C_[j].init(block_size);
328 read_block(n, q);
329 p = q;
330 C_[j].set_n(n);
333 if (j < level) {
334 /* unsigned comparison */
335 if (rare(REVISION(p) > REVISION(C_[j + 1].get_p()))) {
336 set_overwritten();
337 return;
341 if (rare(j != GET_LEVEL(p))) {
342 string msg = "Expected block ";
343 msg += str(n);
344 msg += " to be level ";
345 msg += str(j);
346 msg += ", not ";
347 msg += str(GET_LEVEL(p));
348 throw Xapian::DatabaseCorruptError(msg);
352 /** Btree::alter(); is called when the B-tree is to be altered.
354 It causes new blocks to be forced for the current set of blocks in
355 the cursor.
357 The point is that if a block at level 0 is to be altered it may get
358 a new number. Then the pointer to this block from level 1 will need
359 changing. So the block at level 1 needs altering and may get a new
360 block number. Then the pointer to this block from level 2 will need
361 changing ... and so on back to the root.
363 The clever bit here is spotting the cases when we can make an early
364 exit from this process. If C[j].rewrite is true, C[j+k].rewrite
365 will be true for k = 1,2 ... We have been through all this before,
366 and there is no need to do it again. If C[j].n was free at the
367 start of the transaction, we can copy it back to the same place
368 without violating the integrity of the B-tree. We don't then need a
369 new n and can return. The corresponding C[j].rewrite may be true or
370 false in that case.
373 void
374 GlassTable::alter()
376 LOGCALL_VOID(DB, "GlassTable::alter", NO_ARGS);
377 Assert(writable);
378 if (flags & Xapian::DB_DANGEROUS) {
379 C[0].rewrite = true;
380 return;
382 int j = 0;
383 while (true) {
384 if (C[j].rewrite) return; /* all new, so return */
385 C[j].rewrite = true;
387 glass_revision_number_t rev = REVISION(C[j].get_p());
388 if (rev == revision_number + 1) {
389 return;
391 Assert(rev < revision_number + 1);
392 uint4 n = C[j].get_n();
393 free_list.mark_block_unused(this, block_size, n);
394 SET_REVISION(C[j].get_modifiable_p(block_size), revision_number + 1);
395 n = free_list.get_block(this, block_size);
396 C[j].set_n(n);
398 if (j == level) return;
399 j++;
400 BItem_wr(C[j].get_modifiable_p(block_size), C[j].c).set_block_given_by(n);
404 /** find_in_leaf(p, key, c, exact) searches for the key in the leaf block at p.
406 What we get is the directory entry to the last key <= the key being searched
407 for.
409 The lookup is by binary chop, with i and j set to the left and
410 right ends of the search area. In sequential addition, c will often
411 be the answer, so we test the keys round c and move i and j towards
412 c if possible.
414 exact is set to true if the match was exact (otherwise exact is unchanged).
418 GlassTable::find_in_leaf(const uint8_t * p, LeafItem item, int c, bool& exact)
420 LOGCALL_STATIC(DB, int, "GlassTable::find_in_leaf", (const void*)p | (const void *)item.get_address() | c | Literal("bool&"));
421 // c should be odd (either -1, or an even offset from DIR_START).
422 Assert((c & 1) == 1);
423 int i = DIR_START;
424 i -= D2;
425 if (c != -1) {
426 AssertRel(i,<=,c);
428 int j = DIR_END(p);
430 if (c != -1) {
431 if (c < j && i < c) {
432 int r = compare(LeafItem(p, c), item);
433 if (r == 0) {
434 exact = true;
435 return c;
437 if (r < 0) i = c;
439 c += D2;
440 if (c < j && i < c) {
441 int r = compare(item, LeafItem(p, c));
442 if (r == 0) {
443 exact = true;
444 return c;
446 if (r < 0) j = c;
450 while (j - i > D2) {
451 int k = i + ((j - i) / (D2 * 2)) * D2; /* mid way */
452 int r = compare(item, LeafItem(p, k));
453 if (r < 0) {
454 j = k;
455 } else {
456 i = k;
457 if (r == 0) {
458 exact = true;
459 break;
463 AssertRel(DIR_START - D2,<=,i);
464 AssertRel(i,<,DIR_END(p));
465 RETURN(i);
468 template<typename ITEM> int
469 find_in_branch_(const uint8_t * p, ITEM item, int c)
471 // c should be odd (either -1, or an even offset from DIR_START).
472 Assert((c & 1) == 1);
473 int i = DIR_START;
474 if (c != -1) {
475 AssertRel(i,<=,c);
477 int j = DIR_END(p);
479 if (c != -1) {
480 if (c < j && i < c) {
481 int r = compare(BItem(p, c), item);
482 if (r == 0) return c;
483 if (r < 0) i = c;
485 c += D2;
486 if (c < j && i < c) {
487 int r = compare(item, BItem(p, c));
488 if (r == 0) return c;
489 if (r < 0) j = c;
493 while (j - i > D2) {
494 int k = i + ((j - i) / (D2 * 2)) * D2; /* mid way */
495 int r = compare(item, BItem(p, k));
496 if (r < 0) {
497 j = k;
498 } else {
499 i = k;
500 if (r == 0) break;
503 AssertRel(DIR_START,<=,i);
504 AssertRel(i,<,DIR_END(p));
505 return i;
509 GlassTable::find_in_branch(const uint8_t * p, LeafItem item, int c)
511 LOGCALL_STATIC(DB, int, "GlassTable::find_in_branch", (const void*)p | (const void *)item.get_address() | c);
512 RETURN(find_in_branch_(p, item, c));
516 GlassTable::find_in_branch(const uint8_t * p, BItem item, int c)
518 LOGCALL_STATIC(DB, int, "GlassTable::find_in_branch", (const void*)p | (const void *)item.get_address() | c);
519 RETURN(find_in_branch_(p, item, c));
522 /** find(C_) searches for the key of B->kt in the B-tree.
524 Result is true if found, false otherwise. When false, the B_tree
525 cursor is positioned at the last key in the B-tree <= the search
526 key. Goes to first (null) item in B-tree when key length == 0.
529 bool
530 GlassTable::find(Glass::Cursor * C_) const
532 LOGCALL(DB, bool, "GlassTable::find", (void*)C_);
533 // Note: the parameter is needed when we're called by GlassCursor
534 const uint8_t * p;
535 int c;
536 for (int j = level; j > 0; --j) {
537 p = C_[j].get_p();
538 c = find_in_branch(p, kt, C_[j].c);
539 #ifdef BTREE_DEBUG_FULL
540 printf("Block in GlassTable:find - code position 1");
541 report_block_full(j, C_[j].get_n(), p);
542 #endif /* BTREE_DEBUG_FULL */
543 C_[j].c = c;
544 block_to_cursor(C_, j - 1, BItem(p, c).block_given_by());
546 p = C_[0].get_p();
547 bool exact = false;
548 c = find_in_leaf(p, kt, C_[0].c, exact);
549 #ifdef BTREE_DEBUG_FULL
550 printf("Block in GlassTable:find - code position 2");
551 report_block_full(0, C_[0].get_n(), p);
552 #endif /* BTREE_DEBUG_FULL */
553 C_[0].c = c;
554 RETURN(exact);
557 /** compact(p) compact the block at p by shuffling all the items up to the end.
559 MAX_FREE(p) is then maximized, and is equal to TOTAL_FREE(p).
562 void
563 GlassTable::compact(uint8_t * p)
565 LOGCALL_VOID(DB, "GlassTable::compact", (void*)p);
566 Assert(p != buffer);
567 Assert(writable);
568 int e = block_size;
569 uint8_t * b = buffer;
570 int dir_end = DIR_END(p);
571 if (GET_LEVEL(p) == 0) {
572 // Leaf.
573 for (int c = DIR_START; c < dir_end; c += D2) {
574 LeafItem item(p, c);
575 int l = item.size();
576 e -= l;
577 memcpy(b + e, item.get_address(), l);
578 LeafItem_wr::setD(p, c, e); /* reform in b */
580 } else {
581 // Branch.
582 for (int c = DIR_START; c < dir_end; c += D2) {
583 BItem item(p, c);
584 int l = item.size();
585 e -= l;
586 memcpy(b + e, item.get_address(), l);
587 BItem_wr::setD(p, c, e); /* reform in b */
590 memcpy(p + e, b + e, block_size - e); /* copy back */
591 e -= dir_end;
592 SET_TOTAL_FREE(p, e);
593 SET_MAX_FREE(p, e);
596 /** Btree needs to gain a new level to insert more items: so split root block
597 * and construct a new one.
599 void
600 GlassTable::split_root(uint4 split_n)
602 LOGCALL_VOID(DB, "GlassTable::split_root", split_n);
603 /* gain a level */
604 ++level;
606 /* check level overflow - this isn't something that should ever happen
607 * but deserves more than an Assert()... */
608 if (level == BTREE_CURSOR_LEVELS) {
609 throw Xapian::DatabaseCorruptError("Btree has grown impossibly large (" STRINGIZE(BTREE_CURSOR_LEVELS) " levels)");
612 uint8_t * q = C[level].init(block_size);
613 memset(q, 0, block_size);
614 C[level].c = DIR_START;
615 C[level].set_n(free_list.get_block(this, block_size));
616 C[level].rewrite = true;
617 SET_REVISION(q, revision_number + 1);
618 SET_LEVEL(q, level);
619 SET_DIR_END(q, DIR_START);
620 compact(q); /* to reset TOTAL_FREE, MAX_FREE */
622 /* form a null key in b with a pointer to the old root */
623 uint8_t b[10]; /* 7 is exact */
624 BItem_wr item(b);
625 item.form_null_key(split_n);
626 add_branch_item(item, level);
629 /** enter_key_above_leaf(previtem, newitem) is called after a leaf block split.
631 It enters in the block at level C[1] a separating key for the block
632 at level C[0]. The key itself is newitem.key(). previtem is the
633 preceding item, and at level 1 newitem.key() can be trimmed down to the
634 first point of difference to previtem.key() for entry in C[j].
636 This code looks longer than it really is. If j exceeds the number
637 of B-tree levels the root block has split and we have to construct
638 a new one, but this is a rare event.
640 The key is constructed in b, with block number C[0].n as tag,
641 and this is added in with add_item. add_item may itself cause a
642 block split, with a further call to enter_key. Hence the recursion.
644 void
645 GlassTable::enter_key_above_leaf(LeafItem previtem, LeafItem newitem)
647 LOGCALL_VOID(DB, "GlassTable::enter_key_above_leaf", Literal("previtem") | Literal("newitem"));
648 Assert(writable);
649 Assert(compare(previtem, newitem) < 0);
651 Key prevkey = previtem.key();
652 Key newkey = newitem.key();
653 int new_comp = newitem.component_of();
655 uint4 blocknumber = C[0].get_n();
657 // FIXME update to use Key
658 // Keys are truncated here: but don't truncate the count at the end away.
659 const int newkey_len = newkey.length();
660 AssertRel(newkey_len,>,0);
662 // Truncate the key to the minimal key which differs from prevkey,
663 // the preceding key in the block.
664 int i = 0;
665 const int min_len = min(newkey_len, prevkey.length());
666 while (i < min_len && prevkey[i] == newkey[i]) {
667 i++;
670 // Want one byte of difference.
671 if (i < newkey_len) i++;
673 // Enough space for a branch item with maximum length key.
674 uint8_t b[BYTES_PER_BLOCK_NUMBER + K1 + 255 + X2];
675 BItem_wr item(b);
676 AssertRel(i, <=, 255);
677 item.set_truncated_key_and_block(newkey, new_comp, i, blocknumber);
679 // The split block gets inserted into the parent after the pointer to the
680 // current child.
681 AssertEq(C[1].c, find_in_branch(C[1].get_p(), item, C[1].c));
682 C[1].c += D2;
683 C[1].rewrite = true; /* a subtle point: this *is* required. */
684 add_branch_item(item, 1);
687 /** enter_key_above_branch(j, newkey) is called after a branch block split.
689 It enters in the block at level C[j] a separating key for the block
690 at level C[j - 1]. The key itself is newkey.
692 This code looks longer than it really is. If j exceeds the number
693 of B-tree levels the root block has split and we have to construct
694 a new one, but this is a rare event.
696 The key is constructed in b, with block number C[j - 1].n as tag,
697 and this is added in with add_item. add_item may itself cause a
698 block split, with a further call to enter_key. Hence the recursion.
700 void
701 GlassTable::enter_key_above_branch(int j, BItem newitem)
703 LOGCALL_VOID(DB, "GlassTable::enter_key_above_branch", j | Literal("newitem"));
704 Assert(writable);
705 AssertRel(j,>,1);
707 /* Can't truncate between branch levels, since the separated keys
708 * are in at the leaf level, and truncating again will change the
709 * branch point.
712 uint4 blocknumber = C[j - 1].get_n();
714 // Enough space for a branch item with maximum length key.
715 uint8_t b[BYTES_PER_BLOCK_NUMBER + K1 + 255 + X2];
716 BItem_wr item(b);
717 item.set_key_and_block(newitem.key(), blocknumber);
719 // The split block gets inserted into the parent after the pointer to the
720 // current child.
721 AssertEq(C[j].c, find_in_branch(C[j].get_p(), item, C[j].c));
722 C[j].c += D2;
723 C[j].rewrite = true; /* a subtle point: this *is* required. */
724 add_branch_item(item, j);
727 /** mid_point(p) finds the directory entry in c that determines the
728 approximate mid point of the data in the block at p.
732 GlassTable::mid_point(uint8_t * p) const
734 LOGCALL(DB, int, "GlassTable::mid_point", (void*)p);
735 int n = 0;
736 int dir_end = DIR_END(p);
737 int size = block_size - TOTAL_FREE(p) - dir_end;
738 for (int c = DIR_START; c < dir_end; c += D2) {
739 int l;
740 if (GET_LEVEL(p) == 0) {
741 l = LeafItem(p, c).size();
742 } else {
743 l = BItem(p, c).size();
745 n += 2 * l;
746 if (n >= size) {
747 if (l < n - size) RETURN(c);
748 RETURN(c + D2);
752 /* This shouldn't happen, as the sum of the item sizes should be the same
753 * as the value calculated in size, so assert but return a sane value just
754 * in case. */
755 Assert(false);
756 RETURN(dir_end);
759 /** add_item_to_leaf(p, kt_, c) adds item kt_ to the leaf block at p.
761 c is the offset in the directory that needs to be expanded to accommodate
762 the new entry for the item. We know before this is called that there is
763 enough contiguous room for the item in the block, so it's just a matter of
764 shuffling up any directory entries after where we're inserting and copying
765 in the item.
768 void
769 GlassTable::add_item_to_leaf(uint8_t * p, LeafItem kt_, int c)
771 LOGCALL_VOID(DB, "GlassTable::add_item_to_leaf", (void*)p | Literal("kt_") | c);
772 Assert(writable);
773 int dir_end = DIR_END(p);
774 int kt_len = kt_.size();
775 int needed = kt_len + D2;
776 int new_total = TOTAL_FREE(p) - needed;
777 int new_max = MAX_FREE(p) - needed;
779 Assert(new_total >= 0);
781 AssertRel(MAX_FREE(p),>=,needed);
783 AssertRel(DIR_START,<=,c);
784 AssertRel(c,<=,dir_end);
786 memmove(p + c + D2, p + c, dir_end - c);
787 dir_end += D2;
788 SET_DIR_END(p, dir_end);
790 int o = dir_end + new_max;
791 LeafItem_wr::setD(p, c, o);
792 memmove(p + o, kt_.get_address(), kt_len);
794 SET_MAX_FREE(p, new_max);
796 SET_TOTAL_FREE(p, new_total);
799 /** add_item_to_branch(p, kt_, c) adds item kt_ to the branch block at p.
801 c is the offset in the directory that needs to be expanded to accommodate
802 the new entry for the item. We know before this is called that there is
803 enough contiguous room for the item in the block, so it's just a matter of
804 shuffling up any directory entries after where we're inserting and copying
805 in the item.
808 void
809 GlassTable::add_item_to_branch(uint8_t * p, BItem kt_, int c)
811 LOGCALL_VOID(DB, "GlassTable::add_item_to_branch", (void*)p | Literal("kt_") | c);
812 Assert(writable);
813 int dir_end = DIR_END(p);
814 int kt_len = kt_.size();
815 int needed = kt_len + D2;
816 int new_total = TOTAL_FREE(p) - needed;
817 int new_max = MAX_FREE(p) - needed;
819 Assert(new_total >= 0);
821 AssertRel(MAX_FREE(p),>=,needed);
823 AssertRel(DIR_START,<=,c);
824 AssertRel(c,<=,dir_end);
826 memmove(p + c + D2, p + c, dir_end - c);
827 dir_end += D2;
828 SET_DIR_END(p, dir_end);
830 int o = dir_end + new_max;
831 BItem_wr::setD(p, c, o);
832 memmove(p + o, kt_.get_address(), kt_len);
834 SET_MAX_FREE(p, new_max);
836 SET_TOTAL_FREE(p, new_total);
839 /** GlassTable::add_leaf_item(kt_) adds item kt_ to the leaf block.
841 * If there is not enough room the block splits and the item is then
842 * added to the appropriate half.
844 void
845 GlassTable::add_leaf_item(LeafItem kt_)
847 LOGCALL_VOID(DB, "GlassTable::add_leaf_item", Literal("kt_"));
848 Assert(writable);
849 uint8_t * p = C[0].get_modifiable_p(block_size);
850 int c = C[0].c;
851 uint4 n;
853 int needed = kt_.size() + D2;
854 if (TOTAL_FREE(p) < needed) {
855 int m;
856 // Prepare to split p. After splitting, the block is in two halves, the
857 // lower half is split_p, the upper half p again. add_to_upper_half
858 // becomes true when the item gets added to p, false when it gets added
859 // to split_p.
861 if (seq_count < 0) {
862 // If we're not in sequential mode, we split at the mid point
863 // of the node.
864 m = mid_point(p);
865 } else {
866 // During sequential addition, split at the insert point
867 AssertRel(c,>=,DIR_START);
868 m = c;
871 uint4 split_n = C[0].get_n();
872 C[0].set_n(free_list.get_block(this, block_size));
874 memcpy(split_p, p, block_size); // replicate the whole block in split_p
875 SET_DIR_END(split_p, m);
876 compact(split_p); /* to reset TOTAL_FREE, MAX_FREE */
879 int residue = DIR_END(p) - m;
880 int new_dir_end = DIR_START + residue;
881 memmove(p + DIR_START, p + m, residue);
882 SET_DIR_END(p, new_dir_end);
885 compact(p); /* to reset TOTAL_FREE, MAX_FREE */
887 bool add_to_upper_half;
888 if (seq_count < 0) {
889 add_to_upper_half = (c >= m);
890 } else {
891 // And add item to lower half if split_p has room, otherwise upper
892 // half
893 add_to_upper_half = (TOTAL_FREE(split_p) < needed);
896 if (add_to_upper_half) {
897 c -= (m - DIR_START);
898 Assert(seq_count < 0 || c <= DIR_START + D2);
899 Assert(c >= DIR_START);
900 Assert(c <= DIR_END(p));
901 add_item_to_leaf(p, kt_, c);
902 n = C[0].get_n();
903 } else {
904 Assert(c >= DIR_START);
905 Assert(c <= DIR_END(split_p));
906 add_item_to_leaf(split_p, kt_, c);
907 n = split_n;
909 write_block(split_n, split_p);
911 // Check if we're splitting the root block.
912 if (0 == level) split_root(split_n);
914 /* Enter a separating key at level 1 between */
915 /* the last key of block split_p, and the first key of block p */
916 enter_key_above_leaf(LeafItem(split_p, DIR_END(split_p) - D2),
917 LeafItem(p, DIR_START));
918 } else {
919 AssertRel(TOTAL_FREE(p),>=,needed);
921 if (MAX_FREE(p) < needed) {
922 compact(p);
923 AssertRel(MAX_FREE(p),>=,needed);
926 add_item_to_leaf(p, kt_, c);
927 n = C[0].get_n();
930 changed_n = n;
931 changed_c = c;
934 /** GlassTable::add_item(kt_, j) adds item kt_ to the block at cursor level C[j].
936 * If there is not enough room the block splits and the item is then
937 * added to the appropriate half.
939 void
940 GlassTable::add_branch_item(BItem kt_, int j)
942 LOGCALL_VOID(DB, "GlassTable::add_branch_item", Literal("kt_") | j);
943 Assert(writable);
944 uint8_t * p = C[j].get_modifiable_p(block_size);
945 int c = C[j].c;
947 int needed = kt_.size() + D2;
948 if (TOTAL_FREE(p) < needed) {
949 int m;
950 // Prepare to split p. After splitting, the block is in two halves, the
951 // lower half is split_p, the upper half p again. add_to_upper_half
952 // becomes true when the item gets added to p, false when it gets added
953 // to split_p.
955 if (seq_count < 0) {
956 // If we're not in sequential mode, we split at the mid point
957 // of the node.
958 m = mid_point(p);
959 } else {
960 // During sequential addition, split at the insert point
961 AssertRel(c,>=,DIR_START);
962 m = c;
965 uint4 split_n = C[j].get_n();
966 C[j].set_n(free_list.get_block(this, block_size));
968 memcpy(split_p, p, block_size); // replicate the whole block in split_p
969 SET_DIR_END(split_p, m);
970 compact(split_p); /* to reset TOTAL_FREE, MAX_FREE */
973 int residue = DIR_END(p) - m;
974 int new_dir_end = DIR_START + residue;
975 memmove(p + DIR_START, p + m, residue);
976 SET_DIR_END(p, new_dir_end);
979 compact(p); /* to reset TOTAL_FREE, MAX_FREE */
981 bool add_to_upper_half;
982 if (seq_count < 0) {
983 add_to_upper_half = (c >= m);
984 } else {
985 // And add item to lower half if split_p has room, otherwise upper
986 // half
987 add_to_upper_half = (TOTAL_FREE(split_p) < needed);
990 if (add_to_upper_half) {
991 c -= (m - DIR_START);
992 Assert(seq_count < 0 || c <= DIR_START + D2);
993 Assert(c >= DIR_START);
994 Assert(c <= DIR_END(p));
995 add_item_to_branch(p, kt_, c);
996 } else {
997 Assert(c >= DIR_START);
998 Assert(c <= DIR_END(split_p));
999 add_item_to_branch(split_p, kt_, c);
1001 write_block(split_n, split_p);
1003 // Check if we're splitting the root block.
1004 if (j == level) split_root(split_n);
1006 /* Enter a separating key at level j + 1 between */
1007 /* the last key of block split_p, and the first key of block p */
1008 enter_key_above_branch(j + 1, BItem(p, DIR_START));
1010 // In branch levels, we can make the first key of block p null and
1011 // save a bit of disk space. Other redundant keys will still creep
1012 // in though.
1013 BItem_wr item(p, DIR_START);
1014 int new_total_free = TOTAL_FREE(p) + item.key().length();
1015 item.form_null_key(item.block_given_by());
1016 SET_TOTAL_FREE(p, new_total_free);
1017 } else {
1018 AssertRel(TOTAL_FREE(p),>=,needed);
1020 if (MAX_FREE(p) < needed) {
1021 compact(p);
1022 AssertRel(MAX_FREE(p),>=,needed);
1025 add_item_to_branch(p, kt_, c);
1029 /** GlassTable::delete_leaf_item(repeatedly) is (almost) the converse of add_leaf_item.
1031 * If repeatedly is true, the process repeats at the next level when a
1032 * block has been completely emptied, freeing the block and taking out
1033 * the pointer to it. Emptied root blocks are also removed, which
1034 * reduces the number of levels in the B-tree.
1036 void
1037 GlassTable::delete_leaf_item(bool repeatedly)
1039 LOGCALL_VOID(DB, "GlassTable::delete_leaf_item", repeatedly);
1040 Assert(writable);
1041 uint8_t * p = C[0].get_modifiable_p(block_size);
1042 int c = C[0].c;
1043 AssertRel(DIR_START,<=,c);
1044 AssertRel(c,<,DIR_END(p));
1045 int kt_len = LeafItem(p, c).size(); /* size of the item to be deleted */
1046 int dir_end = DIR_END(p) - D2; /* directory length will go down by 2 bytes */
1048 memmove(p + c, p + c + D2, dir_end - c);
1049 SET_DIR_END(p, dir_end);
1050 SET_MAX_FREE(p, MAX_FREE(p) + D2);
1051 SET_TOTAL_FREE(p, TOTAL_FREE(p) + kt_len + D2);
1053 if (!repeatedly) return;
1054 if (0 < level) {
1055 if (dir_end == DIR_START) {
1056 free_list.mark_block_unused(this, block_size, C[0].get_n());
1057 C[0].rewrite = false;
1058 C[0].set_n(BLK_UNUSED);
1059 C[1].rewrite = true; /* *is* necessary */
1060 delete_branch_item(1);
1065 /** GlassTable::delete_branch_item(j, repeatedly) is (almost) the converse of add_branch_item.
1067 * The process repeats at the next level when a block has been completely
1068 * emptied, freeing the block and taking out the pointer to it. Emptied root
1069 * blocks are also removed, which reduces the number of levels in the B-tree.
1071 void
1072 GlassTable::delete_branch_item(int j)
1074 LOGCALL_VOID(DB, "GlassTable::delete_branch_item", j);
1075 Assert(writable);
1076 uint8_t * p = C[j].get_modifiable_p(block_size);
1077 int c = C[j].c;
1078 AssertRel(DIR_START,<=,c);
1079 AssertRel(c,<,DIR_END(p));
1080 int kt_len = BItem(p, c).size(); /* size of the item to be deleted */
1081 int dir_end = DIR_END(p) - D2; /* directory length will go down by 2 bytes */
1083 memmove(p + c, p + c + D2, dir_end - c);
1084 SET_DIR_END(p, dir_end);
1085 SET_MAX_FREE(p, MAX_FREE(p) + D2);
1086 SET_TOTAL_FREE(p, TOTAL_FREE(p) + kt_len + D2);
1088 if (j < level) {
1089 if (dir_end == DIR_START) {
1090 free_list.mark_block_unused(this, block_size, C[j].get_n());
1091 C[j].rewrite = false;
1092 C[j].set_n(BLK_UNUSED);
1093 C[j + 1].rewrite = true; /* *is* necessary */
1094 delete_branch_item(j + 1);
1096 } else {
1097 Assert(j == level);
1098 while (dir_end == DIR_START + D2 && level > 0) {
1099 /* single item in the root block, so lose a level */
1100 uint4 new_root = BItem(C[level].get_p(), DIR_START).block_given_by();
1101 free_list.mark_block_unused(this, block_size, C[level].get_n());
1102 C[level].destroy();
1103 level--;
1105 block_to_cursor(C, level, new_root);
1107 dir_end = DIR_END(C[level].get_p()); /* prepare for the loop */
1112 /* debugging aid:
1113 static addcount = 0;
1116 /** add_kt(found) adds the item (key-tag pair) at B->kt into the
1117 B-tree, using cursor C.
1119 found == find() is handed over as a parameter from Btree::add.
1120 Btree::alter() prepares for the alteration to the B-tree. Then
1121 there are a number of cases to consider:
1123 If an item with the same key is in the B-tree (found is true),
1124 the new kt replaces it.
1126 If then kt is smaller, or the same size as, the item it replaces,
1127 kt is put in the same place as the item it replaces, and the
1128 TOTAL_FREE measure is reduced.
1130 If kt is larger than the item it replaces it is put in the
1131 MAX_FREE space if there is room, and the directory entry and
1132 space counts are adjusted accordingly.
1134 - But if there is not room we do it the long way: the old item is
1135 deleted with delete_leaf_item and kt is added in with add_item.
1137 If the key of kt is not in the B-tree (found is false), the new
1138 kt is added in with add_item.
1140 Returns:
1141 0 : added kt
1142 1 : replaced kt
1143 2 : replaced kt and it was the final one
1147 GlassTable::add_kt(bool found)
1149 LOGCALL(DB, int, "GlassTable::add_kt", found);
1150 Assert(writable);
1154 printf("%d) %s ", addcount++, (found ? "replacing" : "adding"));
1155 print_bytes(kt[I2], kt + I2 + K1); putchar('\n');
1158 alter();
1160 int result = 0;
1161 if (found) { /* replacement */
1162 seq_count = SEQ_START_POINT;
1163 sequential = false;
1165 uint8_t * p = C[0].get_modifiable_p(block_size);
1166 int c = C[0].c;
1167 AssertRel(DIR_START,<=,c);
1168 AssertRel(c,<,DIR_END(p));
1169 LeafItem item(p, c);
1170 int kt_size = kt.size();
1171 int needed = kt_size - item.size();
1173 result = item.last_component() ? 2 : 1;
1175 if (needed <= 0) {
1176 /* simple replacement */
1177 memmove(const_cast<uint8_t *>(item.get_address()),
1178 kt.get_address(), kt_size);
1179 SET_TOTAL_FREE(p, TOTAL_FREE(p) - needed);
1180 } else {
1181 /* new item into the block's freespace */
1182 int new_max = MAX_FREE(p) - kt_size;
1183 if (new_max >= 0) {
1184 int o = DIR_END(p) + new_max;
1185 memmove(p + o, kt.get_address(), kt_size);
1186 LeafItem_wr::setD(p, c, o);
1187 SET_MAX_FREE(p, new_max);
1188 SET_TOTAL_FREE(p, TOTAL_FREE(p) - needed);
1189 } else {
1190 /* do it the long way */
1191 delete_leaf_item(false);
1192 add_leaf_item(kt);
1195 } else {
1196 /* addition */
1197 if (changed_n == C[0].get_n() && changed_c == C[0].c) {
1198 if (seq_count < 0) seq_count++;
1199 } else {
1200 seq_count = SEQ_START_POINT;
1201 sequential = false;
1203 C[0].c += D2;
1204 add_leaf_item(kt);
1206 RETURN(result);
1209 /* delete_kt() corresponds to add_kt(found), but there are only
1210 two cases: if the key is not found nothing is done, and if it is
1211 found the corresponding item is deleted with delete_leaf_item.
1213 Returns:
1214 0 : nothing to delete
1215 1 : deleted kt
1216 2 : deleted kt and it was the final one
1220 GlassTable::delete_kt()
1222 LOGCALL(DB, int, "GlassTable::delete_kt", NO_ARGS);
1223 Assert(writable);
1225 seq_count = SEQ_START_POINT;
1226 sequential = false;
1228 if (!find(C))
1229 return 0;
1231 int result = LeafItem(C[0].get_p(), C[0].c).last_component() ? 2 : 1;
1232 alter();
1233 delete_leaf_item(true);
1235 RETURN(result);
1238 /* GlassTable::form_key(key) treats address kt as an item holder and fills in
1239 the key part:
1241 (I) K key c (C tag)
1243 The bracketed parts are left blank. The key is filled in with key_len bytes and
1244 K set accordingly. c is set to 1.
1247 void GlassTable::form_key(const string & key) const
1249 LOGCALL_VOID(DB, "GlassTable::form_key", key);
1250 kt.form_key(key);
1253 /* GlassTable::add(key, tag) adds the key/tag item to the
1254 B-tree, replacing any existing item with the same key.
1256 For a long tag, we end up having to add m components, of the form
1258 key 1 m tag1
1259 key 2 m tag2
1261 key m m tagm
1263 and tag1+tag2+...+tagm are equal to tag. These in their turn may be replacing
1264 n components of the form
1266 key 1 n TAG1
1267 key 2 n TAG2
1269 key n n TAGn
1271 and n may be greater than, equal to, or less than m. These cases are dealt
1272 with in the code below. If m < n for example, we end up with a series of
1273 deletions.
1276 void
1277 GlassTable::add(const string &key, string tag, bool already_compressed)
1279 LOGCALL_VOID(DB, "GlassTable::add", key | tag | already_compressed);
1280 Assert(writable);
1282 if (handle < 0) {
1283 if (handle == -2) {
1284 GlassTable::throw_database_closed();
1286 RootInfo root_info;
1287 root_info.init(block_size, compress_min);
1288 do_open_to_write(&root_info);
1291 form_key(key);
1293 const char* tag_data = tag.data();
1294 size_t tag_size = tag.size();
1296 bool compressed = false;
1297 if (already_compressed) {
1298 compressed = true;
1299 } else if (compress_min > 0 && tag_size > compress_min) {
1300 const char * res = comp_stream.compress(tag_data, &tag_size);
1301 if (res) {
1302 compressed = true;
1303 tag_data = res;
1307 // sort of matching kt.append_chunk(), but setting the chunk
1308 const size_t cd = kt.key().length() + K1 + I2 + X2; // offset to the tag data
1309 const size_t L = max_item_size - cd; // largest amount of tag data for any chunk
1310 size_t first_L = L + X2; // - amount for tag1 (we don't store X there)
1311 bool found = find(C);
1312 if (tag_size <= first_L) {
1313 // The whole tag clearly fits in one item, so no need to make this
1314 // complicated.
1315 first_L = tag_size;
1316 } else if (!found) {
1317 const uint8_t * p = C[0].get_p();
1318 size_t n = TOTAL_FREE(p) % (max_item_size + D2);
1319 if (n > D2 + cd) {
1320 n -= (D2 + cd);
1321 // if n >= last then fully filling this block won't produce
1322 // an extra item, so we might as well do this even if
1323 // full_compaction isn't active.
1325 // In the full_compaction case, it turns out we shouldn't always
1326 // try to fill every last byte. Doing so can actually increase the
1327 // total space required (I believe this effect is due to longer
1328 // dividing keys being required in the index blocks). Empirically,
1329 // n >= key.size() + K appears a good criterion for K ~= 34. This
1330 // seems to save about 0.2% in total database size over always
1331 // splitting the tag. It'll also give be slightly faster retrieval
1332 // as we can avoid reading an extra block occasionally.
1333 size_t last = (tag_size - X2) % L;
1334 if (n >= last || (full_compaction && n >= key.size() + 34)) {
1335 // first_L < max_item_size + D2 - D2 - cd
1336 // Total size of first item = cd + first_L < max_item_size
1337 first_L = n + X2;
1342 // There are m items to add.
1343 int m = (tag_size - first_L + L - 1) / L + 1;
1344 /* FIXME: sort out this error higher up and turn this into
1345 * an assert.
1347 if (m >= BYTE_PAIR_RANGE)
1348 throw Xapian::UnimplementedError("Can't handle insanely large tags");
1350 size_t o = 0; // Offset into the tag
1351 size_t residue = tag_size; // Bytes of the tag remaining to add in
1352 bool replacement = false; // Has there been a replacement?
1353 bool components_to_del = false; // Are there components to delete?
1354 int i;
1355 for (i = 1; i <= m; ++i) {
1356 size_t l = (i == m ? residue : (i == 1 ? first_L : L));
1357 size_t this_cd = (i == 1 ? cd - X2 : cd);
1358 Assert(this_cd + l <= block_size);
1359 Assert(o + l <= tag_size);
1360 kt.set_tag(this_cd, tag_data + o, l, compressed, i, m);
1362 o += l;
1363 residue -= l;
1365 if (i > 1) found = find(C);
1366 int result = add_kt(found);
1367 if (result) replacement = true;
1368 components_to_del = (result == 1);
1370 AssertEq(o, tag_size);
1371 if (components_to_del) {
1372 i = m;
1373 do {
1374 kt.set_component_of(++i);
1375 } while (delete_kt() == 1);
1377 if (!replacement) ++item_count;
1378 Btree_modified = true;
1379 if (cursor_created_since_last_modification) {
1380 cursor_created_since_last_modification = false;
1381 ++cursor_version;
1385 /* GlassTable::del(key) returns false if the key is not in the B-tree,
1386 otherwise deletes it and returns true.
1388 Again, this is parallel to GlassTable::add, but simpler in form.
1391 bool
1392 GlassTable::del(const string &key)
1394 LOGCALL(DB, bool, "GlassTable::del", key);
1395 Assert(writable);
1397 if (handle < 0) {
1398 if (handle == -2) {
1399 GlassTable::throw_database_closed();
1401 RETURN(false);
1404 // We can't delete a key which we is too long for us to store.
1405 if (key.size() > GLASS_BTREE_MAX_KEY_LEN) RETURN(false);
1407 if (key.empty()) RETURN(false);
1408 form_key(key);
1410 int r = delete_kt();
1411 if (r == 0) RETURN(false);
1412 int i = 1;
1413 while (r == 1) {
1414 kt.set_component_of(++i);
1415 r = delete_kt();
1418 item_count--;
1419 Btree_modified = true;
1420 if (cursor_created_since_last_modification) {
1421 cursor_created_since_last_modification = false;
1422 ++cursor_version;
1424 RETURN(true);
1427 bool
1428 GlassTable::readahead_key(const string &key) const
1430 LOGCALL(DB, bool, "GlassTable::readahead_key", key);
1431 Assert(!key.empty());
1433 // An overlong key cannot be found.
1434 if (key.size() > GLASS_BTREE_MAX_KEY_LEN)
1435 RETURN(false);
1437 // Three cases:
1439 // handle == -1: Lazy table in a multi-file database which isn't yet open.
1441 // handle == -2: Table has been closed. Since the readahead is just a
1442 // hint, we can safely ignore it for a closed table.
1444 // handle <= -3: Lazy table in a single-file database which isn't yet
1445 // open.
1446 if (handle < 0)
1447 RETURN(false);
1449 // If the table only has one level, there are no branch blocks to preread.
1450 if (level == 0)
1451 RETURN(false);
1453 form_key(key);
1455 // We'll only readahead the first level, since descending the B-tree would
1456 // require actual reads that would likely hurt performance more than help.
1457 const uint8_t * p = C[level].get_p();
1458 int c = find_in_branch(p, kt, C[level].c);
1459 uint4 n = BItem(p, c).block_given_by();
1460 // Don't preread if it's the block we last preread or already in the
1461 // cursor.
1462 if (n != last_readahead && n != C[level - 1].get_n()) {
1463 last_readahead = n;
1464 if (!io_readahead_block(handle, block_size, n, offset))
1465 RETURN(false);
1467 RETURN(true);
1470 bool
1471 GlassTable::get_exact_entry(const string &key, string & tag) const
1473 LOGCALL(DB, bool, "GlassTable::get_exact_entry", key | tag);
1474 Assert(!key.empty());
1476 if (handle < 0) {
1477 if (handle == -2) {
1478 GlassTable::throw_database_closed();
1480 RETURN(false);
1483 // An oversized key can't exist, so attempting to search for it should fail.
1484 if (key.size() > GLASS_BTREE_MAX_KEY_LEN) RETURN(false);
1486 form_key(key);
1487 if (!find(C)) RETURN(false);
1489 (void)read_tag(C, &tag, false);
1490 RETURN(true);
1493 bool
1494 GlassTable::key_exists(const string &key) const
1496 LOGCALL(DB, bool, "GlassTable::key_exists", key);
1497 Assert(!key.empty());
1499 // An oversized key can't exist, so attempting to search for it should fail.
1500 if (key.size() > GLASS_BTREE_MAX_KEY_LEN) RETURN(false);
1502 form_key(key);
1503 RETURN(find(C));
1506 bool
1507 GlassTable::read_tag(Glass::Cursor * C_, string *tag, bool keep_compressed) const
1509 LOGCALL(DB, bool, "GlassTable::read_tag", Literal("C_") | tag | keep_compressed);
1511 tag->resize(0);
1513 bool first = true;
1514 bool compressed = false;
1515 bool decompress = false;
1516 while (true) {
1517 LeafItem item(C_[0].get_p(), C_[0].c);
1518 if (first) {
1519 first = false;
1520 compressed = item.get_compressed();
1521 if (compressed && !keep_compressed) {
1522 comp_stream.decompress_start();
1523 decompress = true;
1526 bool last = item.last_component();
1527 if (decompress) {
1528 // Decompress each chunk as we read it so we don't need both the
1529 // full compressed and uncompressed tags in memory at once.
1530 bool done = item.decompress_chunk(comp_stream, *tag);
1531 if (done != last) {
1532 throw Xapian::DatabaseCorruptError(done ?
1533 "Too many chunks of compressed data" :
1534 "Too few chunks of compressed data");
1536 } else {
1537 item.append_chunk(tag);
1539 if (last) break;
1540 if (!next(C_, 0)) {
1541 throw Xapian::DatabaseCorruptError("Unexpected end of table when reading continuation of tag");
1544 // At this point the cursor is on the last item - calling next will move
1545 // it to the next key (GlassCursor::read_tag() relies on this).
1547 RETURN(compressed && keep_compressed);
1550 void
1551 GlassTable::set_full_compaction(bool parity)
1553 LOGCALL_VOID(DB, "GlassTable::set_full_compaction", parity);
1554 Assert(writable);
1556 if (parity) seq_count = 0;
1557 full_compaction = parity;
1560 GlassCursor * GlassTable::cursor_get() const {
1561 LOGCALL(DB, GlassCursor *, "GlassTable::cursor_get", NO_ARGS);
1562 if (handle < 0) {
1563 if (handle == -2) {
1564 GlassTable::throw_database_closed();
1566 RETURN(NULL);
1568 // FIXME Ick - casting away const is nasty
1569 RETURN(new GlassCursor(const_cast<GlassTable *>(this)));
1572 /************ B-tree opening and closing ************/
1574 void
1575 GlassTable::basic_open(const RootInfo * root_info, glass_revision_number_t rev)
1577 LOGCALL_VOID(DB, "GlassTable::basic_open", root_info|rev);
1578 revision_number = rev;
1579 root = root_info->get_root();
1580 level = root_info->get_level();
1581 item_count = root_info->get_num_entries();
1582 faked_root_block = root_info->get_root_is_fake();
1583 sequential = root_info->get_sequential();
1584 const string & fl_serialised = root_info->get_free_list();
1585 if (!fl_serialised.empty()) {
1586 if (!free_list.unpack(fl_serialised))
1587 throw Xapian::DatabaseCorruptError("Bad freelist metadata");
1588 } else {
1589 free_list.reset();
1592 compress_min = root_info->get_compress_min();
1594 /* kt holds constructed items as well as keys */
1595 kt = LeafItem_wr(zeroed_new(block_size));
1597 set_max_item_size(BLOCK_CAPACITY);
1599 for (int j = 0; j <= level; ++j) {
1600 C[j].init(block_size);
1603 read_root();
1605 if (cursor_created_since_last_modification) {
1606 cursor_created_since_last_modification = false;
1607 ++cursor_version;
1611 void
1612 GlassTable::read_root()
1614 LOGCALL_VOID(DB, "GlassTable::read_root", NO_ARGS);
1615 if (faked_root_block) {
1616 /* root block for an unmodified database. */
1617 uint8_t * p = C[0].init(block_size);
1618 Assert(p);
1620 /* clear block - shouldn't be necessary, but is a bit nicer,
1621 * and means that the same operations should always produce
1622 * the same database. */
1623 memset(p, 0, block_size);
1625 int o = block_size - I2 - K1;
1626 LeafItem_wr(p + o).fake_root_item();
1628 LeafItem_wr::setD(p, DIR_START, o); // its directory entry
1629 SET_DIR_END(p, DIR_START + D2);// the directory size
1631 o -= (DIR_START + D2);
1632 SET_MAX_FREE(p, o);
1633 SET_TOTAL_FREE(p, o);
1634 SET_LEVEL(p, 0);
1636 if (!writable) {
1637 /* reading - revision number doesn't matter as long as
1638 * it's not greater than the current one. */
1639 SET_REVISION(p, 0);
1640 C[0].set_n(0);
1641 } else {
1642 /* writing - */
1643 SET_REVISION(p, revision_number + 1);
1644 C[0].set_n(free_list.get_block(this, block_size));
1645 C[0].rewrite = true;
1647 } else {
1648 /* using a root block stored on disk */
1649 block_to_cursor(C, level, root);
1651 if (REVISION(C[level].get_p()) > revision_number) set_overwritten();
1652 /* although this is unlikely */
1656 void
1657 GlassTable::do_open_to_write(const RootInfo * root_info,
1658 glass_revision_number_t rev)
1660 LOGCALL_VOID(DB, "GlassTable::do_open_to_write", root_info|rev);
1661 if (handle == -2) {
1662 GlassTable::throw_database_closed();
1664 if (handle <= -2) {
1665 // Single file database.
1666 handle = -3 - handle;
1667 } else {
1668 handle = io_open_block_wr(name + GLASS_TABLE_EXTENSION, (rev == 0));
1669 if (handle < 0) {
1670 // lazy doesn't make a lot of sense when we're creating a DB (which
1671 // is the case when rev==0), but ENOENT with O_CREAT means a parent
1672 // directory doesn't exist.
1673 if (lazy && rev && errno == ENOENT) {
1674 revision_number = rev;
1675 return;
1677 string message((rev == 0) ? "Couldn't create " : "Couldn't open ");
1678 message += name;
1679 message += GLASS_TABLE_EXTENSION" read/write";
1680 throw Xapian::DatabaseOpeningError(message, errno);
1684 writable = true;
1685 basic_open(root_info, rev);
1687 split_p = new uint8_t[block_size];
1689 buffer = zeroed_new(block_size);
1691 changed_n = 0;
1692 changed_c = DIR_START;
1693 seq_count = SEQ_START_POINT;
1696 GlassTable::GlassTable(const char * tablename_, const string & path_,
1697 bool readonly_, bool lazy_)
1698 : tablename(tablename_),
1699 revision_number(0),
1700 item_count(0),
1701 block_size(0),
1702 faked_root_block(true),
1703 sequential(true),
1704 handle(-1),
1705 level(0),
1706 root(0),
1707 kt(0),
1708 buffer(0),
1709 free_list(),
1710 name(path_),
1711 seq_count(0),
1712 changed_n(0),
1713 changed_c(0),
1714 max_item_size(0),
1715 Btree_modified(false),
1716 full_compaction(false),
1717 writable(!readonly_),
1718 cursor_created_since_last_modification(false),
1719 cursor_version(0),
1720 changes_obj(NULL),
1721 split_p(0),
1722 compress_min(0),
1723 comp_stream(Z_DEFAULT_STRATEGY),
1724 lazy(lazy_),
1725 last_readahead(BLK_UNUSED),
1726 offset(0)
1728 LOGCALL_CTOR(DB, "GlassTable", tablename_ | path_ | readonly_ | lazy_);
1731 GlassTable::GlassTable(const char * tablename_, int fd, off_t offset_,
1732 bool readonly_, bool lazy_)
1733 : tablename(tablename_),
1734 revision_number(0),
1735 item_count(0),
1736 block_size(0),
1737 faked_root_block(true),
1738 sequential(true),
1739 handle(-3 - fd),
1740 level(0),
1741 root(0),
1742 kt(0),
1743 buffer(0),
1744 free_list(),
1745 name(),
1746 seq_count(0),
1747 changed_n(0),
1748 changed_c(0),
1749 max_item_size(0),
1750 Btree_modified(false),
1751 full_compaction(false),
1752 writable(!readonly_),
1753 cursor_created_since_last_modification(false),
1754 cursor_version(0),
1755 changes_obj(NULL),
1756 split_p(0),
1757 compress_min(0),
1758 comp_stream(Z_DEFAULT_STRATEGY),
1759 lazy(lazy_),
1760 last_readahead(BLK_UNUSED),
1761 offset(offset_)
1763 LOGCALL_CTOR(DB, "GlassTable", tablename_ | fd | offset_ | readonly_ | lazy_);
1766 bool
1767 GlassTable::exists() const {
1768 LOGCALL(DB, bool, "GlassTable::exists", NO_ARGS);
1769 // We know a single-file database exists, since we have an fd open on it!
1770 return single_file() || file_exists(name + GLASS_TABLE_EXTENSION);
1773 void
1774 GlassTable::create_and_open(int flags_, const RootInfo & root_info)
1776 LOGCALL_VOID(DB, "GlassTable::create_and_open", flags_|root_info);
1777 if (handle == -2) {
1778 GlassTable::throw_database_closed();
1780 Assert(writable);
1781 close();
1783 unsigned int block_size_ = root_info.get_blocksize();
1784 Assert(block_size_ >= 2048);
1785 Assert(block_size_ <= BYTE_PAIR_RANGE);
1786 // Must be a power of two.
1787 Assert((block_size_ & (block_size_ - 1)) == 0);
1789 flags = flags_;
1790 block_size = block_size_;
1792 if (lazy) {
1793 close();
1794 (void)io_unlink(name + GLASS_TABLE_EXTENSION);
1795 compress_min = root_info.get_compress_min();
1796 } else {
1797 // FIXME: it would be good to arrange that this works such that there's
1798 // always a valid table in place if you run create_and_open() on an
1799 // existing table.
1801 do_open_to_write(&root_info);
1805 GlassTable::~GlassTable() {
1806 LOGCALL_DTOR(DB, "GlassTable");
1807 GlassTable::close();
1810 void GlassTable::close(bool permanent) {
1811 LOGCALL_VOID(DB, "GlassTable::close", permanent);
1813 if (handle >= 0) {
1814 if (single_file()) {
1815 handle = -3 - handle;
1816 } else {
1817 // If an error occurs here, we just ignore it, since we're just
1818 // trying to free everything.
1819 (void)::close(handle);
1820 handle = -1;
1824 if (permanent) {
1825 handle = -2;
1826 // Don't delete the resources in the table, since they may
1827 // still be used to look up cached content.
1828 return;
1830 for (int j = level; j >= 0; --j) {
1831 C[j].destroy();
1833 delete [] split_p;
1834 split_p = 0;
1836 delete [] kt.get_address();
1837 kt = LeafItem_wr(0);
1838 delete [] buffer;
1839 buffer = 0;
1842 void
1843 GlassTable::flush_db()
1845 LOGCALL_VOID(DB, "GlassTable::flush_db", NO_ARGS);
1846 Assert(writable);
1847 if (handle < 0) {
1848 if (handle == -2) {
1849 GlassTable::throw_database_closed();
1851 return;
1854 for (int j = level; j >= 0; --j) {
1855 if (C[j].rewrite) {
1856 write_block(C[j].get_n(), C[j].get_p());
1860 faked_root_block = false;
1863 void
1864 GlassTable::commit(glass_revision_number_t revision, RootInfo * root_info)
1866 LOGCALL_VOID(DB, "GlassTable::commit", revision|root_info);
1867 Assert(writable);
1869 if (revision <= revision_number) {
1870 throw Xapian::DatabaseError("New revision too low");
1873 if (handle < 0) {
1874 if (handle == -2) {
1875 GlassTable::throw_database_closed();
1877 revision_number = revision;
1878 root_info->set_blocksize(block_size);
1879 root_info->set_level(0);
1880 root_info->set_num_entries(0);
1881 root_info->set_root_is_fake(true);
1882 root_info->set_sequential(true);
1883 root_info->set_root(0);
1884 return;
1887 try {
1888 root = C[level].get_n();
1890 root_info->set_blocksize(block_size);
1891 root_info->set_level(level);
1892 root_info->set_num_entries(item_count);
1893 root_info->set_root_is_fake(faked_root_block);
1894 root_info->set_sequential(sequential);
1895 root_info->set_root(root);
1897 Btree_modified = false;
1899 for (int i = 0; i < BTREE_CURSOR_LEVELS; ++i) {
1900 C[i].init(block_size);
1903 free_list.set_revision(revision);
1904 free_list.commit(this, block_size);
1906 // Save the freelist details into the root_info.
1907 string serialised;
1908 free_list.pack(serialised);
1909 root_info->set_free_list(serialised);
1911 revision_number = revision;
1913 read_root();
1915 changed_n = 0;
1916 changed_c = DIR_START;
1917 seq_count = SEQ_START_POINT;
1918 } catch (...) {
1919 GlassTable::close();
1920 throw;
1924 void
1925 GlassTable::cancel(const RootInfo & root_info, glass_revision_number_t rev)
1927 LOGCALL_VOID(DB, "GlassTable::cancel", root_info|rev);
1928 Assert(writable);
1930 if (handle < 0) {
1931 if (handle == -2) {
1932 GlassTable::throw_database_closed();
1934 return;
1937 // This causes problems: if (!Btree_modified) return;
1939 if (flags & Xapian::DB_DANGEROUS)
1940 throw Xapian::InvalidOperationError("cancel() not supported under Xapian::DB_DANGEROUS");
1942 revision_number = rev;
1943 block_size = root_info.get_blocksize();
1944 root = root_info.get_root();
1945 level = root_info.get_level();
1946 item_count = root_info.get_num_entries();
1947 faked_root_block = root_info.get_root_is_fake();
1948 sequential = root_info.get_sequential();
1949 const string & fl_serialised = root_info.get_free_list();
1950 if (!fl_serialised.empty()) {
1951 if (!free_list.unpack(fl_serialised))
1952 throw Xapian::DatabaseCorruptError("Bad freelist metadata");
1953 } else {
1954 free_list.reset();
1957 Btree_modified = false;
1959 for (int j = 0; j <= level; ++j) {
1960 C[j].init(block_size);
1961 C[j].rewrite = false;
1963 read_root();
1965 changed_n = 0;
1966 changed_c = DIR_START;
1967 seq_count = SEQ_START_POINT;
1969 if (cursor_created_since_last_modification) {
1970 cursor_created_since_last_modification = false;
1971 ++cursor_version;
1975 /************ B-tree reading ************/
1977 void
1978 GlassTable::do_open_to_read(const RootInfo * root_info,
1979 glass_revision_number_t rev)
1981 LOGCALL(DB, bool, "GlassTable::do_open_to_read", root_info|rev);
1982 if (handle == -2) {
1983 GlassTable::throw_database_closed();
1985 if (single_file()) {
1986 handle = -3 - handle;
1987 } else {
1988 handle = io_open_block_rd(name + GLASS_TABLE_EXTENSION);
1989 if (handle < 0) {
1990 if (lazy) {
1991 // This table is optional when reading!
1992 revision_number = rev;
1993 return;
1995 string message("Couldn't open ");
1996 message += name;
1997 message += GLASS_TABLE_EXTENSION" to read";
1998 throw Xapian::DatabaseOpeningError(message, errno);
2002 basic_open(root_info, rev);
2004 read_root();
2007 void
2008 GlassTable::open(int flags_, const RootInfo & root_info,
2009 glass_revision_number_t rev)
2011 LOGCALL_VOID(DB, "GlassTable::open", flags_|root_info|rev);
2012 close();
2014 flags = flags_;
2015 block_size = root_info.get_blocksize();
2016 root = root_info.get_root();
2018 if (!writable) {
2019 do_open_to_read(&root_info, rev);
2020 return;
2023 do_open_to_write(&root_info, rev);
2026 bool
2027 GlassTable::prev_for_sequential(Glass::Cursor * C_, int /*dummy*/) const
2029 LOGCALL(DB, bool, "GlassTable::prev_for_sequential", Literal("C_") | Literal("/*dummy*/"));
2030 int c = C_[0].c;
2031 AssertRel(DIR_START,<=,c);
2032 AssertRel(c,<,DIR_END(C_[0].get_p()));
2033 if (c == DIR_START) {
2034 uint4 n = C_[0].get_n();
2035 const uint8_t * p;
2036 while (true) {
2037 if (n == 0) RETURN(false);
2038 n--;
2039 if (n == C[0].get_n()) {
2040 // Block is a leaf block in the built-in cursor (potentially in
2041 // modified form if the table is writable).
2042 p = C_[0].clone(C[0]);
2043 } else {
2044 if (writable) {
2045 // Blocks in the built-in cursor may not have been written
2046 // to disk yet, so we have to check that the block number
2047 // isn't in the built-in cursor or we'll read an
2048 // uninitialised block (for which GET_LEVEL(p) will
2049 // probably return 0).
2050 int j;
2051 for (j = 1; j <= level; ++j) {
2052 if (n == C[j].get_n()) break;
2054 if (j <= level) continue;
2057 // Block isn't in the built-in cursor, so the form on disk
2058 // is valid, so read it to check if it's the next level 0
2059 // block.
2060 uint8_t * q = C_[0].init(block_size);
2061 read_block(n, q);
2062 p = q;
2063 C_[0].set_n(n);
2065 if (REVISION(p) > revision_number + writable) {
2066 set_overwritten();
2067 RETURN(false);
2069 if (GET_LEVEL(p) == 0) break;
2071 c = DIR_END(p);
2072 AssertRel(DIR_START,<,c);
2074 c -= D2;
2075 C_[0].c = c;
2076 RETURN(true);
2079 bool
2080 GlassTable::next_for_sequential(Glass::Cursor * C_, int /*dummy*/) const
2082 LOGCALL(DB, bool, "GlassTable::next_for_sequential", Literal("C_") | Literal("/*dummy*/"));
2083 const uint8_t * p = C_[0].get_p();
2084 Assert(p);
2085 int c = C_[0].c;
2086 AssertRel(c,<,DIR_END(p));
2087 c += D2;
2088 Assert(unsigned(c) < block_size);
2089 if (c == DIR_END(p)) {
2090 uint4 n = C_[0].get_n();
2091 while (true) {
2092 n++;
2093 if (n >= free_list.get_first_unused_block()) RETURN(false);
2094 if (writable) {
2095 if (n == C[0].get_n()) {
2096 // Block is a leaf block in the built-in cursor
2097 // (potentially in modified form).
2098 p = C_[0].clone(C[0]);
2099 } else {
2100 // Blocks in the built-in cursor may not have been written
2101 // to disk yet, so we have to check that the block number
2102 // isn't in the built-in cursor or we'll read an
2103 // uninitialised block (for which GET_LEVEL(p) will
2104 // probably return 0).
2105 int j;
2106 for (j = 1; j <= level; ++j) {
2107 if (n == C[j].get_n()) break;
2109 if (j <= level) continue;
2111 // Block isn't in the built-in cursor, so the form on disk
2112 // is valid, so read it to check if it's the next level 0
2113 // block.
2114 uint8_t * q = C_[0].init(block_size);
2115 read_block(n, q);
2116 p = q;
2118 } else {
2119 uint8_t * q = C_[0].init(block_size);
2120 read_block(n, q);
2121 p = q;
2123 if (REVISION(p) > revision_number + writable) {
2124 set_overwritten();
2125 RETURN(false);
2127 if (GET_LEVEL(p) == 0) break;
2129 c = DIR_START;
2130 C_[0].set_n(n);
2132 C_[0].c = c;
2133 RETURN(true);
2136 bool
2137 GlassTable::prev_default(Glass::Cursor * C_, int j) const
2139 LOGCALL(DB, bool, "GlassTable::prev_default", Literal("C_") | j);
2140 const uint8_t * p = C_[j].get_p();
2141 int c = C_[j].c;
2142 AssertRel(DIR_START,<=,c);
2143 AssertRel(c,<,DIR_END(p));
2144 AssertRel(unsigned(DIR_END(p)),<=,block_size);
2145 if (c == DIR_START) {
2146 if (j == level) RETURN(false);
2147 if (!prev_default(C_, j + 1)) RETURN(false);
2148 p = C_[j].get_p();
2149 c = DIR_END(p);
2150 AssertRel(DIR_START,<,c);
2152 c -= D2;
2153 C_[j].c = c;
2154 if (j > 0) {
2155 block_to_cursor(C_, j - 1, BItem(p, c).block_given_by());
2157 RETURN(true);
2160 bool
2161 GlassTable::next_default(Glass::Cursor * C_, int j) const
2163 LOGCALL(DB, bool, "GlassTable::next_default", Literal("C_") | j);
2164 const uint8_t * p = C_[j].get_p();
2165 int c = C_[j].c;
2166 AssertRel(c,<,DIR_END(p));
2167 AssertRel(unsigned(DIR_END(p)),<=,block_size);
2168 c += D2;
2169 if (j > 0) {
2170 AssertRel(DIR_START,<,c);
2171 } else {
2172 AssertRel(DIR_START,<=,c);
2174 // Sometimes c can be DIR_END(p) + 2 here it appears...
2175 if (c >= DIR_END(p)) {
2176 if (j == level) RETURN(false);
2177 if (!next_default(C_, j + 1)) RETURN(false);
2178 p = C_[j].get_p();
2179 c = DIR_START;
2181 C_[j].c = c;
2182 if (j > 0) {
2183 block_to_cursor(C_, j - 1, BItem(p, c).block_given_by());
2184 #ifdef BTREE_DEBUG_FULL
2185 printf("Block in GlassTable:next_default");
2186 report_block_full(j - 1, C_[j - 1].get_n(), C_[j - 1].get_p());
2187 #endif /* BTREE_DEBUG_FULL */
2189 RETURN(true);
2192 void
2193 GlassTable::throw_database_closed()
2195 throw Xapian::DatabaseClosedError("Database has been closed");