Main Page | Class Hierarchy | Data Structures | Directories | File List | Data Fields | Related Pages

hash_dup.c

00001 /*-
00002  * See the file LICENSE for redistribution information.
00003  *
00004  * Copyright (c) 1996-2005
00005  *      Sleepycat Software.  All rights reserved.
00006  */
00007 /*
00008  * Copyright (c) 1990, 1993, 1994
00009  *      The Regents of the University of California.  All rights reserved.
00010  *
00011  * This code is derived from software contributed to Berkeley by
00012  * Margo Seltzer.
00013  *
00014  * Redistribution and use in source and binary forms, with or without
00015  * modification, are permitted provided that the following conditions
00016  * are met:
00017  * 1. Redistributions of source code must retain the above copyright
00018  *    notice, this list of conditions and the following disclaimer.
00019  * 2. Redistributions in binary form must reproduce the above copyright
00020  *    notice, this list of conditions and the following disclaimer in the
00021  *    documentation and/or other materials provided with the distribution.
00022  * 3. Neither the name of the University nor the names of its contributors
00023  *    may be used to endorse or promote products derived from this software
00024  *    without specific prior written permission.
00025  *
00026  * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
00027  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00028  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00029  * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
00030  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
00031  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
00032  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
00033  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00034  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
00035  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
00036  * SUCH DAMAGE.
00037  *
00038  * $Id: hash_dup.c,v 12.3 2005/07/20 16:51:41 bostic Exp $
00039  */
00040 
00041 #include "db_config.h"
00042 
00043 /*
00044  * PACKAGE:  hashing
00045  *
00046  * DESCRIPTION:
00047  *      Manipulation of duplicates for the hash package.
00048  */
00049 
00050 #ifndef NO_SYSTEM_INCLUDES
00051 #include <sys/types.h>
00052 
00053 #include <string.h>
00054 #endif
00055 
00056 #include "db_int.h"
00057 #include "dbinc/db_page.h"
00058 #include "dbinc/db_shash.h"
00059 #include "dbinc/hash.h"
00060 #include "dbinc/btree.h"
00061 #include "dbinc/mp.h"
00062 
00063 static int __ham_c_chgpg __P((DBC *,
00064     db_pgno_t, u_int32_t, db_pgno_t, u_int32_t));
00065 static int __ham_check_move __P((DBC *, u_int32_t));
00066 static int __ham_dcursor __P((DBC *, db_pgno_t, u_int32_t));
00067 static int __ham_move_offpage __P((DBC *, PAGE *, u_int32_t, db_pgno_t));
00068 
00069 /*
00070  * Called from hash_access to add a duplicate key. nval is the new
00071  * value that we want to add.  The flags correspond to the flag values
00072  * to cursor_put indicating where to add the new element.
00073  * There are 4 cases.
00074  * Case 1: The existing duplicate set already resides on a separate page.
00075  *         We return and let the common code handle this.
00076  * Case 2: The element is small enough to just be added to the existing set.
00077  * Case 3: The element is large enough to be a big item, so we're going to
00078  *         have to push the set onto a new page.
00079  * Case 4: The element is large enough to push the duplicate set onto a
00080  *         separate page.
00081  *
00082  * PUBLIC: int __ham_add_dup __P((DBC *, DBT *, u_int32_t, db_pgno_t *));
00083  */
00084 int
00085 __ham_add_dup(dbc, nval, flags, pgnop)
00086         DBC *dbc;
00087         DBT *nval;
00088         u_int32_t flags;
00089         db_pgno_t *pgnop;
00090 {
00091         DB *dbp;
00092         DBT pval, tmp_val;
00093         DB_MPOOLFILE *mpf;
00094         HASH_CURSOR *hcp;
00095         u_int32_t add_bytes, new_size;
00096         int cmp, ret;
00097         u_int8_t *hk;
00098 
00099         dbp = dbc->dbp;
00100         mpf = dbp->mpf;
00101         hcp = (HASH_CURSOR *)dbc->internal;
00102 
00103         DB_ASSERT(flags != DB_CURRENT);
00104 
00105         add_bytes = nval->size +
00106             (F_ISSET(nval, DB_DBT_PARTIAL) ? nval->doff : 0);
00107         add_bytes = DUP_SIZE(add_bytes);
00108 
00109         if ((ret = __ham_check_move(dbc, add_bytes)) != 0)
00110                 return (ret);
00111 
00112         /*
00113          * Check if resulting duplicate set is going to need to go
00114          * onto a separate duplicate page.  If so, convert the
00115          * duplicate set and add the new one.  After conversion,
00116          * hcp->dndx is the first free ndx or the index of the
00117          * current pointer into the duplicate set.
00118          */
00119         hk = H_PAIRDATA(dbp, hcp->page, hcp->indx);
00120         /* Add the len bytes to the current singleton. */
00121         if (HPAGE_PTYPE(hk) != H_DUPLICATE)
00122                 add_bytes += DUP_SIZE(0);
00123         new_size =
00124             LEN_HKEYDATA(dbp, hcp->page, dbp->pgsize, H_DATAINDEX(hcp->indx)) +
00125             add_bytes;
00126 
00127         /*
00128          * We convert to off-page duplicates if the item is a big item,
00129          * the addition of the new item will make the set large, or
00130          * if there isn't enough room on this page to add the next item.
00131          */
00132         if (HPAGE_PTYPE(hk) != H_OFFDUP &&
00133             (HPAGE_PTYPE(hk) == H_OFFPAGE || ISBIG(hcp, new_size) ||
00134             add_bytes > P_FREESPACE(dbp, hcp->page))) {
00135 
00136                 if ((ret = __ham_dup_convert(dbc)) != 0)
00137                         return (ret);
00138                 return (hcp->opd->c_am_put(hcp->opd,
00139                     NULL, nval, flags, NULL));
00140         }
00141 
00142         /* There are two separate cases here: on page and off page. */
00143         if (HPAGE_PTYPE(hk) != H_OFFDUP) {
00144                 if (HPAGE_PTYPE(hk) != H_DUPLICATE) {
00145                         pval.flags = 0;
00146                         pval.data = HKEYDATA_DATA(hk);
00147                         pval.size = LEN_HDATA(dbp, hcp->page, dbp->pgsize,
00148                             hcp->indx);
00149                         if ((ret = __ham_make_dup(dbp->dbenv,
00150                             &pval, &tmp_val, &dbc->my_rdata.data,
00151                             &dbc->my_rdata.ulen)) != 0 || (ret =
00152                             __ham_replpair(dbc, &tmp_val, 1)) != 0)
00153                                 return (ret);
00154                         hk = H_PAIRDATA(dbp, hcp->page, hcp->indx);
00155                         HPAGE_PTYPE(hk) = H_DUPLICATE;
00156 
00157                         /*
00158                          * Update the cursor position since we now are in
00159                          * duplicates.
00160                          */
00161                         F_SET(hcp, H_ISDUP);
00162                         hcp->dup_off = 0;
00163                         hcp->dup_len = pval.size;
00164                         hcp->dup_tlen = DUP_SIZE(hcp->dup_len);
00165                 }
00166 
00167                 /* Now make the new entry a duplicate. */
00168                 if ((ret = __ham_make_dup(dbp->dbenv, nval,
00169                     &tmp_val, &dbc->my_rdata.data, &dbc->my_rdata.ulen)) != 0)
00170                         return (ret);
00171 
00172                 tmp_val.dlen = 0;
00173                 switch (flags) {                        /* On page. */
00174                 case DB_KEYFIRST:
00175                 case DB_KEYLAST:
00176                 case DB_NODUPDATA:
00177                         if (dbp->dup_compare != NULL) {
00178                                 __ham_dsearch(dbc,
00179                                     nval, &tmp_val.doff, &cmp, flags);
00180 
00181                                 /* dup dups are not supported w/ sorted dups */
00182                                 if (cmp == 0)
00183                                         return (__db_duperr(dbp, flags));
00184                         } else {
00185                                 hcp->dup_tlen = LEN_HDATA(dbp, hcp->page,
00186                                     dbp->pgsize, hcp->indx);
00187                                 hcp->dup_len = nval->size;
00188                                 F_SET(hcp, H_ISDUP);
00189                                 if (flags == DB_KEYFIRST)
00190                                         hcp->dup_off = tmp_val.doff = 0;
00191                                 else
00192                                         hcp->dup_off =
00193                                             tmp_val.doff = hcp->dup_tlen;
00194                         }
00195                         break;
00196                 case DB_BEFORE:
00197                         tmp_val.doff = hcp->dup_off;
00198                         break;
00199                 case DB_AFTER:
00200                         tmp_val.doff = hcp->dup_off + DUP_SIZE(hcp->dup_len);
00201                         break;
00202                 default:
00203                         DB_ASSERT(0);
00204                         return (EINVAL);
00205                 }
00206 
00207                 /* Add the duplicate. */
00208                 ret = __ham_replpair(dbc, &tmp_val, 0);
00209                 if (ret == 0)
00210                         ret = __memp_fset(mpf, hcp->page, DB_MPOOL_DIRTY);
00211                 if (ret != 0)
00212                         return (ret);
00213 
00214                 /* Now, update the cursor if necessary. */
00215                 switch (flags) {
00216                 case DB_AFTER:
00217                         hcp->dup_off += DUP_SIZE(hcp->dup_len);
00218                         hcp->dup_len = nval->size;
00219                         hcp->dup_tlen += (db_indx_t)DUP_SIZE(nval->size);
00220                         break;
00221                 case DB_BEFORE:
00222                 case DB_KEYFIRST:
00223                 case DB_KEYLAST:
00224                 case DB_NODUPDATA:
00225                         hcp->dup_tlen += (db_indx_t)DUP_SIZE(nval->size);
00226                         hcp->dup_len = nval->size;
00227                         break;
00228                 default:
00229                         DB_ASSERT(0);
00230                         return (EINVAL);
00231                 }
00232                 ret = __ham_c_update(dbc, tmp_val.size, 1, 1);
00233                 return (ret);
00234         }
00235 
00236         /*
00237          * If we get here, then we're on duplicate pages; set pgnop and
00238          * return so the common code can handle it.
00239          */
00240         memcpy(pgnop, HOFFDUP_PGNO(H_PAIRDATA(dbp, hcp->page, hcp->indx)),
00241             sizeof(db_pgno_t));
00242 
00243         return (ret);
00244 }
00245 
00246 /*
00247  * Convert an on-page set of duplicates to an offpage set of duplicates.
00248  *
00249  * PUBLIC: int __ham_dup_convert __P((DBC *));
00250  */
00251 int
00252 __ham_dup_convert(dbc)
00253         DBC *dbc;
00254 {
00255         BOVERFLOW bo;
00256         DB *dbp;
00257         DBC **hcs;
00258         DBT dbt;
00259         DB_LSN lsn;
00260         DB_MPOOLFILE *mpf;
00261         HASH_CURSOR *hcp;
00262         HOFFPAGE ho;
00263         PAGE *dp;
00264         db_indx_t i, len, off;
00265         int c, ret, t_ret;
00266         u_int8_t *p, *pend;
00267 
00268         dbp = dbc->dbp;
00269         mpf = dbp->mpf;
00270         hcp = (HASH_CURSOR *)dbc->internal;
00271 
00272         /*
00273          * Create a new page for the duplicates.
00274          */
00275         if ((ret = __db_new(dbc,
00276             dbp->dup_compare == NULL ? P_LRECNO : P_LDUP, &dp)) != 0)
00277                 return (ret);
00278         P_INIT(dp, dbp->pgsize,
00279             dp->pgno, PGNO_INVALID, PGNO_INVALID, LEAFLEVEL, TYPE(dp));
00280 
00281         /*
00282          * Get the list of cursors that may need to be updated.
00283          */
00284         if ((ret = __ham_get_clist(dbp,
00285             PGNO(hcp->page), (u_int32_t)hcp->indx, &hcs)) != 0)
00286                 goto err;
00287 
00288         /*
00289          * Now put the duplicates onto the new page.
00290          */
00291         dbt.flags = 0;
00292         switch (HPAGE_PTYPE(H_PAIRDATA(dbp, hcp->page, hcp->indx))) {
00293         case H_KEYDATA:
00294                 /* Simple case, one key on page; move it to dup page. */
00295                 dbt.size = LEN_HDATA(dbp, hcp->page, dbp->pgsize, hcp->indx);
00296                 dbt.data = HKEYDATA_DATA(H_PAIRDATA(dbp, hcp->page, hcp->indx));
00297                 ret = __db_pitem(dbc,
00298                     dp, 0, BKEYDATA_SIZE(dbt.size), NULL, &dbt);
00299                 goto finish;
00300         case H_OFFPAGE:
00301                 /* Simple case, one key on page; move it to dup page. */
00302                 memcpy(&ho, P_ENTRY(dbp, hcp->page, H_DATAINDEX(hcp->indx)),
00303                     HOFFPAGE_SIZE);
00304                 UMRW_SET(bo.unused1);
00305                 B_TSET(bo.type, ho.type, 0);
00306                 UMRW_SET(bo.unused2);
00307                 bo.pgno = ho.pgno;
00308                 bo.tlen = ho.tlen;
00309                 dbt.size = BOVERFLOW_SIZE;
00310                 dbt.data = &bo;
00311 
00312                 ret = __db_pitem(dbc, dp, 0, dbt.size, &dbt, NULL);
00313 finish:         if (ret == 0) {
00314                         if ((ret = __memp_fset(mpf, dp, DB_MPOOL_DIRTY)) != 0)
00315                                 break;
00316 
00317                         /* Update any other cursors. */
00318                         if (hcs != NULL && DBC_LOGGING(dbc) &&
00319                             IS_SUBTRANSACTION(dbc->txn)) {
00320                                 if ((ret = __ham_chgpg_log(dbp, dbc->txn,
00321                                     &lsn, 0, DB_HAM_DUP, PGNO(hcp->page),
00322                                     PGNO(dp), hcp->indx, 0)) != 0)
00323                                         break;
00324                         }
00325                         for (c = 0; hcs != NULL && hcs[c] != NULL; c++)
00326                                 if ((ret = __ham_dcursor(hcs[c],
00327                                     PGNO(dp), 0)) != 0)
00328                                         break;
00329                 }
00330                 break;
00331         case H_DUPLICATE:
00332                 p = HKEYDATA_DATA(H_PAIRDATA(dbp, hcp->page, hcp->indx));
00333                 pend = p +
00334                     LEN_HDATA(dbp, hcp->page, dbp->pgsize, hcp->indx);
00335 
00336                 /*
00337                  * We need to maintain the duplicate cursor position.
00338                  * Keep track of where we are in the duplicate set via
00339                  * the offset, and when it matches the one in the cursor,
00340                  * set the off-page duplicate cursor index to the current
00341                  * index.
00342                  */
00343                 for (off = 0, i = 0; p < pend; i++) {
00344                         memcpy(&len, p, sizeof(db_indx_t));
00345                         dbt.size = len;
00346                         p += sizeof(db_indx_t);
00347                         dbt.data = p;
00348                         p += len + sizeof(db_indx_t);
00349                         if ((ret = __db_pitem(dbc, dp,
00350                             i, BKEYDATA_SIZE(dbt.size), NULL, &dbt)) != 0)
00351                                 break;
00352 
00353                         /* Update any other cursors */
00354                         if (hcs != NULL && DBC_LOGGING(dbc) &&
00355                             IS_SUBTRANSACTION(dbc->txn)) {
00356                                 if ((ret = __ham_chgpg_log(dbp, dbc->txn,
00357                                     &lsn, 0, DB_HAM_DUP, PGNO(hcp->page),
00358                                     PGNO(dp), hcp->indx, i)) != 0)
00359                                         break;
00360                         }
00361                         for (c = 0; hcs != NULL && hcs[c] != NULL; c++)
00362                                 if (((HASH_CURSOR *)(hcs[c]->internal))->dup_off
00363                                     == off && (ret = __ham_dcursor(hcs[c],
00364                                     PGNO(dp), i)) != 0)
00365                                         goto err;
00366                         off += len + 2 * sizeof(db_indx_t);
00367                 }
00368                 break;
00369         default:
00370                 ret = __db_pgfmt(dbp->dbenv, hcp->pgno);
00371                 break;
00372         }
00373 
00374         /*
00375          * Now attach this to the source page in place of the old duplicate
00376          * item.
00377          */
00378         if (ret == 0)
00379                 ret = __ham_move_offpage(dbc, hcp->page,
00380                     (u_int32_t)H_DATAINDEX(hcp->indx), PGNO(dp));
00381 
00382 err:    if (ret == 0)
00383                 ret = __memp_fset(mpf, hcp->page, DB_MPOOL_DIRTY);
00384 
00385         if ((t_ret = __memp_fput(
00386             mpf, dp, ret == 0 ? DB_MPOOL_DIRTY : 0)) != 0 && ret == 0)
00387                 ret = t_ret;
00388 
00389         if (ret == 0)
00390                 hcp->dup_tlen = hcp->dup_off = hcp->dup_len = 0;
00391 
00392         if (hcs != NULL)
00393                 __os_free(dbp->dbenv, hcs);
00394 
00395         return (ret);
00396 }
00397 
00398 /*
00399  * __ham_make_dup
00400  *
00401  * Take a regular dbt and make it into a duplicate item with all the partial
00402  * information set appropriately. If the incoming dbt is a partial, assume
00403  * we are creating a new entry and make sure that we do any initial padding.
00404  *
00405  * PUBLIC: int __ham_make_dup __P((DB_ENV *,
00406  * PUBLIC:     const DBT *, DBT *d, void **, u_int32_t *));
00407  */
00408 int
00409 __ham_make_dup(dbenv, notdup, duplicate, bufp, sizep)
00410         DB_ENV *dbenv;
00411         const DBT *notdup;
00412         DBT *duplicate;
00413         void **bufp;
00414         u_int32_t *sizep;
00415 {
00416         db_indx_t tsize, item_size;
00417         int ret;
00418         u_int8_t *p;
00419 
00420         item_size = (db_indx_t)notdup->size;
00421         if (F_ISSET(notdup, DB_DBT_PARTIAL))
00422                 item_size += notdup->doff;
00423 
00424         tsize = DUP_SIZE(item_size);
00425         if ((ret = __ham_init_dbt(dbenv, duplicate, tsize, bufp, sizep)) != 0)
00426                 return (ret);
00427 
00428         duplicate->dlen = 0;
00429         duplicate->flags = notdup->flags;
00430         F_SET(duplicate, DB_DBT_PARTIAL);
00431 
00432         p = duplicate->data;
00433         memcpy(p, &item_size, sizeof(db_indx_t));
00434         p += sizeof(db_indx_t);
00435         if (F_ISSET(notdup, DB_DBT_PARTIAL)) {
00436                 memset(p, 0, notdup->doff);
00437                 p += notdup->doff;
00438         }
00439         memcpy(p, notdup->data, notdup->size);
00440         p += notdup->size;
00441         memcpy(p, &item_size, sizeof(db_indx_t));
00442 
00443         duplicate->doff = 0;
00444         duplicate->dlen = notdup->size;
00445 
00446         return (0);
00447 }
00448 
00449 /*
00450  * __ham_check_move --
00451  *
00452  * Check if we can do whatever we need to on this page.  If not,
00453  * then we'll have to move the current element to a new page.
00454  */
00455 static int
00456 __ham_check_move(dbc, add_len)
00457         DBC *dbc;
00458         u_int32_t add_len;
00459 {
00460         DB *dbp;
00461         DBT k, d;
00462         DB_LSN new_lsn;
00463         DB_MPOOLFILE *mpf;
00464         HASH_CURSOR *hcp;
00465         PAGE *next_pagep;
00466         db_pgno_t next_pgno;
00467         u_int32_t new_datalen, old_len, rectype;
00468         u_int8_t *hk;
00469         int ret;
00470 
00471         dbp = dbc->dbp;
00472         mpf = dbp->mpf;
00473         hcp = (HASH_CURSOR *)dbc->internal;
00474 
00475         hk = H_PAIRDATA(dbp, hcp->page, hcp->indx);
00476 
00477         /*
00478          * If the item is already off page duplicates or an offpage item,
00479          * then we know we can do whatever we need to do in-place
00480          */
00481         if (HPAGE_PTYPE(hk) == H_OFFDUP || HPAGE_PTYPE(hk) == H_OFFPAGE)
00482                 return (0);
00483 
00484         old_len =
00485             LEN_HITEM(dbp, hcp->page, dbp->pgsize, H_DATAINDEX(hcp->indx));
00486         new_datalen = (old_len - HKEYDATA_SIZE(0)) + add_len;
00487         if (HPAGE_PTYPE(hk) != H_DUPLICATE)
00488                 new_datalen += DUP_SIZE(0);
00489 
00490         /*
00491          * We need to add a new page under two conditions:
00492          * 1. The addition makes the total data length cross the BIG
00493          *    threshold and the OFFDUP structure won't fit on this page.
00494          * 2. The addition does not make the total data cross the
00495          *    threshold, but the new data won't fit on the page.
00496          * If neither of these is true, then we can return.
00497          */
00498         if (ISBIG(hcp, new_datalen) && (old_len > HOFFDUP_SIZE ||
00499             HOFFDUP_SIZE - old_len <= P_FREESPACE(dbp, hcp->page)))
00500                 return (0);
00501 
00502         if (!ISBIG(hcp, new_datalen) &&
00503             (new_datalen - old_len) <= P_FREESPACE(dbp, hcp->page))
00504                 return (0);
00505 
00506         /*
00507          * If we get here, then we need to move the item to a new page.
00508          * Check if there are more pages in the chain.  We now need to
00509          * update new_datalen to include the size of both the key and
00510          * the data that we need to move.
00511          */
00512 
00513         new_datalen = ISBIG(hcp, new_datalen) ?
00514             HOFFDUP_SIZE : HKEYDATA_SIZE(new_datalen);
00515         new_datalen +=
00516             LEN_HITEM(dbp, hcp->page, dbp->pgsize, H_KEYINDEX(hcp->indx));
00517 
00518         next_pagep = NULL;
00519         for (next_pgno = NEXT_PGNO(hcp->page); next_pgno != PGNO_INVALID;
00520             next_pgno = NEXT_PGNO(next_pagep)) {
00521                 if (next_pagep != NULL &&
00522                     (ret = __memp_fput(mpf, next_pagep, 0)) != 0)
00523                         return (ret);
00524 
00525                 if ((ret = __memp_fget(mpf,
00526                     &next_pgno, DB_MPOOL_CREATE, &next_pagep)) != 0)
00527                         return (ret);
00528 
00529                 if (P_FREESPACE(dbp, next_pagep) >= new_datalen)
00530                         break;
00531         }
00532 
00533         /* No more pages, add one. */
00534         if (next_pagep == NULL && (ret = __ham_add_ovflpage(dbc,
00535             hcp->page, 0, &next_pagep)) != 0)
00536                 return (ret);
00537 
00538         /* Add new page at the end of the chain. */
00539         if (P_FREESPACE(dbp, next_pagep) < new_datalen && (ret =
00540             __ham_add_ovflpage(dbc, next_pagep, 1, &next_pagep)) != 0) {
00541                 (void)__memp_fput(mpf, next_pagep, 0);
00542                 return (ret);
00543         }
00544 
00545         /* Copy the item to the new page. */
00546         if (DBC_LOGGING(dbc)) {
00547                 rectype = PUTPAIR;
00548                 k.flags = 0;
00549                 d.flags = 0;
00550                 if (HPAGE_PTYPE(
00551                     H_PAIRKEY(dbp, hcp->page, hcp->indx)) == H_OFFPAGE) {
00552                         rectype |= PAIR_KEYMASK;
00553                         k.data = H_PAIRKEY(dbp, hcp->page, hcp->indx);
00554                         k.size = HOFFPAGE_SIZE;
00555                 } else {
00556                         k.data =
00557                             HKEYDATA_DATA(H_PAIRKEY(dbp, hcp->page, hcp->indx));
00558                         k.size =
00559                             LEN_HKEY(dbp, hcp->page, dbp->pgsize, hcp->indx);
00560                 }
00561 
00562                 if (HPAGE_PTYPE(hk) == H_OFFPAGE) {
00563                         rectype |= PAIR_DATAMASK;
00564                         d.data = H_PAIRDATA(dbp, hcp->page, hcp->indx);
00565                         d.size = HOFFPAGE_SIZE;
00566                 } else {
00567                         if (HPAGE_PTYPE(H_PAIRDATA(dbp,
00568                             hcp->page, hcp->indx)) == H_DUPLICATE)
00569                                 rectype |= PAIR_DUPMASK;
00570                         d.data = HKEYDATA_DATA(
00571                             H_PAIRDATA(dbp, hcp->page, hcp->indx));
00572                         d.size = LEN_HDATA(dbp, hcp->page,
00573                             dbp->pgsize, hcp->indx);
00574                 }
00575 
00576                 if ((ret = __ham_insdel_log(dbp,
00577                     dbc->txn, &new_lsn, 0, rectype, PGNO(next_pagep),
00578                     (u_int32_t)NUM_ENT(next_pagep), &LSN(next_pagep),
00579                     &k, &d)) != 0) {
00580                         (void)__memp_fput(mpf, next_pagep, 0);
00581                         return (ret);
00582                 }
00583         } else
00584                 LSN_NOT_LOGGED(new_lsn);
00585 
00586         /* Move lsn onto page. */
00587         LSN(next_pagep) = new_lsn;      /* Structure assignment. */
00588 
00589         __ham_copy_item(dbp, hcp->page, H_KEYINDEX(hcp->indx), next_pagep);
00590         __ham_copy_item(dbp, hcp->page, H_DATAINDEX(hcp->indx), next_pagep);
00591 
00592         /*
00593          * We've just manually inserted a key and set of data onto
00594          * next_pagep;  however, it's possible that our caller will
00595          * return without further modifying the new page, for instance
00596          * if DB_NODUPDATA is set and our new item is a duplicate duplicate.
00597          * Thus, to be on the safe side, we need to mark the page dirty
00598          * here. [#2996]
00599          *
00600          * Note that __ham_del_pair should dirty the page we're moving
00601          * the items from, so we need only dirty the new page ourselves.
00602          */
00603         if ((ret = __memp_fset(mpf, next_pagep, DB_MPOOL_DIRTY)) != 0)
00604                 goto out;
00605 
00606         /* Update all cursors that used to point to this item. */
00607         if ((ret = __ham_c_chgpg(dbc, PGNO(hcp->page), H_KEYINDEX(hcp->indx),
00608             PGNO(next_pagep), NUM_ENT(next_pagep) - 2)) != 0)
00609                 goto out;
00610 
00611         /* Now delete the pair from the current page. */
00612         ret = __ham_del_pair(dbc, 0);
00613 
00614         /*
00615          * __ham_del_pair decremented nelem.  This is incorrect;  we
00616          * manually copied the element elsewhere, so the total number
00617          * of elements hasn't changed.  Increment it again.
00618          *
00619          * !!!
00620          * Note that we still have the metadata page pinned, and
00621          * __ham_del_pair dirtied it, so we don't need to set the dirty
00622          * flag again.
00623          */
00624         if (!STD_LOCKING(dbc))
00625                 hcp->hdr->nelem++;
00626 
00627 out:
00628         (void)__memp_fput(mpf, hcp->page, DB_MPOOL_DIRTY);
00629         hcp->page = next_pagep;
00630         hcp->pgno = PGNO(hcp->page);
00631         hcp->indx = NUM_ENT(hcp->page) - 2;
00632         F_SET(hcp, H_EXPAND);
00633         F_CLR(hcp, H_DELETED);
00634 
00635         return (ret);
00636 }
00637 
00638 /*
00639  * __ham_move_offpage --
00640  *      Replace an onpage set of duplicates with the OFFDUP structure
00641  *      that references the duplicate page.
00642  *
00643  * XXX
00644  * This is really just a special case of __onpage_replace; we should
00645  * probably combine them.
00646  *
00647  */
00648 static int
00649 __ham_move_offpage(dbc, pagep, ndx, pgno)
00650         DBC *dbc;
00651         PAGE *pagep;
00652         u_int32_t ndx;
00653         db_pgno_t pgno;
00654 {
00655         DB *dbp;
00656         DBT new_dbt;
00657         DBT old_dbt;
00658         HOFFDUP od;
00659         db_indx_t i, *inp;
00660         int32_t difflen;
00661         u_int8_t *src;
00662         int ret;
00663 
00664         dbp = dbc->dbp;
00665         od.type = H_OFFDUP;
00666         UMRW_SET(od.unused[0]);
00667         UMRW_SET(od.unused[1]);
00668         UMRW_SET(od.unused[2]);
00669         od.pgno = pgno;
00670         ret = 0;
00671 
00672         if (DBC_LOGGING(dbc)) {
00673                 new_dbt.data = &od;
00674                 new_dbt.size = HOFFDUP_SIZE;
00675                 old_dbt.data = P_ENTRY(dbp, pagep, ndx);
00676                 old_dbt.size = LEN_HITEM(dbp, pagep, dbp->pgsize, ndx);
00677                 if ((ret = __ham_replace_log(dbp, dbc->txn, &LSN(pagep), 0,
00678                     PGNO(pagep), (u_int32_t)ndx, &LSN(pagep), -1,
00679                     &old_dbt, &new_dbt, 0)) != 0)
00680                         return (ret);
00681         } else
00682                 LSN_NOT_LOGGED(LSN(pagep));
00683 
00684         /*
00685          * difflen is the difference in the lengths, and so may be negative.
00686          * We know that the difference between two unsigned lengths from a
00687          * database page will fit into an int32_t.
00688          */
00689         difflen =
00690             (int32_t)LEN_HITEM(dbp, pagep, dbp->pgsize, ndx) -
00691             (int32_t)HOFFDUP_SIZE;
00692         if (difflen != 0) {
00693                 /* Copy data. */
00694                 inp = P_INP(dbp, pagep);
00695                 src = (u_int8_t *)(pagep) + HOFFSET(pagep);
00696                 memmove(src + difflen, src, inp[ndx] - HOFFSET(pagep));
00697                 HOFFSET(pagep) += difflen;
00698 
00699                 /* Update index table. */
00700                 for (i = ndx; i < NUM_ENT(pagep); i++)
00701                         inp[i] += difflen;
00702         }
00703 
00704         /* Now copy the offdup entry onto the page. */
00705         memcpy(P_ENTRY(dbp, pagep, ndx), &od, HOFFDUP_SIZE);
00706         return (ret);
00707 }
00708 
00709 /*
00710  * __ham_dsearch:
00711  *      Locate a particular duplicate in a duplicate set.  Make sure that
00712  *      we exit with the cursor set appropriately.
00713  *
00714  * PUBLIC: void __ham_dsearch
00715  * PUBLIC:     __P((DBC *, DBT *, u_int32_t *, int *, u_int32_t));
00716  */
00717 void
00718 __ham_dsearch(dbc, dbt, offp, cmpp, flags)
00719         DBC *dbc;
00720         DBT *dbt;
00721         u_int32_t *offp, flags;
00722         int *cmpp;
00723 {
00724         DB *dbp;
00725         HASH_CURSOR *hcp;
00726         DBT cur;
00727         db_indx_t i, len;
00728         int (*func) __P((DB *, const DBT *, const DBT *));
00729         u_int8_t *data;
00730 
00731         dbp = dbc->dbp;
00732         hcp = (HASH_CURSOR *)dbc->internal;
00733         func = dbp->dup_compare == NULL ? __bam_defcmp : dbp->dup_compare;
00734 
00735         i = F_ISSET(hcp, H_CONTINUE) ? hcp->dup_off: 0;
00736         data = HKEYDATA_DATA(H_PAIRDATA(dbp, hcp->page, hcp->indx)) + i;
00737         hcp->dup_tlen = LEN_HDATA(dbp, hcp->page, dbp->pgsize, hcp->indx);
00738         len = hcp->dup_len;
00739         while (i < hcp->dup_tlen) {
00740                 memcpy(&len, data, sizeof(db_indx_t));
00741                 data += sizeof(db_indx_t);
00742                 cur.data = data;
00743                 cur.size = (u_int32_t)len;
00744 
00745                 /*
00746                  * If we find an exact match, we're done.  If in a sorted
00747                  * duplicate set and the item is larger than our test item,
00748                  * we're done.  In the latter case, if permitting partial
00749                  * matches, it's not a failure.
00750                  */
00751                 *cmpp = func(dbp, dbt, &cur);
00752                 if (*cmpp == 0)
00753                         break;
00754                 if (*cmpp < 0 && dbp->dup_compare != NULL) {
00755                         if (flags == DB_GET_BOTH_RANGE)
00756                                 *cmpp = 0;
00757                         break;
00758                 }
00759 
00760                 i += len + 2 * sizeof(db_indx_t);
00761                 data += len + sizeof(db_indx_t);
00762         }
00763 
00764         *offp = i;
00765         hcp->dup_off = i;
00766         hcp->dup_len = len;
00767         F_SET(hcp, H_ISDUP);
00768 }
00769 
00770 /*
00771  * __ham_dcursor --
00772  *
00773  *      Create an off page duplicate cursor for this cursor.
00774  */
00775 static int
00776 __ham_dcursor(dbc, pgno, indx)
00777         DBC *dbc;
00778         db_pgno_t pgno;
00779         u_int32_t indx;
00780 {
00781         DB *dbp;
00782         HASH_CURSOR *hcp;
00783         BTREE_CURSOR *dcp;
00784         int ret;
00785 
00786         dbp = dbc->dbp;
00787         hcp = (HASH_CURSOR *)dbc->internal;
00788 
00789         if ((ret = __db_c_newopd(dbc, pgno, hcp->opd, &hcp->opd)) != 0)
00790                 return (ret);
00791 
00792         dcp = (BTREE_CURSOR *)hcp->opd->internal;
00793         dcp->pgno = pgno;
00794         dcp->indx = indx;
00795 
00796         if (dbp->dup_compare == NULL) {
00797                 /*
00798                  * Converting to off-page Recno trees is tricky.  The
00799                  * record number for the cursor is the index + 1 (to
00800                  * convert to 1-based record numbers).
00801                  */
00802                 dcp->recno = indx + 1;
00803         }
00804 
00805         /*
00806          * Transfer the deleted flag from the top-level cursor to the
00807          * created one.
00808          */
00809         if (F_ISSET(hcp, H_DELETED)) {
00810                 F_SET(dcp, C_DELETED);
00811                 F_CLR(hcp, H_DELETED);
00812         }
00813 
00814         return (0);
00815 }
00816 
00817 /*
00818  * __ham_c_chgpg --
00819  *      Adjust the cursors after moving an item to a new page.  We only
00820  *      move cursors that are pointing at this one item and are not
00821  *      deleted;  since we only touch non-deleted cursors, and since
00822  *      (by definition) no item existed at the pgno/indx we're moving the
00823  *      item to, we're guaranteed that all the cursors we affect here or
00824  *      on abort really do refer to this one item.
00825  */
00826 static int
00827 __ham_c_chgpg(dbc, old_pgno, old_index, new_pgno, new_index)
00828         DBC *dbc;
00829         db_pgno_t old_pgno, new_pgno;
00830         u_int32_t old_index, new_index;
00831 {
00832         DB *dbp, *ldbp;
00833         DB_ENV *dbenv;
00834         DB_LSN lsn;
00835         DB_TXN *my_txn;
00836         DBC *cp;
00837         HASH_CURSOR *hcp;
00838         int found, ret;
00839 
00840         dbp = dbc->dbp;
00841         dbenv = dbp->dbenv;
00842 
00843         my_txn = IS_SUBTRANSACTION(dbc->txn) ? dbc->txn : NULL;
00844         found = 0;
00845 
00846         MUTEX_LOCK(dbenv, dbenv->mtx_dblist);
00847         for (ldbp = __dblist_get(dbenv, dbp->adj_fileid);
00848             ldbp != NULL && ldbp->adj_fileid == dbp->adj_fileid;
00849             ldbp = LIST_NEXT(ldbp, dblistlinks)) {
00850                 MUTEX_LOCK(dbenv, dbp->mutex);
00851                 for (cp = TAILQ_FIRST(&ldbp->active_queue); cp != NULL;
00852                     cp = TAILQ_NEXT(cp, links)) {
00853                         if (cp == dbc || cp->dbtype != DB_HASH)
00854                                 continue;
00855 
00856                         hcp = (HASH_CURSOR *)cp->internal;
00857 
00858                         /*
00859                          * If a cursor is deleted, it doesn't refer to this
00860                          * item--it just happens to have the same indx, but
00861                          * it points to a former neighbor.  Don't move it.
00862                          */
00863                         if (F_ISSET(hcp, H_DELETED))
00864                                 continue;
00865 
00866                         if (hcp->pgno == old_pgno) {
00867                                 if (hcp->indx == old_index) {
00868                                         hcp->pgno = new_pgno;
00869                                         hcp->indx = new_index;
00870                                 } else
00871                                         continue;
00872                                 if (my_txn != NULL && cp->txn != my_txn)
00873                                         found = 1;
00874                         }
00875                 }
00876                 MUTEX_UNLOCK(dbenv, dbp->mutex);
00877         }
00878         MUTEX_UNLOCK(dbenv, dbenv->mtx_dblist);
00879 
00880         if (found != 0 && DBC_LOGGING(dbc)) {
00881                 if ((ret = __ham_chgpg_log(dbp, my_txn, &lsn, 0, DB_HAM_CHGPG,
00882                     old_pgno, new_pgno, old_index, new_index)) != 0)
00883                         return (ret);
00884         }
00885         return (0);
00886 }

Generated on Sun Dec 25 12:14:29 2005 for Berkeley DB 4.4.16 by  doxygen 1.4.2