2 * See the file LICENSE for redistribution information.
4 * Copyright (c) 1996, 1997, 1998
5 * Sleepycat Software. All rights reserved.
8 * Copyright (c) 1990, 1993, 1994
9 * The Regents of the University of California. All rights reserved.
11 * This code is derived from software contributed to Berkeley by
14 * Redistribution and use in source and binary forms, with or without
15 * modification, are permitted provided that the following conditions
17 * 1. Redistributions of source code must retain the above copyright
18 * notice, this list of conditions and the following disclaimer.
19 * 2. Redistributions in binary form must reproduce the above copyright
20 * notice, this list of conditions and the following disclaimer in the
21 * documentation and/or other materials provided with the distribution.
22 * 3. All advertising materials mentioning features or use of this software
23 * must display the following acknowledgement:
24 * This product includes software developed by the University of
25 * California, Berkeley and its contributors.
26 * 4. Neither the name of the University nor the names of its contributors
27 * may be used to endorse or promote products derived from this software
28 * without specific prior written permission.
30 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
31 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
32 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
33 * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
34 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
35 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
36 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
37 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
38 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
39 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
45 static const char sccsid
[] = "@(#)hash_dup.c 10.27 (Sleepycat) 12/6/98";
52 * Manipulation of duplicates for the hash package.
61 #ifndef NO_SYSTEM_INCLUDES
62 #include <sys/types.h>
73 static int __ham_check_move
__P((DBC
*, int32_t));
74 static int __ham_dup_convert
__P((DBC
*));
75 static int __ham_make_dup
__P((const DBT
*, DBT
*d
, void **, u_int32_t
*));
78 * Called from hash_access to add a duplicate key. nval is the new
79 * value that we want to add. The flags correspond to the flag values
80 * to cursor_put indicating where to add the new element.
82 * Case 1: The existing duplicate set already resides on a separate page.
83 * We can use common code for this.
84 * Case 2: The element is small enough to just be added to the existing set.
85 * Case 3: The element is large enough to be a big item, so we're going to
86 * have to push the set onto a new page.
87 * Case 4: The element is large enough to push the duplicate set onto a
90 * PUBLIC: int __ham_add_dup __P((DBC *, DBT *, u_int32_t));
93 __ham_add_dup(dbc
, nval
, flags
)
100 DBT dbt
, pval
, tmp_val
;
101 u_int32_t del_len
, new_size
;
106 hcp
= (HASH_CURSOR
*)dbc
->internal
;
107 if (flags
== DB_CURRENT
&& hcp
->dpgno
== PGNO_INVALID
)
108 del_len
= hcp
->dup_len
;
112 if ((ret
= __ham_check_move(dbc
,
113 (int32_t)DUP_SIZE(nval
->size
) - (int32_t)del_len
)) != 0)
117 * Check if resulting duplicate set is going to need to go
118 * onto a separate duplicate page. If so, convert the
119 * duplicate set and add the new one. After conversion,
120 * hcp->dndx is the first free ndx or the index of the
121 * current pointer into the duplicate set.
123 hk
= H_PAIRDATA(hcp
->pagep
, hcp
->bndx
);
124 new_size
= DUP_SIZE(nval
->size
) - del_len
+ LEN_HKEYDATA(hcp
->pagep
,
125 hcp
->hdr
->pagesize
, H_DATAINDEX(hcp
->bndx
));
128 * We convert to off-page duplicates if the item is a big item,
129 * the addition of the new item will make the set large, or
130 * if there isn't enough room on this page to add the next item.
132 if (HPAGE_PTYPE(hk
) != H_OFFDUP
&&
133 (HPAGE_PTYPE(hk
) == H_OFFPAGE
|| ISBIG(hcp
, new_size
) ||
134 DUP_SIZE(nval
->size
) - del_len
> P_FREESPACE(hcp
->pagep
))) {
136 if ((ret
= __ham_dup_convert(dbc
)) != 0)
139 hk
= H_PAIRDATA(hcp
->pagep
, hcp
->bndx
);
142 /* There are two separate cases here: on page and off page. */
143 if (HPAGE_PTYPE(hk
) != H_OFFDUP
) {
144 if (HPAGE_PTYPE(hk
) != H_DUPLICATE
) {
145 HPAGE_PTYPE(hk
) = H_DUPLICATE
;
147 pval
.data
= HKEYDATA_DATA(hk
);
148 pval
.size
= LEN_HDATA(hcp
->pagep
, dbp
->pgsize
,
151 __ham_make_dup(&pval
, &tmp_val
, &dbc
->rdata
.data
,
152 &dbc
->rdata
.size
)) != 0 || (ret
=
153 __ham_replpair(dbc
, &tmp_val
, 1)) != 0)
157 /* Now make the new entry a duplicate. */
158 if ((ret
= __ham_make_dup(nval
,
159 &tmp_val
, &dbc
->rdata
.data
, &dbc
->rdata
.size
)) != 0)
163 switch (flags
) { /* On page. */
166 if (dbp
->dup_compare
!= NULL
)
167 __ham_dsearch(dbc
, nval
, &tmp_val
.doff
, &cmp
);
168 else if (flags
== DB_KEYFIRST
)
171 tmp_val
.doff
= LEN_HDATA(hcp
->pagep
,
172 hcp
->hdr
->pagesize
, hcp
->bndx
);
176 * If we have a sort function, we need to verify that
177 * the new item sorts identically to the old item.
179 if (dbp
->dup_compare
!= NULL
) {
180 dbt
.data
= HKEYDATA_DATA(H_PAIRDATA(hcp
->pagep
,
181 hcp
->bndx
)) + hcp
->dup_off
;
182 dbt
.size
= DUP_SIZE(hcp
->dup_len
);
183 if (dbp
->dup_compare(nval
, &dbt
) != 0)
186 tmp_val
.doff
= hcp
->dup_off
;
187 tmp_val
.dlen
= DUP_SIZE(hcp
->dup_len
);
190 tmp_val
.doff
= hcp
->dup_off
;
193 tmp_val
.doff
= hcp
->dup_off
+ DUP_SIZE(hcp
->dup_len
);
196 /* Add the duplicate. */
197 ret
= __ham_replpair(dbc
, &tmp_val
, 0);
199 ret
= __ham_dirty_page(dbp
, hcp
->pagep
);
200 __ham_c_update(hcp
, hcp
->pgno
, tmp_val
.size
, 1, 1);
204 /* If we get here, then we're on duplicate pages. */
205 if (hcp
->dpgno
== PGNO_INVALID
) {
206 memcpy(&hcp
->dpgno
, HOFFDUP_PGNO(hk
), sizeof(db_pgno_t
));
212 if (dbp
->dup_compare
!= NULL
)
215 * The only way that we are already on a dup page is
216 * if we just converted the on-page representation.
217 * In that case, we've only got one page of duplicates.
219 if (hcp
->dpagep
== NULL
&& (ret
=
220 __db_dend(dbc
, hcp
->dpgno
, &hcp
->dpagep
)) != 0)
225 if (dbp
->dup_compare
!= NULL
) {
226 sorted_dups
: if ((ret
= __db_dsearch(dbc
, 1, nval
,
227 hcp
->dpgno
, &hcp
->dndx
, &hcp
->dpagep
, &cmp
)) != 0)
230 hcp
->dpgno
= PGNO(hcp
->dpagep
);
232 if (hcp
->dpagep
== NULL
&& (ret
=
233 __db_dend(dbc
, hcp
->dpgno
, &hcp
->dpagep
)) != 0)
235 hcp
->dpgno
= PGNO(hcp
->dpagep
);
236 hcp
->dndx
= NUM_ENT(hcp
->dpagep
);
240 if (dbp
->dup_compare
!= NULL
&& __bam_cmp(dbp
,
241 nval
, hcp
->dpagep
, hcp
->dndx
, dbp
->dup_compare
) != 0)
243 switch (GET_BKEYDATA(hcp
->dpagep
, hcp
->dndx
)->type
) {
245 del_len
= BKEYDATA_SIZE(GET_BKEYDATA(hcp
->dpagep
,
249 del_len
= BOVERFLOW_SIZE
;
253 __db_ditem(dbc
, hcp
->dpagep
, hcp
->dndx
, del_len
)) != 0)
256 case DB_BEFORE
: /* The default behavior is correct. */
264 nval
, &hcp
->dpagep
, &hcp
->dndx
, __ham_overflow_page
);
265 hcp
->pgno
= PGNO(hcp
->pagep
);
266 __ham_c_update(hcp
, hcp
->pgno
, nval
->size
, 1, 1);
271 * Convert an on-page set of duplicates to an offpage set of duplicates.
274 __ham_dup_convert(dbc
)
282 db_indx_t dndx
, i
, len
, off
;
287 * Create a new page for the duplicates.
290 hcp
= (HASH_CURSOR
*)dbc
->internal
;
292 __ham_overflow_page(dbc
, P_DUPLICATE
, &hcp
->dpagep
)) != 0)
294 hcp
->dpagep
->type
= P_DUPLICATE
;
295 hcp
->dpgno
= PGNO(hcp
->dpagep
);
298 * Now put the duplicates onto the new page.
302 switch (HPAGE_PTYPE(H_PAIRDATA(hcp
->pagep
, hcp
->bndx
))) {
304 /* Simple case, one key on page; move it to dup page. */
306 LEN_HDATA(hcp
->pagep
, hcp
->hdr
->pagesize
, hcp
->bndx
);
307 dbt
.data
= HKEYDATA_DATA(H_PAIRDATA(hcp
->pagep
, hcp
->bndx
));
308 ret
= __db_pitem(dbc
, hcp
->dpagep
,
309 (u_int32_t
)dndx
, BKEYDATA_SIZE(dbt
.size
), NULL
, &dbt
);
311 __ham_dirty_page(dbp
, hcp
->dpagep
);
314 /* Simple case, one key on page; move it to dup page. */
316 P_ENTRY(hcp
->pagep
, H_DATAINDEX(hcp
->bndx
)), HOFFPAGE_SIZE
);
318 B_TSET(bo
.type
, ho
.type
, 0);
322 dbt
.size
= BOVERFLOW_SIZE
;
325 ret
= __db_pitem(dbc
, hcp
->dpagep
,
326 (u_int32_t
)dndx
, dbt
.size
, &dbt
, NULL
);
328 __ham_dirty_page(dbp
, hcp
->dpagep
);
331 p
= HKEYDATA_DATA(H_PAIRDATA(hcp
->pagep
, hcp
->bndx
));
333 LEN_HDATA(hcp
->pagep
, hcp
->hdr
->pagesize
, hcp
->bndx
);
336 * We need to maintain the duplicate cursor position.
337 * Keep track of where we are in the duplicate set via
338 * the offset, and when it matches the one in the cursor,
339 * set the off-page duplicate cursor index to the current
342 for (off
= 0, i
= 0; p
< pend
; i
++) {
343 if (off
== hcp
->dup_off
)
345 memcpy(&len
, p
, sizeof(db_indx_t
));
347 p
+= sizeof(db_indx_t
);
349 p
+= len
+ sizeof(db_indx_t
);
350 off
+= len
+ 2 * sizeof(db_indx_t
);
351 ret
= __db_dput(dbc
, &dbt
,
352 &hcp
->dpagep
, &i
, __ham_overflow_page
);
358 ret
= __db_pgfmt(dbp
, (u_long
)hcp
->pgno
);
363 * Now attach this to the source page in place of
364 * the old duplicate item.
366 __ham_move_offpage(dbc
, hcp
->pagep
,
367 (u_int32_t
)H_DATAINDEX(hcp
->bndx
), hcp
->dpgno
);
369 /* Can probably just do a "put" here. */
370 ret
= __ham_dirty_page(dbp
, hcp
->pagep
);
373 (void)__ham_del_page(dbc
, hcp
->dpagep
);
380 __ham_make_dup(notdup
, duplicate
, bufp
, sizep
)
386 db_indx_t tsize
, item_size
;
390 item_size
= (db_indx_t
)notdup
->size
;
391 tsize
= DUP_SIZE(item_size
);
392 if ((ret
= __ham_init_dbt(duplicate
, tsize
, bufp
, sizep
)) != 0)
396 duplicate
->flags
= notdup
->flags
;
397 F_SET(duplicate
, DB_DBT_PARTIAL
);
400 memcpy(p
, &item_size
, sizeof(db_indx_t
));
401 p
+= sizeof(db_indx_t
);
402 memcpy(p
, notdup
->data
, notdup
->size
);
404 memcpy(p
, &item_size
, sizeof(db_indx_t
));
407 duplicate
->dlen
= notdup
->size
;
413 __ham_check_move(dbc
, add_len
)
423 u_int32_t new_datalen
, old_len
, rectype
;
428 hcp
= (HASH_CURSOR
*)dbc
->internal
;
430 * Check if we can do whatever we need to on this page. If not,
431 * then we'll have to move the current element to a new page.
433 hk
= H_PAIRDATA(hcp
->pagep
, hcp
->bndx
);
436 * If the item is already off page duplicates or an offpage item,
437 * then we know we can do whatever we need to do in-place
439 if (HPAGE_PTYPE(hk
) == H_OFFDUP
|| HPAGE_PTYPE(hk
) == H_OFFPAGE
)
443 LEN_HITEM(hcp
->pagep
, hcp
->hdr
->pagesize
, H_DATAINDEX(hcp
->bndx
));
444 new_datalen
= old_len
- HKEYDATA_SIZE(0) + add_len
;
447 * We need to add a new page under two conditions:
448 * 1. The addition makes the total data length cross the BIG
449 * threshold and the OFFDUP structure won't fit on this page.
450 * 2. The addition does not make the total data cross the
451 * threshold, but the new data won't fit on the page.
452 * If neither of these is true, then we can return.
454 if (ISBIG(hcp
, new_datalen
) && (old_len
> HOFFDUP_SIZE
||
455 HOFFDUP_SIZE
- old_len
<= P_FREESPACE(hcp
->pagep
)))
458 if (!ISBIG(hcp
, new_datalen
) &&
459 add_len
<= (int32_t)P_FREESPACE(hcp
->pagep
))
463 * If we get here, then we need to move the item to a new page.
464 * Check if there are more pages in the chain.
467 new_datalen
= ISBIG(hcp
, new_datalen
) ?
468 HOFFDUP_SIZE
: HKEYDATA_SIZE(new_datalen
);
471 for (next_pgno
= NEXT_PGNO(hcp
->pagep
); next_pgno
!= PGNO_INVALID
;
472 next_pgno
= NEXT_PGNO(next_pagep
)) {
473 if (next_pagep
!= NULL
&&
474 (ret
= __ham_put_page(dbp
, next_pagep
, 0)) != 0)
478 __ham_get_page(dbp
, next_pgno
, &next_pagep
)) != 0)
481 if (P_FREESPACE(next_pagep
) >= new_datalen
)
485 /* No more pages, add one. */
486 if (next_pagep
== NULL
&& (ret
= __ham_add_ovflpage(dbc
,
487 hcp
->pagep
, 0, &next_pagep
)) != 0)
490 /* Add new page at the end of the chain. */
491 if (P_FREESPACE(next_pagep
) < new_datalen
&& (ret
=
492 __ham_add_ovflpage(dbc
, next_pagep
, 1, &next_pagep
)) != 0)
495 /* Copy the item to the new page. */
496 if (DB_LOGGING(hcp
->dbc
)) {
501 H_PAIRKEY(hcp
->pagep
, hcp
->bndx
)) == H_OFFPAGE
) {
502 rectype
|= PAIR_KEYMASK
;
503 k
.data
= H_PAIRKEY(hcp
->pagep
, hcp
->bndx
);
504 k
.size
= HOFFPAGE_SIZE
;
507 HKEYDATA_DATA(H_PAIRKEY(hcp
->pagep
, hcp
->bndx
));
508 k
.size
= LEN_HKEY(hcp
->pagep
,
509 hcp
->hdr
->pagesize
, hcp
->bndx
);
512 if (HPAGE_PTYPE(hk
) == H_OFFPAGE
) {
513 rectype
|= PAIR_DATAMASK
;
514 d
.data
= H_PAIRDATA(hcp
->pagep
, hcp
->bndx
);
515 d
.size
= HOFFPAGE_SIZE
;
518 HKEYDATA_DATA(H_PAIRDATA(hcp
->pagep
, hcp
->bndx
));
519 d
.size
= LEN_HDATA(hcp
->pagep
,
520 hcp
->hdr
->pagesize
, hcp
->bndx
);
524 if ((ret
= __ham_insdel_log(dbp
->dbenv
->lg_info
,
525 dbc
->txn
, &new_lsn
, 0, rectype
,
526 dbp
->log_fileid
, PGNO(next_pagep
),
527 (u_int32_t
)H_NUMPAIRS(next_pagep
), &LSN(next_pagep
),
531 /* Move lsn onto page. */
532 LSN(next_pagep
) = new_lsn
; /* Structure assignment. */
535 __ham_copy_item(dbp
->pgsize
,
536 hcp
->pagep
, H_KEYINDEX(hcp
->bndx
), next_pagep
);
537 __ham_copy_item(dbp
->pgsize
,
538 hcp
->pagep
, H_DATAINDEX(hcp
->bndx
), next_pagep
);
540 /* Now delete the pair from the current page. */
541 ret
= __ham_del_pair(dbc
, 0);
543 (void)__ham_put_page(dbp
, hcp
->pagep
, 1);
544 hcp
->pagep
= next_pagep
;
545 hcp
->pgno
= PGNO(hcp
->pagep
);
546 hcp
->bndx
= H_NUMPAIRS(hcp
->pagep
) - 1;
547 F_SET(hcp
, H_EXPAND
);
552 * __ham_move_offpage --
553 * Replace an onpage set of duplicates with the OFFDUP structure
554 * that references the duplicate page.
557 * This is really just a special case of __onpage_replace; we should
558 * probably combine them.
560 * PUBLIC: void __ham_move_offpage __P((DBC *, PAGE *, u_int32_t, db_pgno_t));
563 __ham_move_offpage(dbc
, pagep
, ndx
, pgno
)
579 hcp
= (HASH_CURSOR
*)dbc
->internal
;
586 if (DB_LOGGING(dbc
)) {
588 new_dbt
.size
= HOFFDUP_SIZE
;
589 old_dbt
.data
= P_ENTRY(pagep
, ndx
);
590 old_dbt
.size
= LEN_HITEM(pagep
, hcp
->hdr
->pagesize
, ndx
);
591 (void)__ham_replace_log(dbp
->dbenv
->lg_info
,
592 dbc
->txn
, &LSN(pagep
), 0, dbp
->log_fileid
,
593 PGNO(pagep
), (u_int32_t
)ndx
, &LSN(pagep
), -1,
594 &old_dbt
, &new_dbt
, 0);
598 LEN_HITEM(pagep
, hcp
->hdr
->pagesize
, ndx
) - HOFFDUP_SIZE
;
602 src
= (u_int8_t
*)(pagep
) + HOFFSET(pagep
);
603 memmove(src
+ shrink
, src
, pagep
->inp
[ndx
] - HOFFSET(pagep
));
604 HOFFSET(pagep
) += shrink
;
606 /* Update index table. */
607 for (i
= ndx
; i
< NUM_ENT(pagep
); i
++)
608 pagep
->inp
[i
] += shrink
;
611 /* Now copy the offdup entry onto the page. */
612 memcpy(P_ENTRY(pagep
, ndx
), &od
, HOFFDUP_SIZE
);
617 * Locate a particular duplicate in a duplicate set.
619 * PUBLIC: void __ham_dsearch __P((DBC *, DBT *, u_int32_t *, int *));
622 __ham_dsearch(dbc
, dbt
, offp
, cmpp
)
632 int (*func
) __P((const DBT
*, const DBT
*));
636 hcp
= (HASH_CURSOR
*)dbc
->internal
;
637 if (dbp
->dup_compare
== NULL
)
640 func
= dbp
->dup_compare
;
642 i
= F_ISSET(dbc
, DBC_CONTINUE
) ? hcp
->dup_off
: 0;
643 data
= HKEYDATA_DATA(H_PAIRDATA(hcp
->pagep
, hcp
->bndx
)) + i
;
644 while (i
< LEN_HDATA(hcp
->pagep
, hcp
->hdr
->pagesize
, hcp
->bndx
)) {
645 memcpy(&len
, data
, sizeof(db_indx_t
));
646 data
+= sizeof(db_indx_t
);
648 cur
.size
= (u_int32_t
)len
;
649 *cmpp
= func(dbt
, &cur
);
650 if (*cmpp
== 0 || (*cmpp
< 0 && dbp
->dup_compare
!= NULL
))
652 i
+= len
+ 2 * sizeof(db_indx_t
);
653 data
+= len
+ sizeof(db_indx_t
);