Main Page | Class Hierarchy | Data Structures | Directories | File List | Data Fields | Related Pages

db_am.c

00001 /*-
00002  * See the file LICENSE for redistribution information.
00003  *
00004  * Copyright (c) 1998-2005
00005  *      Sleepycat Software.  All rights reserved.
00006  *
00007  * $Id: db_am.c,v 12.12 2005/11/01 00:44:09 bostic Exp $
00008  */
00009 
00010 #include "db_config.h"
00011 
00012 #ifndef NO_SYSTEM_INCLUDES
00013 #include <sys/types.h>
00014 
00015 #include <string.h>
00016 #endif
00017 
00018 #include "db_int.h"
00019 #include "dbinc/db_page.h"
00020 #include "dbinc/db_shash.h"
00021 #include "dbinc/btree.h"
00022 #include "dbinc/hash.h"
00023 #include "dbinc/lock.h"
00024 #include "dbinc/log.h"
00025 #include "dbinc/mp.h"
00026 #include "dbinc/qam.h"
00027 
00028 static int __db_append_primary __P((DBC *, DBT *, DBT *));
00029 static int __db_secondary_get __P((DB *, DB_TXN *, DBT *, DBT *, u_int32_t));
00030 
00031 /*
00032  * __db_cursor_int --
00033  *      Internal routine to create a cursor.
00034  *
00035  * PUBLIC: int __db_cursor_int
00036  * PUBLIC:     __P((DB *, DB_TXN *, DBTYPE, db_pgno_t, int, u_int32_t, DBC **));
00037  */
00038 int
00039 __db_cursor_int(dbp, txn, dbtype, root, is_opd, lockerid, dbcp)
00040         DB *dbp;
00041         DB_TXN *txn;
00042         DBTYPE dbtype;
00043         db_pgno_t root;
00044         int is_opd;
00045         u_int32_t lockerid;
00046         DBC **dbcp;
00047 {
00048         DBC *dbc;
00049         DBC_INTERNAL *cp;
00050         DB_ENV *dbenv;
00051         db_threadid_t tid;
00052         int allocated, ret;
00053         pid_t pid;
00054 
00055         dbenv = dbp->dbenv;
00056         allocated = 0;
00057 
00058         /*
00059          * If dbcp is non-NULL it is assumed to point to an area to initialize
00060          * as a cursor.
00061          *
00062          * Take one from the free list if it's available.  Take only the
00063          * right type.  With off page dups we may have different kinds
00064          * of cursors on the queue for a single database.
00065          */
00066         MUTEX_LOCK(dbenv, dbp->mutex);
00067         for (dbc = TAILQ_FIRST(&dbp->free_queue);
00068             dbc != NULL; dbc = TAILQ_NEXT(dbc, links))
00069                 if (dbtype == dbc->dbtype) {
00070                         TAILQ_REMOVE(&dbp->free_queue, dbc, links);
00071                         F_CLR(dbc, ~DBC_OWN_LID);
00072                         break;
00073                 }
00074         MUTEX_UNLOCK(dbenv, dbp->mutex);
00075 
00076         if (dbc == NULL) {
00077                 if ((ret = __os_calloc(dbenv, 1, sizeof(DBC), &dbc)) != 0)
00078                         return (ret);
00079                 allocated = 1;
00080                 dbc->flags = 0;
00081 
00082                 dbc->dbp = dbp;
00083 
00084                 /* Set up locking information. */
00085                 if (LOCKING_ON(dbenv)) {
00086                         /*
00087                          * If we are not threaded, we share a locker ID among
00088                          * all cursors opened in the environment handle,
00089                          * allocating one if this is the first cursor.
00090                          *
00091                          * This relies on the fact that non-threaded DB handles
00092                          * always have non-threaded environment handles, since
00093                          * we set DB_THREAD on DB handles created with threaded
00094                          * environment handles.
00095                          */
00096                         if (!DB_IS_THREADED(dbp)) {
00097                                 if (dbp->dbenv->env_lref == NULL &&
00098                                     (ret = __lock_id(dbenv, NULL,
00099                                     (DB_LOCKER **)&dbp->dbenv->env_lref)) != 0)
00100                                         goto err;
00101                                 dbc->lref = dbp->dbenv->env_lref;
00102                         } else {
00103                                 if ((ret = __lock_id(dbenv, NULL,
00104                                     (DB_LOCKER **)&dbc->lref)) != 0)
00105                                         goto err;
00106                                 F_SET(dbc, DBC_OWN_LID);
00107                         }
00108 
00109                         /*
00110                          * In CDB, secondary indices should share a lock file
00111                          * ID with the primary;  otherwise we're susceptible
00112                          * to deadlocks.  We also use __db_cursor_int rather
00113                          * than __db_cursor to create secondary update cursors
00114                          * in c_put and c_del; these won't acquire a new lock.
00115                          *
00116                          * !!!
00117                          * Since this is in the one-time cursor allocation
00118                          * code, we need to be sure to destroy, not just
00119                          * close, all cursors in the secondary when we
00120                          * associate.
00121                          */
00122                         if (CDB_LOCKING(dbenv) &&
00123                             F_ISSET(dbp, DB_AM_SECONDARY))
00124                                 memcpy(dbc->lock.fileid,
00125                                     dbp->s_primary->fileid, DB_FILE_ID_LEN);
00126                         else
00127                                 memcpy(dbc->lock.fileid,
00128                                     dbp->fileid, DB_FILE_ID_LEN);
00129 
00130                         if (CDB_LOCKING(dbenv)) {
00131                                 if (F_ISSET(dbenv, DB_ENV_CDB_ALLDB)) {
00132                                         /*
00133                                          * If we are doing a single lock per
00134                                          * environment, set up the global
00135                                          * lock object just like we do to
00136                                          * single thread creates.
00137                                          */
00138                                         DB_ASSERT(sizeof(db_pgno_t) ==
00139                                             sizeof(u_int32_t));
00140                                         dbc->lock_dbt.size = sizeof(u_int32_t);
00141                                         dbc->lock_dbt.data = &dbc->lock.pgno;
00142                                         dbc->lock.pgno = 0;
00143                                 } else {
00144                                         dbc->lock_dbt.size = DB_FILE_ID_LEN;
00145                                         dbc->lock_dbt.data = dbc->lock.fileid;
00146                                 }
00147                         } else {
00148                                 dbc->lock.type = DB_PAGE_LOCK;
00149                                 dbc->lock_dbt.size = sizeof(dbc->lock);
00150                                 dbc->lock_dbt.data = &dbc->lock;
00151                         }
00152                 }
00153                 /* Init the DBC internal structure. */
00154                 switch (dbtype) {
00155                 case DB_BTREE:
00156                 case DB_RECNO:
00157                         if ((ret = __bam_c_init(dbc, dbtype)) != 0)
00158                                 goto err;
00159                         break;
00160                 case DB_HASH:
00161                         if ((ret = __ham_c_init(dbc)) != 0)
00162                                 goto err;
00163                         break;
00164                 case DB_QUEUE:
00165                         if ((ret = __qam_c_init(dbc)) != 0)
00166                                 goto err;
00167                         break;
00168                 case DB_UNKNOWN:
00169                 default:
00170                         ret = __db_unknown_type(dbenv, "DB->cursor", dbtype);
00171                         goto err;
00172                 }
00173 
00174                 cp = dbc->internal;
00175         }
00176 
00177         /* Refresh the DBC structure. */
00178         dbc->dbtype = dbtype;
00179         RESET_RET_MEM(dbc);
00180 
00181         if ((dbc->txn = txn) != NULL)
00182                 dbc->locker = txn->txnid;
00183         else if (LOCKING_ON(dbenv)) {
00184                 /*
00185                  * There are certain cases in which we want to create a
00186                  * new cursor with a particular locker ID that is known
00187                  * to be the same as (and thus not conflict with) an
00188                  * open cursor.
00189                  *
00190                  * The most obvious case is cursor duplication;  when we
00191                  * call DBC->c_dup or __db_c_idup, we want to use the original
00192                  * cursor's locker ID.
00193                  *
00194                  * Another case is when updating secondary indices.  Standard
00195                  * CDB locking would mean that we might block ourself:  we need
00196                  * to open an update cursor in the secondary while an update
00197                  * cursor in the primary is open, and when the secondary and
00198                  * primary are subdatabases or we're using env-wide locking,
00199                  * this is disastrous.
00200                  *
00201                  * In these cases, our caller will pass a nonzero locker
00202                  * ID into this function.  Use this locker ID instead of
00203                  * the default as the locker ID for our new cursor.
00204                  */
00205                 if (lockerid != DB_LOCK_INVALIDID)
00206                         dbc->locker = lockerid;
00207                 else {
00208                         /*
00209                          * If we are threaded then we need to set the
00210                          * proper thread id into the locker.
00211                          */
00212                         if (DB_IS_THREADED(dbp)) {
00213                                 dbenv->thread_id(dbenv, &pid, &tid);
00214                                 __lock_set_thread_id(
00215                                     (DB_LOCKER *)dbc->lref, pid, tid);
00216                         }
00217                         dbc->locker = ((DB_LOCKER *)dbc->lref)->id;
00218                 }
00219         }
00220 
00221         /*
00222          * These fields change when we are used as a secondary index, so
00223          * if the DB is a secondary, make sure they're set properly just
00224          * in case we opened some cursors before we were associated.
00225          *
00226          * __db_c_get is used by all access methods, so this should be safe.
00227          */
00228         if (F_ISSET(dbp, DB_AM_SECONDARY))
00229                 dbc->c_get = __db_c_secondary_get_pp;
00230 
00231         if (is_opd)
00232                 F_SET(dbc, DBC_OPD);
00233         if (F_ISSET(dbp, DB_AM_RECOVER))
00234                 F_SET(dbc, DBC_RECOVER);
00235         if (F_ISSET(dbp, DB_AM_COMPENSATE))
00236                 F_SET(dbc, DBC_COMPENSATE);
00237 
00238         /* Refresh the DBC internal structure. */
00239         cp = dbc->internal;
00240         cp->opd = NULL;
00241 
00242         cp->indx = 0;
00243         cp->page = NULL;
00244         cp->pgno = PGNO_INVALID;
00245         cp->root = root;
00246 
00247         switch (dbtype) {
00248         case DB_BTREE:
00249         case DB_RECNO:
00250                 if ((ret = __bam_c_refresh(dbc)) != 0)
00251                         goto err;
00252                 break;
00253         case DB_HASH:
00254         case DB_QUEUE:
00255                 break;
00256         case DB_UNKNOWN:
00257         default:
00258                 ret = __db_unknown_type(dbenv, "DB->cursor", dbp->type);
00259                 goto err;
00260         }
00261 
00262         /*
00263          * The transaction keeps track of how many cursors were opened within
00264          * it to catch application errors where the cursor isn't closed when
00265          * the transaction is resolved.
00266          */
00267         if (txn != NULL)
00268                 ++txn->cursors;
00269 
00270         MUTEX_LOCK(dbenv, dbp->mutex);
00271         TAILQ_INSERT_TAIL(&dbp->active_queue, dbc, links);
00272         F_SET(dbc, DBC_ACTIVE);
00273         MUTEX_UNLOCK(dbenv, dbp->mutex);
00274 
00275         *dbcp = dbc;
00276         return (0);
00277 
00278 err:    if (allocated)
00279                 __os_free(dbenv, dbc);
00280         return (ret);
00281 }
00282 
00283 /*
00284  * __db_put --
00285  *      Store a key/data pair.
00286  *
00287  * PUBLIC: int __db_put __P((DB *, DB_TXN *, DBT *, DBT *, u_int32_t));
00288  */
00289 int
00290 __db_put(dbp, txn, key, data, flags)
00291         DB *dbp;
00292         DB_TXN *txn;
00293         DBT *key, *data;
00294         u_int32_t flags;
00295 {
00296         DBC *dbc;
00297         DBT tdata;
00298         DB_ENV *dbenv;
00299         int ret, t_ret;
00300 
00301         dbenv = dbp->dbenv;
00302 
00303         if ((ret = __db_cursor(dbp, txn, &dbc, DB_WRITELOCK)) != 0)
00304                 return (ret);
00305 
00306         DEBUG_LWRITE(dbc, txn, "DB->put", key, data, flags);
00307 
00308         SET_RET_MEM(dbc, dbp);
00309 
00310         /*
00311          * See the comment in __db_get().
00312          *
00313          * Note that the c_get in the DB_NOOVERWRITE case is safe to
00314          * do with this flag set;  if it errors in any way other than
00315          * DB_NOTFOUND, we're going to close the cursor without doing
00316          * anything else, and if it returns DB_NOTFOUND then it's safe
00317          * to do a c_put(DB_KEYLAST) even if an access method moved the
00318          * cursor, since that's not position-dependent.
00319          */
00320         F_SET(dbc, DBC_TRANSIENT);
00321 
00322         switch (flags) {
00323         case DB_APPEND:
00324                 /*
00325                  * If there is an append callback, the value stored in
00326                  * data->data may be replaced and then freed.  To avoid
00327                  * passing a freed pointer back to the user, just operate
00328                  * on a copy of the data DBT.
00329                  */
00330                 tdata = *data;
00331 
00332                 /*
00333                  * Append isn't a normal put operation;  call the appropriate
00334                  * access method's append function.
00335                  */
00336                 switch (dbp->type) {
00337                 case DB_QUEUE:
00338                         if ((ret = __qam_append(dbc, key, &tdata)) != 0)
00339                                 goto err;
00340                         break;
00341                 case DB_RECNO:
00342                         if ((ret = __ram_append(dbc, key, &tdata)) != 0)
00343                                 goto err;
00344                         break;
00345                 case DB_BTREE:
00346                 case DB_HASH:
00347                 case DB_UNKNOWN:
00348                 default:
00349                         /* The interface should prevent this. */
00350                         DB_ASSERT(
00351                             dbp->type == DB_QUEUE || dbp->type == DB_RECNO);
00352 
00353                         ret = __db_ferr(dbenv, "DB->put", 0);
00354                         goto err;
00355                 }
00356 
00357                 /*
00358                  * Secondary indices:  since we've returned zero from
00359                  * an append function, we've just put a record, and done
00360                  * so outside __db_c_put.  We know we're not a secondary--
00361                  * the interface prevents puts on them--but we may be a
00362                  * primary.  If so, update our secondary indices
00363                  * appropriately.
00364                  */
00365                 DB_ASSERT(!F_ISSET(dbp, DB_AM_SECONDARY));
00366 
00367                 if (LIST_FIRST(&dbp->s_secondaries) != NULL)
00368                         ret = __db_append_primary(dbc, key, &tdata);
00369 
00370                 /*
00371                  * The append callback, if one exists, may have allocated
00372                  * a new tdata.data buffer.  If so, free it.
00373                  */
00374                 FREE_IF_NEEDED(dbp, &tdata);
00375 
00376                 /* No need for a cursor put;  we're done. */
00377                 goto done;
00378         case DB_NOOVERWRITE:
00379                 flags = 0;
00380                 /*
00381                  * Set DB_DBT_USERMEM, this might be a threaded application and
00382                  * the flags checking will catch us.  We don't want the actual
00383                  * data, so request a partial of length 0.
00384                  */
00385                 memset(&tdata, 0, sizeof(tdata));
00386                 F_SET(&tdata, DB_DBT_USERMEM | DB_DBT_PARTIAL);
00387 
00388                 /*
00389                  * If we're doing page-level locking, set the read-modify-write
00390                  * flag, we're going to overwrite immediately.
00391                  */
00392                 if ((ret = __db_c_get(dbc, key, &tdata,
00393                     DB_SET | (STD_LOCKING(dbc) ? DB_RMW : 0))) == 0)
00394                         ret = DB_KEYEXIST;
00395                 else if (ret == DB_NOTFOUND || ret == DB_KEYEMPTY)
00396                         ret = 0;
00397                 break;
00398         default:
00399                 /* Fall through to normal cursor put. */
00400                 break;
00401         }
00402 
00403         if (ret == 0)
00404                 ret = __db_c_put(dbc,
00405                     key, data, flags == 0 ? DB_KEYLAST : flags);
00406 
00407 err:
00408 done:   /* Close the cursor. */
00409         if ((t_ret = __db_c_close(dbc)) != 0 && ret == 0)
00410                 ret = t_ret;
00411 
00412         return (ret);
00413 }
00414 
00415 /*
00416  * __db_del --
00417  *      Delete the items referenced by a key.
00418  *
00419  * PUBLIC: int __db_del __P((DB *, DB_TXN *, DBT *, u_int32_t));
00420  */
00421 int
00422 __db_del(dbp, txn, key, flags)
00423         DB *dbp;
00424         DB_TXN *txn;
00425         DBT *key;
00426         u_int32_t flags;
00427 {
00428         DBC *dbc;
00429         DBT data, lkey;
00430         u_int32_t f_init, f_next;
00431         int ret, t_ret;
00432 
00433         /* Allocate a cursor. */
00434         if ((ret = __db_cursor(dbp, txn, &dbc, DB_WRITELOCK)) != 0)
00435                 goto err;
00436 
00437         DEBUG_LWRITE(dbc, txn, "DB->del", key, NULL, flags);
00438         COMPQUIET(flags, 0);
00439 
00440         /*
00441          * Walk a cursor through the key/data pairs, deleting as we go.  Set
00442          * the DB_DBT_USERMEM flag, as this might be a threaded application
00443          * and the flags checking will catch us.  We don't actually want the
00444          * keys or data, so request a partial of length 0.
00445          */
00446         memset(&lkey, 0, sizeof(lkey));
00447         F_SET(&lkey, DB_DBT_USERMEM | DB_DBT_PARTIAL);
00448         memset(&data, 0, sizeof(data));
00449         F_SET(&data, DB_DBT_USERMEM | DB_DBT_PARTIAL);
00450 
00451         /*
00452          * If locking (and we haven't already acquired CDB locks), set the
00453          * read-modify-write flag.
00454          */
00455         f_init = DB_SET;
00456         f_next = DB_NEXT_DUP;
00457         if (STD_LOCKING(dbc)) {
00458                 f_init |= DB_RMW;
00459                 f_next |= DB_RMW;
00460         }
00461 
00462         /*
00463          * Optimize the simple cases.  For all AMs if we don't have secondaries
00464          * and are not a secondary and there are no dups then we can avoid a
00465          * bunch of overhead.  For queue we don't need to fetch the record since
00466          * we delete by direct calculation from the record number.
00467          *
00468          * Hash permits an optimization in DB->del: since on-page duplicates are
00469          * stored in a single HKEYDATA structure, it's possible to delete an
00470          * entire set of them at once, and as the HKEYDATA has to be rebuilt
00471          * and re-put each time it changes, this is much faster than deleting
00472          * the duplicates one by one.  Thus, if not pointing at an off-page
00473          * duplicate set, and we're not using secondary indices (in which case
00474          * we'd have to examine the items one by one anyway), let hash do this
00475          * "quick delete".
00476          *
00477          * !!!
00478          * Note that this is the only application-executed delete call in
00479          * Berkeley DB that does not go through the __db_c_del function.
00480          * If anything other than the delete itself (like a secondary index
00481          * update) has to happen there in a particular situation, the
00482          * conditions here should be modified not to use these optimizations.
00483          * The ordinary AM-independent alternative will work just fine;
00484          * it'll just be slower.
00485          */
00486         if (!F_ISSET(dbp, DB_AM_SECONDARY) &&
00487             LIST_FIRST(&dbp->s_secondaries) == NULL) {
00488 
00489 #ifdef HAVE_QUEUE
00490                 if (dbp->type == DB_QUEUE) {
00491                         ret = __qam_delete(dbc, key);
00492                         goto done;
00493                 }
00494 #endif
00495 
00496                 /* Fetch the first record. */
00497                 if ((ret = __db_c_get(dbc, key, &data, f_init)) != 0)
00498                         goto err;
00499 
00500 #ifdef HAVE_HASH
00501                 if (dbp->type == DB_HASH && dbc->internal->opd == NULL) {
00502                         ret = __ham_quick_delete(dbc);
00503                         goto done;
00504                 }
00505 #endif
00506 
00507                 if ((dbp->type == DB_BTREE || dbp->type == DB_RECNO) &&
00508                     !F_ISSET(dbp, DB_AM_DUP)) {
00509                         ret = dbc->c_am_del(dbc);
00510                         goto done;
00511                 }
00512         } else if ((ret = __db_c_get(dbc, key, &data, f_init)) != 0)
00513                 goto err;
00514 
00515         /* Walk through the set of key/data pairs, deleting as we go. */
00516         for (;;) {
00517                 if ((ret = __db_c_del(dbc, 0)) != 0)
00518                         break;
00519                 if ((ret = __db_c_get(dbc, &lkey, &data, f_next)) != 0) {
00520                         if (ret == DB_NOTFOUND)
00521                                 ret = 0;
00522                         break;
00523                 }
00524         }
00525 
00526 done:
00527 err:    /* Discard the cursor. */
00528         if ((t_ret = __db_c_close(dbc)) != 0 && ret == 0)
00529                 ret = t_ret;
00530 
00531         return (ret);
00532 }
00533 
00534 /*
00535  * __db_sync --
00536  *      Flush the database cache.
00537  *
00538  * PUBLIC: int __db_sync __P((DB *));
00539  */
00540 int
00541 __db_sync(dbp)
00542         DB *dbp;
00543 {
00544         int ret, t_ret;
00545 
00546         ret = 0;
00547 
00548         /* If the database was read-only, we're done. */
00549         if (F_ISSET(dbp, DB_AM_RDONLY))
00550                 return (0);
00551 
00552         /* If it's a Recno tree, write the backing source text file. */
00553         if (dbp->type == DB_RECNO)
00554                 ret = __ram_writeback(dbp);
00555 
00556         /* If the database was never backed by a database file, we're done. */
00557         if (F_ISSET(dbp, DB_AM_INMEM))
00558                 return (ret);
00559 
00560         if (dbp->type == DB_QUEUE)
00561                 ret = __qam_sync(dbp);
00562         else
00563                 /* Flush any dirty pages from the cache to the backing file. */
00564                 if ((t_ret = __memp_fsync(dbp->mpf)) != 0 && ret == 0)
00565                         ret = t_ret;
00566 
00567         return (ret);
00568 }
00569 
00570 /*
00571  * __db_associate --
00572  *      Associate another database as a secondary index to this one.
00573  *
00574  * PUBLIC: int __db_associate __P((DB *, DB_TXN *, DB *,
00575  * PUBLIC:     int (*)(DB *, const DBT *, const DBT *, DBT *), u_int32_t));
00576  */
00577 int
00578 __db_associate(dbp, txn, sdbp, callback, flags)
00579         DB *dbp, *sdbp;
00580         DB_TXN *txn;
00581         int (*callback) __P((DB *, const DBT *, const DBT *, DBT *));
00582         u_int32_t flags;
00583 {
00584         DB_ENV *dbenv;
00585         DBC *pdbc, *sdbc;
00586         DBT skey, key, data;
00587         int build, ret, t_ret;
00588 
00589         dbenv = dbp->dbenv;
00590         pdbc = sdbc = NULL;
00591         ret = 0;
00592 
00593         /*
00594          * Check to see if the secondary is empty -- and thus if we should
00595          * build it -- before we link it in and risk making it show up in other
00596          * threads.  Do this first so that the databases remain unassociated on
00597          * error.
00598          */
00599         build = 0;
00600         if (LF_ISSET(DB_CREATE)) {
00601                 if ((ret = __db_cursor(sdbp, txn, &sdbc, 0)) != 0)
00602                         goto err;
00603 
00604                 /*
00605                  * We don't care about key or data;  we're just doing
00606                  * an existence check.
00607                  */
00608                 memset(&key, 0, sizeof(DBT));
00609                 memset(&data, 0, sizeof(DBT));
00610                 F_SET(&key, DB_DBT_PARTIAL | DB_DBT_USERMEM);
00611                 F_SET(&data, DB_DBT_PARTIAL | DB_DBT_USERMEM);
00612                 if ((ret = __db_c_get(sdbc, &key, &data,
00613                     (STD_LOCKING(sdbc) ? DB_RMW : 0) |
00614                     DB_FIRST)) == DB_NOTFOUND) {
00615                         build = 1;
00616                         ret = 0;
00617                 }
00618 
00619                 if ((t_ret = __db_c_close(sdbc)) != 0 && ret == 0)
00620                         ret = t_ret;
00621 
00622                 /* Reset for later error check. */
00623                 sdbc = NULL;
00624 
00625                 if (ret != 0)
00626                         goto err;
00627         }
00628 
00629         /*
00630          * Set up the database handle as a secondary.
00631          */
00632         sdbp->s_callback = callback;
00633         sdbp->s_primary = dbp;
00634 
00635         sdbp->stored_get = sdbp->get;
00636         sdbp->get = __db_secondary_get;
00637 
00638         sdbp->stored_close = sdbp->close;
00639         sdbp->close = __db_secondary_close_pp;
00640 
00641         F_SET(sdbp, DB_AM_SECONDARY);
00642 
00643         if (LF_ISSET(DB_IMMUTABLE_KEY))
00644                 FLD_SET(sdbp->s_assoc_flags, DB_ASSOC_IMMUTABLE_KEY);
00645 
00646         /*
00647          * Add the secondary to the list on the primary.  Do it here
00648          * so that we see any updates that occur while we're walking
00649          * the primary.
00650          */
00651         MUTEX_LOCK(dbenv, dbp->mutex);
00652 
00653         /* See __db_s_next for an explanation of secondary refcounting. */
00654         DB_ASSERT(sdbp->s_refcnt == 0);
00655         sdbp->s_refcnt = 1;
00656         LIST_INSERT_HEAD(&dbp->s_secondaries, sdbp, s_links);
00657         MUTEX_UNLOCK(dbenv, dbp->mutex);
00658 
00659         if (build) {
00660                 /*
00661                  * We loop through the primary, putting each item we
00662                  * find into the new secondary.
00663                  *
00664                  * If we're using CDB, opening these two cursors puts us
00665                  * in a bit of a locking tangle:  CDB locks are done on the
00666                  * primary, so that we stay deadlock-free, but that means
00667                  * that updating the secondary while we have a read cursor
00668                  * open on the primary will self-block.  To get around this,
00669                  * we force the primary cursor to use the same locker ID
00670                  * as the secondary, so they won't conflict.  This should
00671                  * be harmless even if we're not using CDB.
00672                  */
00673                 if ((ret = __db_cursor(sdbp, txn, &sdbc,
00674                     CDB_LOCKING(sdbp->dbenv) ? DB_WRITECURSOR : 0)) != 0)
00675                         goto err;
00676                 if ((ret = __db_cursor_int(dbp,
00677                     txn, dbp->type, PGNO_INVALID, 0, sdbc->locker, &pdbc)) != 0)
00678                         goto err;
00679 
00680                 /* Lock out other threads, now that we have a locker ID. */
00681                 dbp->associate_lid = sdbc->locker;
00682 
00683                 memset(&key, 0, sizeof(DBT));
00684                 memset(&data, 0, sizeof(DBT));
00685                 while ((ret = __db_c_get(pdbc, &key, &data, DB_NEXT)) == 0) {
00686                         memset(&skey, 0, sizeof(DBT));
00687                         if ((ret = callback(sdbp, &key, &data, &skey)) != 0) {
00688                                 if (ret == DB_DONOTINDEX)
00689                                         continue;
00690                                 goto err;
00691                         }
00692                         SWAP_IF_NEEDED(dbp, sdbp, &key);
00693                         if ((ret = __db_c_put(sdbc,
00694                             &skey, &key, DB_UPDATE_SECONDARY)) != 0) {
00695                                 FREE_IF_NEEDED(sdbp, &skey);
00696                                 goto err;
00697                         }
00698                         SWAP_IF_NEEDED(dbp, sdbp, &key);
00699 
00700                         FREE_IF_NEEDED(sdbp, &skey);
00701                 }
00702                 if (ret == DB_NOTFOUND)
00703                         ret = 0;
00704         }
00705 
00706 err:    if (sdbc != NULL && (t_ret = __db_c_close(sdbc)) != 0 && ret == 0)
00707                 ret = t_ret;
00708 
00709         if (pdbc != NULL && (t_ret = __db_c_close(pdbc)) != 0 && ret == 0)
00710                 ret = t_ret;
00711 
00712         dbp->associate_lid = DB_LOCK_INVALIDID;
00713 
00714         return (ret);
00715 }
00716 
00717 /*
00718  * __db_secondary_get --
00719  *      This wrapper function for DB->pget() is the DB->get() function
00720  *      on a database which has been made into a secondary index.
00721  */
00722 static int
00723 __db_secondary_get(sdbp, txn, skey, data, flags)
00724         DB *sdbp;
00725         DB_TXN *txn;
00726         DBT *skey, *data;
00727         u_int32_t flags;
00728 {
00729 
00730         DB_ASSERT(F_ISSET(sdbp, DB_AM_SECONDARY));
00731         return (__db_pget_pp(sdbp, txn, skey, NULL, data, flags));
00732 }
00733 
00734 /*
00735  * __db_secondary_close --
00736  *      Wrapper function for DB->close() which we use on secondaries to
00737  *      manage refcounting and make sure we don't close them underneath
00738  *      a primary that is updating.
00739  *
00740  * PUBLIC: int __db_secondary_close __P((DB *, u_int32_t));
00741  */
00742 int
00743 __db_secondary_close(sdbp, flags)
00744         DB *sdbp;
00745         u_int32_t flags;
00746 {
00747         DB *primary;
00748         int doclose;
00749 
00750         doclose = 0;
00751         primary = sdbp->s_primary;
00752 
00753         MUTEX_LOCK(primary->dbenv, primary->mutex);
00754         /*
00755          * Check the refcount--if it was at 1 when we were called, no
00756          * thread is currently updating this secondary through the primary,
00757          * so it's safe to close it for real.
00758          *
00759          * If it's not safe to do the close now, we do nothing;  the
00760          * database will actually be closed when the refcount is decremented,
00761          * which can happen in either __db_s_next or __db_s_done.
00762          */
00763         DB_ASSERT(sdbp->s_refcnt != 0);
00764         if (--sdbp->s_refcnt == 0) {
00765                 LIST_REMOVE(sdbp, s_links);
00766                 /* We don't want to call close while the mutex is held. */
00767                 doclose = 1;
00768         }
00769         MUTEX_UNLOCK(primary->dbenv, primary->mutex);
00770 
00771         /*
00772          * sdbp->close is this function;  call the real one explicitly if
00773          * need be.
00774          */
00775         return (doclose ? __db_close(sdbp, NULL, flags) : 0);
00776 }
00777 
00778 /*
00779  * __db_append_primary --
00780  *      Perform the secondary index updates necessary to put(DB_APPEND)
00781  *      a record to a primary database.
00782  */
00783 static int
00784 __db_append_primary(dbc, key, data)
00785         DBC *dbc;
00786         DBT *key, *data;
00787 {
00788         DB *dbp, *sdbp;
00789         DBC *sdbc, *pdbc;
00790         DBT oldpkey, pkey, pdata, skey;
00791         int cmp, ret, t_ret;
00792 
00793         dbp = dbc->dbp;
00794         sdbp = NULL;
00795         ret = 0;
00796 
00797         /*
00798          * Worrying about partial appends seems a little like worrying
00799          * about Linear A character encodings.  But we support those
00800          * too if your application understands them.
00801          */
00802         pdbc = NULL;
00803         if (F_ISSET(data, DB_DBT_PARTIAL) || F_ISSET(key, DB_DBT_PARTIAL)) {
00804                 /*
00805                  * The dbc we were passed is all set to pass things
00806                  * back to the user;  we can't safely do a call on it.
00807                  * Dup the cursor, grab the real data item (we don't
00808                  * care what the key is--we've been passed it directly),
00809                  * and use that instead of the data DBT we were passed.
00810                  *
00811                  * Note that we can get away with this simple get because
00812                  * an appended item is by definition new, and the
00813                  * correctly-constructed full data item from this partial
00814                  * put is on the page waiting for us.
00815                  */
00816                 if ((ret = __db_c_idup(dbc, &pdbc, DB_POSITION)) != 0)
00817                         return (ret);
00818                 memset(&pkey, 0, sizeof(DBT));
00819                 memset(&pdata, 0, sizeof(DBT));
00820 
00821                 if ((ret = __db_c_get(pdbc, &pkey, &pdata, DB_CURRENT)) != 0)
00822                         goto err;
00823 
00824                 key = &pkey;
00825                 data = &pdata;
00826         }
00827 
00828         /*
00829          * Loop through the secondary indices, putting a new item in
00830          * each that points to the appended item.
00831          *
00832          * This is much like the loop in "step 3" in __db_c_put, so
00833          * I'm not commenting heavily here;  it was unclean to excerpt
00834          * just that section into a common function, but the basic
00835          * overview is the same here.
00836          */
00837         if ((ret = __db_s_first(dbp, &sdbp)) != 0)
00838                 goto err;
00839         for (; sdbp != NULL && ret == 0; ret = __db_s_next(&sdbp)) {
00840                 memset(&skey, 0, sizeof(DBT));
00841                 if ((ret = sdbp->s_callback(sdbp, key, data, &skey)) != 0) {
00842                         if (ret == DB_DONOTINDEX)
00843                                 continue;
00844                         goto err;
00845                 }
00846 
00847                 if ((ret = __db_cursor_int(sdbp, dbc->txn, sdbp->type,
00848                     PGNO_INVALID, 0, dbc->locker, &sdbc)) != 0) {
00849                         FREE_IF_NEEDED(sdbp, &skey);
00850                         goto err;
00851                 }
00852                 if (CDB_LOCKING(sdbp->dbenv)) {
00853                         DB_ASSERT(sdbc->mylock.off == LOCK_INVALID);
00854                         F_SET(sdbc, DBC_WRITER);
00855                 }
00856 
00857                 /*
00858                  * Since we know we have a new primary key, it can't be a
00859                  * duplicate duplicate in the secondary.  It can be a
00860                  * duplicate in a secondary that doesn't support duplicates,
00861                  * however, so we need to be careful to avoid an overwrite
00862                  * (which would corrupt our index).
00863                  */
00864                 if (!F_ISSET(sdbp, DB_AM_DUP)) {
00865                         memset(&oldpkey, 0, sizeof(DBT));
00866                         F_SET(&oldpkey, DB_DBT_MALLOC);
00867                         ret = __db_c_get(sdbc, &skey, &oldpkey,
00868                             DB_SET | (STD_LOCKING(dbc) ? DB_RMW : 0));
00869                         if (ret == 0) {
00870                                 cmp = __bam_defcmp(sdbp, &oldpkey, key);
00871                                 /*
00872                                  * XXX
00873                                  * This needs to use the right free function
00874                                  * as soon as this is possible.
00875                                  */
00876                                 __os_ufree(sdbp->dbenv,
00877                                     oldpkey.data);
00878                                 if (cmp != 0) {
00879                                         __db_err(sdbp->dbenv, "%s%s",
00880                             "Append results in a non-unique secondary key in",
00881                             " an index not configured to support duplicates");
00882                                         ret = EINVAL;
00883                                         goto err1;
00884                                 }
00885                         } else if (ret != DB_NOTFOUND && ret != DB_KEYEMPTY)
00886                                 goto err1;
00887                 }
00888 
00889                 ret = __db_c_put(sdbc, &skey, key, DB_UPDATE_SECONDARY);
00890 
00891 err1:           FREE_IF_NEEDED(sdbp, &skey);
00892 
00893                 if ((t_ret = __db_c_close(sdbc)) != 0 && ret == 0)
00894                         ret = t_ret;
00895                 if (ret != 0)
00896                         goto err;
00897         }
00898 
00899 err:    if (pdbc != NULL && (t_ret = __db_c_close(pdbc)) != 0 && ret == 0)
00900                 ret = t_ret;
00901         if (sdbp != NULL && (t_ret = __db_s_done(sdbp)) != 0 && ret == 0)
00902                 ret = t_ret;
00903         return (ret);
00904 }

Generated on Sun Dec 25 12:14:18 2005 for Berkeley DB 4.4.16 by  doxygen 1.4.2