Main Page | Class Hierarchy | Data Structures | Directories | File List | Data Fields | Related Pages

bt_cursor.c

00001 /*-
00002  * See the file LICENSE for redistribution information.
00003  *
00004  * Copyright (c) 1996-2005
00005  *      Sleepycat Software.  All rights reserved.
00006  *
00007  * $Id: bt_cursor.c,v 12.7 2005/08/08 14:27:59 bostic Exp $
00008  */
00009 
00010 #include "db_config.h"
00011 
00012 #ifndef NO_SYSTEM_INCLUDES
00013 #include <sys/types.h>
00014 
00015 #include <string.h>
00016 #endif
00017 
00018 #include "db_int.h"
00019 #include "dbinc/db_page.h"
00020 #include "dbinc/db_shash.h"
00021 #include "dbinc/btree.h"
00022 #include "dbinc/lock.h"
00023 #include "dbinc/mp.h"
00024 
00025 static int  __bam_bulk __P((DBC *, DBT *, u_int32_t));
00026 static int  __bam_c_close __P((DBC *, db_pgno_t, int *));
00027 static int  __bam_c_del __P((DBC *));
00028 static int  __bam_c_destroy __P((DBC *));
00029 static int  __bam_c_get __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
00030 static int  __bam_c_getstack __P((DBC *));
00031 static int  __bam_c_next __P((DBC *, int, int));
00032 static int  __bam_c_physdel __P((DBC *));
00033 static int  __bam_c_prev __P((DBC *));
00034 static int  __bam_c_put __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
00035 static int  __bam_c_search __P((DBC *,
00036                 db_pgno_t, const DBT *, u_int32_t, int *));
00037 static int  __bam_c_writelock __P((DBC *));
00038 static int  __bam_getboth_finddatum __P((DBC *, DBT *, u_int32_t));
00039 static int  __bam_getbothc __P((DBC *, DBT *));
00040 static int  __bam_get_prev __P((DBC *));
00041 static int  __bam_isopd __P((DBC *, db_pgno_t *));
00042 
00043 /*
00044  * Acquire a new page/lock.  If we hold a page/lock, discard the page, and
00045  * lock-couple the lock.
00046  *
00047  * !!!
00048  * We have to handle both where we have a lock to lock-couple and where we
00049  * don't -- we don't duplicate locks when we duplicate cursors if we are
00050  * running in a transaction environment as there's no point if locks are
00051  * never discarded.  This means that the cursor may or may not hold a lock.
00052  * In the case where we are descending the tree we always want to unlock
00053  * the held interior page so we use ACQUIRE_COUPLE.
00054  */
00055 #undef  ACQUIRE
00056 #define ACQUIRE(dbc, mode, lpgno, lock, fpgno, pagep, ret) do {         \
00057         DB_MPOOLFILE *__mpf = (dbc)->dbp->mpf;                          \
00058         if ((pagep) != NULL) {                                          \
00059                 ret = __memp_fput(__mpf, pagep, 0);                     \
00060                 pagep = NULL;                                           \
00061         } else                                                          \
00062                 ret = 0;                                                \
00063         if ((ret) == 0 && STD_LOCKING(dbc))                             \
00064                 ret = __db_lget(dbc, LCK_COUPLE, lpgno, mode, 0, &(lock));\
00065         if ((ret) == 0)                                                 \
00066                 ret = __memp_fget(__mpf, &(fpgno), 0, &(pagep));        \
00067 } while (0)
00068 
00069 /* Acquire a new page/lock for a cursor. */
00070 #undef  ACQUIRE_CUR
00071 #define ACQUIRE_CUR(dbc, mode, p, ret) do {                             \
00072         BTREE_CURSOR *__cp = (BTREE_CURSOR *)(dbc)->internal;           \
00073         if (p != __cp->pgno)                                            \
00074                 __cp->pgno = PGNO_INVALID;                              \
00075         ACQUIRE(dbc, mode, p, __cp->lock, p, __cp->page, ret);          \
00076         if ((ret) == 0) {                                               \
00077                 __cp->pgno = p;                                         \
00078                 __cp->lock_mode = (mode);                               \
00079         }                                                               \
00080 } while (0)
00081 
00082 /*
00083  * Acquire a write lock if we don't already have one.
00084  *
00085  * !!!
00086  * See ACQUIRE macro on why we handle cursors that don't have locks.
00087  */
00088 #undef  ACQUIRE_WRITE_LOCK
00089 #define ACQUIRE_WRITE_LOCK(dbc, ret) do {                               \
00090         BTREE_CURSOR *__cp = (BTREE_CURSOR *)(dbc)->internal;           \
00091         ret = 0;                                                        \
00092         if (STD_LOCKING(dbc) &&                                         \
00093             __cp->lock_mode != DB_LOCK_WRITE &&                         \
00094             ((ret) = __db_lget(dbc,                                     \
00095             LOCK_ISSET(__cp->lock) ? LCK_COUPLE : 0,                    \
00096             __cp->pgno, DB_LOCK_WRITE, 0, &__cp->lock)) == 0)           \
00097                 __cp->lock_mode = DB_LOCK_WRITE;                        \
00098 } while (0)
00099 
00100 /* Discard the current page/lock for a cursor. */
00101 #undef  DISCARD_CUR
00102 #define DISCARD_CUR(dbc, ret) do {                                      \
00103         BTREE_CURSOR *__cp = (BTREE_CURSOR *)(dbc)->internal;           \
00104         DB_MPOOLFILE *__mpf = (dbc)->dbp->mpf;                          \
00105         int __t_ret;                                                    \
00106         if ((__cp->page) != NULL) {                                     \
00107                 __t_ret = __memp_fput(__mpf, __cp->page, 0);            \
00108                 __cp->page = NULL;                                      \
00109         } else                                                          \
00110                 __t_ret = 0;                                            \
00111         if (__t_ret != 0 && (ret) == 0)                                 \
00112                 ret = __t_ret;                                          \
00113         __t_ret = __TLPUT((dbc), __cp->lock);                           \
00114         if (__t_ret != 0 && (ret) == 0)                                 \
00115                 ret = __t_ret;                                          \
00116         if ((ret) == 0 && !LOCK_ISSET(__cp->lock))                      \
00117                 __cp->lock_mode = DB_LOCK_NG;                           \
00118 } while (0)
00119 
00120 /* If on-page item is a deleted record. */
00121 #undef  IS_DELETED
00122 #define IS_DELETED(dbp, page, indx)                                     \
00123         B_DISSET(GET_BKEYDATA(dbp, page,                                \
00124             (indx) + (TYPE(page) == P_LBTREE ? O_INDX : 0))->type)
00125 #undef  IS_CUR_DELETED
00126 #define IS_CUR_DELETED(dbc)                                             \
00127         IS_DELETED((dbc)->dbp, (dbc)->internal->page, (dbc)->internal->indx)
00128 
00129 /*
00130  * Test to see if two cursors could point to duplicates of the same key.
00131  * In the case of off-page duplicates they are they same, as the cursors
00132  * will be in the same off-page duplicate tree.  In the case of on-page
00133  * duplicates, the key index offsets must be the same.  For the last test,
00134  * as the original cursor may not have a valid page pointer, we use the
00135  * current cursor's.
00136  */
00137 #undef  IS_DUPLICATE
00138 #define IS_DUPLICATE(dbc, i1, i2)                                       \
00139             (P_INP((dbc)->dbp,((PAGE *)(dbc)->internal->page))[i1] ==   \
00140              P_INP((dbc)->dbp,((PAGE *)(dbc)->internal->page))[i2])
00141 #undef  IS_CUR_DUPLICATE
00142 #define IS_CUR_DUPLICATE(dbc, orig_pgno, orig_indx)                     \
00143         (F_ISSET(dbc, DBC_OPD) ||                                       \
00144             (orig_pgno == (dbc)->internal->pgno &&                      \
00145             IS_DUPLICATE(dbc, (dbc)->internal->indx, orig_indx)))
00146 
00147 /*
00148  * __bam_c_init --
00149  *      Initialize the access private portion of a cursor
00150  *
00151  * PUBLIC: int __bam_c_init __P((DBC *, DBTYPE));
00152  */
00153 int
00154 __bam_c_init(dbc, dbtype)
00155         DBC *dbc;
00156         DBTYPE dbtype;
00157 {
00158         DB_ENV *dbenv;
00159         int ret;
00160 
00161         dbenv = dbc->dbp->dbenv;
00162 
00163         /* Allocate/initialize the internal structure. */
00164         if (dbc->internal == NULL && (ret =
00165             __os_calloc(dbenv, 1, sizeof(BTREE_CURSOR), &dbc->internal)) != 0)
00166                 return (ret);
00167 
00168         /* Initialize methods. */
00169         dbc->c_close = __db_c_close_pp;
00170         dbc->c_count = __db_c_count_pp;
00171         dbc->c_del = __db_c_del_pp;
00172         dbc->c_dup = __db_c_dup_pp;
00173         dbc->c_get = __db_c_get_pp;
00174         dbc->c_pget = __db_c_pget_pp;
00175         dbc->c_put = __db_c_put_pp;
00176         if (dbtype == DB_BTREE) {
00177                 dbc->c_am_bulk = __bam_bulk;
00178                 dbc->c_am_close = __bam_c_close;
00179                 dbc->c_am_del = __bam_c_del;
00180                 dbc->c_am_destroy = __bam_c_destroy;
00181                 dbc->c_am_get = __bam_c_get;
00182                 dbc->c_am_put = __bam_c_put;
00183                 dbc->c_am_writelock = __bam_c_writelock;
00184         } else {
00185                 dbc->c_am_bulk = __bam_bulk;
00186                 dbc->c_am_close = __bam_c_close;
00187                 dbc->c_am_del = __ram_c_del;
00188                 dbc->c_am_destroy = __bam_c_destroy;
00189                 dbc->c_am_get = __ram_c_get;
00190                 dbc->c_am_put = __ram_c_put;
00191                 dbc->c_am_writelock = __bam_c_writelock;
00192         }
00193 
00194         return (0);
00195 }
00196 
00197 /*
00198  * __bam_c_refresh
00199  *      Set things up properly for cursor re-use.
00200  *
00201  * PUBLIC: int __bam_c_refresh __P((DBC *));
00202  */
00203 int
00204 __bam_c_refresh(dbc)
00205         DBC *dbc;
00206 {
00207         BTREE *t;
00208         BTREE_CURSOR *cp;
00209         DB *dbp;
00210 
00211         dbp = dbc->dbp;
00212         t = dbp->bt_internal;
00213         cp = (BTREE_CURSOR *)dbc->internal;
00214 
00215         /*
00216          * If our caller set the root page number, it's because the root was
00217          * known.  This is always the case for off page dup cursors.  Else,
00218          * pull it out of our internal information.
00219          */
00220         if (cp->root == PGNO_INVALID)
00221                 cp->root = t->bt_root;
00222 
00223         LOCK_INIT(cp->lock);
00224         cp->lock_mode = DB_LOCK_NG;
00225 
00226         if (cp->sp == NULL) {
00227                 cp->sp = cp->stack;
00228                 cp->esp = cp->stack + sizeof(cp->stack) / sizeof(cp->stack[0]);
00229         }
00230         BT_STK_CLR(cp);
00231 
00232         /*
00233          * The btree leaf page data structures require that two key/data pairs
00234          * (or four items) fit on a page, but other than that there's no fixed
00235          * requirement.  The btree off-page duplicates only require two items,
00236          * to be exact, but requiring four for them as well seems reasonable.
00237          *
00238          * Recno uses the btree bt_ovflsize value -- it's close enough.
00239          */
00240         cp->ovflsize = B_MINKEY_TO_OVFLSIZE(
00241             dbp,  F_ISSET(dbc, DBC_OPD) ? 2 : t->bt_minkey, dbp->pgsize);
00242 
00243         cp->recno = RECNO_OOB;
00244         cp->order = INVALID_ORDER;
00245         cp->flags = 0;
00246 
00247         /* Initialize for record numbers. */
00248         if (F_ISSET(dbc, DBC_OPD) ||
00249             dbc->dbtype == DB_RECNO || F_ISSET(dbp, DB_AM_RECNUM)) {
00250                 F_SET(cp, C_RECNUM);
00251 
00252                 /*
00253                  * All btrees that support record numbers, optionally standard
00254                  * recno trees, and all off-page duplicate recno trees have
00255                  * mutable record numbers.
00256                  */
00257                 if ((F_ISSET(dbc, DBC_OPD) && dbc->dbtype == DB_RECNO) ||
00258                     F_ISSET(dbp, DB_AM_RECNUM | DB_AM_RENUMBER))
00259                         F_SET(cp, C_RENUMBER);
00260         }
00261 
00262         return (0);
00263 }
00264 
00265 /*
00266  * __bam_c_close --
00267  *      Close down the cursor.
00268  */
00269 static int
00270 __bam_c_close(dbc, root_pgno, rmroot)
00271         DBC *dbc;
00272         db_pgno_t root_pgno;
00273         int *rmroot;
00274 {
00275         BTREE_CURSOR *cp, *cp_opd, *cp_c;
00276         DB *dbp;
00277         DBC *dbc_opd, *dbc_c;
00278         DB_MPOOLFILE *mpf;
00279         PAGE *h;
00280         int cdb_lock, count, ret;
00281 
00282         dbp = dbc->dbp;
00283         mpf = dbp->mpf;
00284         cp = (BTREE_CURSOR *)dbc->internal;
00285         cp_opd = (dbc_opd = cp->opd) == NULL ?
00286             NULL : (BTREE_CURSOR *)dbc_opd->internal;
00287         cdb_lock = ret = 0;
00288 
00289         /*
00290          * There are 3 ways this function is called:
00291          *
00292          * 1. Closing a primary cursor: we get called with a pointer to a
00293          *    primary cursor that has a NULL opd field.  This happens when
00294          *    closing a btree/recno database cursor without an associated
00295          *    off-page duplicate tree.
00296          *
00297          * 2. Closing a primary and an off-page duplicate cursor stack: we
00298          *    get called with a pointer to the primary cursor which has a
00299          *    non-NULL opd field.  This happens when closing a btree cursor
00300          *    into database with an associated off-page btree/recno duplicate
00301          *    tree. (It can't be a primary recno database, recno databases
00302          *    don't support duplicates.)
00303          *
00304          * 3. Closing an off-page duplicate cursor stack: we get called with
00305          *    a pointer to the off-page duplicate cursor.  This happens when
00306          *    closing a non-btree database that has an associated off-page
00307          *    btree/recno duplicate tree or for a btree database when the
00308          *    opd tree is not empty (root_pgno == PGNO_INVALID).
00309          *
00310          * If either the primary or off-page duplicate cursor deleted a btree
00311          * key/data pair, check to see if the item is still referenced by a
00312          * different cursor.  If it is, confirm that cursor's delete flag is
00313          * set and leave it to that cursor to do the delete.
00314          *
00315          * NB: The test for == 0 below is correct.  Our caller already removed
00316          * our cursor argument from the active queue, we won't find it when we
00317          * search the queue in __bam_ca_delete().
00318          * NB: It can't be true that both the primary and off-page duplicate
00319          * cursors have deleted a btree key/data pair.  Either the primary
00320          * cursor may have deleted an item and there's no off-page duplicate
00321          * cursor, or there's an off-page duplicate cursor and it may have
00322          * deleted an item.
00323          *
00324          * Primary recno databases aren't an issue here.  Recno keys are either
00325          * deleted immediately or never deleted, and do not have to be handled
00326          * here.
00327          *
00328          * Off-page duplicate recno databases are an issue here, cases #2 and
00329          * #3 above can both be off-page recno databases.  The problem is the
00330          * same as the final problem for off-page duplicate btree databases.
00331          * If we no longer need the off-page duplicate tree, we want to remove
00332          * it.  For off-page duplicate btrees, we are done with the tree when
00333          * we delete the last item it contains, i.e., there can be no further
00334          * references to it when it's empty.  For off-page duplicate recnos,
00335          * we remove items from the tree as the application calls the remove
00336          * function, so we are done with the tree when we close the last cursor
00337          * that references it.
00338          *
00339          * We optionally take the root page number from our caller.  If the
00340          * primary database is a btree, we can get it ourselves because dbc
00341          * is the primary cursor.  If the primary database is not a btree,
00342          * the problem is that we may be dealing with a stack of pages.  The
00343          * cursor we're using to do the delete points at the bottom of that
00344          * stack and we need the top of the stack.
00345          */
00346         if (F_ISSET(cp, C_DELETED)) {
00347                 dbc_c = dbc;
00348                 switch (dbc->dbtype) {
00349                 case DB_BTREE:                          /* Case #1, #3. */
00350                         if ((ret = __bam_ca_delete(
00351                             dbp, cp->pgno, cp->indx, 1, &count)) != 0)
00352                                 goto err;
00353                         if (count == 0)
00354                                 goto lock;
00355                         goto done;
00356                 case DB_RECNO:
00357                         if (!F_ISSET(dbc, DBC_OPD))     /* Case #1. */
00358                                 goto done;
00359                                                         /* Case #3. */
00360                         if ((ret = __ram_ca_delete(dbp, cp->root, &count)) != 0)
00361                                 goto err;
00362                         if (count == 0)
00363                                 goto lock;
00364                         goto done;
00365                 case DB_HASH:
00366                 case DB_QUEUE:
00367                 case DB_UNKNOWN:
00368                 default:
00369                         ret = __db_unknown_type(dbp->dbenv,
00370                                 "__bam_c_close", dbc->dbtype);
00371                         goto err;
00372                 }
00373         }
00374 
00375         if (dbc_opd == NULL)
00376                 goto done;
00377 
00378         if (F_ISSET(cp_opd, C_DELETED)) {               /* Case #2. */
00379                 /*
00380                  * We will not have been provided a root page number.  Acquire
00381                  * one from the primary database.
00382                  */
00383                 if ((ret = __memp_fget(mpf, &cp->pgno, 0, &h)) != 0)
00384                         goto err;
00385                 root_pgno = GET_BOVERFLOW(dbp, h, cp->indx + O_INDX)->pgno;
00386                 if ((ret = __memp_fput(mpf, h, 0)) != 0)
00387                         goto err;
00388 
00389                 dbc_c = dbc_opd;
00390                 switch (dbc_opd->dbtype) {
00391                 case DB_BTREE:
00392                         if ((ret = __bam_ca_delete(
00393                             dbp, cp_opd->pgno, cp_opd->indx, 1, &count)) != 0)
00394                                 goto err;
00395                         if (count == 0)
00396                                 goto lock;
00397                         goto done;
00398                 case DB_RECNO:
00399                         if ((ret =
00400                             __ram_ca_delete(dbp, cp_opd->root, &count)) != 0)
00401                                 goto err;
00402                         if (count == 0)
00403                                 goto lock;
00404                         goto done;
00405                 case DB_HASH:
00406                 case DB_QUEUE:
00407                 case DB_UNKNOWN:
00408                 default:
00409                         ret = __db_unknown_type(
00410                            dbp->dbenv, "__bam_c_close", dbc->dbtype);
00411                         goto err;
00412                 }
00413         }
00414         goto done;
00415 
00416 lock:   cp_c = (BTREE_CURSOR *)dbc_c->internal;
00417 
00418         /*
00419          * If this is CDB, upgrade the lock if necessary.  While we acquired
00420          * the write lock to logically delete the record, we released it when
00421          * we returned from that call, and so may not be holding a write lock
00422          * at the moment.
00423          */
00424         if (CDB_LOCKING(dbp->dbenv)) {
00425                 if (F_ISSET(dbc, DBC_WRITECURSOR)) {
00426                         if ((ret = __lock_get(dbp->dbenv,
00427                             dbc->locker, DB_LOCK_UPGRADE, &dbc->lock_dbt,
00428                             DB_LOCK_WRITE, &dbc->mylock)) != 0)
00429                                 goto err;
00430                         cdb_lock = 1;
00431                 }
00432                 goto delete;
00433         }
00434 
00435         /*
00436          * The variable dbc_c has been initialized to reference the cursor in
00437          * which we're going to do the delete.  Initialize the cursor's lock
00438          * structures as necessary.
00439          *
00440          * First, we may not need to acquire any locks.  If we're in case #3,
00441          * that is, the primary database isn't a btree database, our caller
00442          * is responsible for acquiring any necessary locks before calling us.
00443          */
00444         if (F_ISSET(dbc, DBC_OPD))
00445                 goto delete;
00446 
00447         /*
00448          * Otherwise, acquire a write lock on the primary database's page.
00449          *
00450          * Lock the primary database page, regardless of whether we're deleting
00451          * an item on a primary database page or an off-page duplicates page.
00452          *
00453          * If the cursor that did the initial logical deletion (and had a write
00454          * lock) is not the same cursor doing the physical deletion (which may
00455          * have only ever had a read lock on the item), we need to upgrade to a
00456          * write lock.  The confusion comes as follows:
00457          *
00458          *      C1      created, acquires item read lock
00459          *      C2      dup C1, create C2, also has item read lock.
00460          *      C1      acquire write lock, delete item
00461          *      C1      close
00462          *      C2      close, needs a write lock to physically delete item.
00463          *
00464          * If we're in a TXN, we know that C2 will be able to acquire the write
00465          * lock, because no locker other than the one shared by C1 and C2 can
00466          * acquire a write lock -- the original write lock C1 acquired was never
00467          * discarded.
00468          *
00469          * If we're not in a TXN, it's nastier.  Other cursors might acquire
00470          * read locks on the item after C1 closed, discarding its write lock,
00471          * and such locks would prevent C2 from acquiring a read lock.  That's
00472          * OK, though, we'll simply wait until we can acquire a write lock, or
00473          * we'll deadlock.  (Which better not happen, since we're not in a TXN.)
00474          *
00475          * There are similar scenarios with dirty reads, where the cursor may
00476          * have downgraded its write lock to a was-write lock.
00477          */
00478         if (STD_LOCKING(dbc))
00479                 if ((ret = __db_lget(dbc,
00480                     LCK_COUPLE, cp->pgno, DB_LOCK_WRITE, 0, &cp->lock)) != 0)
00481                         goto err;
00482 
00483 delete: /*
00484          * If the delete occurred in a Btree, we're going to look at the page
00485          * to see if the item has to be physically deleted.  Otherwise, we do
00486          * not need the actual page (and it may not even exist, it might have
00487          * been truncated from the file after an allocation aborted).
00488          *
00489          * Delete the on-page physical item referenced by the cursor.
00490          */
00491         if (dbc_c->dbtype == DB_BTREE) {
00492                 if ((ret = __memp_fget(mpf, &cp_c->pgno, 0, &cp_c->page)) != 0)
00493                         goto err;
00494                 if ((ret = __bam_c_physdel(dbc_c)) != 0)
00495                         goto err;
00496         }
00497 
00498         /*
00499          * If we're not working in an off-page duplicate tree, then we're
00500          * done.
00501          */
00502         if (!F_ISSET(dbc_c, DBC_OPD) || root_pgno == PGNO_INVALID)
00503                 goto done;
00504 
00505         /*
00506          * We may have just deleted the last element in the off-page duplicate
00507          * tree, and closed the last cursor in the tree.  For an off-page btree
00508          * there are no other cursors in the tree by definition, if the tree is
00509          * empty.  For an off-page recno we know we have closed the last cursor
00510          * in the tree because the __ram_ca_delete call above returned 0 only
00511          * in that case.  So, if the off-page duplicate tree is empty at this
00512          * point, we want to remove it.
00513          */
00514         if ((ret = __memp_fget(mpf, &root_pgno, 0, &h)) != 0)
00515                 goto err;
00516         if (NUM_ENT(h) == 0) {
00517                 DISCARD_CUR(dbc_c, ret);
00518                 if (ret != 0)
00519                         goto err;
00520                 if ((ret = __db_free(dbc, h)) != 0)
00521                         goto err;
00522         } else {
00523                 if ((ret = __memp_fput(mpf, h, 0)) != 0)
00524                         goto err;
00525                 goto done;
00526         }
00527 
00528         /*
00529          * When removing the tree, we have to do one of two things.  If this is
00530          * case #2, that is, the primary tree is a btree, delete the key that's
00531          * associated with the tree from the btree leaf page.  We know we are
00532          * the only reference to it and we already have the correct lock.  We
00533          * detect this case because the cursor that was passed to us references
00534          * an off-page duplicate cursor.
00535          *
00536          * If this is case #3, that is, the primary tree isn't a btree, pass
00537          * the information back to our caller, it's their job to do cleanup on
00538          * the primary page.
00539          */
00540         if (dbc_opd != NULL) {
00541                 if ((ret = __memp_fget(mpf, &cp->pgno, 0, &cp->page)) != 0)
00542                         goto err;
00543                 if ((ret = __bam_c_physdel(dbc)) != 0)
00544                         goto err;
00545         } else
00546                 *rmroot = 1;
00547 err:
00548 done:   /*
00549          * Discard the page references and locks, and confirm that the stack
00550          * has been emptied.
00551          */
00552         if (dbc_opd != NULL)
00553                 DISCARD_CUR(dbc_opd, ret);
00554         DISCARD_CUR(dbc, ret);
00555 
00556         /* Downgrade any CDB lock we acquired. */
00557         if (cdb_lock)
00558                 (void)__lock_downgrade(
00559                     dbp->dbenv, &dbc->mylock, DB_LOCK_IWRITE, 0);
00560 
00561         return (ret);
00562 }
00563 
00564 /*
00565  * __bam_c_destroy --
00566  *      Close a single cursor -- internal version.
00567  */
00568 static int
00569 __bam_c_destroy(dbc)
00570         DBC *dbc;
00571 {
00572         BTREE_CURSOR *cp;
00573 
00574         cp = (BTREE_CURSOR *)dbc->internal;
00575 
00576         /* Discard the structures. */
00577         if (cp->sp != cp->stack)
00578                 __os_free(dbc->dbp->dbenv, cp->sp);
00579         __os_free(dbc->dbp->dbenv, cp);
00580 
00581         return (0);
00582 }
00583 
00584 /*
00585  * __bam_c_count --
00586  *      Return a count of on and off-page duplicates.
00587  *
00588  * PUBLIC: int __bam_c_count __P((DBC *, db_recno_t *));
00589  */
00590 int
00591 __bam_c_count(dbc, recnop)
00592         DBC *dbc;
00593         db_recno_t *recnop;
00594 {
00595         BTREE_CURSOR *cp;
00596         DB *dbp;
00597         DB_MPOOLFILE *mpf;
00598         db_indx_t indx, top;
00599         db_recno_t recno;
00600         int ret;
00601 
00602         dbp = dbc->dbp;
00603         mpf = dbp->mpf;
00604         cp = (BTREE_CURSOR *)dbc->internal;
00605 
00606         /*
00607          * Called with the top-level cursor that may reference an off-page
00608          * duplicates tree.  We don't have to acquire any new locks, we have
00609          * to have a read lock to even get here.
00610          */
00611         if (cp->opd == NULL) {
00612                 /*
00613                  * On-page duplicates, get the page and count.
00614                  */
00615                 if ((ret = __memp_fget(mpf, &cp->pgno, 0, &cp->page)) != 0)
00616                         return (ret);
00617 
00618                 /*
00619                  * Move back to the beginning of the set of duplicates and
00620                  * then count forward.
00621                  */
00622                 for (indx = cp->indx;; indx -= P_INDX)
00623                         if (indx == 0 ||
00624                             !IS_DUPLICATE(dbc, indx, indx - P_INDX))
00625                                 break;
00626                 for (recno = 0,
00627                     top = NUM_ENT(cp->page) - P_INDX;; indx += P_INDX) {
00628                         if (!IS_DELETED(dbp, cp->page, indx))
00629                                 ++recno;
00630                         if (indx == top ||
00631                             !IS_DUPLICATE(dbc, indx, indx + P_INDX))
00632                                 break;
00633                 }
00634         } else {
00635                 /*
00636                  * Off-page duplicates tree, get the root page of the off-page
00637                  * duplicate tree.
00638                  */
00639                 if ((ret = __memp_fget(
00640                     mpf, &cp->opd->internal->root, 0, &cp->page)) != 0)
00641                         return (ret);
00642 
00643                 /*
00644                  * If the page is an internal page use the page's count as it's
00645                  * up-to-date and reflects the status of cursors in the tree.
00646                  * If the page is a leaf page for unsorted duplicates, use the
00647                  * page's count as cursors don't mark items deleted on the page
00648                  * and wait, cursor delete items immediately.
00649                  * If the page is a leaf page for sorted duplicates, there may
00650                  * be cursors on the page marking deleted items -- count.
00651                  */
00652                 if (TYPE(cp->page) == P_LDUP)
00653                         for (recno = 0, indx = 0,
00654                             top = NUM_ENT(cp->page) - O_INDX;; indx += O_INDX) {
00655                                 if (!IS_DELETED(dbp, cp->page, indx))
00656                                         ++recno;
00657                                 if (indx == top)
00658                                         break;
00659                         }
00660                 else
00661                         recno = RE_NREC(cp->page);
00662         }
00663 
00664         *recnop = recno;
00665 
00666         ret = __memp_fput(mpf, cp->page, 0);
00667         cp->page = NULL;
00668 
00669         return (ret);
00670 }
00671 
00672 /*
00673  * __bam_c_del --
00674  *      Delete using a cursor.
00675  */
00676 static int
00677 __bam_c_del(dbc)
00678         DBC *dbc;
00679 {
00680         BTREE_CURSOR *cp;
00681         DB *dbp;
00682         DB_MPOOLFILE *mpf;
00683         int count, ret, t_ret;
00684 
00685         dbp = dbc->dbp;
00686         mpf = dbp->mpf;
00687         cp = (BTREE_CURSOR *)dbc->internal;
00688         ret = 0;
00689 
00690         /* If the item was already deleted, return failure. */
00691         if (F_ISSET(cp, C_DELETED))
00692                 return (DB_KEYEMPTY);
00693 
00694         /*
00695          * This code is always called with a page lock but no page.
00696          */
00697         DB_ASSERT(cp->page == NULL);
00698 
00699         /*
00700          * We don't physically delete the record until the cursor moves, so
00701          * we have to have a long-lived write lock on the page instead of a
00702          * a long-lived read lock.  Note, we have to have a read lock to even
00703          * get here.
00704          *
00705          * If we're maintaining record numbers, we lock the entire tree, else
00706          * we lock the single page.
00707          */
00708         if (F_ISSET(cp, C_RECNUM)) {
00709                 if ((ret = __bam_c_getstack(dbc)) != 0)
00710                         goto err;
00711                 cp->page = cp->csp->page;
00712         } else {
00713                 ACQUIRE_CUR(dbc, DB_LOCK_WRITE, cp->pgno, ret);
00714                 if (ret != 0)
00715                         goto err;
00716         }
00717 
00718         /* Log the change. */
00719         if (DBC_LOGGING(dbc)) {
00720                 if ((ret = __bam_cdel_log(dbp, dbc->txn, &LSN(cp->page), 0,
00721                     PGNO(cp->page), &LSN(cp->page), cp->indx)) != 0)
00722                         goto err;
00723         } else
00724                 LSN_NOT_LOGGED(LSN(cp->page));
00725 
00726         /* Set the intent-to-delete flag on the page. */
00727         if (TYPE(cp->page) == P_LBTREE)
00728                 B_DSET(GET_BKEYDATA(dbp, cp->page, cp->indx + O_INDX)->type);
00729         else
00730                 B_DSET(GET_BKEYDATA(dbp, cp->page, cp->indx)->type);
00731 
00732         /* Mark the page dirty. */
00733         ret = __memp_fset(mpf, cp->page, DB_MPOOL_DIRTY);
00734 
00735 err:    /*
00736          * If we've been successful so far and the tree has record numbers,
00737          * adjust the record counts.  Either way, release acquired page(s).
00738          */
00739         if (F_ISSET(cp, C_RECNUM)) {
00740                 if (ret == 0)
00741                         ret = __bam_adjust(dbc, -1);
00742                 (void)__bam_stkrel(dbc, 0);
00743         } else
00744                 if (cp->page != NULL &&
00745                     (t_ret = __memp_fput(mpf, cp->page, 0)) != 0 && ret == 0)
00746                         ret = t_ret;
00747 
00748         cp->page = NULL;
00749 
00750         /*
00751          * Update the cursors last, after all chance of recoverable failure
00752          * is past.
00753          */
00754         if (ret == 0)
00755                 ret = __bam_ca_delete(dbp, cp->pgno, cp->indx, 1, &count);
00756 
00757         return (ret);
00758 }
00759 
00760 /*
00761  * __bam_c_dup --
00762  *      Duplicate a btree cursor, such that the new one holds appropriate
00763  *      locks for the position of the original.
00764  *
00765  * PUBLIC: int __bam_c_dup __P((DBC *, DBC *));
00766  */
00767 int
00768 __bam_c_dup(orig_dbc, new_dbc)
00769         DBC *orig_dbc, *new_dbc;
00770 {
00771         BTREE_CURSOR *orig, *new;
00772         int ret;
00773 
00774         orig = (BTREE_CURSOR *)orig_dbc->internal;
00775         new = (BTREE_CURSOR *)new_dbc->internal;
00776 
00777         /*
00778          * If we're holding a lock we need to acquire a copy of it, unless
00779          * we're in a transaction.  We don't need to copy any lock we're
00780          * holding inside a transaction because all the locks are retained
00781          * until the transaction commits or aborts.
00782          */
00783         if (orig_dbc->txn == NULL && LOCK_ISSET(orig->lock))
00784                 if ((ret = __db_lget(new_dbc,
00785                     0, new->pgno, new->lock_mode, 0, &new->lock)) != 0)
00786                         return (ret);
00787 
00788         new->ovflsize = orig->ovflsize;
00789         new->recno = orig->recno;
00790         new->flags = orig->flags;
00791 
00792         return (0);
00793 }
00794 
00795 /*
00796  * __bam_c_get --
00797  *      Get using a cursor (btree).
00798  */
00799 static int
00800 __bam_c_get(dbc, key, data, flags, pgnop)
00801         DBC *dbc;
00802         DBT *key, *data;
00803         u_int32_t flags;
00804         db_pgno_t *pgnop;
00805 {
00806         BTREE_CURSOR *cp;
00807         DB *dbp;
00808         DB_MPOOLFILE *mpf;
00809         db_pgno_t orig_pgno;
00810         db_indx_t orig_indx;
00811         int exact, newopd, ret;
00812 
00813         dbp = dbc->dbp;
00814         mpf = dbp->mpf;
00815         cp = (BTREE_CURSOR *)dbc->internal;
00816         orig_pgno = cp->pgno;
00817         orig_indx = cp->indx;
00818 
00819         newopd = 0;
00820         switch (flags) {
00821         case DB_CURRENT:
00822                 /* It's not possible to return a deleted record. */
00823                 if (F_ISSET(cp, C_DELETED)) {
00824                         ret = DB_KEYEMPTY;
00825                         goto err;
00826                 }
00827 
00828                 /*
00829                  * Acquire the current page.  We have at least a read-lock
00830                  * already.  The caller may have set DB_RMW asking for a
00831                  * write lock, but upgrading to a write lock has no better
00832                  * chance of succeeding now instead of later, so don't try.
00833                  */
00834                 if ((ret = __memp_fget(mpf, &cp->pgno, 0, &cp->page)) != 0)
00835                         goto err;
00836                 break;
00837         case DB_FIRST:
00838                 newopd = 1;
00839                 if ((ret = __bam_c_search(dbc,
00840                      PGNO_INVALID, NULL, flags, &exact)) != 0)
00841                         goto err;
00842                 break;
00843         case DB_GET_BOTH:
00844         case DB_GET_BOTH_RANGE:
00845                 /*
00846                  * There are two ways to get here based on DBcursor->c_get
00847                  * with the DB_GET_BOTH/DB_GET_BOTH_RANGE flags set:
00848                  *
00849                  * 1. Searching a sorted off-page duplicate tree: do a tree
00850                  * search.
00851                  *
00852                  * 2. Searching btree: do a tree search.  If it returns a
00853                  * reference to off-page duplicate tree, return immediately
00854                  * and let our caller deal with it.  If the search doesn't
00855                  * return a reference to off-page duplicate tree, continue
00856                  * with an on-page search.
00857                  */
00858                 if (F_ISSET(dbc, DBC_OPD)) {
00859                         if ((ret = __bam_c_search(
00860                             dbc, PGNO_INVALID, data, flags, &exact)) != 0)
00861                                 goto err;
00862                         if (flags == DB_GET_BOTH) {
00863                                 if (!exact) {
00864                                         ret = DB_NOTFOUND;
00865                                         goto err;
00866                                 }
00867                                 break;
00868                         }
00869 
00870                         /*
00871                          * We didn't require an exact match, so the search may
00872                          * may have returned an entry past the end of the page,
00873                          * or we may be referencing a deleted record.  If so,
00874                          * move to the next entry.
00875                          */
00876                         if ((cp->indx == NUM_ENT(cp->page) ||
00877                             IS_CUR_DELETED(dbc)) &&
00878                             (ret = __bam_c_next(dbc, 1, 0)) != 0)
00879                                 goto err;
00880                 } else {
00881                         if ((ret = __bam_c_search(
00882                             dbc, PGNO_INVALID, key, flags, &exact)) != 0)
00883                                 return (ret);
00884                         if (!exact) {
00885                                 ret = DB_NOTFOUND;
00886                                 goto err;
00887                         }
00888 
00889                         if (pgnop != NULL && __bam_isopd(dbc, pgnop)) {
00890                                 newopd = 1;
00891                                 break;
00892                         }
00893                         if ((ret =
00894                             __bam_getboth_finddatum(dbc, data, flags)) != 0)
00895                                 goto err;
00896                 }
00897                 break;
00898         case DB_GET_BOTHC:
00899                 if ((ret = __bam_getbothc(dbc, data)) != 0)
00900                         goto err;
00901                 break;
00902         case DB_LAST:
00903                 newopd = 1;
00904                 if ((ret = __bam_c_search(dbc,
00905                      PGNO_INVALID, NULL, flags, &exact)) != 0)
00906                         goto err;
00907                 break;
00908         case DB_NEXT:
00909                 newopd = 1;
00910                 if (cp->pgno == PGNO_INVALID) {
00911                         if ((ret = __bam_c_search(dbc,
00912                              PGNO_INVALID, NULL, DB_FIRST, &exact)) != 0)
00913                                 goto err;
00914                 } else
00915                         if ((ret = __bam_c_next(dbc, 1, 0)) != 0)
00916                                 goto err;
00917                 break;
00918         case DB_NEXT_DUP:
00919                 if ((ret = __bam_c_next(dbc, 1, 0)) != 0)
00920                         goto err;
00921                 if (!IS_CUR_DUPLICATE(dbc, orig_pgno, orig_indx)) {
00922                         ret = DB_NOTFOUND;
00923                         goto err;
00924                 }
00925                 break;
00926         case DB_NEXT_NODUP:
00927                 newopd = 1;
00928                 if (cp->pgno == PGNO_INVALID) {
00929                         if ((ret = __bam_c_search(dbc,
00930                              PGNO_INVALID, NULL, DB_FIRST, &exact)) != 0)
00931                                 goto err;
00932                 } else
00933                         do {
00934                                 if ((ret = __bam_c_next(dbc, 1, 0)) != 0)
00935                                         goto err;
00936                         } while (IS_CUR_DUPLICATE(dbc, orig_pgno, orig_indx));
00937                 break;
00938         case DB_PREV:
00939                 newopd = 1;
00940                 if (cp->pgno == PGNO_INVALID) {
00941                         if ((ret = __bam_c_search(dbc,
00942                              PGNO_INVALID, NULL, DB_LAST, &exact)) != 0)
00943                                 goto err;
00944                 } else
00945                         if ((ret = __bam_c_prev(dbc)) != 0)
00946                                 goto err;
00947                 break;
00948         case DB_PREV_NODUP:
00949                 newopd = 1;
00950                 if (cp->pgno == PGNO_INVALID) {
00951                         if ((ret = __bam_c_search(dbc,
00952                              PGNO_INVALID, NULL, DB_LAST, &exact)) != 0)
00953                                 goto err;
00954                 } else
00955                         do {
00956                                 if ((ret = __bam_c_prev(dbc)) != 0)
00957                                         goto err;
00958                         } while (IS_CUR_DUPLICATE(dbc, orig_pgno, orig_indx));
00959                 break;
00960         case DB_SET:
00961         case DB_SET_RECNO:
00962                 newopd = 1;
00963                 if ((ret = __bam_c_search(dbc,
00964                     PGNO_INVALID, key, flags, &exact)) != 0)
00965                         goto err;
00966                 break;
00967         case DB_SET_RANGE:
00968                 newopd = 1;
00969                 if ((ret = __bam_c_search(dbc,
00970                     PGNO_INVALID, key, flags, &exact)) != 0)
00971                         goto err;
00972 
00973                 /*
00974                  * As we didn't require an exact match, the search function
00975                  * may have returned an entry past the end of the page.  Or,
00976                  * we may be referencing a deleted record.  If so, move to
00977                  * the next entry.
00978                  */
00979                 if (cp->indx == NUM_ENT(cp->page) || IS_CUR_DELETED(dbc))
00980                         if ((ret = __bam_c_next(dbc, 0, 0)) != 0)
00981                                 goto err;
00982                 break;
00983         default:
00984                 ret = __db_unknown_flag(dbp->dbenv, "__bam_c_get", flags);
00985                 goto err;
00986         }
00987 
00988         /*
00989          * We may have moved to an off-page duplicate tree.  Return that
00990          * information to our caller.
00991          */
00992         if (newopd && pgnop != NULL)
00993                 (void)__bam_isopd(dbc, pgnop);
00994 
00995         /*
00996          * Don't return the key, it was passed to us (this is true even if the
00997          * application defines a compare function returning equality for more
00998          * than one key value, since in that case which actual value we store
00999          * in the database is undefined -- and particularly true in the case of
01000          * duplicates where we only store one key value).
01001          */
01002         if (flags == DB_GET_BOTH ||
01003             flags == DB_GET_BOTH_RANGE || flags == DB_SET)
01004                 F_SET(key, DB_DBT_ISSET);
01005 
01006 err:    /*
01007          * Regardless of whether we were successful or not, if the cursor
01008          * moved, clear the delete flag, DBcursor->c_get never references
01009          * a deleted key, if it moved at all.
01010          */
01011         if (F_ISSET(cp, C_DELETED) &&
01012             (cp->pgno != orig_pgno || cp->indx != orig_indx))
01013                 F_CLR(cp, C_DELETED);
01014 
01015         return (ret);
01016 }
01017 
01018 static int
01019 __bam_get_prev(dbc)
01020         DBC *dbc;
01021 {
01022         BTREE_CURSOR *cp;
01023         DBT key, data;
01024         db_pgno_t pgno;
01025         int ret;
01026 
01027         if ((ret = __bam_c_prev(dbc)) != 0)
01028                 return (ret);
01029 
01030         if (__bam_isopd(dbc, &pgno)) {
01031                 cp = (BTREE_CURSOR *)dbc->internal;
01032                 if ((ret = __db_c_newopd(dbc, pgno, cp->opd, &cp->opd)) != 0)
01033                         return (ret);
01034                 if ((ret = cp->opd->c_am_get(cp->opd,
01035                     &key, &data, DB_LAST, NULL)) != 0)
01036                         return (ret);
01037         }
01038 
01039         return (0);
01040 }
01041 
01042 /*
01043  * __bam_bulk -- Return bulk data from a btree.
01044  */
01045 static int
01046 __bam_bulk(dbc, data, flags)
01047         DBC *dbc;
01048         DBT *data;
01049         u_int32_t flags;
01050 {
01051         BKEYDATA *bk;
01052         BOVERFLOW *bo;
01053         BTREE_CURSOR *cp;
01054         PAGE *pg;
01055         db_indx_t *inp, indx, pg_keyoff;
01056         int32_t  *endp, key_off, *offp, *saveoffp;
01057         u_int8_t *dbuf, *dp, *np;
01058         u_int32_t key_size, pagesize, size, space;
01059         int adj, is_key, need_pg, next_key, no_dup, rec_key, ret;
01060 
01061         ret = 0;
01062         key_off = 0;
01063         size = 0;
01064         pagesize = dbc->dbp->pgsize;
01065         cp = (BTREE_CURSOR *)dbc->internal;
01066 
01067         /*
01068          * dp tracks the beginning of the page in the buffer.
01069          * np is the next place to copy things into the buffer.
01070          * dbuf always stays at the beginning of the buffer.
01071          */
01072         dbuf = data->data;
01073         np = dp = dbuf;
01074 
01075         /* Keep track of space that is left.  There is a termination entry */
01076         space = data->ulen;
01077         space -= sizeof(*offp);
01078 
01079         /* Build the offset/size table from the end up. */
01080         endp = (int32_t *)((u_int8_t *)dbuf + data->ulen);
01081         endp--;
01082         offp = endp;
01083 
01084         key_size = 0;
01085 
01086         /*
01087          * Distinguish between BTREE and RECNO.
01088          * There are no keys in RECNO.  If MULTIPLE_KEY is specified
01089          * then we return the record numbers.
01090          * is_key indicates that multiple btree keys are returned.
01091          * rec_key is set if we are returning record numbers.
01092          * next_key is set if we are going after the next key rather than dup.
01093          */
01094         if (dbc->dbtype == DB_BTREE) {
01095                 is_key = LF_ISSET(DB_MULTIPLE_KEY) ? 1: 0;
01096                 rec_key = 0;
01097                 next_key = is_key && LF_ISSET(DB_OPFLAGS_MASK) != DB_NEXT_DUP;
01098                 adj = 2;
01099         } else {
01100                 is_key = 0;
01101                 rec_key = LF_ISSET(DB_MULTIPLE_KEY) ? 1 : 0;
01102                 next_key = LF_ISSET(DB_OPFLAGS_MASK) != DB_NEXT_DUP;
01103                 adj = 1;
01104         }
01105         no_dup = LF_ISSET(DB_OPFLAGS_MASK) == DB_NEXT_NODUP;
01106 
01107 next_pg:
01108         indx = cp->indx;
01109         pg = cp->page;
01110 
01111         inp = P_INP(dbc->dbp, pg);
01112         /* The current page is not yet in the buffer. */
01113         need_pg = 1;
01114 
01115         /*
01116          * Keep track of the offset of the current key on the page.
01117          * If we are returning keys, set it to 0 first so we force
01118          * the copy of the key to the buffer.
01119          */
01120         pg_keyoff = 0;
01121         if (is_key == 0)
01122                 pg_keyoff = inp[indx];
01123 
01124         do {
01125                 if (IS_DELETED(dbc->dbp, pg, indx)) {
01126                         if (dbc->dbtype != DB_RECNO)
01127                                 continue;
01128 
01129                         cp->recno++;
01130                         /*
01131                          * If we are not returning recnos then we
01132                          * need to fill in every slot so the user
01133                          * can calculate the record numbers.
01134                          */
01135                         if (rec_key != 0)
01136                                 continue;
01137 
01138                         space -= 2 * sizeof(*offp);
01139                         /* Check if space as underflowed. */
01140                         if (space > data->ulen)
01141                                 goto back_up;
01142 
01143                         /* Just mark the empty recno slots. */
01144                         *offp-- = 0;
01145                         *offp-- = 0;
01146                         continue;
01147                 }
01148 
01149                 /*
01150                  * Check to see if we have a new key.
01151                  * If so, then see if we need to put the
01152                  * key on the page.  If its already there
01153                  * then we just point to it.
01154                  */
01155                 if (is_key && pg_keyoff != inp[indx]) {
01156                         bk = GET_BKEYDATA(dbc->dbp, pg, indx);
01157                         if (B_TYPE(bk->type) == B_OVERFLOW) {
01158                                 bo = (BOVERFLOW *)bk;
01159                                 size = key_size = bo->tlen;
01160                                 if (key_size > space)
01161                                         goto get_key_space;
01162                                 if ((ret = __bam_bulk_overflow(dbc,
01163                                     bo->tlen, bo->pgno, np)) != 0)
01164                                         return (ret);
01165                                 space -= key_size;
01166                                 key_off = (int32_t)(np - dbuf);
01167                                 np += key_size;
01168                         } else {
01169                                 if (need_pg) {
01170                                         dp = np;
01171                                         size = pagesize - HOFFSET(pg);
01172                                         if (space < size) {
01173 get_key_space:
01174                                                 /* Nothing added, then error. */
01175                                                 if (offp == endp) {
01176                                                         data->size = (u_int32_t)
01177                                                             DB_ALIGN(size +
01178                                                             pagesize, 1024);
01179                                                         return
01180                                                             (DB_BUFFER_SMALL);
01181                                                 }
01182                                                 /*
01183                                                  * We need to back up to the
01184                                                  * last record put into the
01185                                                  * buffer so that it is
01186                                                  * CURRENT.
01187                                                  */
01188                                                 if (indx != 0)
01189                                                         indx -= P_INDX;
01190                                                 else {
01191                                                         if ((ret =
01192                                                             __bam_get_prev(
01193                                                             dbc)) != 0)
01194                                                                 return (ret);
01195                                                         indx = cp->indx;
01196                                                         pg = cp->page;
01197                                                 }
01198                                                 break;
01199                                         }
01200                                         /*
01201                                          * Move the data part of the page
01202                                          * to the buffer.
01203                                          */
01204                                         memcpy(dp,
01205                                            (u_int8_t *)pg + HOFFSET(pg), size);
01206                                         need_pg = 0;
01207                                         space -= size;
01208                                         np += size;
01209                                 }
01210                                 key_size = bk->len;
01211                                 key_off = (int32_t)((inp[indx] - HOFFSET(pg))
01212                                     + (dp - dbuf) + SSZA(BKEYDATA, data));
01213                                 pg_keyoff = inp[indx];
01214                         }
01215                 }
01216 
01217                 /*
01218                  * Reserve space for the pointers and sizes.
01219                  * Either key/data pair or just for a data item.
01220                  */
01221                 space -= (is_key ? 4 : 2) * sizeof(*offp);
01222                 if (rec_key)
01223                         space -= sizeof(*offp);
01224 
01225                 /* Check to see if space has underflowed. */
01226                 if (space > data->ulen)
01227                         goto back_up;
01228 
01229                 /*
01230                  * Determine if the next record is in the
01231                  * buffer already or if it needs to be copied in.
01232                  * If we have an off page dup, then copy as many
01233                  * as will fit into the buffer.
01234                  */
01235                 bk = GET_BKEYDATA(dbc->dbp, pg, indx + adj - 1);
01236                 if (B_TYPE(bk->type) == B_DUPLICATE) {
01237                         bo = (BOVERFLOW *)bk;
01238                         if (is_key) {
01239                                 *offp-- = (int32_t)key_off;
01240                                 *offp-- = (int32_t)key_size;
01241                         }
01242                         /*
01243                          * We pass the offset of the current key.
01244                          * On return we check to see if offp has
01245                          * moved to see if any data fit.
01246                          */
01247                         saveoffp = offp;
01248                         if ((ret = __bam_bulk_duplicates(dbc, bo->pgno,
01249                             dbuf, is_key ? offp + P_INDX : NULL,
01250                             &offp, &np, &space, no_dup)) != 0) {
01251                                 if (ret == DB_BUFFER_SMALL) {
01252                                         size = space;
01253                                         space = 0;
01254                                         /* If nothing was added, then error. */
01255                                         if (offp == saveoffp) {
01256                                                 offp += 2;
01257                                                 goto back_up;
01258                                         }
01259                                         goto get_space;
01260                                 }
01261                                 return (ret);
01262                         }
01263                 } else if (B_TYPE(bk->type) == B_OVERFLOW) {
01264                         bo = (BOVERFLOW *)bk;
01265                         size = bo->tlen;
01266                         if (size > space)
01267                                 goto back_up;
01268                         if ((ret =
01269                             __bam_bulk_overflow(dbc,
01270                                 bo->tlen, bo->pgno, np)) != 0)
01271                                 return (ret);
01272                         space -= size;
01273                         if (is_key) {
01274                                 *offp-- = (int32_t)key_off;
01275                                 *offp-- = (int32_t)key_size;
01276                         } else if (rec_key)
01277                                 *offp-- = (int32_t)cp->recno;
01278                         *offp-- = (int32_t)(np - dbuf);
01279                         np += size;
01280                         *offp-- = (int32_t)size;
01281                 } else {
01282                         if (need_pg) {
01283                                 dp = np;
01284                                 size = pagesize - HOFFSET(pg);
01285                                 if (space < size) {
01286 back_up:
01287                                         /*
01288                                          * Back up the index so that the
01289                                          * last record in the buffer is CURRENT
01290                                          */
01291                                         if (indx >= adj)
01292                                                 indx -= adj;
01293                                         else {
01294                                                 if ((ret =
01295                                                     __bam_get_prev(dbc)) != 0 &&
01296                                                     ret != DB_NOTFOUND)
01297                                                         return (ret);
01298                                                 indx = cp->indx;
01299                                                 pg = cp->page;
01300                                         }
01301                                         if (dbc->dbtype == DB_RECNO)
01302                                                 cp->recno--;
01303 get_space:
01304                                         /*
01305                                          * See if we put anything in the
01306                                          * buffer or if we are doing a DBP->get
01307                                          * did we get all of the data.
01308                                          */
01309                                         if (offp >=
01310                                             (is_key ? &endp[-1] : endp) ||
01311                                             F_ISSET(dbc, DBC_TRANSIENT)) {
01312                                                 data->size = (u_int32_t)
01313                                                     DB_ALIGN(size +
01314                                                     data->ulen - space, 1024);
01315                                                 return (DB_BUFFER_SMALL);
01316                                         }
01317                                         break;
01318                                 }
01319                                 memcpy(dp, (u_int8_t *)pg + HOFFSET(pg), size);
01320                                 need_pg = 0;
01321                                 space -= size;
01322                                 np += size;
01323                         }
01324                         /*
01325                          * Add the offsets and sizes to the end of the buffer.
01326                          * First add the key info then the data info.
01327                          */
01328                         if (is_key) {
01329                                 *offp-- = (int32_t)key_off;
01330                                 *offp-- = (int32_t)key_size;
01331                         } else if (rec_key)
01332                                 *offp-- = (int32_t)cp->recno;
01333                         *offp-- = (int32_t)((inp[indx + adj - 1] - HOFFSET(pg))
01334                             + (dp - dbuf) + SSZA(BKEYDATA, data));
01335                         *offp-- = bk->len;
01336                 }
01337                 if (dbc->dbtype == DB_RECNO)
01338                         cp->recno++;
01339                 else if (no_dup) {
01340                         while (indx + adj < NUM_ENT(pg) &&
01341                             pg_keyoff == inp[indx + adj])
01342                                 indx += adj;
01343                 }
01344         /*
01345          * Stop when we either run off the page or we move to the next key and
01346          * we are not returning multiple keys.
01347          */
01348         } while ((indx += adj) < NUM_ENT(pg) &&
01349             (next_key || pg_keyoff == inp[indx]));
01350 
01351         /* If we are off the page then try to the next page. */
01352         if (ret == 0 && next_key && indx >= NUM_ENT(pg)) {
01353                 cp->indx = indx;
01354                 ret = __bam_c_next(dbc, 0, 1);
01355                 if (ret == 0)
01356                         goto next_pg;
01357                 if (ret != DB_NOTFOUND)
01358                         return (ret);
01359         }
01360 
01361         /*
01362          * If we did a DBP->get we must error if we did not return
01363          * all the data for the current key because there is
01364          * no way to know if we did not get it all, nor any
01365          * interface to fetch the balance.
01366          */
01367 
01368         if (ret == 0 && indx < pg->entries &&
01369             F_ISSET(dbc, DBC_TRANSIENT) && pg_keyoff == inp[indx]) {
01370                 data->size = (data->ulen - space) + size;
01371                 return (DB_BUFFER_SMALL);
01372         }
01373         /*
01374          * Must leave the index pointing at the last record fetched.
01375          * If we are not fetching keys, we may have stepped to the
01376          * next key.
01377          */
01378         if (ret == DB_BUFFER_SMALL || next_key || pg_keyoff == inp[indx])
01379                 cp->indx = indx;
01380         else
01381                 cp->indx = indx - P_INDX;
01382 
01383         if (rec_key == 1)
01384                 *offp = RECNO_OOB;
01385         else
01386                 *offp = -1;
01387         return (0);
01388 }
01389 
01390 /*
01391  * __bam_bulk_overflow --
01392  *      Dump overflow record into the buffer.
01393  *      The space requirements have already been checked.
01394  * PUBLIC: int __bam_bulk_overflow
01395  * PUBLIC:    __P((DBC *, u_int32_t, db_pgno_t, u_int8_t *));
01396  */
01397 int
01398 __bam_bulk_overflow(dbc, len, pgno, dp)
01399         DBC *dbc;
01400         u_int32_t len;
01401         db_pgno_t pgno;
01402         u_int8_t *dp;
01403 {
01404         DBT dbt;
01405 
01406         memset(&dbt, 0, sizeof(dbt));
01407         F_SET(&dbt, DB_DBT_USERMEM);
01408         dbt.ulen = len;
01409         dbt.data = (void *)dp;
01410         return (__db_goff(dbc->dbp, &dbt, len, pgno, NULL, NULL));
01411 }
01412 
01413 /*
01414  * __bam_bulk_duplicates --
01415  *      Put as many off page duplicates as will fit into the buffer.
01416  * This routine will adjust the cursor to reflect the position in
01417  * the overflow tree.
01418  * PUBLIC: int __bam_bulk_duplicates __P((DBC *,
01419  * PUBLIC:       db_pgno_t, u_int8_t *, int32_t *,
01420  * PUBLIC:       int32_t **, u_int8_t **, u_int32_t *, int));
01421  */
01422 int
01423 __bam_bulk_duplicates(dbc, pgno, dbuf, keyoff, offpp, dpp, spacep, no_dup)
01424         DBC *dbc;
01425         db_pgno_t pgno;
01426         u_int8_t *dbuf;
01427         int32_t *keyoff, **offpp;
01428         u_int8_t **dpp;
01429         u_int32_t *spacep;
01430         int no_dup;
01431 {
01432         DB *dbp;
01433         BKEYDATA *bk;
01434         BOVERFLOW *bo;
01435         BTREE_CURSOR *cp;
01436         DBC *opd;
01437         DBT key, data;
01438         PAGE *pg;
01439         db_indx_t indx, *inp;
01440         int32_t *offp;
01441         u_int32_t pagesize, size, space;
01442         u_int8_t *dp, *np;
01443         int first, need_pg, ret, t_ret;
01444 
01445         ret = 0;
01446 
01447         dbp = dbc->dbp;
01448         cp = (BTREE_CURSOR *)dbc->internal;
01449         opd = cp->opd;
01450 
01451         if (opd == NULL) {
01452                 if ((ret = __db_c_newopd(dbc, pgno, NULL, &opd)) != 0)
01453                         return (ret);
01454                 cp->opd = opd;
01455                 if ((ret = opd->c_am_get(opd,
01456                     &key, &data, DB_FIRST, NULL)) != 0)
01457                         goto close_opd;
01458         }
01459 
01460         pagesize = opd->dbp->pgsize;
01461         cp = (BTREE_CURSOR *)opd->internal;
01462         space = *spacep;
01463         /* Get current offset slot. */
01464         offp = *offpp;
01465 
01466         /*
01467          * np is the next place to put data.
01468          * dp is the beginning of the current page in the buffer.
01469          */
01470         np = dp = *dpp;
01471         first = 1;
01472         indx = cp->indx;
01473 
01474         do {
01475                 /* Fetch the current record.  No initial move. */
01476                 if ((ret = __bam_c_next(opd, 0, 0)) != 0)
01477                         break;
01478                 pg = cp->page;
01479                 indx = cp->indx;
01480                 inp = P_INP(dbp, pg);
01481                 /* We need to copy the page to the buffer. */
01482                 need_pg = 1;
01483 
01484                 do {
01485                         if (IS_DELETED(dbp, pg, indx))
01486                                 goto contin;
01487                         bk = GET_BKEYDATA(dbp, pg, indx);
01488                         space -= 2 * sizeof(*offp);
01489                         /* Allocate space for key if needed. */
01490                         if (first == 0 && keyoff != NULL)
01491                                 space -= 2 * sizeof(*offp);
01492 
01493                         /* Did space underflow? */
01494                         if (space > *spacep) {
01495                                 ret = DB_BUFFER_SMALL;
01496                                 if (first == 1) {
01497                                         /* Get the absolute value. */
01498                                         space = -(int32_t)space;
01499                                         space = *spacep + space;
01500                                         if (need_pg)
01501                                                 space += pagesize - HOFFSET(pg);
01502                                 }
01503                                 break;
01504                         }
01505                         if (B_TYPE(bk->type) == B_OVERFLOW) {
01506                                 bo = (BOVERFLOW *)bk;
01507                                 size = bo->tlen;
01508                                 if (size > space) {
01509                                         ret = DB_BUFFER_SMALL;
01510                                         space = *spacep + size;
01511                                         break;
01512                                 }
01513                                 if (first == 0 && keyoff != NULL) {
01514                                         *offp-- = keyoff[0];
01515                                         *offp-- = keyoff[-1];
01516                                 }
01517                                 if ((ret = __bam_bulk_overflow(dbc,
01518                                     bo->tlen, bo->pgno, np)) != 0)
01519                                         return (ret);
01520                                 space -= size;
01521                                 *offp-- = (int32_t)(np - dbuf);
01522                                 np += size;
01523                         } else {
01524                                 if (need_pg) {
01525                                         dp = np;
01526                                         size = pagesize - HOFFSET(pg);
01527                                         if (space < size) {
01528                                                 ret = DB_BUFFER_SMALL;
01529                                                 /* Return space required. */
01530                                                 space = *spacep + size;
01531                                                 break;
01532                                         }
01533                                         memcpy(dp,
01534                                             (u_int8_t *)pg + HOFFSET(pg), size);
01535                                         need_pg = 0;
01536                                         space -= size;
01537                                         np += size;
01538                                 }
01539                                 if (first == 0 && keyoff != NULL) {
01540                                         *offp-- = keyoff[0];
01541                                         *offp-- = keyoff[-1];
01542                                 }
01543                                 size = bk->len;
01544                                 *offp-- = (int32_t)((inp[indx] - HOFFSET(pg))
01545                                     + (dp - dbuf) + SSZA(BKEYDATA, data));
01546                         }
01547                         *offp-- = (int32_t)size;
01548                         first = 0;
01549                         if (no_dup)
01550                                 break;
01551 contin:
01552                         indx++;
01553                         if (opd->dbtype == DB_RECNO)
01554                                 cp->recno++;
01555                 } while (indx < NUM_ENT(pg));
01556                 if (no_dup)
01557                         break;
01558                 cp->indx = indx;
01559 
01560         } while (ret == 0);
01561 
01562         /* Return the updated information. */
01563         *spacep = space;
01564         *offpp = offp;
01565         *dpp = np;
01566 
01567         /*
01568          * If we ran out of space back up the pointer.
01569          * If we did not return any dups or reached the end, close the opd.
01570          */
01571         if (ret == DB_BUFFER_SMALL) {
01572                 if (opd->dbtype == DB_RECNO) {
01573                         if (--cp->recno == 0)
01574                                 goto close_opd;
01575                 } else if (indx != 0)
01576                         cp->indx--;
01577                 else {
01578                         t_ret = __bam_c_prev(opd);
01579                         if (t_ret == DB_NOTFOUND)
01580                                 goto close_opd;
01581                         if (t_ret != 0)
01582                                 ret = t_ret;
01583                 }
01584         } else if (keyoff == NULL && ret == DB_NOTFOUND) {
01585                 cp->indx--;
01586                 if (opd->dbtype == DB_RECNO)
01587                         --cp->recno;
01588         } else if (indx == 0 || ret == DB_NOTFOUND) {
01589 close_opd:
01590                 if (ret == DB_NOTFOUND)
01591                         ret = 0;
01592                 if ((t_ret = __db_c_close(opd)) != 0 && ret == 0)
01593                         ret = t_ret;
01594                 ((BTREE_CURSOR *)dbc->internal)->opd = NULL;
01595         }
01596         if (ret == DB_NOTFOUND)
01597                 ret = 0;
01598 
01599         return (ret);
01600 }
01601 
01602 /*
01603  * __bam_getbothc --
01604  *      Search for a matching data item on a join.
01605  */
01606 static int
01607 __bam_getbothc(dbc, data)
01608         DBC *dbc;
01609         DBT *data;
01610 {
01611         BTREE_CURSOR *cp;
01612         DB *dbp;
01613         DB_MPOOLFILE *mpf;
01614         int cmp, exact, ret;
01615 
01616         dbp = dbc->dbp;
01617         mpf = dbp->mpf;
01618         cp = (BTREE_CURSOR *)dbc->internal;
01619 
01620         /*
01621          * Acquire the current page.  We have at least a read-lock
01622          * already.  The caller may have set DB_RMW asking for a
01623          * write lock, but upgrading to a write lock has no better
01624          * chance of succeeding now instead of later, so don't try.
01625          */
01626         if ((ret = __memp_fget(mpf, &cp->pgno, 0, &cp->page)) != 0)
01627                 return (ret);
01628 
01629         /*
01630          * An off-page duplicate cursor.  Search the remaining duplicates
01631          * for one which matches (do a normal btree search, then verify
01632          * that the retrieved record is greater than the original one).
01633          */
01634         if (F_ISSET(dbc, DBC_OPD)) {
01635                 /*
01636                  * Check to make sure the desired item comes strictly after
01637                  * the current position;  if it doesn't, return DB_NOTFOUND.
01638                  */
01639                 if ((ret = __bam_cmp(dbp, data, cp->page, cp->indx,
01640                     dbp->dup_compare == NULL ? __bam_defcmp : dbp->dup_compare,
01641                     &cmp)) != 0)
01642                         return (ret);
01643 
01644                 if (cmp <= 0)
01645                         return (DB_NOTFOUND);
01646 
01647                 /* Discard the current page, we're going to do a full search. */
01648                 if ((ret = __memp_fput(mpf, cp->page, 0)) != 0)
01649                         return (ret);
01650                 cp->page = NULL;
01651 
01652                 return (__bam_c_search(dbc,
01653                     PGNO_INVALID, data, DB_GET_BOTH, &exact));
01654         }
01655 
01656         /*
01657          * We're doing a DBC->c_get(DB_GET_BOTHC) and we're already searching
01658          * a set of on-page duplicates (either sorted or unsorted).  Continue
01659          * a linear search from after the current position.
01660          *
01661          * (Note that we could have just finished a "set" of one duplicate,
01662          * i.e. not a duplicate at all, but the following check will always
01663          * return DB_NOTFOUND in this case, which is the desired behavior.)
01664          */
01665         if (cp->indx + P_INDX >= NUM_ENT(cp->page) ||
01666             !IS_DUPLICATE(dbc, cp->indx, cp->indx + P_INDX))
01667                 return (DB_NOTFOUND);
01668         cp->indx += P_INDX;
01669 
01670         return (__bam_getboth_finddatum(dbc, data, DB_GET_BOTH));
01671 }
01672 
01673 /*
01674  * __bam_getboth_finddatum --
01675  *      Find a matching on-page data item.
01676  */
01677 static int
01678 __bam_getboth_finddatum(dbc, data, flags)
01679         DBC *dbc;
01680         DBT *data;
01681         u_int32_t flags;
01682 {
01683         BTREE_CURSOR *cp;
01684         DB *dbp;
01685         db_indx_t base, lim, top;
01686         int cmp, ret;
01687 
01688         COMPQUIET(cmp, 0);
01689 
01690         dbp = dbc->dbp;
01691         cp = (BTREE_CURSOR *)dbc->internal;
01692 
01693         /*
01694          * Called (sometimes indirectly) from DBC->get to search on-page data
01695          * item(s) for a matching value.  If the original flag was DB_GET_BOTH
01696          * or DB_GET_BOTH_RANGE, the cursor is set to the first undeleted data
01697          * item for the key.  If the original flag was DB_GET_BOTHC, the cursor
01698          * argument is set to the first data item we can potentially return.
01699          * In both cases, there may or may not be additional duplicate data
01700          * items to search.
01701          *
01702          * If the duplicates are not sorted, do a linear search.
01703          */
01704         if (dbp->dup_compare == NULL) {
01705                 for (;; cp->indx += P_INDX) {
01706                         if (!IS_CUR_DELETED(dbc) &&
01707                             (ret = __bam_cmp(dbp, data, cp->page,
01708                             cp->indx + O_INDX, __bam_defcmp, &cmp)) != 0)
01709                                 return (ret);
01710                         if (cmp == 0)
01711                                 return (0);
01712 
01713                         if (cp->indx + P_INDX >= NUM_ENT(cp->page) ||
01714                             !IS_DUPLICATE(dbc, cp->indx, cp->indx + P_INDX))
01715                                 break;
01716                 }
01717                 return (DB_NOTFOUND);
01718         }
01719 
01720         /*
01721          * If the duplicates are sorted, do a binary search.  The reason for
01722          * this is that large pages and small key/data pairs result in large
01723          * numbers of on-page duplicates before they get pushed off-page.
01724          *
01725          * Find the top and bottom of the duplicate set.  Binary search
01726          * requires at least two items, don't loop if there's only one.
01727          */
01728         for (base = top = cp->indx; top < NUM_ENT(cp->page); top += P_INDX)
01729                 if (!IS_DUPLICATE(dbc, cp->indx, top))
01730                         break;
01731         if (base == (top - P_INDX)) {
01732                 if  ((ret = __bam_cmp(dbp, data,
01733                     cp->page, cp->indx + O_INDX, dbp->dup_compare, &cmp)) != 0)
01734                         return (ret);
01735                 return (cmp == 0 ||
01736                     (cmp < 0 && flags == DB_GET_BOTH_RANGE) ? 0 : DB_NOTFOUND);
01737         }
01738 
01739         for (lim = (top - base) / (db_indx_t)P_INDX; lim != 0; lim >>= 1) {
01740                 cp->indx = base + ((lim >> 1) * P_INDX);
01741                 if ((ret = __bam_cmp(dbp, data, cp->page,
01742                     cp->indx + O_INDX, dbp->dup_compare, &cmp)) != 0)
01743                         return (ret);
01744                 if (cmp == 0) {
01745                         /*
01746                          * XXX
01747                          * No duplicate duplicates in sorted duplicate sets,
01748                          * so there can be only one.
01749                          */
01750                         if (!IS_CUR_DELETED(dbc))
01751                                 return (0);
01752                         break;
01753                 }
01754                 if (cmp > 0) {
01755                         base = cp->indx + P_INDX;
01756                         --lim;
01757                 }
01758         }
01759 
01760         /* No match found; if we're looking for an exact match, we're done. */
01761         if (flags == DB_GET_BOTH)
01762                 return (DB_NOTFOUND);
01763 
01764         /*
01765          * Base is the smallest index greater than the data item, may be zero
01766          * or a last + O_INDX index, and may be deleted.  Find an undeleted
01767          * item.
01768          */
01769         cp->indx = base;
01770         while (cp->indx < top && IS_CUR_DELETED(dbc))
01771                 cp->indx += P_INDX;
01772         return (cp->indx < top ? 0 : DB_NOTFOUND);
01773 }
01774 
01775 /*
01776  * __bam_c_put --
01777  *      Put using a cursor.
01778  */
01779 static int
01780 __bam_c_put(dbc, key, data, flags, pgnop)
01781         DBC *dbc;
01782         DBT *key, *data;
01783         u_int32_t flags;
01784         db_pgno_t *pgnop;
01785 {
01786         BTREE *t;
01787         BTREE_CURSOR *cp;
01788         DB *dbp;
01789         DBT dbt;
01790         DB_MPOOLFILE *mpf;
01791         db_pgno_t root_pgno;
01792         u_int32_t iiop;
01793         int cmp, exact, own, ret, stack;
01794         void *arg;
01795 
01796         dbp = dbc->dbp;
01797         mpf = dbp->mpf;
01798         cp = (BTREE_CURSOR *)dbc->internal;
01799         root_pgno = cp->root;
01800 
01801 split:  ret = stack = 0;
01802         switch (flags) {
01803         case DB_CURRENT:
01804                 if (F_ISSET(cp, C_DELETED))
01805                         return (DB_NOTFOUND);
01806                 /* FALLTHROUGH */
01807 
01808         case DB_AFTER:
01809         case DB_BEFORE:
01810                 iiop = flags;
01811                 own = 1;
01812 
01813                 /* Acquire the current page with a write lock. */
01814                 ACQUIRE_WRITE_LOCK(dbc, ret);
01815                 if (ret != 0)
01816                         goto err;
01817                 if ((ret = __memp_fget(mpf, &cp->pgno, 0, &cp->page)) != 0)
01818                         goto err;
01819                 break;
01820         case DB_KEYFIRST:
01821         case DB_KEYLAST:
01822         case DB_NODUPDATA:
01823                 own = 0;
01824                 /*
01825                  * Searching off-page, sorted duplicate tree: do a tree search
01826                  * for the correct item; __bam_c_search returns the smallest
01827                  * slot greater than the key, use it.
01828                  *
01829                  * See comment below regarding where we can start the search.
01830                  */
01831                 if (F_ISSET(dbc, DBC_OPD)) {
01832                         if ((ret = __bam_c_search(dbc,
01833                             F_ISSET(cp, C_RECNUM) ? cp->root : root_pgno,
01834                             data, flags, &exact)) != 0)
01835                                 goto err;
01836                         stack = 1;
01837 
01838                         /* Disallow "sorted" duplicate duplicates. */
01839                         if (exact) {
01840                                 if (IS_DELETED(dbp, cp->page, cp->indx)) {
01841                                         iiop = DB_CURRENT;
01842                                         break;
01843                                 }
01844                                 ret = __db_duperr(dbp, flags);
01845                                 goto err;
01846                         }
01847                         iiop = DB_BEFORE;
01848                         break;
01849                 }
01850 
01851                 /*
01852                  * Searching a btree.
01853                  *
01854                  * If we've done a split, we can start the search from the
01855                  * parent of the split page, which __bam_split returned
01856                  * for us in root_pgno, unless we're in a Btree with record
01857                  * numbering.  In that case, we'll need the true root page
01858                  * in order to adjust the record count.
01859                  */
01860                 if ((ret = __bam_c_search(dbc,
01861                     F_ISSET(cp, C_RECNUM) ? cp->root : root_pgno, key,
01862                     flags == DB_KEYFIRST || dbp->dup_compare != NULL ?
01863                     DB_KEYFIRST : DB_KEYLAST, &exact)) != 0)
01864                         goto err;
01865                 stack = 1;
01866 
01867                 /*
01868                  * If we don't have an exact match, __bam_c_search returned
01869                  * the smallest slot greater than the key, use it.
01870                  */
01871                 if (!exact) {
01872                         iiop = DB_KEYFIRST;
01873                         break;
01874                 }
01875 
01876                 /*
01877                  * If duplicates aren't supported, replace the current item.
01878                  * (If implementing the DB->put function, our caller already
01879                  * checked the DB_NOOVERWRITE flag.)
01880                  */
01881                 if (!F_ISSET(dbp, DB_AM_DUP)) {
01882                         iiop = DB_CURRENT;
01883                         break;
01884                 }
01885 
01886                 /*
01887                  * If we find a matching entry, it may be an off-page duplicate
01888                  * tree.  Return the page number to our caller, we need a new
01889                  * cursor.
01890                  */
01891                 if (pgnop != NULL && __bam_isopd(dbc, pgnop))
01892                         goto done;
01893 
01894                 /* If the duplicates aren't sorted, move to the right slot. */
01895                 if (dbp->dup_compare == NULL) {
01896                         if (flags == DB_KEYFIRST)
01897                                 iiop = DB_BEFORE;
01898                         else
01899                                 for (;; cp->indx += P_INDX)
01900                                         if (cp->indx + P_INDX >=
01901                                             NUM_ENT(cp->page) ||
01902                                             !IS_DUPLICATE(dbc, cp->indx,
01903                                             cp->indx + P_INDX)) {
01904                                                 iiop = DB_AFTER;
01905                                                 break;
01906                                         }
01907                         break;
01908                 }
01909 
01910                 /*
01911                  * We know that we're looking at the first of a set of sorted
01912                  * on-page duplicates.  Walk the list to find the right slot.
01913                  */
01914                 for (;; cp->indx += P_INDX) {
01915                         if ((ret = __bam_cmp(dbp, data, cp->page,
01916                             cp->indx + O_INDX, dbp->dup_compare, &cmp)) != 0)
01917                                 goto err;
01918                         if (cmp < 0) {
01919                                 iiop = DB_BEFORE;
01920                                 break;
01921                         }
01922 
01923                         /* Disallow "sorted" duplicate duplicates. */
01924                         if (cmp == 0) {
01925                                 if (IS_DELETED(dbp, cp->page, cp->indx)) {
01926                                         iiop = DB_CURRENT;
01927                                         break;
01928                                 }
01929                                 ret = __db_duperr(dbp, flags);
01930                                 goto err;
01931                         }
01932 
01933                         if (cp->indx + P_INDX >= NUM_ENT(cp->page) ||
01934                             P_INP(dbp, ((PAGE *)cp->page))[cp->indx] !=
01935                             P_INP(dbp, ((PAGE *)cp->page))[cp->indx + P_INDX]) {
01936                                 iiop = DB_AFTER;
01937                                 break;
01938                         }
01939                 }
01940                 break;
01941         default:
01942                 ret = __db_unknown_flag(dbp->dbenv, "__bam_c_put", flags);
01943                 goto err;
01944         }
01945 
01946         switch (ret = __bam_iitem(dbc, key, data, iiop, 0)) {
01947         case 0:
01948                 break;
01949         case DB_NEEDSPLIT:
01950                 /*
01951                  * To split, we need a key for the page.  Either use the key
01952                  * argument or get a copy of the key from the page.
01953                  */
01954                 if (flags == DB_AFTER ||
01955                     flags == DB_BEFORE || flags == DB_CURRENT) {
01956                         memset(&dbt, 0, sizeof(DBT));
01957                         if ((ret = __db_ret(dbp, cp->page, 0, &dbt,
01958                             &dbc->my_rkey.data, &dbc->my_rkey.ulen)) != 0)
01959                                 goto err;
01960                         arg = &dbt;
01961                 } else
01962                         arg = F_ISSET(dbc, DBC_OPD) ? data : key;
01963 
01964                 /*
01965                  * Discard any locks and pinned pages (the locks are discarded
01966                  * even if we're running with transactions, as they lock pages
01967                  * that we're sorry we ever acquired).  If stack is set and the
01968                  * cursor entries are valid, they point to the same entries as
01969                  * the stack, don't free them twice.
01970                  */
01971                 if (stack)
01972                         ret = __bam_stkrel(dbc, STK_CLRDBC | STK_NOLOCK);
01973                 else
01974                         DISCARD_CUR(dbc, ret);
01975                 if (ret != 0)
01976                         goto err;
01977 
01978                 /*
01979                  * SR [#6059]
01980                  * If we do not own a lock on the page any more, then clear the
01981                  * cursor so we don't point at it.  Even if we call __bam_stkrel
01982                  * above we still may have entered the routine with the cursor
01983                  * positioned to a particular record.  This is in the case
01984                  * where C_RECNUM is set.
01985                  */
01986                 if (own == 0) {
01987                         cp->pgno = PGNO_INVALID;
01988                         cp->indx = 0;
01989                 }
01990 
01991                 /* Split the tree. */
01992                 if ((ret = __bam_split(dbc, arg, &root_pgno)) != 0)
01993                         return (ret);
01994 
01995                 goto split;
01996         default:
01997                 goto err;
01998         }
01999 
02000 err:
02001 done:   /*
02002          * If we inserted a key into the first or last slot of the tree,
02003          * remember where it was so we can do it more quickly next time.
02004          * If the tree has record numbers, we need a complete stack so
02005          * that we can adjust the record counts, so skipping the tree search
02006          * isn't possible.  For subdatabases we need to be careful that the
02007          * page does not move from one db to another, so we track its LSN.
02008          *
02009          * If there are duplicates and we are inserting into the last slot,
02010          * the cursor will point _to_ the last item, not after it, which
02011          * is why we subtract P_INDX below.
02012          */
02013 
02014         t = dbp->bt_internal;
02015         if (ret == 0 && TYPE(cp->page) == P_LBTREE &&
02016             (flags == DB_KEYFIRST || flags == DB_KEYLAST) &&
02017             !F_ISSET(cp, C_RECNUM) &&
02018             (!F_ISSET(dbp, DB_AM_SUBDB) ||
02019             (LOGGING_ON(dbp->dbenv) && !F_ISSET(dbp, DB_AM_NOT_DURABLE))) &&
02020             ((NEXT_PGNO(cp->page) == PGNO_INVALID &&
02021             cp->indx >= NUM_ENT(cp->page) - P_INDX) ||
02022             (PREV_PGNO(cp->page) == PGNO_INVALID && cp->indx == 0))) {
02023                 t->bt_lpgno = cp->pgno;
02024                 if (F_ISSET(dbp, DB_AM_SUBDB))
02025                         t->bt_llsn = LSN(cp->page);
02026         } else
02027                 t->bt_lpgno = PGNO_INVALID;
02028         /*
02029          * Discard any pages pinned in the tree and their locks, except for
02030          * the leaf page.  Note, the leaf page participated in any stack we
02031          * acquired, and so we have to adjust the stack as necessary.  If
02032          * there was only a single page on the stack, we don't have to free
02033          * further stack pages.
02034          */
02035         if (stack && BT_STK_POP(cp) != NULL)
02036                 (void)__bam_stkrel(dbc, 0);
02037 
02038         /*
02039          * Regardless of whether we were successful or not, clear the delete
02040          * flag.  If we're successful, we either moved the cursor or the item
02041          * is no longer deleted.  If we're not successful, then we're just a
02042          * copy, no need to have the flag set.
02043          *
02044          * We may have instantiated off-page duplicate cursors during the put,
02045          * so clear the deleted bit from the off-page duplicate cursor as well.
02046          */
02047         F_CLR(cp, C_DELETED);
02048         if (cp->opd != NULL) {
02049                 cp = (BTREE_CURSOR *)cp->opd->internal;
02050                 F_CLR(cp, C_DELETED);
02051         }
02052 
02053         return (ret);
02054 }
02055 
02056 /*
02057  * __bam_c_rget --
02058  *      Return the record number for a cursor.
02059  *
02060  * PUBLIC: int __bam_c_rget __P((DBC *, DBT *));
02061  */
02062 int
02063 __bam_c_rget(dbc, data)
02064         DBC *dbc;
02065         DBT *data;
02066 {
02067         BTREE_CURSOR *cp;
02068         DB *dbp;
02069         DBT dbt;
02070         DB_MPOOLFILE *mpf;
02071         db_recno_t recno;
02072         int exact, ret, t_ret;
02073 
02074         dbp = dbc->dbp;
02075         mpf = dbp->mpf;
02076         cp = (BTREE_CURSOR *)dbc->internal;
02077 
02078         /*
02079          * Get the page with the current item on it.
02080          * Get a copy of the key.
02081          * Release the page, making sure we don't release it twice.
02082          */
02083         if ((ret = __memp_fget(mpf, &cp->pgno, 0, &cp->page)) != 0)
02084                 return (ret);
02085         memset(&dbt, 0, sizeof(DBT));
02086         if ((ret = __db_ret(dbp, cp->page,
02087             cp->indx, &dbt, &dbc->my_rkey.data, &dbc->my_rkey.ulen)) != 0)
02088                 goto err;
02089         ret = __memp_fput(mpf, cp->page, 0);
02090         cp->page = NULL;
02091         if (ret != 0)
02092                 return (ret);
02093 
02094         if ((ret = __bam_search(dbc, PGNO_INVALID, &dbt,
02095             F_ISSET(dbc, DBC_RMW) ? S_FIND_WR : S_FIND,
02096             1, &recno, &exact)) != 0)
02097                 goto err;
02098 
02099         ret = __db_retcopy(dbp->dbenv, data,
02100             &recno, sizeof(recno), &dbc->rdata->data, &dbc->rdata->ulen);
02101 
02102         /* Release the stack. */
02103 err:    if ((t_ret = __bam_stkrel(dbc, 0)) != 0 && ret == 0)
02104                 ret = t_ret;
02105 
02106         return (ret);
02107 }
02108 
02109 /*
02110  * __bam_c_writelock --
02111  *      Upgrade the cursor to a write lock.
02112  */
02113 static int
02114 __bam_c_writelock(dbc)
02115         DBC *dbc;
02116 {
02117         BTREE_CURSOR *cp;
02118         int ret;
02119 
02120         cp = (BTREE_CURSOR *)dbc->internal;
02121 
02122         if (cp->lock_mode == DB_LOCK_WRITE)
02123                 return (0);
02124 
02125         /*
02126          * When writing to an off-page duplicate tree, we need to have the
02127          * appropriate page in the primary tree locked.  The general DBC
02128          * code calls us first with the primary cursor so we can acquire the
02129          * appropriate lock.
02130          */
02131         ACQUIRE_WRITE_LOCK(dbc, ret);
02132         return (ret);
02133 }
02134 
02135 /*
02136  * __bam_c_next --
02137  *      Move to the next record.
02138  */
02139 static int
02140 __bam_c_next(dbc, initial_move, deleted_okay)
02141         DBC *dbc;
02142         int initial_move, deleted_okay;
02143 {
02144         BTREE_CURSOR *cp;
02145         db_indx_t adjust;
02146         db_lockmode_t lock_mode;
02147         db_pgno_t pgno;
02148         int ret;
02149 
02150         cp = (BTREE_CURSOR *)dbc->internal;
02151         ret = 0;
02152 
02153         /*
02154          * We're either moving through a page of duplicates or a btree leaf
02155          * page.
02156          *
02157          * !!!
02158          * This code handles empty pages and pages with only deleted entries.
02159          */
02160         if (F_ISSET(dbc, DBC_OPD)) {
02161                 adjust = O_INDX;
02162                 lock_mode = DB_LOCK_NG;
02163         } else {
02164                 adjust = dbc->dbtype == DB_BTREE ? P_INDX : O_INDX;
02165                 lock_mode =
02166                     F_ISSET(dbc, DBC_RMW) ? DB_LOCK_WRITE : DB_LOCK_READ;
02167         }
02168         if (cp->page == NULL) {
02169                 ACQUIRE_CUR(dbc, lock_mode, cp->pgno, ret);
02170                 if (ret != 0)
02171                         return (ret);
02172         }
02173 
02174         if (initial_move)
02175                 cp->indx += adjust;
02176 
02177         for (;;) {
02178                 /*
02179                  * If at the end of the page, move to a subsequent page.
02180                  *
02181                  * !!!
02182                  * Check for >= NUM_ENT.  If the original search landed us on
02183                  * NUM_ENT, we may have incremented indx before the test.
02184                  */
02185                 if (cp->indx >= NUM_ENT(cp->page)) {
02186                         if ((pgno
02187                             = NEXT_PGNO(cp->page)) == PGNO_INVALID)
02188                                 return (DB_NOTFOUND);
02189 
02190                         ACQUIRE_CUR(dbc, lock_mode, pgno, ret);
02191                         if (ret != 0)
02192                                 return (ret);
02193                         cp->indx = 0;
02194                         continue;
02195                 }
02196                 if (!deleted_okay && IS_CUR_DELETED(dbc)) {
02197                         cp->indx += adjust;
02198                         continue;
02199                 }
02200                 break;
02201         }
02202         return (0);
02203 }
02204 
02205 /*
02206  * __bam_c_prev --
02207  *      Move to the previous record.
02208  */
02209 static int
02210 __bam_c_prev(dbc)
02211         DBC *dbc;
02212 {
02213         BTREE_CURSOR *cp;
02214         db_indx_t adjust;
02215         db_lockmode_t lock_mode;
02216         db_pgno_t pgno;
02217         int ret;
02218 
02219         cp = (BTREE_CURSOR *)dbc->internal;
02220         ret = 0;
02221 
02222         /*
02223          * We're either moving through a page of duplicates or a btree leaf
02224          * page.
02225          *
02226          * !!!
02227          * This code handles empty pages and pages with only deleted entries.
02228          */
02229         if (F_ISSET(dbc, DBC_OPD)) {
02230                 adjust = O_INDX;
02231                 lock_mode = DB_LOCK_NG;
02232         } else {
02233                 adjust = dbc->dbtype == DB_BTREE ? P_INDX : O_INDX;
02234                 lock_mode =
02235                     F_ISSET(dbc, DBC_RMW) ? DB_LOCK_WRITE : DB_LOCK_READ;
02236         }
02237         if (cp->page == NULL) {
02238                 ACQUIRE_CUR(dbc, lock_mode, cp->pgno, ret);
02239                 if (ret != 0)
02240                         return (ret);
02241         }
02242 
02243         for (;;) {
02244                 /* If at the beginning of the page, move to a previous one. */
02245                 if (cp->indx == 0) {
02246                         if ((pgno =
02247                             PREV_PGNO(cp->page)) == PGNO_INVALID)
02248                                 return (DB_NOTFOUND);
02249 
02250                         ACQUIRE_CUR(dbc, lock_mode, pgno, ret);
02251                         if (ret != 0)
02252                                 return (ret);
02253 
02254                         if ((cp->indx = NUM_ENT(cp->page)) == 0)
02255                                 continue;
02256                 }
02257 
02258                 /* Ignore deleted records. */
02259                 cp->indx -= adjust;
02260                 if (IS_CUR_DELETED(dbc))
02261                         continue;
02262 
02263                 break;
02264         }
02265         return (0);
02266 }
02267 
02268 /*
02269  * __bam_c_search --
02270  *      Move to a specified record.
02271  */
02272 static int
02273 __bam_c_search(dbc, root_pgno, key, flags, exactp)
02274         DBC *dbc;
02275         db_pgno_t root_pgno;
02276         const DBT *key;
02277         u_int32_t flags;
02278         int *exactp;
02279 {
02280         BTREE *t;
02281         BTREE_CURSOR *cp;
02282         DB *dbp;
02283         PAGE *h;
02284         db_indx_t indx, *inp;
02285         db_pgno_t bt_lpgno;
02286         db_recno_t recno;
02287         u_int32_t sflags;
02288         int cmp, ret, t_ret;
02289 
02290         dbp = dbc->dbp;
02291         cp = (BTREE_CURSOR *)dbc->internal;
02292         t = dbp->bt_internal;
02293         ret = 0;
02294 
02295         /*
02296          * Find an entry in the database.  Discard any lock we currently hold,
02297          * we're going to search the tree.
02298          */
02299         DISCARD_CUR(dbc, ret);
02300         if (ret != 0)
02301                 return (ret);
02302 
02303         switch (flags) {
02304         case DB_FIRST:
02305                 sflags = (F_ISSET(dbc, DBC_RMW) ? S_WRITE : S_READ) | S_MIN;
02306                 goto search;
02307         case DB_LAST:
02308                 sflags = (F_ISSET(dbc, DBC_RMW) ? S_WRITE : S_READ) | S_MAX;
02309                 goto search;
02310         case DB_SET_RECNO:
02311                 if ((ret = __ram_getno(dbc, key, &recno, 0)) != 0)
02312                         return (ret);
02313                 sflags = (F_ISSET(dbc, DBC_RMW) ? S_FIND_WR : S_FIND) | S_EXACT;
02314                 if ((ret = __bam_rsearch(dbc, &recno, sflags, 1, exactp)) != 0)
02315                         return (ret);
02316                 break;
02317         case DB_SET:
02318         case DB_GET_BOTH:
02319                 sflags = (F_ISSET(dbc, DBC_RMW) ? S_FIND_WR : S_FIND) | S_EXACT;
02320                 goto search;
02321         case DB_GET_BOTH_RANGE:
02322                 sflags = (F_ISSET(dbc, DBC_RMW) ? S_FIND_WR : S_FIND);
02323                 goto search;
02324         case DB_SET_RANGE:
02325                 sflags =
02326                     (F_ISSET(dbc, DBC_RMW) ? S_WRITE : S_READ) | S_DUPFIRST;
02327                 goto search;
02328         case DB_KEYFIRST:
02329                 sflags = S_KEYFIRST;
02330                 goto fast_search;
02331         case DB_KEYLAST:
02332         case DB_NODUPDATA:
02333                 sflags = S_KEYLAST;
02334 fast_search:    /*
02335                  * If the application has a history of inserting into the first
02336                  * or last pages of the database, we check those pages first to
02337                  * avoid doing a full search.
02338                  */
02339                 if (F_ISSET(dbc, DBC_OPD))
02340                         goto search;
02341 
02342                 /*
02343                  * !!!
02344                  * We do not mutex protect the t->bt_lpgno field, which means
02345                  * that it can only be used in an advisory manner.  If we find
02346                  * page we can use, great.  If we don't, we don't care, we do
02347                  * it the slow way instead.  Regardless, copy it into a local
02348                  * variable, otherwise we might acquire a lock for a page and
02349                  * then read a different page because it changed underfoot.
02350                  */
02351                 bt_lpgno = t->bt_lpgno;
02352 
02353                 /*
02354                  * If the tree has no history of insertion, do it the slow way.
02355                  */
02356                 if (bt_lpgno == PGNO_INVALID)
02357                         goto search;
02358 
02359                 /*
02360                  * Lock and retrieve the page on which we last inserted.
02361                  *
02362                  * The page may not exist: if a transaction created the page
02363                  * and then aborted, the page might have been truncated from
02364                  * the end of the file.
02365                  */
02366                 h = NULL;
02367                 ACQUIRE_CUR(dbc, DB_LOCK_WRITE, bt_lpgno, ret);
02368                 if (ret != 0) {
02369                         if (ret == DB_PAGE_NOTFOUND)
02370                                 ret = 0;
02371                         goto fast_miss;
02372                 }
02373 
02374                 h = cp->page;
02375                 inp = P_INP(dbp, h);
02376 
02377                 /*
02378                  * It's okay if the page type isn't right or it's empty, it
02379                  * just means that the world changed.
02380                  */
02381                 if (TYPE(h) != P_LBTREE || NUM_ENT(h) == 0)
02382                         goto fast_miss;
02383 
02384                 /* Verify that this page cannot have moved to another db. */
02385                 if (F_ISSET(dbp, DB_AM_SUBDB) &&
02386                     log_compare(&t->bt_llsn, &LSN(h)) != 0)
02387                         goto fast_miss;
02388                 /*
02389                  * What we do here is test to see if we're at the beginning or
02390                  * end of the tree and if the new item sorts before/after the
02391                  * first/last page entry.  We don't try and catch inserts into
02392                  * the middle of the tree (although we could, as long as there
02393                  * were two keys on the page and we saved both the index and
02394                  * the page number of the last insert).
02395                  */
02396                 if (h->next_pgno == PGNO_INVALID) {
02397                         indx = NUM_ENT(h) - P_INDX;
02398                         if ((ret = __bam_cmp(dbp,
02399                             key, h, indx, t->bt_compare, &cmp)) != 0)
02400                                 return (ret);
02401 
02402                         if (cmp < 0)
02403                                 goto try_begin;
02404                         if (cmp > 0) {
02405                                 indx += P_INDX;
02406                                 goto fast_hit;
02407                         }
02408 
02409                         /*
02410                          * Found a duplicate.  If doing DB_KEYLAST, we're at
02411                          * the correct position, otherwise, move to the first
02412                          * of the duplicates.  If we're looking at off-page
02413                          * duplicates, duplicate duplicates aren't permitted,
02414                          * so we're done.
02415                          */
02416                         if (flags == DB_KEYLAST)
02417                                 goto fast_hit;
02418                         for (;
02419                             indx > 0 && inp[indx - P_INDX] == inp[indx];
02420                             indx -= P_INDX)
02421                                 ;
02422                         goto fast_hit;
02423                 }
02424 try_begin:      if (h->prev_pgno == PGNO_INVALID) {
02425                         indx = 0;
02426                         if ((ret = __bam_cmp(dbp,
02427                             key, h, indx, t->bt_compare, &cmp)) != 0)
02428                                 return (ret);
02429 
02430                         if (cmp > 0)
02431                                 goto fast_miss;
02432                         if (cmp < 0)
02433                                 goto fast_hit;
02434 
02435                         /*
02436                          * Found a duplicate.  If doing DB_KEYFIRST, we're at
02437                          * the correct position, otherwise, move to the last
02438                          * of the duplicates.  If we're looking at off-page
02439                          * duplicates, duplicate duplicates aren't permitted,
02440                          * so we're done.
02441                          */
02442                         if (flags == DB_KEYFIRST)
02443                                 goto fast_hit;
02444                         for (;
02445                             indx < (db_indx_t)(NUM_ENT(h) - P_INDX) &&
02446                             inp[indx] == inp[indx + P_INDX];
02447                             indx += P_INDX)
02448                                 ;
02449                         goto fast_hit;
02450                 }
02451                 goto fast_miss;
02452 
02453 fast_hit:       /* Set the exact match flag, we may have found a duplicate. */
02454                 *exactp = cmp == 0;
02455 
02456                 /*
02457                  * Insert the entry in the stack.  (Our caller is likely to
02458                  * call __bam_stkrel() after our return.)
02459                  */
02460                 BT_STK_CLR(cp);
02461                 BT_STK_ENTER(dbp->dbenv,
02462                     cp, h, indx, cp->lock, cp->lock_mode, ret);
02463                 if (ret != 0)
02464                         return (ret);
02465                 break;
02466 
02467 fast_miss:      /*
02468                  * This was not the right page, so we do not need to retain
02469                  * the lock even in the presence of transactions.
02470                  *
02471                  * This is also an error path, so ret may have been set.
02472                  */
02473                 DISCARD_CUR(dbc, ret);
02474                 cp->pgno = PGNO_INVALID;
02475                 if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0)
02476                         ret = t_ret;
02477                 if (ret != 0)
02478                         return (ret);
02479 
02480 search:         if ((ret = __bam_search(dbc, root_pgno,
02481                     key, sflags, 1, NULL, exactp)) != 0)
02482                         return (ret);
02483                 break;
02484         default:
02485                 return (__db_unknown_flag(dbp->dbenv, "__bam_c_search", flags));
02486         }
02487         /* Initialize the cursor from the stack. */
02488         cp->page = cp->csp->page;
02489         cp->pgno = cp->csp->page->pgno;
02490         cp->indx = cp->csp->indx;
02491         cp->lock = cp->csp->lock;
02492         cp->lock_mode = cp->csp->lock_mode;
02493 
02494         /* If on an empty page or a deleted record, move to the next one. */
02495         if (flags == DB_FIRST &&
02496             (NUM_ENT(cp->page) == 0 || IS_CUR_DELETED(dbc)))
02497                 if ((ret = __bam_c_next(dbc, 0, 0)) != 0)
02498                         return (ret);
02499         if (flags == DB_LAST &&
02500             (NUM_ENT(cp->page) == 0 || IS_CUR_DELETED(dbc)))
02501                 if ((ret = __bam_c_prev(dbc)) != 0)
02502                         return (ret);
02503 
02504         return (0);
02505 }
02506 
02507 /*
02508  * __bam_c_physdel --
02509  *      Physically remove an item from the page.
02510  */
02511 static int
02512 __bam_c_physdel(dbc)
02513         DBC *dbc;
02514 {
02515         BTREE_CURSOR *cp;
02516         DB *dbp;
02517         DBT key;
02518         int delete_page, empty_page, exact, ret;
02519 
02520         dbp = dbc->dbp;
02521         memset(&key, 0, sizeof(DBT));
02522         cp = (BTREE_CURSOR *)dbc->internal;
02523         delete_page = empty_page = ret = 0;
02524 
02525         /* If the page is going to be emptied, consider deleting it. */
02526         delete_page = empty_page =
02527             NUM_ENT(cp->page) == (TYPE(cp->page) == P_LBTREE ? 2 : 1);
02528 
02529         /*
02530          * Check if the application turned off reverse splits.  Applications
02531          * can't turn off reverse splits in off-page duplicate trees, that
02532          * space will never be reused unless the exact same key is specified.
02533          */
02534         if (delete_page &&
02535             !F_ISSET(dbc, DBC_OPD) && F_ISSET(dbp, DB_AM_REVSPLITOFF))
02536                 delete_page = 0;
02537 
02538         /*
02539          * We never delete the last leaf page.  (Not really true -- we delete
02540          * the last leaf page of off-page duplicate trees, but that's handled
02541          * by our caller, not down here.)
02542          */
02543         if (delete_page && cp->pgno == cp->root)
02544                 delete_page = 0;
02545 
02546         /*
02547          * To delete a leaf page other than an empty root page, we need a
02548          * copy of a key from the page.  Use the 0th page index since it's
02549          * the last key the page held.
02550          *
02551          * !!!
02552          * Note that because __bam_c_physdel is always called from a cursor
02553          * close, it should be safe to use the cursor's own "my_rkey" memory
02554          * to temporarily hold this key.  We shouldn't own any returned-data
02555          * memory of interest--if we do, we're in trouble anyway.
02556          */
02557         if (delete_page)
02558                 if ((ret = __db_ret(dbp, cp->page,
02559                     0, &key, &dbc->my_rkey.data, &dbc->my_rkey.ulen)) != 0)
02560                         return (ret);
02561 
02562         /*
02563          * Delete the items.  If page isn't empty, we adjust the cursors.
02564          *
02565          * !!!
02566          * The following operations to delete a page may deadlock.  The easy
02567          * scenario is if we're deleting an item because we're closing cursors
02568          * because we've already deadlocked and want to call txn->abort.  If
02569          * we fail due to deadlock, we'll leave a locked, possibly empty page
02570          * in the tree, which won't be empty long because we'll undo the delete
02571          * when we undo the transaction's modifications.
02572          *
02573          * !!!
02574          * Delete the key item first, otherwise the on-page duplicate checks
02575          * in __bam_ditem() won't work!
02576          */
02577         if (TYPE(cp->page) == P_LBTREE) {
02578                 if ((ret = __bam_ditem(dbc, cp->page, cp->indx)) != 0)
02579                         return (ret);
02580                 if (!empty_page)
02581                         if ((ret = __bam_ca_di(dbc,
02582                             PGNO(cp->page), cp->indx, -1)) != 0)
02583                                 return (ret);
02584         }
02585         if ((ret = __bam_ditem(dbc, cp->page, cp->indx)) != 0)
02586                 return (ret);
02587 
02588         /* Clear the deleted flag, the item is gone. */
02589         F_CLR(cp, C_DELETED);
02590 
02591         if (!empty_page)
02592                 if ((ret = __bam_ca_di(dbc, PGNO(cp->page), cp->indx, -1)) != 0)
02593                         return (ret);
02594 
02595         /* If we're not going to try and delete the page, we're done. */
02596         if (!delete_page)
02597                 return (0);
02598 
02599         ret = __bam_search(dbc, PGNO_INVALID, &key, S_DEL, 0, NULL, &exact);
02600 
02601         /*
02602          * If everything worked, delete the stack, otherwise, release the
02603          * stack and page locks without further damage.
02604          */
02605         if (ret == 0)
02606                 DISCARD_CUR(dbc, ret);
02607         if (ret == 0)
02608                 ret = __bam_dpages(dbc, 1, 0);
02609         else
02610                 (void)__bam_stkrel(dbc, 0);
02611 
02612         return (ret);
02613 }
02614 
02615 /*
02616  * __bam_c_getstack --
02617  *      Acquire a full stack for a cursor.
02618  */
02619 static int
02620 __bam_c_getstack(dbc)
02621         DBC *dbc;
02622 {
02623         BTREE_CURSOR *cp;
02624         DB *dbp;
02625         DBT dbt;
02626         DB_MPOOLFILE *mpf;
02627         PAGE *h;
02628         int exact, ret, t_ret;
02629 
02630         dbp = dbc->dbp;
02631         mpf = dbp->mpf;
02632         cp = (BTREE_CURSOR *)dbc->internal;
02633 
02634         /*
02635          * Get the page with the current item on it.  The caller of this
02636          * routine has to already hold a read lock on the page, so there
02637          * is no additional lock to acquire.
02638          */
02639         if ((ret = __memp_fget(mpf, &cp->pgno, 0, &h)) != 0)
02640                 return (ret);
02641 
02642         /* Get a copy of a key from the page. */
02643         memset(&dbt, 0, sizeof(DBT));
02644         if ((ret = __db_ret(dbp,
02645             h, 0, &dbt, &dbc->my_rkey.data, &dbc->my_rkey.ulen)) != 0)
02646                 goto err;
02647 
02648         /* Get a write-locked stack for the page. */
02649         exact = 0;
02650         ret = __bam_search(dbc, PGNO_INVALID,
02651             &dbt, S_KEYFIRST, 1, NULL, &exact);
02652 
02653 err:    /* Discard the key and the page. */
02654         if ((t_ret = __memp_fput(mpf, h, 0)) != 0 && ret == 0)
02655                 ret = t_ret;
02656 
02657         return (ret);
02658 }
02659 
02660 /*
02661  * __bam_isopd --
02662  *      Return if the cursor references an off-page duplicate tree via its
02663  *      page number.
02664  */
02665 static int
02666 __bam_isopd(dbc, pgnop)
02667         DBC *dbc;
02668         db_pgno_t *pgnop;
02669 {
02670         BOVERFLOW *bo;
02671 
02672         if (TYPE(dbc->internal->page) != P_LBTREE)
02673                 return (0);
02674 
02675         bo = GET_BOVERFLOW(dbc->dbp,
02676             dbc->internal->page, dbc->internal->indx + O_INDX);
02677         if (B_TYPE(bo->type) == B_DUPLICATE) {
02678                 *pgnop = bo->pgno;
02679                 return (1);
02680         }
02681         return (0);
02682 }

Generated on Sun Dec 25 12:14:12 2005 for Berkeley DB 4.4.16 by  doxygen 1.4.2