Main Page | Class Hierarchy | Data Structures | Directories | File List | Data Fields | Related Pages

db_cam.c

00001 /*-
00002  * See the file LICENSE for redistribution information.
00003  *
00004  * Copyright (c) 2000-2005
00005  *      Sleepycat Software.  All rights reserved.
00006  *
00007  * $Id: db_cam.c,v 12.21 2005/10/07 20:21:22 ubell Exp $
00008  */
00009 
00010 #include "db_config.h"
00011 
00012 #ifndef NO_SYSTEM_INCLUDES
00013 #include <sys/types.h>
00014 
00015 #include <string.h>
00016 #endif
00017 
00018 #include "db_int.h"
00019 #include "dbinc/db_page.h"
00020 #include "dbinc/db_shash.h"
00021 #include "dbinc/btree.h"
00022 #include "dbinc/hash.h"
00023 #include "dbinc/lock.h"
00024 #include "dbinc/mp.h"
00025 #include "dbinc/qam.h"
00026 
00027 static int __db_buildpartial __P((DB *, DBT *, DBT *, DBT *));
00028 static int __db_c_cleanup __P((DBC *, DBC *, int));
00029 static int __db_c_del_secondary __P((DBC *));
00030 static int __db_c_pget_recno __P((DBC *, DBT *, DBT *, u_int32_t));
00031 static int __db_wrlock_err __P((DB_ENV *));
00032 
00033 #define CDB_LOCKING_INIT(dbp, dbc)                                      \
00034         /*                                                              \
00035          * If we are running CDB, this had better be either a write     \
00036          * cursor or an immediate writer.  If it's a regular writer,    \
00037          * that means we have an IWRITE lock and we need to upgrade     \
00038          * it to a write lock.                                          \
00039          */                                                             \
00040         if (CDB_LOCKING((dbp)->dbenv)) {                                \
00041                 if (!F_ISSET(dbc, DBC_WRITECURSOR | DBC_WRITER))        \
00042                         return (__db_wrlock_err(dbp->dbenv));           \
00043                                                                         \
00044                 if (F_ISSET(dbc, DBC_WRITECURSOR) &&                    \
00045                     (ret = __lock_get((dbp)->dbenv,                     \
00046                     (dbc)->locker, DB_LOCK_UPGRADE, &(dbc)->lock_dbt,   \
00047                     DB_LOCK_WRITE, &(dbc)->mylock)) != 0)               \
00048                         return (ret);                                   \
00049         }
00050 #define CDB_LOCKING_DONE(dbp, dbc)                                      \
00051         /* Release the upgraded lock. */                                \
00052         if (F_ISSET(dbc, DBC_WRITECURSOR))                              \
00053                 (void)__lock_downgrade(                                 \
00054                     (dbp)->dbenv, &(dbc)->mylock, DB_LOCK_IWRITE, 0);
00055 
00056 /*
00057  * __db_c_close --
00058  *      DBC->c_close.
00059  *
00060  * PUBLIC: int __db_c_close __P((DBC *));
00061  */
00062 int
00063 __db_c_close(dbc)
00064         DBC *dbc;
00065 {
00066         DB *dbp;
00067         DBC *opd;
00068         DBC_INTERNAL *cp;
00069         DB_ENV *dbenv;
00070         int ret, t_ret;
00071 
00072         dbp = dbc->dbp;
00073         dbenv = dbp->dbenv;
00074         cp = dbc->internal;
00075         opd = cp->opd;
00076         ret = 0;
00077 
00078         /*
00079          * Remove the cursor(s) from the active queue.  We may be closing two
00080          * cursors at once here, a top-level one and a lower-level, off-page
00081          * duplicate one.  The access-method specific cursor close routine must
00082          * close both of them in a single call.
00083          *
00084          * !!!
00085          * Cursors must be removed from the active queue before calling the
00086          * access specific cursor close routine, btree depends on having that
00087          * order of operations.
00088          */
00089         MUTEX_LOCK(dbenv, dbp->mutex);
00090 
00091         if (opd != NULL) {
00092                 DB_ASSERT(F_ISSET(opd, DBC_ACTIVE));
00093                 F_CLR(opd, DBC_ACTIVE);
00094                 TAILQ_REMOVE(&dbp->active_queue, opd, links);
00095         }
00096         DB_ASSERT(F_ISSET(dbc, DBC_ACTIVE));
00097         F_CLR(dbc, DBC_ACTIVE);
00098         TAILQ_REMOVE(&dbp->active_queue, dbc, links);
00099 
00100         MUTEX_UNLOCK(dbenv, dbp->mutex);
00101 
00102         /* Call the access specific cursor close routine. */
00103         if ((t_ret =
00104             dbc->c_am_close(dbc, PGNO_INVALID, NULL)) != 0 && ret == 0)
00105                 ret = t_ret;
00106 
00107         /*
00108          * Release the lock after calling the access method specific close
00109          * routine, a Btree cursor may have had pending deletes.
00110          */
00111         if (CDB_LOCKING(dbenv)) {
00112                 /*
00113                  * Also, be sure not to free anything if mylock.off is
00114                  * INVALID;  in some cases, such as idup'ed read cursors
00115                  * and secondary update cursors, a cursor in a CDB
00116                  * environment may not have a lock at all.
00117                  */
00118                 if ((t_ret = __LPUT(dbc, dbc->mylock)) != 0 && ret == 0)
00119                         ret = t_ret;
00120 
00121                 /* For safety's sake, since this is going on the free queue. */
00122                 memset(&dbc->mylock, 0, sizeof(dbc->mylock));
00123                 if (opd != NULL)
00124                         memset(&opd->mylock, 0, sizeof(opd->mylock));
00125         }
00126 
00127         if (dbc->txn != NULL)
00128                 dbc->txn->cursors--;
00129 
00130         /* Move the cursor(s) to the free queue. */
00131         MUTEX_LOCK(dbenv, dbp->mutex);
00132         if (opd != NULL) {
00133                 if (dbc->txn != NULL)
00134                         dbc->txn->cursors--;
00135                 TAILQ_INSERT_TAIL(&dbp->free_queue, opd, links);
00136                 opd = NULL;
00137         }
00138         TAILQ_INSERT_TAIL(&dbp->free_queue, dbc, links);
00139         MUTEX_UNLOCK(dbenv, dbp->mutex);
00140 
00141         return (ret);
00142 }
00143 
00144 /*
00145  * __db_c_destroy --
00146  *      Destroy the cursor, called after DBC->c_close.
00147  *
00148  * PUBLIC: int __db_c_destroy __P((DBC *));
00149  */
00150 int
00151 __db_c_destroy(dbc)
00152         DBC *dbc;
00153 {
00154         DB *dbp;
00155         DB_ENV *dbenv;
00156         int ret, t_ret;
00157 
00158         dbp = dbc->dbp;
00159         dbenv = dbp->dbenv;
00160 
00161         /* Remove the cursor from the free queue. */
00162         MUTEX_LOCK(dbenv, dbp->mutex);
00163         TAILQ_REMOVE(&dbp->free_queue, dbc, links);
00164         MUTEX_UNLOCK(dbenv, dbp->mutex);
00165 
00166         /* Free up allocated memory. */
00167         if (dbc->my_rskey.data != NULL)
00168                 __os_free(dbenv, dbc->my_rskey.data);
00169         if (dbc->my_rkey.data != NULL)
00170                 __os_free(dbenv, dbc->my_rkey.data);
00171         if (dbc->my_rdata.data != NULL)
00172                 __os_free(dbenv, dbc->my_rdata.data);
00173 
00174         /* Call the access specific cursor destroy routine. */
00175         ret = dbc->c_am_destroy == NULL ? 0 : dbc->c_am_destroy(dbc);
00176 
00177         /*
00178          * Release the lock id for this cursor.
00179          */
00180         if (LOCKING_ON(dbenv) &&
00181             F_ISSET(dbc, DBC_OWN_LID) &&
00182             (t_ret = __lock_id_free(dbenv,
00183             ((DB_LOCKER *)dbc->lref)->id)) != 0 && ret == 0)
00184                 ret = t_ret;
00185 
00186         __os_free(dbenv, dbc);
00187 
00188         return (ret);
00189 }
00190 
00191 /*
00192  * __db_c_count --
00193  *      Return a count of duplicate data items.
00194  *
00195  * PUBLIC: int __db_c_count __P((DBC *, db_recno_t *));
00196  */
00197 int
00198 __db_c_count(dbc, recnop)
00199         DBC *dbc;
00200         db_recno_t *recnop;
00201 {
00202         DB_ENV *dbenv;
00203         int ret;
00204 
00205         dbenv = dbc->dbp->dbenv;
00206 
00207         /*
00208          * Cursor Cleanup Note:
00209          * All of the cursors passed to the underlying access methods by this
00210          * routine are not duplicated and will not be cleaned up on return.
00211          * So, pages/locks that the cursor references must be resolved by the
00212          * underlying functions.
00213          */
00214         switch (dbc->dbtype) {
00215         case DB_QUEUE:
00216         case DB_RECNO:
00217                 *recnop = 1;
00218                 break;
00219         case DB_HASH:
00220                 if (dbc->internal->opd == NULL) {
00221                         if ((ret = __ham_c_count(dbc, recnop)) != 0)
00222                                 return (ret);
00223                         break;
00224                 }
00225                 /* FALLTHROUGH */
00226         case DB_BTREE:
00227                 if ((ret = __bam_c_count(dbc, recnop)) != 0)
00228                         return (ret);
00229                 break;
00230         case DB_UNKNOWN:
00231         default:
00232                 return (__db_unknown_type(dbenv, "__db_c_count", dbc->dbtype));
00233         }
00234         return (0);
00235 }
00236 
00237 /*
00238  * __db_c_del --
00239  *      DBC->c_del.
00240  *
00241  * PUBLIC: int __db_c_del __P((DBC *, u_int32_t));
00242  */
00243 int
00244 __db_c_del(dbc, flags)
00245         DBC *dbc;
00246         u_int32_t flags;
00247 {
00248         DB *dbp;
00249         DBC *opd;
00250         int ret, t_ret;
00251 
00252         dbp = dbc->dbp;
00253 
00254         /*
00255          * Cursor Cleanup Note:
00256          * All of the cursors passed to the underlying access methods by this
00257          * routine are not duplicated and will not be cleaned up on return.
00258          * So, pages/locks that the cursor references must be resolved by the
00259          * underlying functions.
00260          */
00261 
00262         CDB_LOCKING_INIT(dbp, dbc);
00263 
00264         /*
00265          * If we're a secondary index, and DB_UPDATE_SECONDARY isn't set
00266          * (which it only is if we're being called from a primary update),
00267          * then we need to call through to the primary and delete the item.
00268          *
00269          * Note that this will delete the current item;  we don't need to
00270          * delete it ourselves as well, so we can just goto done.
00271          */
00272         if (flags != DB_UPDATE_SECONDARY && F_ISSET(dbp, DB_AM_SECONDARY)) {
00273                 ret = __db_c_del_secondary(dbc);
00274                 goto done;
00275         }
00276 
00277         /*
00278          * If we are a primary and have secondary indices, go through
00279          * and delete any secondary keys that point at the current record.
00280          */
00281         if (LIST_FIRST(&dbp->s_secondaries) != NULL &&
00282             (ret = __db_c_del_primary(dbc)) != 0)
00283                 goto done;
00284 
00285         /*
00286          * Off-page duplicate trees are locked in the primary tree, that is,
00287          * we acquire a write lock in the primary tree and no locks in the
00288          * off-page dup tree.  If the del operation is done in an off-page
00289          * duplicate tree, call the primary cursor's upgrade routine first.
00290          */
00291         opd = dbc->internal->opd;
00292         if (opd == NULL)
00293                 ret = dbc->c_am_del(dbc);
00294         else
00295                 if ((ret = dbc->c_am_writelock(dbc)) == 0)
00296                         ret = opd->c_am_del(opd);
00297 
00298         /*
00299          * If this was an update that is supporting dirty reads
00300          * then we may have just swapped our read for a write lock
00301          * which is held by the surviving cursor.  We need
00302          * to explicitly downgrade this lock.  The closed cursor
00303          * may only have had a read lock.
00304          */
00305         if (F_ISSET(dbc->dbp, DB_AM_READ_UNCOMMITTED) &&
00306             dbc->internal->lock_mode == DB_LOCK_WRITE) {
00307                 if ((t_ret =
00308                     __TLPUT(dbc, dbc->internal->lock)) != 0 && ret == 0)
00309                         ret = t_ret;
00310                 if (t_ret == 0)
00311                         dbc->internal->lock_mode = DB_LOCK_WWRITE;
00312         }
00313 
00314 done:   CDB_LOCKING_DONE(dbp, dbc);
00315 
00316         return (ret);
00317 }
00318 
00319 /*
00320  * __db_c_dup --
00321  *      Duplicate a cursor
00322  *
00323  * PUBLIC: int __db_c_dup __P((DBC *, DBC **, u_int32_t));
00324  */
00325 int
00326 __db_c_dup(dbc_orig, dbcp, flags)
00327         DBC *dbc_orig;
00328         DBC **dbcp;
00329         u_int32_t flags;
00330 {
00331         DBC *dbc_n, *dbc_nopd;
00332         int ret;
00333 
00334         dbc_n = dbc_nopd = NULL;
00335 
00336         /* Allocate a new cursor and initialize it. */
00337         if ((ret = __db_c_idup(dbc_orig, &dbc_n, flags)) != 0)
00338                 goto err;
00339         *dbcp = dbc_n;
00340 
00341         /*
00342          * If the cursor references an off-page duplicate tree, allocate a
00343          * new cursor for that tree and initialize it.
00344          */
00345         if (dbc_orig->internal->opd != NULL) {
00346                 if ((ret =
00347                    __db_c_idup(dbc_orig->internal->opd, &dbc_nopd, flags)) != 0)
00348                         goto err;
00349                 dbc_n->internal->opd = dbc_nopd;
00350         }
00351         return (0);
00352 
00353 err:    if (dbc_n != NULL)
00354                 (void)__db_c_close(dbc_n);
00355         if (dbc_nopd != NULL)
00356                 (void)__db_c_close(dbc_nopd);
00357 
00358         return (ret);
00359 }
00360 
00361 /*
00362  * __db_c_idup --
00363  *      Internal version of __db_c_dup.
00364  *
00365  * PUBLIC: int __db_c_idup __P((DBC *, DBC **, u_int32_t));
00366  */
00367 int
00368 __db_c_idup(dbc_orig, dbcp, flags)
00369         DBC *dbc_orig, **dbcp;
00370         u_int32_t flags;
00371 {
00372         DB *dbp;
00373         DBC *dbc_n;
00374         DBC_INTERNAL *int_n, *int_orig;
00375         int ret;
00376 
00377         dbp = dbc_orig->dbp;
00378         dbc_n = *dbcp;
00379 
00380         if ((ret = __db_cursor_int(dbp, dbc_orig->txn, dbc_orig->dbtype,
00381             dbc_orig->internal->root, F_ISSET(dbc_orig, DBC_OPD),
00382             dbc_orig->locker, &dbc_n)) != 0)
00383                 return (ret);
00384 
00385         /* Position the cursor if requested, acquiring the necessary locks. */
00386         if (flags == DB_POSITION) {
00387                 int_n = dbc_n->internal;
00388                 int_orig = dbc_orig->internal;
00389 
00390                 dbc_n->flags |= dbc_orig->flags & ~DBC_OWN_LID;
00391 
00392                 int_n->indx = int_orig->indx;
00393                 int_n->pgno = int_orig->pgno;
00394                 int_n->root = int_orig->root;
00395                 int_n->lock_mode = int_orig->lock_mode;
00396 
00397                 switch (dbc_orig->dbtype) {
00398                 case DB_QUEUE:
00399                         if ((ret = __qam_c_dup(dbc_orig, dbc_n)) != 0)
00400                                 goto err;
00401                         break;
00402                 case DB_BTREE:
00403                 case DB_RECNO:
00404                         if ((ret = __bam_c_dup(dbc_orig, dbc_n)) != 0)
00405                                 goto err;
00406                         break;
00407                 case DB_HASH:
00408                         if ((ret = __ham_c_dup(dbc_orig, dbc_n)) != 0)
00409                                 goto err;
00410                         break;
00411                 case DB_UNKNOWN:
00412                 default:
00413                         ret = __db_unknown_type(dbp->dbenv,
00414                             "__db_c_idup", dbc_orig->dbtype);
00415                         goto err;
00416                 }
00417         }
00418 
00419         /* Copy the locking flags to the new cursor. */
00420         F_SET(dbc_n, F_ISSET(dbc_orig,
00421             DBC_READ_COMMITTED | DBC_READ_UNCOMMITTED | DBC_WRITECURSOR));
00422 
00423         /*
00424          * If we're in CDB and this isn't an offpage dup cursor, then
00425          * we need to get a lock for the duplicated cursor.
00426          */
00427         if (CDB_LOCKING(dbp->dbenv) && !F_ISSET(dbc_n, DBC_OPD) &&
00428             (ret = __lock_get(dbp->dbenv, dbc_n->locker, 0,
00429             &dbc_n->lock_dbt, F_ISSET(dbc_orig, DBC_WRITECURSOR) ?
00430             DB_LOCK_IWRITE : DB_LOCK_READ, &dbc_n->mylock)) != 0)
00431                 goto err;
00432 
00433         *dbcp = dbc_n;
00434         return (0);
00435 
00436 err:    (void)__db_c_close(dbc_n);
00437         return (ret);
00438 }
00439 
00440 /*
00441  * __db_c_newopd --
00442  *      Create a new off-page duplicate cursor.
00443  *
00444  * PUBLIC: int __db_c_newopd __P((DBC *, db_pgno_t, DBC *, DBC **));
00445  */
00446 int
00447 __db_c_newopd(dbc_parent, root, oldopd, dbcp)
00448         DBC *dbc_parent;
00449         db_pgno_t root;
00450         DBC *oldopd;
00451         DBC **dbcp;
00452 {
00453         DB *dbp;
00454         DBC *opd;
00455         DBTYPE dbtype;
00456         int ret;
00457 
00458         dbp = dbc_parent->dbp;
00459         dbtype = (dbp->dup_compare == NULL) ? DB_RECNO : DB_BTREE;
00460 
00461         /*
00462          * On failure, we want to default to returning the old off-page dup
00463          * cursor, if any;  our caller can't be left with a dangling pointer
00464          * to a freed cursor.  On error the only allowable behavior is to
00465          * close the cursor (and the old OPD cursor it in turn points to), so
00466          * this should be safe.
00467          */
00468         *dbcp = oldopd;
00469 
00470         if ((ret = __db_cursor_int(dbp,
00471             dbc_parent->txn, dbtype, root, 1, dbc_parent->locker, &opd)) != 0)
00472                 return (ret);
00473 
00474         *dbcp = opd;
00475 
00476         /*
00477          * Check to see if we already have an off-page dup cursor that we've
00478          * passed in.  If we do, close it.  It'd be nice to use it again
00479          * if it's a cursor belonging to the right tree, but if we're doing
00480          * a cursor-relative operation this might not be safe, so for now
00481          * we'll take the easy way out and always close and reopen.
00482          *
00483          * Note that under no circumstances do we want to close the old
00484          * cursor without returning a valid new one;  we don't want to
00485          * leave the main cursor in our caller with a non-NULL pointer
00486          * to a freed off-page dup cursor.
00487          */
00488         if (oldopd != NULL && (ret = __db_c_close(oldopd)) != 0)
00489                 return (ret);
00490 
00491         return (0);
00492 }
00493 
00494 /*
00495  * __db_c_get --
00496  *      Get using a cursor.
00497  *
00498  * PUBLIC: int __db_c_get __P((DBC *, DBT *, DBT *, u_int32_t));
00499  */
00500 int
00501 __db_c_get(dbc_arg, key, data, flags)
00502         DBC *dbc_arg;
00503         DBT *key, *data;
00504         u_int32_t flags;
00505 {
00506         DB *dbp;
00507         DBC *dbc, *dbc_n, *opd;
00508         DBC_INTERNAL *cp, *cp_n;
00509         DB_MPOOLFILE *mpf;
00510         db_pgno_t pgno;
00511         u_int32_t multi, orig_ulen, tmp_flags, tmp_read_uncommitted, tmp_rmw;
00512         u_int8_t type;
00513         int key_small, ret, t_ret;
00514 
00515         COMPQUIET(orig_ulen, 0);
00516 
00517         key_small = 0;
00518 
00519         /*
00520          * Cursor Cleanup Note:
00521          * All of the cursors passed to the underlying access methods by this
00522          * routine are duplicated cursors.  On return, any referenced pages
00523          * will be discarded, and, if the cursor is not intended to be used
00524          * again, the close function will be called.  So, pages/locks that
00525          * the cursor references do not need to be resolved by the underlying
00526          * functions.
00527          */
00528         dbp = dbc_arg->dbp;
00529         mpf = dbp->mpf;
00530         dbc_n = NULL;
00531         opd = NULL;
00532 
00533         /* Clear OR'd in additional bits so we can check for flag equality. */
00534         tmp_rmw = LF_ISSET(DB_RMW);
00535         LF_CLR(DB_RMW);
00536 
00537         tmp_read_uncommitted =
00538             LF_ISSET(DB_READ_UNCOMMITTED) &&
00539             !F_ISSET(dbc_arg, DBC_READ_UNCOMMITTED);
00540         LF_CLR(DB_READ_UNCOMMITTED);
00541 
00542         multi = LF_ISSET(DB_MULTIPLE|DB_MULTIPLE_KEY);
00543         LF_CLR(DB_MULTIPLE|DB_MULTIPLE_KEY);
00544 
00545         /*
00546          * Return a cursor's record number.  It has nothing to do with the
00547          * cursor get code except that it was put into the interface.
00548          */
00549         if (flags == DB_GET_RECNO) {
00550                 if (tmp_rmw)
00551                         F_SET(dbc_arg, DBC_RMW);
00552                 if (tmp_read_uncommitted)
00553                         F_SET(dbc_arg, DBC_READ_UNCOMMITTED);
00554                 ret = __bam_c_rget(dbc_arg, data);
00555                 if (tmp_rmw)
00556                         F_CLR(dbc_arg, DBC_RMW);
00557                 if (tmp_read_uncommitted)
00558                         F_CLR(dbc_arg, DBC_READ_UNCOMMITTED);
00559                 return (ret);
00560         }
00561 
00562         if (flags == DB_CONSUME || flags == DB_CONSUME_WAIT)
00563                 CDB_LOCKING_INIT(dbp, dbc_arg);
00564 
00565         /*
00566          * If we have an off-page duplicates cursor, and the operation applies
00567          * to it, perform the operation.  Duplicate the cursor and call the
00568          * underlying function.
00569          *
00570          * Off-page duplicate trees are locked in the primary tree, that is,
00571          * we acquire a write lock in the primary tree and no locks in the
00572          * off-page dup tree.  If the DB_RMW flag was specified and the get
00573          * operation is done in an off-page duplicate tree, call the primary
00574          * cursor's upgrade routine first.
00575          */
00576         cp = dbc_arg->internal;
00577         if (cp->opd != NULL &&
00578             (flags == DB_CURRENT || flags == DB_GET_BOTHC ||
00579             flags == DB_NEXT || flags == DB_NEXT_DUP || flags == DB_PREV)) {
00580                 if (tmp_rmw && (ret = dbc_arg->c_am_writelock(dbc_arg)) != 0)
00581                         return (ret);
00582                 if ((ret = __db_c_idup(cp->opd, &opd, DB_POSITION)) != 0)
00583                         return (ret);
00584 
00585                 switch (ret =
00586                     opd->c_am_get(opd, key, data, flags, NULL)) {
00587                 case 0:
00588                         goto done;
00589                 case DB_NOTFOUND:
00590                         /*
00591                          * Translate DB_NOTFOUND failures for the DB_NEXT and
00592                          * DB_PREV operations into a subsequent operation on
00593                          * the parent cursor.
00594                          */
00595                         if (flags == DB_NEXT || flags == DB_PREV) {
00596                                 if ((ret = __db_c_close(opd)) != 0)
00597                                         goto err;
00598                                 opd = NULL;
00599                                 break;
00600                         }
00601                         goto err;
00602                 default:
00603                         goto err;
00604                 }
00605         }
00606 
00607         /*
00608          * Perform an operation on the main cursor.  Duplicate the cursor,
00609          * upgrade the lock as required, and call the underlying function.
00610          */
00611         switch (flags) {
00612         case DB_CURRENT:
00613         case DB_GET_BOTHC:
00614         case DB_NEXT:
00615         case DB_NEXT_DUP:
00616         case DB_NEXT_NODUP:
00617         case DB_PREV:
00618         case DB_PREV_NODUP:
00619                 tmp_flags = DB_POSITION;
00620                 break;
00621         default:
00622                 tmp_flags = 0;
00623                 break;
00624         }
00625 
00626         if (tmp_read_uncommitted)
00627                 F_SET(dbc_arg, DBC_READ_UNCOMMITTED);
00628 
00629         /*
00630          * If this cursor is going to be closed immediately, we don't
00631          * need to take precautions to clean it up on error.
00632          */
00633         if (F_ISSET(dbc_arg, DBC_TRANSIENT))
00634                 dbc_n = dbc_arg;
00635         else {
00636                 ret = __db_c_idup(dbc_arg, &dbc_n, tmp_flags);
00637                 if (tmp_read_uncommitted)
00638                         F_CLR(dbc_arg, DBC_READ_UNCOMMITTED);
00639 
00640                 if (ret != 0)
00641                         goto err;
00642                 COPY_RET_MEM(dbc_arg, dbc_n);
00643         }
00644 
00645         if (tmp_rmw)
00646                 F_SET(dbc_n, DBC_RMW);
00647 
00648         switch (multi) {
00649         case DB_MULTIPLE:
00650                 F_SET(dbc_n, DBC_MULTIPLE);
00651                 break;
00652         case DB_MULTIPLE_KEY:
00653                 F_SET(dbc_n, DBC_MULTIPLE_KEY);
00654                 break;
00655         case DB_MULTIPLE | DB_MULTIPLE_KEY:
00656                 F_SET(dbc_n, DBC_MULTIPLE|DBC_MULTIPLE_KEY);
00657                 break;
00658         case 0:
00659         default:
00660                 break;
00661         }
00662 
00663         pgno = PGNO_INVALID;
00664         ret = dbc_n->c_am_get(dbc_n, key, data, flags, &pgno);
00665         if (tmp_rmw)
00666                 F_CLR(dbc_n, DBC_RMW);
00667         if (tmp_read_uncommitted)
00668                 F_CLR(dbc_arg, DBC_READ_UNCOMMITTED);
00669         F_CLR(dbc_n, DBC_MULTIPLE|DBC_MULTIPLE_KEY);
00670         if (ret != 0)
00671                 goto err;
00672 
00673         cp_n = dbc_n->internal;
00674 
00675         /*
00676          * We may be referencing a new off-page duplicates tree.  Acquire
00677          * a new cursor and call the underlying function.
00678          */
00679         if (pgno != PGNO_INVALID) {
00680                 if ((ret = __db_c_newopd(dbc_arg,
00681                     pgno, cp_n->opd, &cp_n->opd)) != 0)
00682                         goto err;
00683 
00684                 switch (flags) {
00685                 case DB_FIRST:
00686                 case DB_NEXT:
00687                 case DB_NEXT_NODUP:
00688                 case DB_SET:
00689                 case DB_SET_RECNO:
00690                 case DB_SET_RANGE:
00691                         tmp_flags = DB_FIRST;
00692                         break;
00693                 case DB_LAST:
00694                 case DB_PREV:
00695                 case DB_PREV_NODUP:
00696                         tmp_flags = DB_LAST;
00697                         break;
00698                 case DB_GET_BOTH:
00699                 case DB_GET_BOTHC:
00700                 case DB_GET_BOTH_RANGE:
00701                         tmp_flags = flags;
00702                         break;
00703                 default:
00704                         ret =
00705                             __db_unknown_flag(dbp->dbenv, "__db_c_get", flags);
00706                         goto err;
00707                 }
00708                 if ((ret = cp_n->opd->c_am_get(
00709                     cp_n->opd, key, data, tmp_flags, NULL)) != 0)
00710                         goto err;
00711         }
00712 
00713 done:   /*
00714          * Return a key/data item.  The only exception is that we don't return
00715          * a key if the user already gave us one, that is, if the DB_SET flag
00716          * was set.  The DB_SET flag is necessary.  In a Btree, the user's key
00717          * doesn't have to be the same as the key stored the tree, depending on
00718          * the magic performed by the comparison function.  As we may not have
00719          * done any key-oriented operation here, the page reference may not be
00720          * valid.  Fill it in as necessary.  We don't have to worry about any
00721          * locks, the cursor must already be holding appropriate locks.
00722          *
00723          * XXX
00724          * If not a Btree and DB_SET_RANGE is set, we shouldn't return a key
00725          * either, should we?
00726          */
00727         cp_n = dbc_n == NULL ? dbc_arg->internal : dbc_n->internal;
00728         if (!F_ISSET(key, DB_DBT_ISSET)) {
00729                 if (cp_n->page == NULL && (ret =
00730                     __memp_fget(mpf, &cp_n->pgno, 0, &cp_n->page)) != 0)
00731                         goto err;
00732 
00733                 if ((ret = __db_ret(dbp, cp_n->page, cp_n->indx,
00734                     key, &dbc_arg->rkey->data, &dbc_arg->rkey->ulen)) != 0) {
00735                         /*
00736                          * If the key DBT is too small, we still want to return
00737                          * the size of the data.  Otherwise applications are
00738                          * forced to check each one with a separate call.  We
00739                          * don't want to copy the data, so we set the ulen to
00740                          * zero before calling __db_ret.
00741                          */
00742                         if (ret == DB_BUFFER_SMALL &&
00743                             F_ISSET(data, DB_DBT_USERMEM)) {
00744                                 key_small = 1;
00745                                 orig_ulen = data->ulen;
00746                                 data->ulen = 0;
00747                         } else
00748                                 goto err;
00749                 }
00750         }
00751         if (multi != 0) {
00752                 /*
00753                  * Even if fetching from the OPD cursor we need a duplicate
00754                  * primary cursor if we are going after multiple keys.
00755                  */
00756                 if (dbc_n == NULL) {
00757                         /*
00758                          * Non-"_KEY" DB_MULTIPLE doesn't move the main cursor,
00759                          * so it's safe to just use dbc_arg, unless dbc_arg
00760                          * has an open OPD cursor whose state might need to
00761                          * be preserved.
00762                          */
00763                         if ((!(multi & DB_MULTIPLE_KEY) &&
00764                             dbc_arg->internal->opd == NULL) ||
00765                             F_ISSET(dbc_arg, DBC_TRANSIENT))
00766                                 dbc_n = dbc_arg;
00767                         else {
00768                                 if ((ret = __db_c_idup(dbc_arg,
00769                                     &dbc_n, DB_POSITION)) != 0)
00770                                         goto err;
00771                                 if ((ret = dbc_n->c_am_get(dbc_n,
00772                                     key, data, DB_CURRENT, &pgno)) != 0)
00773                                         goto err;
00774                         }
00775                         cp_n = dbc_n->internal;
00776                 }
00777 
00778                 /*
00779                  * If opd is set then we dupped the opd that we came in with.
00780                  * When we return we may have a new opd if we went to another
00781                  * key.
00782                  */
00783                 if (opd != NULL) {
00784                         DB_ASSERT(cp_n->opd == NULL);
00785                         cp_n->opd = opd;
00786                         opd = NULL;
00787                 }
00788 
00789                 /*
00790                  * Bulk get doesn't use __db_retcopy, so data.size won't
00791                  * get set up unless there is an error.  Assume success
00792                  * here.  This is the only call to c_am_bulk, and it avoids
00793                  * setting it exactly the same everywhere.  If we have an
00794                  * DB_BUFFER_SMALL error, it'll get overwritten with the
00795                  * needed value.
00796                  */
00797                 data->size = data->ulen;
00798                 ret = dbc_n->c_am_bulk(dbc_n, data, flags | multi);
00799         } else if (!F_ISSET(data, DB_DBT_ISSET)) {
00800                 dbc = opd != NULL ? opd : cp_n->opd != NULL ? cp_n->opd : dbc_n;
00801                 type = TYPE(dbc->internal->page);
00802                 ret = __db_ret(dbp, dbc->internal->page, dbc->internal->indx +
00803                     (type == P_LBTREE || type == P_HASH ? O_INDX : 0),
00804                     data, &dbc_arg->rdata->data, &dbc_arg->rdata->ulen);
00805         }
00806 
00807 err:    /* Don't pass DB_DBT_ISSET back to application level, error or no. */
00808         F_CLR(key, DB_DBT_ISSET);
00809         F_CLR(data, DB_DBT_ISSET);
00810 
00811         /* Cleanup and cursor resolution. */
00812         if (opd != NULL) {
00813                 /*
00814                  * To support dirty reads we must reget the write lock
00815                  * if we have just stepped off a deleted record.
00816                  * Since the OPD cursor does not know anything
00817                  * about the referencing page or cursor we need
00818                  * to peek at the OPD cursor and get the lock here.
00819                  */
00820                 if (F_ISSET(dbc_arg->dbp, DB_AM_READ_UNCOMMITTED) &&
00821                      F_ISSET((BTREE_CURSOR *)
00822                      dbc_arg->internal->opd->internal, C_DELETED))
00823                         if ((t_ret =
00824                             dbc_arg->c_am_writelock(dbc_arg)) != 0 && ret == 0)
00825                                 ret = t_ret;
00826                 if ((t_ret = __db_c_cleanup(
00827                     dbc_arg->internal->opd, opd, ret)) != 0 && ret == 0)
00828                         ret = t_ret;
00829 
00830         }
00831 
00832         if ((t_ret = __db_c_cleanup(dbc_arg, dbc_n, ret)) != 0 && ret == 0)
00833                 ret = t_ret;
00834 
00835         if (key_small) {
00836                 data->ulen = orig_ulen;
00837                 if (ret == 0)
00838                         ret = DB_BUFFER_SMALL;
00839         }
00840 
00841         if (flags == DB_CONSUME || flags == DB_CONSUME_WAIT)
00842                 CDB_LOCKING_DONE(dbp, dbc_arg);
00843         return (ret);
00844 }
00845 
00846 /*
00847  * __db_c_put --
00848  *      Put using a cursor.
00849  *
00850  * PUBLIC: int __db_c_put __P((DBC *, DBT *, DBT *, u_int32_t));
00851  */
00852 int
00853 __db_c_put(dbc_arg, key, data, flags)
00854         DBC *dbc_arg;
00855         DBT *key, *data;
00856         u_int32_t flags;
00857 {
00858         DB_ENV *dbenv;
00859         DB *dbp, *sdbp;
00860         DBC *dbc_n, *oldopd, *opd, *sdbc, *pdbc;
00861         DBT olddata, oldpkey, oldskey, newdata, pkey, skey, temppkey, tempskey;
00862         db_pgno_t pgno;
00863         int cmp, have_oldrec, ispartial, nodel, re_pad, ret, rmw, t_ret;
00864         u_int32_t re_len, size, tmp_flags;
00865 
00866         /*
00867          * Cursor Cleanup Note:
00868          * All of the cursors passed to the underlying access methods by this
00869          * routine are duplicated cursors.  On return, any referenced pages
00870          * will be discarded, and, if the cursor is not intended to be used
00871          * again, the close function will be called.  So, pages/locks that
00872          * the cursor references do not need to be resolved by the underlying
00873          * functions.
00874          */
00875         dbp = dbc_arg->dbp;
00876         dbenv = dbp->dbenv;
00877         sdbp = NULL;
00878         pdbc = dbc_n = NULL;
00879         memset(&newdata, 0, sizeof(DBT));
00880         ret = 0;
00881 
00882         /*
00883          * We do multiple cursor operations in some cases and subsequently
00884          * access the data DBT information.  Set DB_DBT_MALLOC so we don't risk
00885          * modification of the data between our uses of it.
00886          */
00887         memset(&olddata, 0, sizeof(DBT));
00888         F_SET(&olddata, DB_DBT_MALLOC);
00889 
00890         /*
00891          * Putting to secondary indices is forbidden;  when we need
00892          * to internally update one, we'll call this with a private
00893          * synonym for DB_KEYLAST, DB_UPDATE_SECONDARY, which does
00894          * the right thing but won't return an error from cputchk().
00895          */
00896         if (flags == DB_UPDATE_SECONDARY)
00897                 flags = DB_KEYLAST;
00898 
00899         CDB_LOCKING_INIT(dbp, dbc_arg);
00900 
00901         /*
00902          * Check to see if we are a primary and have secondary indices.
00903          * If we are not, we save ourselves a good bit of trouble and
00904          * just skip to the "normal" put.
00905          */
00906         if (LIST_FIRST(&dbp->s_secondaries) == NULL)
00907                 goto skip_s_update;
00908 
00909         /*
00910          * We have at least one secondary which we may need to update.
00911          *
00912          * There is a rather vile locking issue here.  Secondary gets
00913          * will always involve acquiring a read lock in the secondary,
00914          * then acquiring a read lock in the primary.  Ideally, we
00915          * would likewise perform puts by updating all the secondaries
00916          * first, then doing the actual put in the primary, to avoid
00917          * deadlock (since having multiple threads doing secondary
00918          * gets and puts simultaneously is probably a common case).
00919          *
00920          * However, if this put is a put-overwrite--and we have no way to
00921          * tell in advance whether it will be--we may need to delete
00922          * an outdated secondary key.  In order to find that old
00923          * secondary key, we need to get the record we're overwriting,
00924          * before we overwrite it.
00925          *
00926          * (XXX: It would be nice to avoid this extra get, and have the
00927          * underlying put routines somehow pass us the old record
00928          * since they need to traverse the tree anyway.  I'm saving
00929          * this optimization for later, as it's a lot of work, and it
00930          * would be hard to fit into this locking paradigm anyway.)
00931          *
00932          * The simple thing to do would be to go get the old record before
00933          * we do anything else.  Unfortunately, though, doing so would
00934          * violate our "secondary, then primary" lock acquisition
00935          * ordering--even in the common case where no old primary record
00936          * exists, we'll still acquire and keep a lock on the page where
00937          * we're about to do the primary insert.
00938          *
00939          * To get around this, we do the following gyrations, which
00940          * hopefully solve this problem in the common case:
00941          *
00942          * 1) If this is a c_put(DB_CURRENT), go ahead and get the
00943          *    old record.  We already hold the lock on this page in
00944          *    the primary, so no harm done, and we'll need the primary
00945          *    key (which we weren't passed in this case) to do any
00946          *    secondary puts anyway.
00947          *
00948          * 2) If we're doing a partial put, we need to perform the
00949          *    get on the primary key right away, since we don't have
00950          *    the whole datum that the secondary key is based on.
00951          *    We may also need to pad out the record if the primary
00952          *    has a fixed record length.
00953          *
00954          * 3) Loop through the secondary indices, putting into each a
00955          *    new secondary key that corresponds to the new record.
00956          *
00957          * 4) If we haven't done so in (1) or (2), get the old primary
00958          *    key/data pair.  If one does not exist--the common case--we're
00959          *    done with secondary indices, and can go straight on to the
00960          *    primary put.
00961          *
00962          * 5) If we do have an old primary key/data pair, however, we need
00963          *    to loop through all the secondaries a second time and delete
00964          *    the old secondary in each.
00965          */
00966         memset(&pkey, 0, sizeof(DBT));
00967         have_oldrec = nodel = 0;
00968 
00969         /*
00970          * Primary indices can't have duplicates, so only DB_CURRENT,
00971          * DB_KEYFIRST, and DB_KEYLAST make any sense.  Other flags
00972          * should have been caught by the checking routine, but
00973          * add a sprinkling of paranoia.
00974          */
00975         DB_ASSERT(flags == DB_CURRENT ||
00976             flags == DB_KEYFIRST || flags == DB_KEYLAST);
00977 
00978         /*
00979          * We'll want to use DB_RMW in a few places, but it's only legal
00980          * when locking is on.
00981          */
00982         rmw = STD_LOCKING(dbc_arg) ? DB_RMW : 0;
00983 
00984         if (flags == DB_CURRENT) {              /* Step 1. */
00985                 /*
00986                  * This is safe to do on the cursor we already have;
00987                  * error or no, it won't move.
00988                  *
00989                  * We use DB_RMW for all of these gets because we'll be
00990                  * writing soon enough in the "normal" put code.  In
00991                  * transactional databases we'll hold those write locks
00992                  * even if we close the cursor we're reading with.
00993                  *
00994                  * The DB_KEYEMPTY return needs special handling -- if the
00995                  * cursor is on a deleted key, we return DB_NOTFOUND.
00996                  */
00997                 ret = __db_c_get(dbc_arg, &pkey, &olddata, rmw | DB_CURRENT);
00998                 if (ret == DB_KEYEMPTY)
00999                         ret = DB_NOTFOUND;
01000                 if (ret != 0)
01001                         goto err;
01002 
01003                 have_oldrec = 1; /* We've looked for the old record. */
01004         } else {
01005                 /*
01006                  * Set pkey so we can use &pkey everywhere instead of key.
01007                  * If DB_CURRENT is set and there is a key at the current
01008                  * location, pkey will be overwritten before it's used.
01009                  */
01010                 pkey.data = key->data;
01011                 pkey.size = key->size;
01012         }
01013 
01014         /*
01015          * Check for partial puts (step 2).
01016          */
01017         if (F_ISSET(data, DB_DBT_PARTIAL)) {
01018                 if (!have_oldrec && !nodel) {
01019                         /*
01020                          * We're going to have to search the tree for the
01021                          * specified key.  Dup a cursor (so we have the same
01022                          * locking info) and do a c_get.
01023                          */
01024                         if ((ret = __db_c_idup(dbc_arg, &pdbc, 0)) != 0)
01025                                 goto err;
01026 
01027                         /* We should have gotten DB_CURRENT in step 1. */
01028                         DB_ASSERT(flags != DB_CURRENT);
01029 
01030                         ret = __db_c_get(pdbc, &pkey, &olddata, rmw | DB_SET);
01031                         if (ret == DB_KEYEMPTY || ret == DB_NOTFOUND) {
01032                                 nodel = 1;
01033                                 ret = 0;
01034                         }
01035                         if ((t_ret = __db_c_close(pdbc)) != 0)
01036                                 ret = t_ret;
01037                         if (ret != 0)
01038                                 goto err;
01039 
01040                         have_oldrec = 1;
01041                 }
01042 
01043                 /*
01044                  * Now build the new datum from olddata and the partial data we
01045                  * were given.  It's okay to do this if no record was returned
01046                  * above: a partial put on an empty record is allowed, if a
01047                  * little strange.  The data is zero-padded.
01048                  */
01049                 if ((ret =
01050                     __db_buildpartial(dbp, &olddata, data, &newdata)) != 0)
01051                         goto err;
01052                 ispartial = 1;
01053         } else
01054                 ispartial = 0;
01055 
01056         /*
01057          * Handle fixed-length records.  If the primary database has
01058          * fixed-length records, we need to pad out the datum before
01059          * we pass it into the callback function;  we always index the
01060          * "real" record.
01061          */
01062         if ((dbp->type == DB_RECNO && F_ISSET(dbp, DB_AM_FIXEDLEN)) ||
01063             (dbp->type == DB_QUEUE)) {
01064                 if (dbp->type == DB_QUEUE) {
01065                         re_len = ((QUEUE *)dbp->q_internal)->re_len;
01066                         re_pad = ((QUEUE *)dbp->q_internal)->re_pad;
01067                 } else {
01068                         re_len = ((BTREE *)dbp->bt_internal)->re_len;
01069                         re_pad = ((BTREE *)dbp->bt_internal)->re_pad;
01070                 }
01071 
01072                 size = ispartial ? newdata.size : data->size;
01073                 if (size > re_len) {
01074                         ret = __db_rec_toobig(dbenv, size, re_len);
01075                         goto err;
01076                 } else if (size < re_len) {
01077                         /*
01078                          * If we're not doing a partial put, copy
01079                          * data->data into newdata.data, then pad out
01080                          * newdata.data.
01081                          *
01082                          * If we're doing a partial put, the data
01083                          * we want are already in newdata.data;  we
01084                          * just need to pad.
01085                          *
01086                          * Either way, realloc is safe.
01087                          */
01088                         if ((ret =
01089                             __os_realloc(dbenv, re_len, &newdata.data)) != 0)
01090                                 goto err;
01091                         if (!ispartial)
01092                                 memcpy(newdata.data, data->data, size);
01093                         memset((u_int8_t *)newdata.data + size, re_pad,
01094                             re_len - size);
01095                         newdata.size = re_len;
01096                         ispartial = 1;
01097                 }
01098         }
01099 
01100         /*
01101          * Loop through the secondaries.  (Step 3.)
01102          *
01103          * Note that __db_s_first and __db_s_next will take care of
01104          * thread-locking and refcounting issues.
01105          */
01106         if ((ret = __db_s_first(dbp, &sdbp)) != 0)
01107                 goto err;
01108         for (; sdbp != NULL && ret == 0; ret = __db_s_next(&sdbp)) {
01109                 /*
01110                  * Don't process this secondary if the key is immutable and we
01111                  * know that the old record exists.  This optimization can't be
01112                  * used if we have not checked for the old record yet.
01113                  */
01114                 if (have_oldrec && !nodel &&
01115                     FLD_ISSET(sdbp->s_assoc_flags, DB_ASSOC_IMMUTABLE_KEY))
01116                         continue;
01117 
01118                 /*
01119                  * Call the callback for this secondary, to get the
01120                  * appropriate secondary key.
01121                  */
01122                 memset(&skey, 0, sizeof(DBT));
01123                 if ((ret = sdbp->s_callback(sdbp,
01124                     &pkey, ispartial ? &newdata : data, &skey)) != 0) {
01125                         if (ret == DB_DONOTINDEX)
01126                                 /*
01127                                  * The callback returned a null value--don't
01128                                  * put this key in the secondary.  Just
01129                                  * move on to the next one--we'll handle
01130                                  * any necessary deletes in step 5.
01131                                  */
01132                                 continue;
01133                         goto err;
01134                 }
01135 
01136                 /*
01137                  * Open a cursor in this secondary.
01138                  *
01139                  * Use the same locker ID as our primary cursor, so that
01140                  * we're guaranteed that the locks don't conflict (e.g. in CDB
01141                  * or if we're subdatabases that share and want to lock a
01142                  * metadata page).
01143                  */
01144                 if ((ret = __db_cursor_int(sdbp, dbc_arg->txn, sdbp->type,
01145                     PGNO_INVALID, 0, dbc_arg->locker, &sdbc)) != 0)
01146                         goto err;
01147 
01148                 /*
01149                  * If we're in CDB, updates will fail since the new cursor
01150                  * isn't a writer.  However, we hold the WRITE lock in the
01151                  * primary and will for as long as our new cursor lasts,
01152                  * and the primary and secondary share a lock file ID,
01153                  * so it's safe to consider this a WRITER.  The close
01154                  * routine won't try to put anything because we don't
01155                  * really have a lock.
01156                  */
01157                 if (CDB_LOCKING(dbenv)) {
01158                         DB_ASSERT(sdbc->mylock.off == LOCK_INVALID);
01159                         F_SET(sdbc, DBC_WRITER);
01160                 }
01161 
01162                 /*
01163                  * Swap the primary key to the byte order of this secondary, if
01164                  * necessary.  By doing this now, we can compare directly
01165                  * against the data already in the secondary without having to
01166                  * swap it after reading.
01167                  */
01168                 SWAP_IF_NEEDED(dbp, sdbp, &pkey);
01169 
01170                 /*
01171                  * There are three cases here--
01172                  * 1) The secondary supports sorted duplicates.
01173                  *      If we attempt to put a secondary/primary pair
01174                  *      that already exists, that's a duplicate duplicate,
01175                  *      and c_put will return DB_KEYEXIST (see __db_duperr).
01176                  *      This will leave us with exactly one copy of the
01177                  *      secondary/primary pair, and this is just right--we'll
01178                  *      avoid deleting it later, as the old and new secondaries
01179                  *      will match (since the old secondary is the dup dup
01180                  *      that's already there).
01181                  * 2) The secondary supports duplicates, but they're not
01182                  *      sorted.  We need to avoid putting a duplicate
01183                  *      duplicate, because the matching old and new secondaries
01184                  *      will prevent us from deleting anything and we'll
01185                  *      wind up with two secondary records that point to the
01186                  *      same primary key.  Do a c_get(DB_GET_BOTH);  only
01187                  *      do the put if the secondary doesn't exist.
01188                  * 3) The secondary doesn't support duplicates at all.
01189                  *      In this case, secondary keys must be unique;  if
01190                  *      another primary key already exists for this
01191                  *      secondary key, we have to either overwrite it or
01192                  *      not put this one, and in either case we've
01193                  *      corrupted the secondary index.  Do a c_get(DB_SET).
01194                  *      If the secondary/primary pair already exists, do
01195                  *      nothing;  if the secondary exists with a different
01196                  *      primary, return an error;  and if the secondary
01197                  *      does not exist, put it.
01198                  */
01199                 if (!F_ISSET(sdbp, DB_AM_DUP)) {
01200                         /* Case 3. */
01201                         memset(&oldpkey, 0, sizeof(DBT));
01202                         F_SET(&oldpkey, DB_DBT_MALLOC);
01203                         ret = __db_c_get(sdbc,
01204                             &skey, &oldpkey, rmw | DB_SET);
01205                         if (ret == 0) {
01206                                 cmp = __bam_defcmp(sdbp, &oldpkey, &pkey);
01207                                 __os_ufree(dbenv, oldpkey.data);
01208                                 if (cmp != 0) {
01209                                         __db_err(dbenv, "%s%s",
01210                             "Put results in a non-unique secondary key in an ",
01211                             "index not configured to support duplicates");
01212                                         ret = EINVAL;
01213                                 }
01214                         }
01215                         if (ret != DB_NOTFOUND && ret != DB_KEYEMPTY)
01216                                 goto skipput;
01217                 } else if (!F_ISSET(sdbp, DB_AM_DUPSORT)) {
01218                         /* Case 2. */
01219                         memset(&tempskey, 0, sizeof(DBT));
01220                         tempskey.data = skey.data;
01221                         tempskey.size = skey.size;
01222                         memset(&temppkey, 0, sizeof(DBT));
01223                         temppkey.data = pkey.data;
01224                         temppkey.size = pkey.size;
01225                         ret = __db_c_get(sdbc, &tempskey, &temppkey,
01226                             rmw | DB_GET_BOTH);
01227                         if (ret != DB_NOTFOUND && ret != DB_KEYEMPTY)
01228                                 goto skipput;
01229                 }
01230 
01231                 ret = __db_c_put(sdbc, &skey, &pkey, DB_UPDATE_SECONDARY);
01232 
01233                 /*
01234                  * We don't know yet whether this was a put-overwrite that
01235                  * in fact changed nothing.  If it was, we may get DB_KEYEXIST.
01236                  * This is not an error.
01237                  */
01238                 if (ret == DB_KEYEXIST)
01239                         ret = 0;
01240 
01241 skipput:        FREE_IF_NEEDED(sdbp, &skey)
01242 
01243                 /* Make sure the primary key is back in native byte-order. */
01244                 SWAP_IF_NEEDED(dbp, sdbp, &pkey);
01245 
01246                 if ((t_ret = __db_c_close(sdbc)) != 0 && ret == 0)
01247                         ret = t_ret;
01248 
01249                 if (ret != 0)
01250                         goto err;
01251         }
01252         if (ret != 0)
01253                 goto err;
01254 
01255         /* If still necessary, go get the old primary key/data.  (Step 4.) */
01256         if (!have_oldrec) {
01257                 /* See the comments in step 2.  This is real familiar. */
01258                 if ((ret = __db_c_idup(dbc_arg, &pdbc, 0)) != 0)
01259                         goto err;
01260                 DB_ASSERT(flags != DB_CURRENT);
01261                 pkey.data = key->data;
01262                 pkey.size = key->size;
01263                 ret = __db_c_get(pdbc, &pkey, &olddata, rmw | DB_SET);
01264                 if (ret == DB_KEYEMPTY || ret == DB_NOTFOUND) {
01265                         nodel = 1;
01266                         ret = 0;
01267                 }
01268                 if ((t_ret = __db_c_close(pdbc)) != 0 && ret == 0)
01269                         ret = t_ret;
01270                 if (ret != 0)
01271                         goto err;
01272                 have_oldrec = 1;
01273         }
01274 
01275         /*
01276          * If we don't follow this goto, we do in fact have an old record
01277          * we may need to go delete.  (Step 5).
01278          */
01279         if (nodel)
01280                 goto skip_s_update;
01281 
01282         if ((ret = __db_s_first(dbp, &sdbp)) != 0)
01283                 goto err;
01284         for (; sdbp != NULL && ret == 0; ret = __db_s_next(&sdbp)) {
01285                 /*
01286                  * Don't process this secondary if the key is immutable.  We
01287                  * know that the old record exists, so this optimization can
01288                  * always be used.
01289                  */
01290                 if (FLD_ISSET(sdbp->s_assoc_flags, DB_ASSOC_IMMUTABLE_KEY))
01291                         continue;
01292 
01293                 /*
01294                  * Call the callback for this secondary to get the
01295                  * old secondary key.
01296                  */
01297                 memset(&oldskey, 0, sizeof(DBT));
01298                 if ((ret = sdbp->s_callback(sdbp,
01299                     &pkey, &olddata, &oldskey)) != 0) {
01300                         if (ret == DB_DONOTINDEX)
01301                                 /*
01302                                  * The callback returned a null value--there's
01303                                  * nothing to delete.  Go on to the next
01304                                  * secondary.
01305                                  */
01306                                 continue;
01307                         goto err;
01308                 }
01309                 memset(&skey, 0, sizeof(DBT));
01310                 if ((ret = sdbp->s_callback(sdbp,
01311                     &pkey, ispartial ? &newdata : data, &skey)) != 0 &&
01312                     ret != DB_DONOTINDEX)
01313                         goto err;
01314 
01315                 /*
01316                  * If there is no new secondary key, or if the old secondary
01317                  * key is different from the new secondary key, then
01318                  * we need to delete the old one.
01319                  *
01320                  * Note that bt_compare is (and must be) set no matter
01321                  * what access method we're in.
01322                  */
01323                 sdbc = NULL;
01324                 if (ret == DB_DONOTINDEX ||
01325                     ((BTREE *)sdbp->bt_internal)->bt_compare(sdbp,
01326                     &oldskey, &skey) != 0) {
01327                         if ((ret = __db_cursor_int(
01328                             sdbp, dbc_arg->txn, sdbp->type,
01329                             PGNO_INVALID, 0, dbc_arg->locker, &sdbc)) != 0)
01330                                 goto err;
01331                         if (CDB_LOCKING(dbenv)) {
01332                                 DB_ASSERT(sdbc->mylock.off == LOCK_INVALID);
01333                                 F_SET(sdbc, DBC_WRITER);
01334                         }
01335 
01336                         /*
01337                          * Don't let c_get(DB_GET_BOTH) stomp on
01338                          * our data.  Use a temp DBT instead.
01339                          */
01340                         memset(&tempskey, 0, sizeof(DBT));
01341                         tempskey.data = oldskey.data;
01342                         tempskey.size = oldskey.size;
01343                         SWAP_IF_NEEDED(dbp, sdbp, &pkey);
01344                         memset(&temppkey, 0, sizeof(DBT));
01345                         temppkey.data = pkey.data;
01346                         temppkey.size = pkey.size;
01347                         if ((ret = __db_c_get(sdbc,
01348                             &tempskey, &temppkey, rmw | DB_GET_BOTH)) == 0)
01349                                 ret = __db_c_del(sdbc, DB_UPDATE_SECONDARY);
01350                         else if (ret == DB_NOTFOUND)
01351                                 ret = __db_secondary_corrupt(dbp);
01352                         SWAP_IF_NEEDED(dbp, sdbp, &pkey);
01353                 }
01354 
01355                 FREE_IF_NEEDED(sdbp, &skey);
01356                 FREE_IF_NEEDED(sdbp, &oldskey);
01357                 if (sdbc != NULL && (t_ret = __db_c_close(sdbc)) != 0 &&
01358                     ret == 0)
01359                         ret = t_ret;
01360                 if (ret != 0)
01361                         goto err;
01362         }
01363 
01364         /* Secondary index updates are now done.  On to the "real" stuff. */
01365 
01366 skip_s_update:
01367         /*
01368          * If we have an off-page duplicates cursor, and the operation applies
01369          * to it, perform the operation.  Duplicate the cursor and call the
01370          * underlying function.
01371          *
01372          * Off-page duplicate trees are locked in the primary tree, that is,
01373          * we acquire a write lock in the primary tree and no locks in the
01374          * off-page dup tree.  If the put operation is done in an off-page
01375          * duplicate tree, call the primary cursor's upgrade routine first.
01376          */
01377         if (dbc_arg->internal->opd != NULL &&
01378             (flags == DB_AFTER || flags == DB_BEFORE || flags == DB_CURRENT)) {
01379                 /*
01380                  * A special case for hash off-page duplicates.  Hash doesn't
01381                  * support (and is documented not to support) put operations
01382                  * relative to a cursor which references an already deleted
01383                  * item.  For consistency, apply the same criteria to off-page
01384                  * duplicates as well.
01385                  */
01386                 if (dbc_arg->dbtype == DB_HASH && F_ISSET(
01387                     ((BTREE_CURSOR *)(dbc_arg->internal->opd->internal)),
01388                     C_DELETED)) {
01389                         ret = DB_NOTFOUND;
01390                         goto err;
01391                 }
01392 
01393                 if ((ret = dbc_arg->c_am_writelock(dbc_arg)) != 0 ||
01394                     (ret = __db_c_dup(dbc_arg, &dbc_n, DB_POSITION)) != 0)
01395                         goto err;
01396                 opd = dbc_n->internal->opd;
01397                 if ((ret = opd->c_am_put(
01398                     opd, key, data, flags, NULL)) != 0)
01399                         goto err;
01400                 goto done;
01401         }
01402 
01403         /*
01404          * Perform an operation on the main cursor.  Duplicate the cursor,
01405          * and call the underlying function.
01406          *
01407          * XXX: MARGO
01408          *
01409         tmp_flags = flags == DB_AFTER ||
01410             flags == DB_BEFORE || flags == DB_CURRENT ? DB_POSITION : 0;
01411          */
01412         tmp_flags = DB_POSITION;
01413 
01414         /*
01415          * If this cursor is going to be closed immediately, we don't
01416          * need to take precautions to clean it up on error.
01417          */
01418         if (F_ISSET(dbc_arg, DBC_TRANSIENT))
01419                 dbc_n = dbc_arg;
01420         else if ((ret = __db_c_idup(dbc_arg, &dbc_n, tmp_flags)) != 0)
01421                 goto err;
01422 
01423         pgno = PGNO_INVALID;
01424         if ((ret = dbc_n->c_am_put(dbc_n, key, data, flags, &pgno)) != 0)
01425                 goto err;
01426 
01427         /*
01428          * We may be referencing a new off-page duplicates tree.  Acquire
01429          * a new cursor and call the underlying function.
01430          */
01431         if (pgno != PGNO_INVALID) {
01432                 oldopd = dbc_n->internal->opd;
01433                 if ((ret = __db_c_newopd(dbc_arg, pgno, oldopd, &opd)) != 0) {
01434                         dbc_n->internal->opd = opd;
01435                         goto err;
01436                 }
01437 
01438                 dbc_n->internal->opd = opd;
01439 
01440                 if ((ret = opd->c_am_put(
01441                     opd, key, data, flags, NULL)) != 0)
01442                         goto err;
01443         }
01444 
01445 done:
01446 err:    /* Cleanup and cursor resolution. */
01447         if ((t_ret = __db_c_cleanup(dbc_arg, dbc_n, ret)) != 0 && ret == 0)
01448                 ret = t_ret;
01449 
01450         /* If newdata or olddata were used, free their buffers. */
01451         if (newdata.data != NULL)
01452                 __os_free(dbenv, newdata.data);
01453         if (olddata.data != NULL)
01454                 __os_ufree(dbenv, olddata.data);
01455 
01456         CDB_LOCKING_DONE(dbp, dbc_arg);
01457 
01458         if (sdbp != NULL && (t_ret = __db_s_done(sdbp)) != 0 && ret == 0)
01459                 ret = t_ret;
01460 
01461         return (ret);
01462 }
01463 
01464 /*
01465  * __db_duperr()
01466  *      Error message: we don't currently support sorted duplicate duplicates.
01467  * PUBLIC: int __db_duperr __P((DB *, u_int32_t));
01468  */
01469 int
01470 __db_duperr(dbp, flags)
01471         DB *dbp;
01472         u_int32_t flags;
01473 {
01474 
01475         /*
01476          * If we run into this error while updating a secondary index,
01477          * don't yell--there's no clean way to pass DB_NODUPDATA in along
01478          * with DB_UPDATE_SECONDARY, but we may run into this problem
01479          * in a normal, non-error course of events.
01480          *
01481          * !!!
01482          * If and when we ever permit duplicate duplicates in sorted-dup
01483          * databases, we need to either change the secondary index code
01484          * to check for dup dups, or we need to maintain the implicit
01485          * "DB_NODUPDATA" behavior for databases with DB_AM_SECONDARY set.
01486          */
01487         if (flags != DB_NODUPDATA && !F_ISSET(dbp, DB_AM_SECONDARY))
01488                 __db_err(dbp->dbenv,
01489                     "Duplicate data items are not supported with sorted data");
01490         return (DB_KEYEXIST);
01491 }
01492 
01493 /*
01494  * __db_c_cleanup --
01495  *      Clean up duplicate cursors.
01496  */
01497 static int
01498 __db_c_cleanup(dbc, dbc_n, failed)
01499         DBC *dbc, *dbc_n;
01500         int failed;
01501 {
01502         DB *dbp;
01503         DBC *opd;
01504         DBC_INTERNAL *internal;
01505         DB_MPOOLFILE *mpf;
01506         int ret, t_ret;
01507 
01508         dbp = dbc->dbp;
01509         mpf = dbp->mpf;
01510         internal = dbc->internal;
01511         ret = 0;
01512 
01513         /* Discard any pages we're holding. */
01514         if (internal->page != NULL) {
01515                 if ((t_ret =
01516                     __memp_fput(mpf, internal->page, 0)) != 0 && ret == 0)
01517                         ret = t_ret;
01518                 internal->page = NULL;
01519         }
01520         opd = internal->opd;
01521         if (opd != NULL && opd->internal->page != NULL) {
01522                 if ((t_ret =
01523                     __memp_fput(mpf, opd->internal->page, 0)) != 0 && ret == 0)
01524                         ret = t_ret;
01525                 opd->internal->page = NULL;
01526         }
01527 
01528         /*
01529          * If dbc_n is NULL, there's no internal cursor swapping to be done
01530          * and no dbc_n to close--we probably did the entire operation on an
01531          * offpage duplicate cursor.  Just return.
01532          *
01533          * If dbc and dbc_n are the same, we're either inside a DB->{put/get}
01534          * operation, and as an optimization we performed the operation on
01535          * the main cursor rather than on a duplicated one, or we're in a
01536          * bulk get that can't have moved the cursor (DB_MULTIPLE with the
01537          * initial c_get operation on an off-page dup cursor).  Just
01538          * return--either we know we didn't move the cursor, or we're going
01539          * to close it before we return to application code, so we're sure
01540          * not to visibly violate the "cursor stays put on error" rule.
01541          */
01542         if (dbc_n == NULL || dbc == dbc_n)
01543                 return (ret);
01544 
01545         if (dbc_n->internal->page != NULL) {
01546                 if ((t_ret = __memp_fput(
01547                     mpf, dbc_n->internal->page, 0)) != 0 && ret == 0)
01548                         ret = t_ret;
01549                 dbc_n->internal->page = NULL;
01550         }
01551         opd = dbc_n->internal->opd;
01552         if (opd != NULL && opd->internal->page != NULL) {
01553                 if ((t_ret =
01554                     __memp_fput(mpf, opd->internal->page, 0)) != 0 && ret == 0)
01555                         ret = t_ret;
01556                 opd->internal->page = NULL;
01557         }
01558 
01559         /*
01560          * If we didn't fail before entering this routine or just now when
01561          * freeing pages, swap the interesting contents of the old and new
01562          * cursors.
01563          */
01564         if (!failed && ret == 0) {
01565                 dbc->internal = dbc_n->internal;
01566                 dbc_n->internal = internal;
01567         }
01568 
01569         /*
01570          * Close the cursor we don't care about anymore.  The close can fail,
01571          * but we only expect DB_LOCK_DEADLOCK failures.  This violates our
01572          * "the cursor is unchanged on error" semantics, but since all you can
01573          * do with a DB_LOCK_DEADLOCK failure is close the cursor, I believe
01574          * that's OK.
01575          *
01576          * XXX
01577          * There's no way to recover from failure to close the old cursor.
01578          * All we can do is move to the new position and return an error.
01579          *
01580          * XXX
01581          * We might want to consider adding a flag to the cursor, so that any
01582          * subsequent operations other than close just return an error?
01583          */
01584         if ((t_ret = __db_c_close(dbc_n)) != 0 && ret == 0)
01585                 ret = t_ret;
01586 
01587         /*
01588          * If this was an update that is supporting dirty reads
01589          * then we may have just swapped our read for a write lock
01590          * which is held by the surviving cursor.  We need
01591          * to explicitly downgrade this lock.  The closed cursor
01592          * may only have had a read lock.
01593          */
01594         if (F_ISSET(dbp, DB_AM_READ_UNCOMMITTED) &&
01595             dbc->internal->lock_mode == DB_LOCK_WRITE) {
01596                 if ((t_ret =
01597                     __TLPUT(dbc, dbc->internal->lock)) != 0 && ret == 0)
01598                         ret = t_ret;
01599                 if (t_ret == 0)
01600                         dbc->internal->lock_mode = DB_LOCK_WWRITE;
01601         }
01602 
01603         return (ret);
01604 }
01605 
01606 /*
01607  * __db_c_secondary_get_pp --
01608  *      This wrapper function for DBC->c_pget() is the DBC->c_get() function
01609  *      for a secondary index cursor.
01610  *
01611  * PUBLIC: int __db_c_secondary_get_pp __P((DBC *, DBT *, DBT *, u_int32_t));
01612  */
01613 int
01614 __db_c_secondary_get_pp(dbc, skey, data, flags)
01615         DBC *dbc;
01616         DBT *skey, *data;
01617         u_int32_t flags;
01618 {
01619 
01620         DB_ASSERT(F_ISSET(dbc->dbp, DB_AM_SECONDARY));
01621         return (__db_c_pget_pp(dbc, skey, NULL, data, flags));
01622 }
01623 
01624 /*
01625  * __db_c_pget --
01626  *      Get a primary key/data pair through a secondary index.
01627  *
01628  * PUBLIC: int __db_c_pget __P((DBC *, DBT *, DBT *, DBT *, u_int32_t));
01629  */
01630 int
01631 __db_c_pget(dbc, skey, pkey, data, flags)
01632         DBC *dbc;
01633         DBT *skey, *pkey, *data;
01634         u_int32_t flags;
01635 {
01636         DB *pdbp, *sdbp;
01637         DBC *dbc_n, *pdbc;
01638         DBT nullpkey;
01639         u_int32_t save_pkey_flags, tmp_flags, tmp_read_uncommitted, tmp_rmw;
01640         int pkeymalloc, ret, t_ret;
01641 
01642         sdbp = dbc->dbp;
01643         pdbp = sdbp->s_primary;
01644         dbc_n = NULL;
01645         pkeymalloc = t_ret = 0;
01646 
01647         /*
01648          * The challenging part of this function is getting the behavior
01649          * right for all the various permutations of DBT flags.  The
01650          * next several blocks handle the various cases we need to
01651          * deal with specially.
01652          */
01653 
01654         /*
01655          * We may be called with a NULL pkey argument, if we've been
01656          * wrapped by a 2-DBT get call.  If so, we need to use our
01657          * own DBT.
01658          */
01659         if (pkey == NULL) {
01660                 memset(&nullpkey, 0, sizeof(DBT));
01661                 pkey = &nullpkey;
01662         }
01663 
01664         /* Clear OR'd in additional bits so we can check for flag equality. */
01665         tmp_rmw = LF_ISSET(DB_RMW);
01666         LF_CLR(DB_RMW);
01667 
01668         tmp_read_uncommitted =
01669             LF_ISSET(DB_READ_UNCOMMITTED) &&
01670             !F_ISSET(dbc, DBC_READ_UNCOMMITTED);
01671         LF_CLR(DB_READ_UNCOMMITTED);
01672 
01673         /*
01674          * DB_GET_RECNO is a special case, because we're interested not in
01675          * the primary key/data pair, but rather in the primary's record
01676          * number.
01677          */
01678         if (flags == DB_GET_RECNO) {
01679                 if (tmp_rmw)
01680                         F_SET(dbc, DBC_RMW);
01681                 if (tmp_read_uncommitted)
01682                         F_SET(dbc, DBC_READ_UNCOMMITTED);
01683                 ret = __db_c_pget_recno(dbc, pkey, data, flags);
01684                 if (tmp_rmw)
01685                         F_CLR(dbc, DBC_RMW);
01686                 if (tmp_read_uncommitted)
01687                         F_CLR(dbc, DBC_READ_UNCOMMITTED);
01688                 return (ret);
01689         }
01690 
01691         /*
01692          * If the DBTs we've been passed don't have any of the
01693          * user-specified memory management flags set, we want to make sure
01694          * we return values using the DBTs dbc->rskey, dbc->rkey, and
01695          * dbc->rdata, respectively.
01696          *
01697          * There are two tricky aspects to this:  first, we need to pass
01698          * skey and pkey *in* to the initial c_get on the secondary key,
01699          * since either or both may be looked at by it (depending on the
01700          * get flag).  Second, we must not use a normal DB->get call
01701          * on the secondary, even though that's what we want to accomplish,
01702          * because the DB handle may be free-threaded.  Instead,
01703          * we open a cursor, then take steps to ensure that we actually use
01704          * the rkey/rdata from the *secondary* cursor.
01705          *
01706          * We accomplish all this by passing in the DBTs we started out
01707          * with to the c_get, but swapping the contents of rskey and rkey,
01708          * respectively, into rkey and rdata;  __db_ret will treat them like
01709          * the normal key/data pair in a c_get call, and will realloc them as
01710          * need be (this is "step 1").  Then, for "step 2", we swap back
01711          * rskey/rkey/rdata to normal, and do a get on the primary with the
01712          * secondary dbc appointed as the owner of the returned-data memory.
01713          *
01714          * Note that in step 2, we copy the flags field in case we need to
01715          * pass down a DB_DBT_PARTIAL or other flag that is compatible with
01716          * letting DB do the memory management.
01717          */
01718 
01719         /*
01720          * It is correct, though slightly sick, to attempt a partial get of a
01721          * primary key.  However, if we do so here, we'll never find the
01722          * primary record;  clear the DB_DBT_PARTIAL field of pkey just for the
01723          * duration of the next call.
01724          */
01725         save_pkey_flags = pkey->flags;
01726         F_CLR(pkey, DB_DBT_PARTIAL);
01727 
01728         /*
01729          * Now we can go ahead with the meat of this call.  First, get the
01730          * primary key from the secondary index.  (What exactly we get depends
01731          * on the flags, but the underlying cursor get will take care of the
01732          * dirty work.)  Duplicate the cursor, in case the later get on the
01733          * primary fails.
01734          */
01735         switch (flags) {
01736         case DB_CURRENT:
01737         case DB_GET_BOTHC:
01738         case DB_NEXT:
01739         case DB_NEXT_DUP:
01740         case DB_NEXT_NODUP:
01741         case DB_PREV:
01742         case DB_PREV_NODUP:
01743                 tmp_flags = DB_POSITION;
01744                 break;
01745         default:
01746                 tmp_flags = 0;
01747                 break;
01748         }
01749 
01750         if (tmp_read_uncommitted)
01751                 F_SET(dbc, DBC_READ_UNCOMMITTED);
01752 
01753         if ((ret = __db_c_dup(dbc, &dbc_n, tmp_flags)) != 0) {
01754                 if (tmp_read_uncommitted)
01755                         F_CLR(dbc, DBC_READ_UNCOMMITTED);
01756 
01757                 return (ret);
01758         }
01759 
01760         F_SET(dbc_n, DBC_TRANSIENT);
01761 
01762         if (tmp_rmw)
01763                 F_SET(dbc_n, DBC_RMW);
01764 
01765         /*
01766          * If we've been handed a primary key, it will be in native byte order,
01767          * so we need to swap it before reading from the secondary.
01768          */
01769         if (flags == DB_GET_BOTH || flags == DB_GET_BOTHC ||
01770             flags == DB_GET_BOTH_RANGE)
01771                 SWAP_IF_NEEDED(pdbp, sdbp, pkey);
01772 
01773         /* Step 1. */
01774         dbc_n->rdata = dbc->rkey;
01775         dbc_n->rkey = dbc->rskey;
01776         ret = __db_c_get(dbc_n, skey, pkey, flags);
01777         /* Restore pkey's flags in case we stomped the PARTIAL flag. */
01778         pkey->flags = save_pkey_flags;
01779 
01780         if (tmp_read_uncommitted)
01781                 F_CLR(dbc_n, DBC_READ_UNCOMMITTED);
01782         if (tmp_rmw)
01783                 F_CLR(dbc_n, DBC_RMW);
01784 
01785         /*
01786          * We need to swap the primary key to native byte order if we read it
01787          * successfully, or if we swapped it on entry above.  We can't return
01788          * with the application's data modified.
01789          */
01790         if (ret == 0 || flags == DB_GET_BOTH || flags == DB_GET_BOTHC ||
01791             flags == DB_GET_BOTH_RANGE)
01792                 SWAP_IF_NEEDED(pdbp, sdbp, pkey);
01793 
01794         if (ret != 0)
01795                 goto err;
01796 
01797         /*
01798          * Now we're ready for "step 2".  If either or both of pkey and data do
01799          * not have memory management flags set--that is, if DB is managing
01800          * their memory--we need to swap around the rkey/rdata structures so
01801          * that we don't wind up trying to use memory managed by the primary
01802          * database cursor, which we'll close before we return.
01803          *
01804          * !!!
01805          * If you're carefully following the bouncing ball, you'll note that in
01806          * the DB-managed case, the buffer hanging off of pkey is the same as
01807          * dbc->rkey->data.  This is just fine;  we may well realloc and stomp
01808          * on it when we return, if we're doing a DB_GET_BOTH and need to
01809          * return a different partial or key (depending on the comparison
01810          * function), but this is safe.
01811          *
01812          * !!!
01813          * We need to use __db_cursor_int here rather than simply calling
01814          * pdbp->cursor, because otherwise, if we're in CDB, we'll allocate a
01815          * new locker ID and leave ourselves open to deadlocks.  (Even though
01816          * we're only acquiring read locks, we'll still block if there are any
01817          * waiters.)
01818          */
01819         if ((ret = __db_cursor_int(pdbp,
01820             dbc->txn, pdbp->type, PGNO_INVALID, 0, dbc->locker, &pdbc)) != 0)
01821                 goto err;
01822 
01823         if (tmp_read_uncommitted)
01824                 F_SET(pdbc, DBC_READ_UNCOMMITTED);
01825         if (tmp_rmw)
01826                 F_SET(pdbc, DBC_RMW);
01827         if (F_ISSET(dbc, DBC_READ_COMMITTED))
01828                 F_SET(pdbc, DBC_READ_COMMITTED);
01829 
01830         /*
01831          * We're about to use pkey a second time.  If DB_DBT_MALLOC is set on
01832          * it, we'll leak the memory we allocated the first time.  Thus, set
01833          * DB_DBT_REALLOC instead so that we reuse that memory instead of
01834          * leaking it.
01835          *
01836          * !!!
01837          * This assumes that the user must always specify a compatible realloc
01838          * function if a malloc function is specified.  I think this is a
01839          * reasonable requirement.
01840          */
01841         if (F_ISSET(pkey, DB_DBT_MALLOC)) {
01842                 F_CLR(pkey, DB_DBT_MALLOC);
01843                 F_SET(pkey, DB_DBT_REALLOC);
01844                 pkeymalloc = 1;
01845         }
01846 
01847         /*
01848          * Do the actual get.  Set DBC_TRANSIENT since we don't care about
01849          * preserving the position on error, and it's faster.  SET_RET_MEM so
01850          * that the secondary DBC owns any returned-data memory.
01851          */
01852         F_SET(pdbc, DBC_TRANSIENT);
01853         SET_RET_MEM(pdbc, dbc);
01854         ret = __db_c_get(pdbc, pkey, data, DB_SET);
01855 
01856         /*
01857          * If the item wasn't found in the primary, this is a bug; our
01858          * secondary has somehow gotten corrupted, and contains elements that
01859          * don't correspond to anything in the primary.  Complain.
01860          */
01861         if (ret == DB_NOTFOUND)
01862                 ret = __db_secondary_corrupt(pdbp);
01863 
01864         /* Now close the primary cursor. */
01865         if ((t_ret = __db_c_close(pdbc)) != 0 && ret == 0)
01866                 ret = t_ret;
01867 
01868 err:    /* Cleanup and cursor resolution. */
01869         if ((t_ret = __db_c_cleanup(dbc, dbc_n, ret)) != 0 && ret == 0)
01870                 ret = t_ret;
01871         if (pkeymalloc) {
01872                 /*
01873                  * If pkey had a MALLOC flag, we need to restore it; otherwise,
01874                  * if the user frees the buffer but reuses the DBT without
01875                  * NULL'ing its data field or changing the flags, we may drop
01876                  * core.
01877                  */
01878                 F_CLR(pkey, DB_DBT_REALLOC);
01879                 F_SET(pkey, DB_DBT_MALLOC);
01880         }
01881 
01882         return (ret);
01883 }
01884 
01885 /*
01886  * __db_c_pget_recno --
01887  *      Perform a DB_GET_RECNO c_pget on a secondary index.  Returns
01888  * the secondary's record number in the pkey field and the primary's
01889  * in the data field.
01890  */
01891 static int
01892 __db_c_pget_recno(sdbc, pkey, data, flags)
01893         DBC *sdbc;
01894         DBT *pkey, *data;
01895         u_int32_t flags;
01896 {
01897         DB *pdbp, *sdbp;
01898         DB_ENV *dbenv;
01899         DBC *pdbc;
01900         DBT discardme, primary_key;
01901         db_recno_t oob;
01902         u_int32_t rmw;
01903         int ret, t_ret;
01904 
01905         sdbp = sdbc->dbp;
01906         pdbp = sdbp->s_primary;
01907         dbenv = sdbp->dbenv;
01908         pdbc = NULL;
01909         ret = t_ret = 0;
01910 
01911         rmw = LF_ISSET(DB_RMW);
01912 
01913         memset(&discardme, 0, sizeof(DBT));
01914         F_SET(&discardme, DB_DBT_USERMEM | DB_DBT_PARTIAL);
01915 
01916         oob = RECNO_OOB;
01917 
01918         /*
01919          * If the primary is an rbtree, we want its record number, whether
01920          * or not the secondary is one too.  Fetch the recno into "data".
01921          *
01922          * If it's not an rbtree, return RECNO_OOB in "data".
01923          */
01924         if (F_ISSET(pdbp, DB_AM_RECNUM)) {
01925                 /*
01926                  * Get the primary key, so we can find the record number
01927                  * in the primary. (We're uninterested in the secondary key.)
01928                  */
01929                 memset(&primary_key, 0, sizeof(DBT));
01930                 F_SET(&primary_key, DB_DBT_MALLOC);
01931                 if ((ret = __db_c_get(sdbc,
01932                     &discardme, &primary_key, rmw | DB_CURRENT)) != 0)
01933                         return (ret);
01934 
01935                 /*
01936                  * Open a cursor on the primary, set it to the right record,
01937                  * and fetch its recno into "data".
01938                  *
01939                  * (See __db_c_pget for comments on the use of __db_cursor_int.)
01940                  *
01941                  * SET_RET_MEM so that the secondary DBC owns any returned-data
01942                  * memory.
01943                  */
01944                 if ((ret = __db_cursor_int(pdbp, sdbc->txn,
01945                     pdbp->type, PGNO_INVALID, 0, sdbc->locker, &pdbc)) != 0)
01946                         goto perr;
01947                 SET_RET_MEM(pdbc, sdbc);
01948                 if ((ret = __db_c_get(pdbc,
01949                     &primary_key, &discardme, rmw | DB_SET)) != 0)
01950                         goto perr;
01951 
01952                 ret = __db_c_get(pdbc, &discardme, data, rmw | DB_GET_RECNO);
01953 
01954 perr:           __os_ufree(sdbp->dbenv, primary_key.data);
01955                 if (pdbc != NULL &&
01956                     (t_ret = __db_c_close(pdbc)) != 0 && ret == 0)
01957                         ret = t_ret;
01958                 if (ret != 0)
01959                         return (ret);
01960         } else if ((ret = __db_retcopy(dbenv, data, &oob,
01961                     sizeof(oob), &sdbc->rkey->data, &sdbc->rkey->ulen)) != 0)
01962                         return (ret);
01963 
01964         /*
01965          * If the secondary is an rbtree, we want its record number, whether
01966          * or not the primary is one too.  Fetch the recno into "pkey".
01967          *
01968          * If it's not an rbtree, return RECNO_OOB in "pkey".
01969          */
01970         if (F_ISSET(sdbp, DB_AM_RECNUM))
01971                 return (__db_c_get(sdbc, &discardme, pkey, flags));
01972         else
01973                 return (__db_retcopy(dbenv, pkey, &oob,
01974                     sizeof(oob), &sdbc->rdata->data, &sdbc->rdata->ulen));
01975 }
01976 
01977 /*
01978  * __db_wrlock_err -- do not have a write lock.
01979  */
01980 static int
01981 __db_wrlock_err(dbenv)
01982         DB_ENV *dbenv;
01983 {
01984         __db_err(dbenv, "Write attempted on read-only cursor");
01985         return (EPERM);
01986 }
01987 
01988 /*
01989  * __db_c_del_secondary --
01990  *      Perform a delete operation on a secondary index:  call through
01991  *      to the primary and delete the primary record that this record
01992  *      points to.
01993  *
01994  *      Note that deleting the primary record will call c_del on all
01995  *      the secondaries, including this one;  thus, it is not necessary
01996  *      to execute both this function and an actual delete.
01997  */
01998 static int
01999 __db_c_del_secondary(dbc)
02000         DBC *dbc;
02001 {
02002         DB *pdbp;
02003         DBC *pdbc;
02004         DBT skey, pkey;
02005         int ret, t_ret;
02006 
02007         memset(&skey, 0, sizeof(DBT));
02008         memset(&pkey, 0, sizeof(DBT));
02009         pdbp = dbc->dbp->s_primary;
02010 
02011         /*
02012          * Get the current item that we're pointing at.
02013          * We don't actually care about the secondary key, just
02014          * the primary.
02015          */
02016         F_SET(&skey, DB_DBT_PARTIAL | DB_DBT_USERMEM);
02017         if ((ret = __db_c_get(dbc, &skey, &pkey, DB_CURRENT)) != 0)
02018                 return (ret);
02019 
02020         SWAP_IF_NEEDED(pdbp, dbc->dbp, &pkey);
02021 
02022         /*
02023          * Create a cursor on the primary with our locker ID,
02024          * so that when it calls back, we don't conflict.
02025          *
02026          * We create a cursor explicitly because there's no
02027          * way to specify the same locker ID if we're using
02028          * locking but not transactions if we use the DB->del
02029          * interface.  This shouldn't be any less efficient
02030          * anyway.
02031          */
02032         if ((ret = __db_cursor_int(pdbp, dbc->txn,
02033             pdbp->type, PGNO_INVALID, 0, dbc->locker, &pdbc)) != 0)
02034                 return (ret);
02035 
02036         /*
02037          * See comment in __db_c_put--if we're in CDB,
02038          * we already hold the locks we need, and we need to flag
02039          * the cursor as a WRITER so we don't run into errors
02040          * when we try to delete.
02041          */
02042         if (CDB_LOCKING(pdbp->dbenv)) {
02043                 DB_ASSERT(pdbc->mylock.off == LOCK_INVALID);
02044                 F_SET(pdbc, DBC_WRITER);
02045         }
02046 
02047         /*
02048          * Set the new cursor to the correct primary key.  Then
02049          * delete it.  We don't really care about the datum;
02050          * just reuse our skey DBT.
02051          *
02052          * If the primary get returns DB_NOTFOUND, something is amiss--
02053          * every record in the secondary should correspond to some record
02054          * in the primary.
02055          */
02056         if ((ret = __db_c_get(pdbc, &pkey, &skey,
02057             (STD_LOCKING(dbc) ? DB_RMW : 0) | DB_SET)) == 0)
02058                 ret = __db_c_del(pdbc, 0);
02059         else if (ret == DB_NOTFOUND)
02060                 ret = __db_secondary_corrupt(pdbp);
02061 
02062         if ((t_ret = __db_c_close(pdbc)) != 0 && ret == 0)
02063                 ret = t_ret;
02064 
02065         return (ret);
02066 }
02067 
02068 /*
02069  * __db_c_del_primary --
02070  *      Perform a delete operation on a primary index.  Loop through
02071  *      all the secondary indices which correspond to this primary
02072  *      database, and delete any secondary keys that point at the current
02073  *      record.
02074  *
02075  * PUBLIC: int __db_c_del_primary __P((DBC *));
02076  */
02077 int
02078 __db_c_del_primary(dbc)
02079         DBC *dbc;
02080 {
02081         DB *dbp, *sdbp;
02082         DBC *sdbc;
02083         DBT data, pkey, skey, temppkey, tempskey;
02084         int ret, t_ret;
02085 
02086         dbp = dbc->dbp;
02087 
02088         /*
02089          * If we're called at all, we have at least one secondary.
02090          * (Unfortunately, we can't assert this without grabbing the mutex.)
02091          * Get the current record so that we can construct appropriate
02092          * secondary keys as needed.
02093          */
02094         memset(&pkey, 0, sizeof(DBT));
02095         memset(&data, 0, sizeof(DBT));
02096         if ((ret = __db_c_get(dbc, &pkey, &data, DB_CURRENT)) != 0)
02097                 return (ret);
02098 
02099         if ((ret = __db_s_first(dbp, &sdbp)) != 0)
02100                 goto err;
02101         for (; sdbp != NULL && ret == 0; ret = __db_s_next(&sdbp)) {
02102                 /*
02103                  * Get the secondary key for this secondary and the current
02104                  * item.
02105                  */
02106                 memset(&skey, 0, sizeof(DBT));
02107                 if ((ret = sdbp->s_callback(sdbp, &pkey, &data, &skey)) != 0) {
02108                         /*
02109                          * If the current item isn't in this index, we
02110                          * have no work to do.  Proceed.
02111                          */
02112                         if (ret == DB_DONOTINDEX)
02113                                 continue;
02114 
02115                         /* We had a substantive error.  Bail. */
02116                         FREE_IF_NEEDED(sdbp, &skey);
02117                         goto err;
02118                 }
02119 
02120                 /* Open a secondary cursor. */
02121                 if ((ret = __db_cursor_int(sdbp, dbc->txn, sdbp->type,
02122                     PGNO_INVALID, 0, dbc->locker, &sdbc)) != 0)
02123                         goto err;
02124                 /* See comment above and in __db_c_put. */
02125                 if (CDB_LOCKING(sdbp->dbenv)) {
02126                         DB_ASSERT(sdbc->mylock.off == LOCK_INVALID);
02127                         F_SET(sdbc, DBC_WRITER);
02128                 }
02129 
02130                 /*
02131                  * Set the secondary cursor to the appropriate item.
02132                  * Delete it.
02133                  *
02134                  * We want to use DB_RMW if locking is on;  it's only
02135                  * legal then, though.
02136                  *
02137                  * !!!
02138                  * Don't stomp on any callback-allocated buffer in skey
02139                  * when we do a c_get(DB_GET_BOTH); use a temp DBT instead.
02140                  * Similarly, don't allow pkey to be invalidated when the
02141                  * cursor is closed.
02142                  */
02143                 memset(&tempskey, 0, sizeof(DBT));
02144                 tempskey.data = skey.data;
02145                 tempskey.size = skey.size;
02146                 SWAP_IF_NEEDED(dbp, sdbp, &pkey);
02147                 memset(&temppkey, 0, sizeof(DBT));
02148                 temppkey.data = pkey.data;
02149                 temppkey.size = pkey.size;
02150                 if ((ret = __db_c_get(sdbc, &tempskey, &temppkey,
02151                     (STD_LOCKING(dbc) ? DB_RMW : 0) | DB_GET_BOTH)) == 0)
02152                         ret = __db_c_del(sdbc, DB_UPDATE_SECONDARY);
02153                 else if (ret == DB_NOTFOUND)
02154                         ret = __db_secondary_corrupt(dbp);
02155                 SWAP_IF_NEEDED(dbp, sdbp, &pkey);
02156 
02157                 FREE_IF_NEEDED(sdbp, &skey);
02158 
02159                 if ((t_ret = __db_c_close(sdbc)) != 0 && ret == 0)
02160                         ret = t_ret;
02161                 if (ret != 0)
02162                         goto err;
02163         }
02164 
02165 err:    if (sdbp != NULL && (t_ret = __db_s_done(sdbp)) != 0 && ret == 0)
02166                 ret = t_ret;
02167         return (ret);
02168 }
02169 
02170 /*
02171  * __db_s_first --
02172  *      Get the first secondary, if any are present, from the primary.
02173  *
02174  * PUBLIC: int __db_s_first __P((DB *, DB **));
02175  */
02176 int
02177 __db_s_first(pdbp, sdbpp)
02178         DB *pdbp, **sdbpp;
02179 {
02180         DB *sdbp;
02181 
02182         MUTEX_LOCK(pdbp->dbenv, pdbp->mutex);
02183         sdbp = LIST_FIRST(&pdbp->s_secondaries);
02184 
02185         /* See __db_s_next. */
02186         if (sdbp != NULL)
02187                 sdbp->s_refcnt++;
02188         MUTEX_UNLOCK(pdbp->dbenv, pdbp->mutex);
02189 
02190         *sdbpp = sdbp;
02191 
02192         return (0);
02193 }
02194 
02195 /*
02196  * __db_s_next --
02197  *      Get the next secondary in the list.
02198  *
02199  * PUBLIC: int __db_s_next __P((DB **));
02200  */
02201 int
02202 __db_s_next(sdbpp)
02203         DB **sdbpp;
02204 {
02205         DB *sdbp, *pdbp, *closeme;
02206         int ret;
02207 
02208         /*
02209          * Secondary indices are kept in a linked list, s_secondaries,
02210          * off each primary DB handle.  If a primary is free-threaded,
02211          * this list may only be traversed or modified while the primary's
02212          * thread mutex is held.
02213          *
02214          * The tricky part is that we don't want to hold the thread mutex
02215          * across the full set of secondary puts necessary for each primary
02216          * put, or we'll wind up essentially single-threading all the puts
02217          * to the handle;  the secondary puts will each take about as
02218          * long as the primary does, and may require I/O.  So we instead
02219          * hold the thread mutex only long enough to follow one link to the
02220          * next secondary, and then we release it before performing the
02221          * actual secondary put.
02222          *
02223          * The only danger here is that we might legitimately close a
02224          * secondary index in one thread while another thread is performing
02225          * a put and trying to update that same secondary index.  To
02226          * prevent this from happening, we refcount the secondary handles.
02227          * If close is called on a secondary index handle while we're putting
02228          * to it, it won't really be closed--the refcount will simply drop,
02229          * and we'll be responsible for closing it here.
02230          */
02231         sdbp = *sdbpp;
02232         pdbp = sdbp->s_primary;
02233         closeme = NULL;
02234 
02235         MUTEX_LOCK(pdbp->dbenv, pdbp->mutex);
02236         DB_ASSERT(sdbp->s_refcnt != 0);
02237         if (--sdbp->s_refcnt == 0) {
02238                 LIST_REMOVE(sdbp, s_links);
02239                 closeme = sdbp;
02240         }
02241         sdbp = LIST_NEXT(sdbp, s_links);
02242         if (sdbp != NULL)
02243                 sdbp->s_refcnt++;
02244         MUTEX_UNLOCK(pdbp->dbenv, pdbp->mutex);
02245 
02246         *sdbpp = sdbp;
02247 
02248         /*
02249          * closeme->close() is a wrapper;  call __db_close explicitly.
02250          */
02251         ret = closeme != NULL ? __db_close(closeme, NULL, 0) : 0;
02252         return (ret);
02253 }
02254 
02255 /*
02256  * __db_s_done --
02257  *      Properly decrement the refcount on a secondary database handle we're
02258  *      using, without calling __db_s_next.
02259  *
02260  * PUBLIC: int __db_s_done __P((DB *));
02261  */
02262 int
02263 __db_s_done(sdbp)
02264         DB *sdbp;
02265 {
02266         DB *pdbp;
02267         int doclose;
02268 
02269         pdbp = sdbp->s_primary;
02270         doclose = 0;
02271 
02272         MUTEX_LOCK(pdbp->dbenv, pdbp->mutex);
02273         DB_ASSERT(sdbp->s_refcnt != 0);
02274         if (--sdbp->s_refcnt == 0) {
02275                 LIST_REMOVE(sdbp, s_links);
02276                 doclose = 1;
02277         }
02278         MUTEX_UNLOCK(pdbp->dbenv, pdbp->mutex);
02279 
02280         return (doclose ? __db_close(sdbp, NULL, 0) : 0);
02281 }
02282 
02283 /*
02284  * __db_buildpartial --
02285  *      Build the record that will result after a partial put is applied to
02286  *      an existing record.
02287  *
02288  *      This should probably be merged with __bam_build, but that requires
02289  *      a little trickery if we plan to keep the overflow-record optimization
02290  *      in that function.
02291  */
02292 static int
02293 __db_buildpartial(dbp, oldrec, partial, newrec)
02294         DB *dbp;
02295         DBT *oldrec, *partial, *newrec;
02296 {
02297         int ret;
02298         u_int8_t *buf;
02299         u_int32_t len, nbytes;
02300 
02301         DB_ASSERT(F_ISSET(partial, DB_DBT_PARTIAL));
02302 
02303         memset(newrec, 0, sizeof(DBT));
02304 
02305         nbytes = __db_partsize(oldrec->size, partial);
02306         newrec->size = nbytes;
02307 
02308         if ((ret = __os_malloc(dbp->dbenv, nbytes, &buf)) != 0)
02309                 return (ret);
02310         newrec->data = buf;
02311 
02312         /* Nul or pad out the buffer, for any part that isn't specified. */
02313         memset(buf,
02314             F_ISSET(dbp, DB_AM_FIXEDLEN) ? ((BTREE *)dbp->bt_internal)->re_pad :
02315             0, nbytes);
02316 
02317         /* Copy in any leading data from the original record. */
02318         memcpy(buf, oldrec->data,
02319             partial->doff > oldrec->size ? oldrec->size : partial->doff);
02320 
02321         /* Copy the data from partial. */
02322         memcpy(buf + partial->doff, partial->data, partial->size);
02323 
02324         /* Copy any trailing data from the original record. */
02325         len = partial->doff + partial->dlen;
02326         if (oldrec->size > len)
02327                 memcpy(buf + partial->doff + partial->size,
02328                     (u_int8_t *)oldrec->data + len, oldrec->size - len);
02329 
02330         return (0);
02331 }
02332 
02333 /*
02334  * __db_partsize --
02335  *      Given the number of bytes in an existing record and a DBT that
02336  *      is about to be partial-put, calculate the size of the record
02337  *      after the put.
02338  *
02339  *      This code is called from __bam_partsize.
02340  *
02341  * PUBLIC: u_int32_t __db_partsize __P((u_int32_t, DBT *));
02342  */
02343 u_int32_t
02344 __db_partsize(nbytes, data)
02345         u_int32_t nbytes;
02346         DBT *data;
02347 {
02348 
02349         /*
02350          * There are really two cases here:
02351          *
02352          * Case 1: We are replacing some bytes that do not exist (i.e., they
02353          * are past the end of the record).  In this case the number of bytes
02354          * we are replacing is irrelevant and all we care about is how many
02355          * bytes we are going to add from offset.  So, the new record length
02356          * is going to be the size of the new bytes (size) plus wherever those
02357          * new bytes begin (doff).
02358          *
02359          * Case 2: All the bytes we are replacing exist.  Therefore, the new
02360          * size is the oldsize (nbytes) minus the bytes we are replacing (dlen)
02361          * plus the bytes we are adding (size).
02362          */
02363         if (nbytes < data->doff + data->dlen)           /* Case 1 */
02364                 return (data->doff + data->size);
02365 
02366         return (nbytes + data->size - data->dlen);      /* Case 2 */
02367 }

Generated on Sun Dec 25 12:14:19 2005 for Berkeley DB 4.4.16 by  doxygen 1.4.2