Main Page | Class Hierarchy | Data Structures | Directories | File List | Data Fields | Related Pages

bt_split.c

00001 /*-
00002  * See the file LICENSE for redistribution information.
00003  *
00004  * Copyright (c) 1996-2005
00005  *      Sleepycat Software.  All rights reserved.
00006  */
00007 /*
00008  * Copyright (c) 1990, 1993, 1994, 1995, 1996
00009  *      Keith Bostic.  All rights reserved.
00010  */
00011 /*
00012  * Copyright (c) 1990, 1993, 1994, 1995
00013  *      The Regents of the University of California.  All rights reserved.
00014  *
00015  * Redistribution and use in source and binary forms, with or without
00016  * modification, are permitted provided that the following conditions
00017  * are met:
00018  * 1. Redistributions of source code must retain the above copyright
00019  *    notice, this list of conditions and the following disclaimer.
00020  * 2. Redistributions in binary form must reproduce the above copyright
00021  *    notice, this list of conditions and the following disclaimer in the
00022  *    documentation and/or other materials provided with the distribution.
00023  * 3. Neither the name of the University nor the names of its contributors
00024  *    may be used to endorse or promote products derived from this software
00025  *    without specific prior written permission.
00026  *
00027  * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
00028  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00029  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00030  * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
00031  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
00032  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
00033  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
00034  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00035  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
00036  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
00037  * SUCH DAMAGE.
00038  *
00039  * $Id: bt_split.c,v 12.4 2005/06/16 20:20:22 bostic Exp $
00040  */
00041 
00042 #include "db_config.h"
00043 
00044 #ifndef NO_SYSTEM_INCLUDES
00045 #include <sys/types.h>
00046 
00047 #include <string.h>
00048 #endif
00049 
00050 #include "db_int.h"
00051 #include "dbinc/db_page.h"
00052 #include "dbinc/db_shash.h"
00053 #include "dbinc/lock.h"
00054 #include "dbinc/mp.h"
00055 #include "dbinc/btree.h"
00056 
00057 static int __bam_broot __P((DBC *, PAGE *, PAGE *, PAGE *));
00058 static int __bam_page __P((DBC *, EPG *, EPG *));
00059 static int __bam_psplit __P((DBC *, EPG *, PAGE *, PAGE *, db_indx_t *));
00060 static int __bam_root __P((DBC *, EPG *));
00061 static int __ram_root __P((DBC *, PAGE *, PAGE *, PAGE *));
00062 
00063 /*
00064  * __bam_split --
00065  *      Split a page.
00066  *
00067  * PUBLIC: int __bam_split __P((DBC *, void *, db_pgno_t *));
00068  */
00069 int
00070 __bam_split(dbc, arg, root_pgnop)
00071         DBC *dbc;
00072         void *arg;
00073         db_pgno_t *root_pgnop;
00074 {
00075         BTREE_CURSOR *cp;
00076         enum { UP, DOWN } dir;
00077         db_pgno_t root_pgno;
00078         int exact, level, ret;
00079 
00080         cp = (BTREE_CURSOR *)dbc->internal;
00081         root_pgno = cp->root;
00082 
00083         /*
00084          * The locking protocol we use to avoid deadlock to acquire locks by
00085          * walking down the tree, but we do it as lazily as possible, locking
00086          * the root only as a last resort.  We expect all stack pages to have
00087          * been discarded before we're called; we discard all short-term locks.
00088          *
00089          * When __bam_split is first called, we know that a leaf page was too
00090          * full for an insert.  We don't know what leaf page it was, but we
00091          * have the key/recno that caused the problem.  We call XX_search to
00092          * reacquire the leaf page, but this time get both the leaf page and
00093          * its parent, locked.  We then split the leaf page and see if the new
00094          * internal key will fit into the parent page.  If it will, we're done.
00095          *
00096          * If it won't, we discard our current locks and repeat the process,
00097          * only this time acquiring the parent page and its parent, locked.
00098          * This process repeats until we succeed in the split, splitting the
00099          * root page as the final resort.  The entire process then repeats,
00100          * as necessary, until we split a leaf page.
00101          *
00102          * XXX
00103          * A traditional method of speeding this up is to maintain a stack of
00104          * the pages traversed in the original search.  You can detect if the
00105          * stack is correct by storing the page's LSN when it was searched and
00106          * comparing that LSN with the current one when it's locked during the
00107          * split.  This would be an easy change for this code, but I have no
00108          * numbers that indicate it's worthwhile.
00109          */
00110         for (dir = UP, level = LEAFLEVEL;; dir == UP ? ++level : --level) {
00111                 /*
00112                  * Acquire a page and its parent, locked.
00113                  */
00114                 if ((ret = (dbc->dbtype == DB_BTREE ?
00115                     __bam_search(dbc, PGNO_INVALID,
00116                         arg, S_WRPAIR, level, NULL, &exact) :
00117                     __bam_rsearch(dbc,
00118                         (db_recno_t *)arg, S_WRPAIR, level, &exact))) != 0)
00119                         break;
00120 
00121                 if (root_pgnop != NULL)
00122                         *root_pgnop = cp->csp[0].page->pgno == root_pgno ?
00123                             root_pgno : cp->csp[-1].page->pgno;
00124                 /*
00125                  * Split the page if it still needs it (it's possible another
00126                  * thread of control has already split the page).  If we are
00127                  * guaranteed that two items will fit on the page, the split
00128                  * is no longer necessary.
00129                  */
00130                 if (2 * B_MAXSIZEONPAGE(cp->ovflsize)
00131                     <= (db_indx_t)P_FREESPACE(dbc->dbp, cp->csp[0].page)) {
00132                         __bam_stkrel(dbc, STK_NOLOCK);
00133                         break;
00134                 }
00135                 ret = cp->csp[0].page->pgno == root_pgno ?
00136                     __bam_root(dbc, &cp->csp[0]) :
00137                     __bam_page(dbc, &cp->csp[-1], &cp->csp[0]);
00138                 BT_STK_CLR(cp);
00139 
00140                 switch (ret) {
00141                 case 0:
00142                         /* Once we've split the leaf page, we're done. */
00143                         if (level == LEAFLEVEL)
00144                                 return (0);
00145 
00146                         /* Switch directions. */
00147                         if (dir == UP)
00148                                 dir = DOWN;
00149                         break;
00150                 case DB_NEEDSPLIT:
00151                         /*
00152                          * It's possible to fail to split repeatedly, as other
00153                          * threads may be modifying the tree, or the page usage
00154                          * is sufficiently bad that we don't get enough space
00155                          * the first time.
00156                          */
00157                         if (dir == DOWN)
00158                                 dir = UP;
00159                         break;
00160                 default:
00161                         goto err;
00162                 }
00163         }
00164 
00165 err:    if (root_pgnop != NULL)
00166                 *root_pgnop = cp->root;
00167         return (ret);
00168 }
00169 
00170 /*
00171  * __bam_root --
00172  *      Split the root page of a btree.
00173  */
00174 static int
00175 __bam_root(dbc, cp)
00176         DBC *dbc;
00177         EPG *cp;
00178 {
00179         DB *dbp;
00180         DBT log_dbt;
00181         DB_LSN log_lsn;
00182         DB_MPOOLFILE *mpf;
00183         PAGE *lp, *rp;
00184         db_indx_t split;
00185         u_int32_t opflags;
00186         int ret, t_ret;
00187 
00188         dbp = dbc->dbp;
00189         mpf = dbp->mpf;
00190         lp = rp = NULL;
00191 
00192         /* Yeah, right. */
00193         if (cp->page->level >= MAXBTREELEVEL) {
00194                 __db_err(dbp->dbenv,
00195                     "Too many btree levels: %d", cp->page->level);
00196                 ret = ENOSPC;
00197                 goto err;
00198         }
00199 
00200         /* Create new left and right pages for the split. */
00201         if ((ret = __db_new(dbc, TYPE(cp->page), &lp)) != 0 ||
00202             (ret = __db_new(dbc, TYPE(cp->page), &rp)) != 0)
00203                 goto err;
00204         P_INIT(lp, dbp->pgsize, lp->pgno,
00205             PGNO_INVALID, ISINTERNAL(cp->page) ? PGNO_INVALID : rp->pgno,
00206             cp->page->level, TYPE(cp->page));
00207         P_INIT(rp, dbp->pgsize, rp->pgno,
00208             ISINTERNAL(cp->page) ?  PGNO_INVALID : lp->pgno, PGNO_INVALID,
00209             cp->page->level, TYPE(cp->page));
00210 
00211         /* Split the page. */
00212         if ((ret = __bam_psplit(dbc, cp, lp, rp, &split)) != 0)
00213                 goto err;
00214 
00215         /* Log the change. */
00216         if (DBC_LOGGING(dbc)) {
00217                 memset(&log_dbt, 0, sizeof(log_dbt));
00218                 log_dbt.data = cp->page;
00219                 log_dbt.size = dbp->pgsize;
00220                 ZERO_LSN(log_lsn);
00221                 opflags = F_ISSET(
00222                     (BTREE_CURSOR *)dbc->internal, C_RECNUM) ? SPL_NRECS : 0;
00223                 if ((ret = __bam_split_log(dbp,
00224                     dbc->txn, &LSN(cp->page), 0, PGNO(lp), &LSN(lp), PGNO(rp),
00225                     &LSN(rp), (u_int32_t)NUM_ENT(lp), 0, &log_lsn,
00226                     dbc->internal->root, &log_dbt, opflags)) != 0)
00227                         goto err;
00228         } else
00229                 LSN_NOT_LOGGED(LSN(cp->page));
00230         LSN(lp) = LSN(cp->page);
00231         LSN(rp) = LSN(cp->page);
00232 
00233         /* Clean up the new root page. */
00234         if ((ret = (dbc->dbtype == DB_RECNO ?
00235             __ram_root(dbc, cp->page, lp, rp) :
00236             __bam_broot(dbc, cp->page, lp, rp))) != 0)
00237                 goto err;
00238 
00239         /* Adjust any cursors. */
00240         ret = __bam_ca_split(dbc, cp->page->pgno, lp->pgno, rp->pgno, split, 1);
00241 
00242         /* Success or error: release pages and locks. */
00243 err:    if ((t_ret =
00244             __memp_fput(mpf, cp->page, DB_MPOOL_DIRTY)) != 0 && ret == 0)
00245                 ret = t_ret;
00246         if ((t_ret = __TLPUT(dbc, cp->lock)) != 0 && ret == 0)
00247                 ret = t_ret;
00248         if (lp != NULL &&
00249             (t_ret = __memp_fput(mpf, lp, DB_MPOOL_DIRTY)) != 0 && ret == 0)
00250                 ret = t_ret;
00251         if (rp != NULL &&
00252             (t_ret = __memp_fput(mpf, rp, DB_MPOOL_DIRTY)) != 0 && ret == 0)
00253                 ret = t_ret;
00254 
00255         return (ret);
00256 }
00257 
00258 /*
00259  * __bam_page --
00260  *      Split the non-root page of a btree.
00261  */
00262 static int
00263 __bam_page(dbc, pp, cp)
00264         DBC *dbc;
00265         EPG *pp, *cp;
00266 {
00267         BTREE_CURSOR *bc;
00268         DBT log_dbt;
00269         DB_LSN log_lsn;
00270         DB *dbp;
00271         DB_LOCK rplock, tplock;
00272         DB_MPOOLFILE *mpf;
00273         DB_LSN save_lsn;
00274         PAGE *lp, *rp, *alloc_rp, *tp;
00275         db_indx_t split;
00276         u_int32_t opflags;
00277         int ret, t_ret;
00278 
00279         dbp = dbc->dbp;
00280         mpf = dbp->mpf;
00281         alloc_rp = lp = rp = tp = NULL;
00282         LOCK_INIT(rplock);
00283         LOCK_INIT(tplock);
00284         ret = -1;
00285 
00286         /*
00287          * Create a new right page for the split, and fill in everything
00288          * except its LSN and page number.
00289          *
00290          * We malloc space for both the left and right pages, so we don't get
00291          * a new page from the underlying buffer pool until we know the split
00292          * is going to succeed.  The reason is that we can't release locks
00293          * acquired during the get-a-new-page process because metadata page
00294          * locks can't be discarded on failure since we may have modified the
00295          * free list.  So, if you assume that we're holding a write lock on the
00296          * leaf page which ran out of space and started this split (e.g., we
00297          * have already written records to the page, or we retrieved a record
00298          * from it with the DB_RMW flag set), failing in a split with both a
00299          * leaf page locked and the metadata page locked can potentially lock
00300          * up the tree badly, because we've violated the rule of always locking
00301          * down the tree, and never up.
00302          */
00303         if ((ret = __os_malloc(dbp->dbenv, dbp->pgsize, &rp)) != 0)
00304                 goto err;
00305         P_INIT(rp, dbp->pgsize, 0,
00306             ISINTERNAL(cp->page) ? PGNO_INVALID : PGNO(cp->page),
00307             ISINTERNAL(cp->page) ? PGNO_INVALID : NEXT_PGNO(cp->page),
00308             cp->page->level, TYPE(cp->page));
00309 
00310         /*
00311          * Create new left page for the split, and fill in everything
00312          * except its LSN and next-page page number.
00313          */
00314         if ((ret = __os_malloc(dbp->dbenv, dbp->pgsize, &lp)) != 0)
00315                 goto err;
00316         P_INIT(lp, dbp->pgsize, PGNO(cp->page),
00317             ISINTERNAL(cp->page) ?  PGNO_INVALID : PREV_PGNO(cp->page),
00318             ISINTERNAL(cp->page) ?  PGNO_INVALID : 0,
00319             cp->page->level, TYPE(cp->page));
00320 
00321         /*
00322          * Split right.
00323          *
00324          * Only the indices are sorted on the page, i.e., the key/data pairs
00325          * aren't, so it's simpler to copy the data from the split page onto
00326          * two new pages instead of copying half the data to a new right page
00327          * and compacting the left page in place.  Since the left page can't
00328          * change, we swap the original and the allocated left page after the
00329          * split.
00330          */
00331         if ((ret = __bam_psplit(dbc, cp, lp, rp, &split)) != 0)
00332                 goto err;
00333 
00334         /*
00335          * Test to see if we are going to be able to insert the new pages into
00336          * the parent page.  The interesting failure here is that the parent
00337          * page can't hold the new keys, and has to be split in turn, in which
00338          * case we want to release all the locks we can.
00339          */
00340         if ((ret = __bam_pinsert(dbc, pp, lp, rp, BPI_SPACEONLY)) != 0)
00341                 goto err;
00342 
00343         /*
00344          * Fix up the previous pointer of any leaf page following the split
00345          * page.
00346          *
00347          * There's interesting deadlock situations here as we try to write-lock
00348          * a page that's not in our direct ancestry.  Consider a cursor walking
00349          * backward through the leaf pages, that has our following page locked,
00350          * and is waiting on a lock for the page we're splitting.  In that case
00351          * we're going to deadlock here.  It's probably OK, stepping backward
00352          * through the tree isn't a common operation.
00353          */
00354         if (ISLEAF(cp->page) && NEXT_PGNO(cp->page) != PGNO_INVALID) {
00355                 if ((ret = __db_lget(dbc,
00356                     0, NEXT_PGNO(cp->page), DB_LOCK_WRITE, 0, &tplock)) != 0)
00357                         goto err;
00358                 if ((ret = __memp_fget(mpf, &NEXT_PGNO(cp->page), 0, &tp)) != 0)
00359                         goto err;
00360         }
00361 
00362         /*
00363          * We've got everything locked down we need, and we know the split
00364          * is going to succeed.  Go and get the additional page we'll need.
00365          */
00366         if ((ret = __db_new(dbc, TYPE(cp->page), &alloc_rp)) != 0)
00367                 goto err;
00368 
00369         /*
00370          * Lock the new page.  We need to do this for two reasons: first, the
00371          * fast-lookup code might have a reference to this page in bt_lpgno if
00372          * the page was recently deleted from the tree, and that code doesn't
00373          * walk the tree and so won't encounter the parent's page lock.
00374          * Second, a dirty reader could get to this page via the parent or old
00375          * page after the split is done but before the transaction is committed
00376          * or aborted.
00377          */
00378         if ((ret = __db_lget(dbc,
00379             0, PGNO(alloc_rp), DB_LOCK_WRITE, 0, &rplock)) != 0)
00380                 goto err;
00381 
00382         /*
00383          * Fix up the page numbers we didn't have before.  We have to do this
00384          * before calling __bam_pinsert because it may copy a page number onto
00385          * the parent page and it takes the page number from its page argument.
00386          */
00387         PGNO(rp) = NEXT_PGNO(lp) = PGNO(alloc_rp);
00388 
00389         /* Actually update the parent page. */
00390         if ((ret = __bam_pinsert(dbc, pp, lp, rp, 0)) != 0)
00391                 goto err;
00392 
00393         bc = (BTREE_CURSOR *)dbc->internal;
00394         /* Log the change. */
00395         if (DBC_LOGGING(dbc)) {
00396                 memset(&log_dbt, 0, sizeof(log_dbt));
00397                 log_dbt.data = cp->page;
00398                 log_dbt.size = dbp->pgsize;
00399                 if (tp == NULL)
00400                         ZERO_LSN(log_lsn);
00401                 opflags = F_ISSET(bc, C_RECNUM) ? SPL_NRECS : 0;
00402                 if ((ret = __bam_split_log(dbp, dbc->txn, &LSN(cp->page), 0,
00403                     PGNO(cp->page), &LSN(cp->page), PGNO(alloc_rp),
00404                     &LSN(alloc_rp), (u_int32_t)NUM_ENT(lp),
00405                     tp == NULL ? 0 : PGNO(tp),
00406                     tp == NULL ? &log_lsn : &LSN(tp),
00407                     PGNO_INVALID, &log_dbt, opflags)) != 0)
00408                         goto err;
00409 
00410         } else
00411                 LSN_NOT_LOGGED(LSN(cp->page));
00412 
00413         /* Update the LSNs for all involved pages. */
00414         LSN(alloc_rp) = LSN(cp->page);
00415         LSN(lp) = LSN(cp->page);
00416         LSN(rp) = LSN(cp->page);
00417         if (tp != NULL)
00418                 LSN(tp) = LSN(cp->page);
00419 
00420         /*
00421          * Copy the left and right pages into place.  There are two paths
00422          * through here.  Either we are logging and we set the LSNs in the
00423          * logging path.  However, if we are not logging, then we do not
00424          * have valid LSNs on lp or rp.  The correct LSNs to use are the
00425          * ones on the page we got from __db_new or the one that was
00426          * originally on cp->page.  In both cases, we save the LSN from the
00427          * real database page (not a malloc'd one) and reapply it after we
00428          * do the copy.
00429          */
00430         save_lsn = alloc_rp->lsn;
00431         memcpy(alloc_rp, rp, LOFFSET(dbp, rp));
00432         memcpy((u_int8_t *)alloc_rp + HOFFSET(rp),
00433             (u_int8_t *)rp + HOFFSET(rp), dbp->pgsize - HOFFSET(rp));
00434         alloc_rp->lsn = save_lsn;
00435 
00436         save_lsn = cp->page->lsn;
00437         memcpy(cp->page, lp, LOFFSET(dbp, lp));
00438         memcpy((u_int8_t *)cp->page + HOFFSET(lp),
00439             (u_int8_t *)lp + HOFFSET(lp), dbp->pgsize - HOFFSET(lp));
00440         cp->page->lsn = save_lsn;
00441 
00442         /* Fix up the next-page link. */
00443         if (tp != NULL)
00444                 PREV_PGNO(tp) = PGNO(rp);
00445 
00446         /* Adjust any cursors. */
00447         if ((ret = __bam_ca_split(dbc,
00448             PGNO(cp->page), PGNO(cp->page), PGNO(rp), split, 0)) != 0)
00449                 goto err;
00450 
00451         __os_free(dbp->dbenv, lp);
00452         __os_free(dbp->dbenv, rp);
00453 
00454         /*
00455          * Success -- write the real pages back to the store.  As we never
00456          * acquired any sort of lock on the new page, we release it before
00457          * releasing locks on the pages that reference it.  We're finished
00458          * modifying the page so it's not really necessary, but it's neater.
00459          */
00460         if ((t_ret =
00461             __memp_fput(mpf, alloc_rp, DB_MPOOL_DIRTY)) != 0 && ret == 0)
00462                 ret = t_ret;
00463         if ((t_ret = __TLPUT(dbc, rplock)) != 0 && ret == 0)
00464                 ret = t_ret;
00465         if ((t_ret =
00466             __memp_fput(mpf, pp->page, DB_MPOOL_DIRTY)) != 0 && ret == 0)
00467                 ret = t_ret;
00468         if ((t_ret = __TLPUT(dbc, pp->lock)) != 0 && ret == 0)
00469                 ret = t_ret;
00470         if ((t_ret =
00471             __memp_fput(mpf, cp->page, DB_MPOOL_DIRTY)) != 0 && ret == 0)
00472                 ret = t_ret;
00473         if ((t_ret = __TLPUT(dbc, cp->lock)) != 0 && ret == 0)
00474                 ret = t_ret;
00475         if (tp != NULL) {
00476                 if ((t_ret =
00477                     __memp_fput(mpf, tp, DB_MPOOL_DIRTY)) != 0 && ret == 0)
00478                         ret = t_ret;
00479                 if ((t_ret = __TLPUT(dbc, tplock)) != 0 && ret == 0)
00480                         ret = t_ret;
00481         }
00482         return (ret);
00483 
00484 err:    if (lp != NULL)
00485                 __os_free(dbp->dbenv, lp);
00486         if (rp != NULL)
00487                 __os_free(dbp->dbenv, rp);
00488         if (alloc_rp != NULL)
00489                 (void)__memp_fput(mpf, alloc_rp, 0);
00490         if (tp != NULL)
00491                 (void)__memp_fput(mpf, tp, 0);
00492 
00493         /* We never updated the new or next pages, we can release them. */
00494         (void)__LPUT(dbc, rplock);
00495         (void)__LPUT(dbc, tplock);
00496 
00497         (void)__memp_fput(mpf, pp->page, 0);
00498         if (ret == DB_NEEDSPLIT)
00499                 (void)__LPUT(dbc, pp->lock);
00500         else
00501                 (void)__TLPUT(dbc, pp->lock);
00502 
00503         (void)__memp_fput(mpf, cp->page, 0);
00504         if (ret == DB_NEEDSPLIT)
00505                 (void)__LPUT(dbc, cp->lock);
00506         else
00507                 (void)__TLPUT(dbc, cp->lock);
00508 
00509         return (ret);
00510 }
00511 
00512 /*
00513  * __bam_broot --
00514  *      Fix up the btree root page after it has been split.
00515  */
00516 static int
00517 __bam_broot(dbc, rootp, lp, rp)
00518         DBC *dbc;
00519         PAGE *rootp, *lp, *rp;
00520 {
00521         BINTERNAL bi, *child_bi;
00522         BKEYDATA *child_bk;
00523         BTREE_CURSOR *cp;
00524         DB *dbp;
00525         DBT hdr, data;
00526         db_pgno_t root_pgno;
00527         int ret;
00528 
00529         dbp = dbc->dbp;
00530         cp = (BTREE_CURSOR *)dbc->internal;
00531 
00532         /*
00533          * If the root page was a leaf page, change it into an internal page.
00534          * We copy the key we split on (but not the key's data, in the case of
00535          * a leaf page) to the new root page.
00536          */
00537         root_pgno = cp->root;
00538         P_INIT(rootp, dbp->pgsize,
00539             root_pgno, PGNO_INVALID, PGNO_INVALID, lp->level + 1, P_IBTREE);
00540 
00541         memset(&data, 0, sizeof(data));
00542         memset(&hdr, 0, sizeof(hdr));
00543 
00544         /*
00545          * The btree comparison code guarantees that the left-most key on any
00546          * internal btree page is never used, so it doesn't need to be filled
00547          * in.  Set the record count if necessary.
00548          */
00549         memset(&bi, 0, sizeof(bi));
00550         bi.len = 0;
00551         B_TSET(bi.type, B_KEYDATA, 0);
00552         bi.pgno = lp->pgno;
00553         if (F_ISSET(cp, C_RECNUM)) {
00554                 bi.nrecs = __bam_total(dbp, lp);
00555                 RE_NREC_SET(rootp, bi.nrecs);
00556         }
00557         hdr.data = &bi;
00558         hdr.size = SSZA(BINTERNAL, data);
00559         if ((ret =
00560             __db_pitem(dbc, rootp, 0, BINTERNAL_SIZE(0), &hdr, NULL)) != 0)
00561                 return (ret);
00562 
00563         switch (TYPE(rp)) {
00564         case P_IBTREE:
00565                 /* Copy the first key of the child page onto the root page. */
00566                 child_bi = GET_BINTERNAL(dbp, rp, 0);
00567 
00568                 bi.len = child_bi->len;
00569                 B_TSET(bi.type, child_bi->type, 0);
00570                 bi.pgno = rp->pgno;
00571                 if (F_ISSET(cp, C_RECNUM)) {
00572                         bi.nrecs = __bam_total(dbp, rp);
00573                         RE_NREC_ADJ(rootp, bi.nrecs);
00574                 }
00575                 hdr.data = &bi;
00576                 hdr.size = SSZA(BINTERNAL, data);
00577                 data.data = child_bi->data;
00578                 data.size = child_bi->len;
00579                 if ((ret = __db_pitem(dbc, rootp, 1,
00580                     BINTERNAL_SIZE(child_bi->len), &hdr, &data)) != 0)
00581                         return (ret);
00582 
00583                 /* Increment the overflow ref count. */
00584                 if (B_TYPE(child_bi->type) == B_OVERFLOW)
00585                         if ((ret = __db_ovref(dbc,
00586                             ((BOVERFLOW *)(child_bi->data))->pgno, 1)) != 0)
00587                                 return (ret);
00588                 break;
00589         case P_LDUP:
00590         case P_LBTREE:
00591                 /* Copy the first key of the child page onto the root page. */
00592                 child_bk = GET_BKEYDATA(dbp, rp, 0);
00593                 switch (B_TYPE(child_bk->type)) {
00594                 case B_KEYDATA:
00595                         bi.len = child_bk->len;
00596                         B_TSET(bi.type, child_bk->type, 0);
00597                         bi.pgno = rp->pgno;
00598                         if (F_ISSET(cp, C_RECNUM)) {
00599                                 bi.nrecs = __bam_total(dbp, rp);
00600                                 RE_NREC_ADJ(rootp, bi.nrecs);
00601                         }
00602                         hdr.data = &bi;
00603                         hdr.size = SSZA(BINTERNAL, data);
00604                         data.data = child_bk->data;
00605                         data.size = child_bk->len;
00606                         if ((ret = __db_pitem(dbc, rootp, 1,
00607                             BINTERNAL_SIZE(child_bk->len), &hdr, &data)) != 0)
00608                                 return (ret);
00609                         break;
00610                 case B_DUPLICATE:
00611                 case B_OVERFLOW:
00612                         bi.len = BOVERFLOW_SIZE;
00613                         B_TSET(bi.type, child_bk->type, 0);
00614                         bi.pgno = rp->pgno;
00615                         if (F_ISSET(cp, C_RECNUM)) {
00616                                 bi.nrecs = __bam_total(dbp, rp);
00617                                 RE_NREC_ADJ(rootp, bi.nrecs);
00618                         }
00619                         hdr.data = &bi;
00620                         hdr.size = SSZA(BINTERNAL, data);
00621                         data.data = child_bk;
00622                         data.size = BOVERFLOW_SIZE;
00623                         if ((ret = __db_pitem(dbc, rootp, 1,
00624                             BINTERNAL_SIZE(BOVERFLOW_SIZE), &hdr, &data)) != 0)
00625                                 return (ret);
00626 
00627                         /* Increment the overflow ref count. */
00628                         if (B_TYPE(child_bk->type) == B_OVERFLOW)
00629                                 if ((ret = __db_ovref(dbc,
00630                                     ((BOVERFLOW *)child_bk)->pgno, 1)) != 0)
00631                                         return (ret);
00632                         break;
00633                 default:
00634                         return (__db_pgfmt(dbp->dbenv, rp->pgno));
00635                 }
00636                 break;
00637         default:
00638                 return (__db_pgfmt(dbp->dbenv, rp->pgno));
00639         }
00640         return (0);
00641 }
00642 
00643 /*
00644  * __ram_root --
00645  *      Fix up the recno root page after it has been split.
00646  */
00647 static int
00648 __ram_root(dbc, rootp, lp, rp)
00649         DBC *dbc;
00650         PAGE *rootp, *lp, *rp;
00651 {
00652         DB *dbp;
00653         DBT hdr;
00654         RINTERNAL ri;
00655         db_pgno_t root_pgno;
00656         int ret;
00657 
00658         dbp = dbc->dbp;
00659         root_pgno = dbc->internal->root;
00660 
00661         /* Initialize the page. */
00662         P_INIT(rootp, dbp->pgsize,
00663             root_pgno, PGNO_INVALID, PGNO_INVALID, lp->level + 1, P_IRECNO);
00664 
00665         /* Initialize the header. */
00666         memset(&hdr, 0, sizeof(hdr));
00667         hdr.data = &ri;
00668         hdr.size = RINTERNAL_SIZE;
00669 
00670         /* Insert the left and right keys, set the header information. */
00671         ri.pgno = lp->pgno;
00672         ri.nrecs = __bam_total(dbp, lp);
00673         if ((ret = __db_pitem(dbc, rootp, 0, RINTERNAL_SIZE, &hdr, NULL)) != 0)
00674                 return (ret);
00675         RE_NREC_SET(rootp, ri.nrecs);
00676         ri.pgno = rp->pgno;
00677         ri.nrecs = __bam_total(dbp, rp);
00678         if ((ret = __db_pitem(dbc, rootp, 1, RINTERNAL_SIZE, &hdr, NULL)) != 0)
00679                 return (ret);
00680         RE_NREC_ADJ(rootp, ri.nrecs);
00681         return (0);
00682 }
00683 
00684 /*
00685  * __bam_pinsert --
00686  *      Insert a new key into a parent page, completing the split.
00687  *
00688  * PUBLIC: int __bam_pinsert __P((DBC *, EPG *, PAGE *, PAGE *, int));
00689  */
00690 int
00691 __bam_pinsert(dbc, parent, lchild, rchild, flags)
00692         DBC *dbc;
00693         EPG *parent;
00694         PAGE *lchild, *rchild;
00695         int flags;
00696 {
00697         BINTERNAL bi, *child_bi;
00698         BKEYDATA *child_bk, *tmp_bk;
00699         BTREE *t;
00700         BTREE_CURSOR *cp;
00701         DB *dbp;
00702         DBT a, b, hdr, data;
00703         PAGE *ppage;
00704         RINTERNAL ri;
00705         db_indx_t off;
00706         db_recno_t nrecs;
00707         size_t (*func) __P((DB *, const DBT *, const DBT *));
00708         u_int32_t n, nbytes, nksize;
00709         int ret;
00710 
00711         dbp = dbc->dbp;
00712         cp = (BTREE_CURSOR *)dbc->internal;
00713         t = dbp->bt_internal;
00714         ppage = parent->page;
00715 
00716         /* If handling record numbers, count records split to the right page. */
00717         nrecs = F_ISSET(cp, C_RECNUM) &&
00718             !LF_ISSET(BPI_SPACEONLY) ? __bam_total(dbp, rchild) : 0;
00719 
00720         /*
00721          * Now we insert the new page's first key into the parent page, which
00722          * completes the split.  The parent points to a PAGE and a page index
00723          * offset, where the new key goes ONE AFTER the index, because we split
00724          * to the right.
00725          *
00726          * XXX
00727          * Some btree algorithms replace the key for the old page as well as
00728          * the new page.  We don't, as there's no reason to believe that the
00729          * first key on the old page is any better than the key we have, and,
00730          * in the case of a key being placed at index 0 causing the split, the
00731          * key is unavailable.
00732          */
00733         off = parent->indx + O_INDX;
00734 
00735         /*
00736          * Calculate the space needed on the parent page.
00737          *
00738          * Prefix trees: space hack used when inserting into BINTERNAL pages.
00739          * Retain only what's needed to distinguish between the new entry and
00740          * the LAST entry on the page to its left.  If the keys compare equal,
00741          * retain the entire key.  We ignore overflow keys, and the entire key
00742          * must be retained for the next-to-leftmost key on the leftmost page
00743          * of each level, or the search will fail.  Applicable ONLY to internal
00744          * pages that have leaf pages as children.  Further reduction of the
00745          * key between pairs of internal pages loses too much information.
00746          */
00747         switch (TYPE(rchild)) {
00748         case P_IBTREE:
00749                 child_bi = GET_BINTERNAL(dbp, rchild, 0);
00750                 nbytes = BINTERNAL_PSIZE(child_bi->len);
00751 
00752                 if (P_FREESPACE(dbp, ppage) < nbytes)
00753                         return (DB_NEEDSPLIT);
00754                 if (LF_ISSET(BPI_SPACEONLY))
00755                         return (0);
00756 
00757                 /* Add a new record for the right page. */
00758                 memset(&bi, 0, sizeof(bi));
00759                 bi.len = child_bi->len;
00760                 B_TSET(bi.type, child_bi->type, 0);
00761                 bi.pgno = rchild->pgno;
00762                 bi.nrecs = nrecs;
00763                 memset(&hdr, 0, sizeof(hdr));
00764                 hdr.data = &bi;
00765                 hdr.size = SSZA(BINTERNAL, data);
00766                 memset(&data, 0, sizeof(data));
00767                 data.data = child_bi->data;
00768                 data.size = child_bi->len;
00769                 if ((ret = __db_pitem(dbc, ppage, off,
00770                     BINTERNAL_SIZE(child_bi->len), &hdr, &data)) != 0)
00771                         return (ret);
00772 
00773                 /* Increment the overflow ref count. */
00774                 if (B_TYPE(child_bi->type) == B_OVERFLOW)
00775                         if ((ret = __db_ovref(dbc,
00776                             ((BOVERFLOW *)(child_bi->data))->pgno, 1)) != 0)
00777                                 return (ret);
00778                 break;
00779         case P_LDUP:
00780         case P_LBTREE:
00781                 child_bk = GET_BKEYDATA(dbp, rchild, 0);
00782                 switch (B_TYPE(child_bk->type)) {
00783                 case B_KEYDATA:
00784                         nbytes = BINTERNAL_PSIZE(child_bk->len);
00785                         nksize = child_bk->len;
00786 
00787                         /*
00788                          * Prefix compression:
00789                          * We set t->bt_prefix to NULL if we have a comparison
00790                          * callback but no prefix compression callback.  But,
00791                          * if we're splitting in an off-page duplicates tree,
00792                          * we still have to do some checking.  If using the
00793                          * default off-page duplicates comparison routine we
00794                          * can use the default prefix compression callback. If
00795                          * not using the default off-page duplicates comparison
00796                          * routine, we can't do any kind of prefix compression
00797                          * as there's no way for an application to specify a
00798                          * prefix compression callback that corresponds to its
00799                          * comparison callback.
00800                          *
00801                          * No prefix compression if we don't have a compression
00802                          * function, or the key we'd compress isn't a normal
00803                          * key (for example, it references an overflow page).
00804                          *
00805                          * Generate a parent page key for the right child page
00806                          * from a comparison of the last key on the left child
00807                          * page and the first key on the right child page.
00808                          */
00809                         if (F_ISSET(dbc, DBC_OPD)) {
00810                                 if (dbp->dup_compare == __bam_defcmp)
00811                                         func = __bam_defpfx;
00812                                 else
00813                                         func = NULL;
00814                         } else
00815                                 func = t->bt_prefix;
00816                         if (func == NULL)
00817                                 goto noprefix;
00818                         tmp_bk = GET_BKEYDATA(dbp, lchild, NUM_ENT(lchild) -
00819                             (TYPE(lchild) == P_LDUP ? O_INDX : P_INDX));
00820                         if (B_TYPE(tmp_bk->type) != B_KEYDATA)
00821                                 goto noprefix;
00822                         memset(&a, 0, sizeof(a));
00823                         a.size = tmp_bk->len;
00824                         a.data = tmp_bk->data;
00825                         memset(&b, 0, sizeof(b));
00826                         b.size = child_bk->len;
00827                         b.data = child_bk->data;
00828                         nksize = (u_int32_t)func(dbp, &a, &b);
00829                         if ((n = BINTERNAL_PSIZE(nksize)) < nbytes)
00830                                 nbytes = n;
00831                         else
00832                                 nksize = child_bk->len;
00833 
00834 noprefix:               if (P_FREESPACE(dbp, ppage) < nbytes)
00835                                 return (DB_NEEDSPLIT);
00836                         if (LF_ISSET(BPI_SPACEONLY))
00837                                 return (0);
00838 
00839                         memset(&bi, 0, sizeof(bi));
00840                         bi.len = nksize;
00841                         B_TSET(bi.type, child_bk->type, 0);
00842                         bi.pgno = rchild->pgno;
00843                         bi.nrecs = nrecs;
00844                         memset(&hdr, 0, sizeof(hdr));
00845                         hdr.data = &bi;
00846                         hdr.size = SSZA(BINTERNAL, data);
00847                         memset(&data, 0, sizeof(data));
00848                         data.data = child_bk->data;
00849                         data.size = nksize;
00850                         if ((ret = __db_pitem(dbc, ppage, off,
00851                             BINTERNAL_SIZE(nksize), &hdr, &data)) != 0)
00852                                 return (ret);
00853                         break;
00854                 case B_DUPLICATE:
00855                 case B_OVERFLOW:
00856                         nbytes = BINTERNAL_PSIZE(BOVERFLOW_SIZE);
00857 
00858                         if (P_FREESPACE(dbp, ppage) < nbytes)
00859                                 return (DB_NEEDSPLIT);
00860                         if (LF_ISSET(BPI_SPACEONLY))
00861                                 return (0);
00862 
00863                         memset(&bi, 0, sizeof(bi));
00864                         bi.len = BOVERFLOW_SIZE;
00865                         B_TSET(bi.type, child_bk->type, 0);
00866                         bi.pgno = rchild->pgno;
00867                         bi.nrecs = nrecs;
00868                         memset(&hdr, 0, sizeof(hdr));
00869                         hdr.data = &bi;
00870                         hdr.size = SSZA(BINTERNAL, data);
00871                         memset(&data, 0, sizeof(data));
00872                         data.data = child_bk;
00873                         data.size = BOVERFLOW_SIZE;
00874                         if ((ret = __db_pitem(dbc, ppage, off,
00875                             BINTERNAL_SIZE(BOVERFLOW_SIZE), &hdr, &data)) != 0)
00876                                 return (ret);
00877 
00878                         /* Increment the overflow ref count. */
00879                         if (B_TYPE(child_bk->type) == B_OVERFLOW)
00880                                 if ((ret = __db_ovref(dbc,
00881                                     ((BOVERFLOW *)child_bk)->pgno, 1)) != 0)
00882                                         return (ret);
00883                         break;
00884                 default:
00885                         return (__db_pgfmt(dbp->dbenv, rchild->pgno));
00886                 }
00887                 break;
00888         case P_IRECNO:
00889         case P_LRECNO:
00890                 nbytes = RINTERNAL_PSIZE;
00891 
00892                 if (P_FREESPACE(dbp, ppage) < nbytes)
00893                         return (DB_NEEDSPLIT);
00894                 if (LF_ISSET(BPI_SPACEONLY))
00895                         return (0);
00896 
00897                 /* Add a new record for the right page. */
00898                 memset(&hdr, 0, sizeof(hdr));
00899                 hdr.data = &ri;
00900                 hdr.size = RINTERNAL_SIZE;
00901                 ri.pgno = rchild->pgno;
00902                 ri.nrecs = nrecs;
00903                 if ((ret = __db_pitem(dbc,
00904                     ppage, off, RINTERNAL_SIZE, &hdr, NULL)) != 0)
00905                         return (ret);
00906                 break;
00907         default:
00908                 return (__db_pgfmt(dbp->dbenv, rchild->pgno));
00909         }
00910 
00911         /*
00912          * If a Recno or Btree with record numbers AM page, or an off-page
00913          * duplicates tree, adjust the parent page's left page record count.
00914          */
00915         if (F_ISSET(cp, C_RECNUM) && !LF_ISSET(BPI_NORECNUM)) {
00916                 /* Log the change. */
00917                 if (DBC_LOGGING(dbc)) {
00918                         if ((ret = __bam_cadjust_log(dbp, dbc->txn,
00919                             &LSN(ppage), 0, PGNO(ppage), &LSN(ppage),
00920                             parent->indx, -(int32_t)nrecs, 0)) != 0)
00921                                 return (ret);
00922                 } else
00923                         LSN_NOT_LOGGED(LSN(ppage));
00924 
00925                 /* Update the left page count. */
00926                 if (dbc->dbtype == DB_RECNO)
00927                         GET_RINTERNAL(dbp, ppage, parent->indx)->nrecs -= nrecs;
00928                 else
00929                         GET_BINTERNAL(dbp, ppage, parent->indx)->nrecs -= nrecs;
00930         }
00931 
00932         return (0);
00933 }
00934 
00935 /*
00936  * __bam_psplit --
00937  *      Do the real work of splitting the page.
00938  */
00939 static int
00940 __bam_psplit(dbc, cp, lp, rp, splitret)
00941         DBC *dbc;
00942         EPG *cp;
00943         PAGE *lp, *rp;
00944         db_indx_t *splitret;
00945 {
00946         DB *dbp;
00947         PAGE *pp;
00948         db_indx_t half, *inp, nbytes, off, splitp, top;
00949         int adjust, cnt, iflag, isbigkey, ret;
00950 
00951         dbp = dbc->dbp;
00952         pp = cp->page;
00953         inp = P_INP(dbp, pp);
00954         adjust = TYPE(pp) == P_LBTREE ? P_INDX : O_INDX;
00955 
00956         /*
00957          * If we're splitting the first (last) page on a level because we're
00958          * inserting (appending) a key to it, it's likely that the data is
00959          * sorted.  Moving a single item to the new page is less work and can
00960          * push the fill factor higher than normal.  This is trivial when we
00961          * are splitting a new page before the beginning of the tree, all of
00962          * the interesting tests are against values of 0.
00963          *
00964          * Catching appends to the tree is harder.  In a simple append, we're
00965          * inserting an item that sorts past the end of the tree; the cursor
00966          * will point past the last element on the page.  But, in trees with
00967          * duplicates, the cursor may point to the last entry on the page --
00968          * in this case, the entry will also be the last element of a duplicate
00969          * set (the last because the search call specified the S_DUPLAST flag).
00970          * The only way to differentiate between an insert immediately before
00971          * the last item in a tree or an append after a duplicate set which is
00972          * also the last item in the tree is to call the comparison function.
00973          * When splitting internal pages during an append, the search code
00974          * guarantees the cursor always points to the largest page item less
00975          * than the new internal entry.  To summarize, we want to catch three
00976          * possible index values:
00977          *
00978          *      NUM_ENT(page)           Btree/Recno leaf insert past end-of-tree
00979          *      NUM_ENT(page) - O_INDX  Btree or Recno internal insert past EOT
00980          *      NUM_ENT(page) - P_INDX  Btree leaf insert past EOT after a set
00981          *                                  of duplicates
00982          *
00983          * two of which, (NUM_ENT(page) - O_INDX or P_INDX) might be an insert
00984          * near the end of the tree, and not after the end of the tree at all.
00985          * Do a simple test which might be wrong because calling the comparison
00986          * functions is expensive.  Regardless, it's not a big deal if we're
00987          * wrong, we'll do the split the right way next time.
00988          */
00989         off = 0;
00990         if (NEXT_PGNO(pp) == PGNO_INVALID && cp->indx >= NUM_ENT(pp) - adjust)
00991                 off = NUM_ENT(pp) - adjust;
00992         else if (PREV_PGNO(pp) == PGNO_INVALID && cp->indx == 0)
00993                 off = adjust;
00994         if (off != 0)
00995                 goto sort;
00996 
00997         /*
00998          * Split the data to the left and right pages.  Try not to split on
00999          * an overflow key.  (Overflow keys on internal pages will slow down
01000          * searches.)  Refuse to split in the middle of a set of duplicates.
01001          *
01002          * First, find the optimum place to split.
01003          *
01004          * It's possible to try and split past the last record on the page if
01005          * there's a very large record at the end of the page.  Make sure this
01006          * doesn't happen by bounding the check at the next-to-last entry on
01007          * the page.
01008          *
01009          * Note, we try and split half the data present on the page.  This is
01010          * because another process may have already split the page and left
01011          * it half empty.  We don't try and skip the split -- we don't know
01012          * how much space we're going to need on the page, and we may need up
01013          * to half the page for a big item, so there's no easy test to decide
01014          * if we need to split or not.  Besides, if two threads are inserting
01015          * data into the same place in the database, we're probably going to
01016          * need more space soon anyway.
01017          */
01018         top = NUM_ENT(pp) - adjust;
01019         half = (dbp->pgsize - HOFFSET(pp)) / 2;
01020         for (nbytes = 0, off = 0; off < top && nbytes < half; ++off)
01021                 switch (TYPE(pp)) {
01022                 case P_IBTREE:
01023                         if (B_TYPE(
01024                             GET_BINTERNAL(dbp, pp, off)->type) == B_KEYDATA)
01025                                 nbytes += BINTERNAL_SIZE(
01026                                    GET_BINTERNAL(dbp, pp, off)->len);
01027                         else
01028                                 nbytes += BINTERNAL_SIZE(BOVERFLOW_SIZE);
01029                         break;
01030                 case P_LBTREE:
01031                         if (B_TYPE(GET_BKEYDATA(dbp, pp, off)->type) ==
01032                             B_KEYDATA)
01033                                 nbytes += BKEYDATA_SIZE(GET_BKEYDATA(dbp,
01034                                     pp, off)->len);
01035                         else
01036                                 nbytes += BOVERFLOW_SIZE;
01037 
01038                         ++off;
01039                         /* FALLTHROUGH */
01040                 case P_LDUP:
01041                 case P_LRECNO:
01042                         if (B_TYPE(GET_BKEYDATA(dbp, pp, off)->type) ==
01043                             B_KEYDATA)
01044                                 nbytes += BKEYDATA_SIZE(GET_BKEYDATA(dbp,
01045                                     pp, off)->len);
01046                         else
01047                                 nbytes += BOVERFLOW_SIZE;
01048                         break;
01049                 case P_IRECNO:
01050                         nbytes += RINTERNAL_SIZE;
01051                         break;
01052                 default:
01053                         return (__db_pgfmt(dbp->dbenv, pp->pgno));
01054                 }
01055 sort:   splitp = off;
01056 
01057         /*
01058          * Splitp is either at or just past the optimum split point.  If the
01059          * tree type is such that we're going to promote a key to an internal
01060          * page, and our current choice is an overflow key, look for something
01061          * close by that's smaller.
01062          */
01063         switch (TYPE(pp)) {
01064         case P_IBTREE:
01065                 iflag = 1;
01066                 isbigkey =
01067                     B_TYPE(GET_BINTERNAL(dbp, pp, off)->type) != B_KEYDATA;
01068                 break;
01069         case P_LBTREE:
01070         case P_LDUP:
01071                 iflag = 0;
01072                 isbigkey = B_TYPE(GET_BKEYDATA(dbp, pp, off)->type) !=
01073                     B_KEYDATA;
01074                 break;
01075         default:
01076                 iflag = isbigkey = 0;
01077         }
01078         if (isbigkey)
01079                 for (cnt = 1; cnt <= 3; ++cnt) {
01080                         off = splitp + cnt * adjust;
01081                         if (off < (db_indx_t)NUM_ENT(pp) &&
01082                             ((iflag && B_TYPE(
01083                             GET_BINTERNAL(dbp, pp,off)->type) == B_KEYDATA) ||
01084                             B_TYPE(GET_BKEYDATA(dbp, pp, off)->type) ==
01085                             B_KEYDATA)) {
01086                                 splitp = off;
01087                                 break;
01088                         }
01089                         if (splitp <= (db_indx_t)(cnt * adjust))
01090                                 continue;
01091                         off = splitp - cnt * adjust;
01092                         if (iflag ? B_TYPE(
01093                             GET_BINTERNAL(dbp, pp, off)->type) == B_KEYDATA :
01094                             B_TYPE(GET_BKEYDATA(dbp, pp, off)->type) ==
01095                             B_KEYDATA) {
01096                                 splitp = off;
01097                                 break;
01098                         }
01099                 }
01100 
01101         /*
01102          * We can't split in the middle a set of duplicates.  We know that
01103          * no duplicate set can take up more than about 25% of the page,
01104          * because that's the point where we push it off onto a duplicate
01105          * page set.  So, this loop can't be unbounded.
01106          */
01107         if (TYPE(pp) == P_LBTREE &&
01108             inp[splitp] == inp[splitp - adjust])
01109                 for (cnt = 1;; ++cnt) {
01110                         off = splitp + cnt * adjust;
01111                         if (off < NUM_ENT(pp) &&
01112                             inp[splitp] != inp[off]) {
01113                                 splitp = off;
01114                                 break;
01115                         }
01116                         if (splitp <= (db_indx_t)(cnt * adjust))
01117                                 continue;
01118                         off = splitp - cnt * adjust;
01119                         if (inp[splitp] != inp[off]) {
01120                                 splitp = off + adjust;
01121                                 break;
01122                         }
01123                 }
01124 
01125         /* We're going to split at splitp. */
01126         if ((ret = __bam_copy(dbp, pp, lp, 0, splitp)) != 0)
01127                 return (ret);
01128         if ((ret = __bam_copy(dbp, pp, rp, splitp, NUM_ENT(pp))) != 0)
01129                 return (ret);
01130 
01131         *splitret = splitp;
01132         return (0);
01133 }
01134 
01135 /*
01136  * __bam_copy --
01137  *      Copy a set of records from one page to another.
01138  *
01139  * PUBLIC: int __bam_copy __P((DB *, PAGE *, PAGE *, u_int32_t, u_int32_t));
01140  */
01141 int
01142 __bam_copy(dbp, pp, cp, nxt, stop)
01143         DB *dbp;
01144         PAGE *pp, *cp;
01145         u_int32_t nxt, stop;
01146 {
01147         db_indx_t *cinp, nbytes, off, *pinp;
01148 
01149         cinp = P_INP(dbp, cp);
01150         pinp = P_INP(dbp, pp);
01151         /*
01152          * Nxt is the offset of the next record to be placed on the target page.
01153          */
01154         for (off = 0; nxt < stop; ++nxt, ++NUM_ENT(cp), ++off) {
01155                 switch (TYPE(pp)) {
01156                 case P_IBTREE:
01157                         if (B_TYPE(
01158                             GET_BINTERNAL(dbp, pp, nxt)->type) == B_KEYDATA)
01159                                 nbytes = BINTERNAL_SIZE(
01160                                     GET_BINTERNAL(dbp, pp, nxt)->len);
01161                         else
01162                                 nbytes = BINTERNAL_SIZE(BOVERFLOW_SIZE);
01163                         break;
01164                 case P_LBTREE:
01165                         /*
01166                          * If we're on a key and it's a duplicate, just copy
01167                          * the offset.
01168                          */
01169                         if (off != 0 && (nxt % P_INDX) == 0 &&
01170                             pinp[nxt] == pinp[nxt - P_INDX]) {
01171                                 cinp[off] = cinp[off - P_INDX];
01172                                 continue;
01173                         }
01174                         /* FALLTHROUGH */
01175                 case P_LDUP:
01176                 case P_LRECNO:
01177                         if (B_TYPE(GET_BKEYDATA(dbp, pp, nxt)->type) ==
01178                             B_KEYDATA)
01179                                 nbytes = BKEYDATA_SIZE(GET_BKEYDATA(dbp,
01180                                     pp, nxt)->len);
01181                         else
01182                                 nbytes = BOVERFLOW_SIZE;
01183                         break;
01184                 case P_IRECNO:
01185                         nbytes = RINTERNAL_SIZE;
01186                         break;
01187                 default:
01188                         return (__db_pgfmt(dbp->dbenv, pp->pgno));
01189                 }
01190                 cinp[off] = HOFFSET(cp) -= nbytes;
01191                 memcpy(P_ENTRY(dbp, cp, off), P_ENTRY(dbp, pp, nxt), nbytes);
01192         }
01193         return (0);
01194 }

Generated on Sun Dec 25 12:14:13 2005 for Berkeley DB 4.4.16 by  doxygen 1.4.2