Main Page | Class Hierarchy | Data Structures | Directories | File List | Data Fields | Related Pages

qam.c

00001 /*-
00002  * See the file LICENSE for redistribution information.
00003  *
00004  * Copyright (c) 1999-2005
00005  *      Sleepycat Software.  All rights reserved.
00006  *
00007  * $Id: qam.c,v 12.12 2005/10/05 17:16:46 bostic Exp $
00008  */
00009 
00010 #include "db_config.h"
00011 
00012 #ifndef NO_SYSTEM_INCLUDES
00013 #include <sys/types.h>
00014 
00015 #include <string.h>
00016 #endif
00017 
00018 #include "db_int.h"
00019 #include "dbinc/db_page.h"
00020 #include "dbinc/db_shash.h"
00021 #include "dbinc/btree.h"
00022 #include "dbinc/lock.h"
00023 #include "dbinc/log.h"
00024 #include "dbinc/mp.h"
00025 #include "dbinc/qam.h"
00026 
00027 static int __qam_bulk __P((DBC *, DBT *, u_int32_t));
00028 static int __qam_c_close __P((DBC *, db_pgno_t, int *));
00029 static int __qam_c_del __P((DBC *));
00030 static int __qam_c_destroy __P((DBC *));
00031 static int __qam_c_get __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
00032 static int __qam_c_put __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
00033 static int __qam_consume __P((DBC *, QMETA *, db_recno_t));
00034 static int __qam_getno __P((DB *, const DBT *, db_recno_t *));
00035 
00036 /*
00037  * __qam_position --
00038  *      Position a queued access method cursor at a record.  This returns
00039  *      the page locked.  *exactp will be set if the record is valid.
00040  * PUBLIC: int __qam_position
00041  * PUBLIC:       __P((DBC *, db_recno_t *, qam_position_mode, int *));
00042  */
00043 int
00044 __qam_position(dbc, recnop, mode, exactp)
00045         DBC *dbc;               /* open cursor */
00046         db_recno_t *recnop;     /* pointer to recno to find */
00047         qam_position_mode mode;/* locking: read or write */
00048         int *exactp;            /* indicate if it was found */
00049 {
00050         QUEUE_CURSOR *cp;
00051         DB *dbp;
00052         QAMDATA  *qp;
00053         db_pgno_t pg;
00054         int ret, t_ret;
00055 
00056         dbp = dbc->dbp;
00057         cp = (QUEUE_CURSOR *)dbc->internal;
00058 
00059         /* Fetch the page for this recno. */
00060         pg = QAM_RECNO_PAGE(dbp, *recnop);
00061 
00062         if ((ret = __db_lget(dbc, 0, pg, mode == QAM_READ ?
00063             DB_LOCK_READ : DB_LOCK_WRITE, 0, &cp->lock)) != 0)
00064                 return (ret);
00065         cp->page = NULL;
00066         *exactp = 0;
00067         if ((ret = __qam_fget(dbp, &pg,
00068             mode == QAM_WRITE ? DB_MPOOL_CREATE : 0, &cp->page)) != 0) {
00069                 if (mode != QAM_WRITE &&
00070                     (ret == DB_PAGE_NOTFOUND || ret == ENOENT))
00071                         ret = 0;
00072 
00073                 /* We did not fetch it, we can release the lock. */
00074                 if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0)
00075                         ret = t_ret;
00076                 return (ret);
00077         }
00078         cp->pgno = pg;
00079         cp->indx = QAM_RECNO_INDEX(dbp, pg, *recnop);
00080 
00081         if (PGNO(cp->page) == 0) {
00082                 if (F_ISSET(dbp, DB_AM_RDONLY)) {
00083                         *exactp = 0;
00084                         return (0);
00085                 }
00086                 PGNO(cp->page) = pg;
00087                 TYPE(cp->page) = P_QAMDATA;
00088         }
00089 
00090         qp = QAM_GET_RECORD(dbp, cp->page, cp->indx);
00091         *exactp = F_ISSET(qp, QAM_VALID) ? 1 : 0;
00092 
00093         return (ret);
00094 }
00095 
00096 /*
00097  * __qam_pitem --
00098  *      Put an item on a queue page.  Copy the data to the page and set the
00099  *      VALID and SET bits.  If logging and the record was previously set,
00100  *      log that data, otherwise just log the new data.
00101  *
00102  *   pagep must be write locked
00103  *
00104  * PUBLIC: int __qam_pitem
00105  * PUBLIC:     __P((DBC *,  QPAGE *, u_int32_t, db_recno_t, DBT *));
00106  */
00107 int
00108 __qam_pitem(dbc, pagep, indx, recno, data)
00109         DBC *dbc;
00110         QPAGE *pagep;
00111         u_int32_t indx;
00112         db_recno_t recno;
00113         DBT *data;
00114 {
00115         DB_ENV *dbenv;
00116         DB *dbp;
00117         DBT olddata, pdata, *datap;
00118         QAMDATA *qp;
00119         QUEUE *t;
00120         u_int8_t *dest, *p;
00121         int allocated, ret;
00122 
00123         dbp = dbc->dbp;
00124         dbenv = dbp->dbenv;
00125         t = (QUEUE *)dbp->q_internal;
00126         allocated = ret = 0;
00127 
00128         if (data->size > t->re_len)
00129                 return (__db_rec_toobig(dbenv, data->size, t->re_len));
00130         qp = QAM_GET_RECORD(dbp, pagep, indx);
00131 
00132         p = qp->data;
00133         datap = data;
00134         if (F_ISSET(data, DB_DBT_PARTIAL)) {
00135                 if (data->doff + data->dlen > t->re_len) {
00136                         __db_err(dbenv,
00137                 "%s: data offset plus length larger than record size of %lu",
00138                             "Record length error", (u_long)t->re_len);
00139                         return (EINVAL);
00140                 }
00141 
00142                 if (data->size != data->dlen)
00143                         return (__db_rec_repl(dbenv, data->size, data->dlen));
00144 
00145                 if (data->size == t->re_len)
00146                         goto no_partial;
00147 
00148                 /*
00149                  * If we are logging, then we have to build the record
00150                  * first, otherwise, we can simply drop the change
00151                  * directly on the page.  After this clause, make
00152                  * sure that datap and p are set up correctly so that
00153                  * copying datap into p does the right thing.
00154                  *
00155                  * Note, I am changing this so that if the existing
00156                  * record is not valid, we create a complete record
00157                  * to log so that both this and the recovery code is simpler.
00158                  */
00159 
00160                 if (DBC_LOGGING(dbc) || !F_ISSET(qp, QAM_VALID)) {
00161                         datap = &pdata;
00162                         memset(datap, 0, sizeof(*datap));
00163 
00164                         if ((ret = __os_malloc(dbenv,
00165                             t->re_len, &datap->data)) != 0)
00166                                 return (ret);
00167                         allocated = 1;
00168                         datap->size = t->re_len;
00169 
00170                         /*
00171                          * Construct the record if it's valid, otherwise set it
00172                          * all to the pad character.
00173                          */
00174                         dest = datap->data;
00175                         if (F_ISSET(qp, QAM_VALID))
00176                                 memcpy(dest, p, t->re_len);
00177                         else
00178                                 memset(dest, (int)t->re_pad, t->re_len);
00179 
00180                         dest += data->doff;
00181                         memcpy(dest, data->data, data->size);
00182                 } else {
00183                         datap = data;
00184                         p += data->doff;
00185                 }
00186         }
00187 
00188 no_partial:
00189         if (DBC_LOGGING(dbc)) {
00190                 olddata.size = 0;
00191                 if (F_ISSET(qp, QAM_SET)) {
00192                         olddata.data = qp->data;
00193                         olddata.size = t->re_len;
00194                 }
00195                 if ((ret = __qam_add_log(dbp, dbc->txn, &LSN(pagep),
00196                     0, &LSN(pagep), pagep->pgno,
00197                     indx, recno, datap, qp->flags,
00198                     olddata.size == 0 ? NULL : &olddata)) != 0)
00199                         goto err;
00200         }
00201 
00202         F_SET(qp, QAM_VALID | QAM_SET);
00203         memcpy(p, datap->data, datap->size);
00204         if (!F_ISSET(data, DB_DBT_PARTIAL))
00205                 memset(p + datap->size,
00206                      (int)t->re_pad, t->re_len - datap->size);
00207 
00208 err:    if (allocated)
00209                 __os_free(dbenv, datap->data);
00210 
00211         return (ret);
00212 }
00213 /*
00214  * __qam_c_put
00215  *      Cursor put for queued access method.
00216  *      BEFORE and AFTER cannot be specified.
00217  */
00218 static int
00219 __qam_c_put(dbc, key, data, flags, pgnop)
00220         DBC *dbc;
00221         DBT *key, *data;
00222         u_int32_t flags;
00223         db_pgno_t *pgnop;
00224 {
00225         DB *dbp;
00226         DB_LOCK lock;
00227         DB_MPOOLFILE *mpf;
00228         QMETA *meta;
00229         QUEUE_CURSOR *cp;
00230         db_pgno_t pg;
00231         db_recno_t new_cur, new_first;
00232         u_int32_t opcode;
00233         int exact, ret, t_ret;
00234 
00235         dbp = dbc->dbp;
00236         mpf = dbp->mpf;
00237         if (pgnop != NULL)
00238                 *pgnop = PGNO_INVALID;
00239 
00240         cp = (QUEUE_CURSOR *)dbc->internal;
00241 
00242         switch (flags) {
00243         case DB_KEYFIRST:
00244         case DB_KEYLAST:
00245                 if ((ret = __qam_getno(dbp, key, &cp->recno)) != 0)
00246                         return (ret);
00247                 /* FALLTHROUGH */
00248         case DB_CURRENT:
00249                 break;
00250         default:
00251                 /* The interface shouldn't let anything else through. */
00252                 DB_ASSERT(0);
00253                 return (__db_ferr(dbp->dbenv, "DBC->put", 0));
00254         }
00255 
00256         /* Write lock the record. */
00257         if ((ret = __db_lget(dbc, LCK_COUPLE,
00258              cp->recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &cp->lock)) != 0)
00259                 return (ret);
00260 
00261         lock = cp->lock;
00262 
00263         if ((ret = __qam_position(dbc, &cp->recno, QAM_WRITE, &exact)) != 0) {
00264                 /* We could not get the page, we can release the record lock. */
00265                 (void)__LPUT(dbc, lock);
00266                 return (ret);
00267         }
00268 
00269         /* Put the item on the page. */
00270         ret = __qam_pitem(dbc, (QPAGE *)cp->page, cp->indx, cp->recno, data);
00271 
00272         /* Doing record locking, release the page lock */
00273         if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0)
00274                 ret = t_ret;
00275         if ((t_ret = __qam_fput(
00276             dbp, cp->pgno, cp->page, DB_MPOOL_DIRTY)) != 0 && ret == 0)
00277                 ret = t_ret;
00278         cp->page = NULL;
00279         cp->lock = lock;
00280         cp->lock_mode = DB_LOCK_WRITE;
00281         if (ret != 0)
00282                 return (ret);
00283 
00284         /* We may need to reset the head or tail of the queue. */
00285         pg = ((QUEUE *)dbp->q_internal)->q_meta;
00286 
00287         /*
00288          * Get the meta page first, we don't want to write lock it while
00289          * trying to pin it.
00290          */
00291         if ((ret = __memp_fget(mpf, &pg, 0, &meta)) != 0)
00292                 return (ret);
00293         if ((ret = __db_lget(dbc, LCK_COUPLE,
00294              pg,  DB_LOCK_WRITE, 0, &cp->lock)) != 0) {
00295                 (void)__memp_fput(mpf, meta, 0);
00296                 return (ret);
00297         }
00298 
00299         opcode = 0;
00300         new_cur = new_first = 0;
00301 
00302         /*
00303          * If the put address is outside the queue, adjust the head and
00304          * tail of the queue.  If the order is inverted we move
00305          * the one which is closer.  The first case is when the
00306          * queue is empty, move first and current to where the new
00307          * insert is.
00308          */
00309 
00310         if (meta->first_recno == meta->cur_recno) {
00311                 new_first = cp->recno;
00312                 new_cur = cp->recno + 1;
00313                 if (new_cur == RECNO_OOB)
00314                         new_cur++;
00315                 opcode |= QAM_SETFIRST;
00316                 opcode |= QAM_SETCUR;
00317         } else {
00318                 if (QAM_BEFORE_FIRST(meta, cp->recno) &&
00319                     (meta->first_recno <= meta->cur_recno ||
00320                     meta->first_recno - cp->recno <
00321                     cp->recno - meta->cur_recno)) {
00322                         new_first = cp->recno;
00323                         opcode |= QAM_SETFIRST;
00324                 }
00325 
00326                 if (meta->cur_recno == cp->recno ||
00327                     (QAM_AFTER_CURRENT(meta, cp->recno) &&
00328                     (meta->first_recno <= meta->cur_recno ||
00329                     cp->recno - meta->cur_recno <=
00330                     meta->first_recno - cp->recno))) {
00331                         new_cur = cp->recno + 1;
00332                         if (new_cur == RECNO_OOB)
00333                                 new_cur++;
00334                         opcode |= QAM_SETCUR;
00335                 }
00336         }
00337 
00338         if (opcode != 0 && DBC_LOGGING(dbc)) {
00339                 ret = __qam_mvptr_log(dbp, dbc->txn, &meta->dbmeta.lsn,
00340                     0, opcode, meta->first_recno, new_first,
00341                     meta->cur_recno, new_cur, &meta->dbmeta.lsn, PGNO_BASE_MD);
00342                 if (ret != 0)
00343                         opcode = 0;
00344         }
00345 
00346         if (opcode & QAM_SETCUR)
00347                 meta->cur_recno = new_cur;
00348         if (opcode & QAM_SETFIRST)
00349                 meta->first_recno = new_first;
00350 
00351         if ((t_ret = __memp_fput(
00352             mpf, meta, opcode != 0 ? DB_MPOOL_DIRTY : 0)) != 0 && ret == 0)
00353                 ret = t_ret;
00354 
00355         /* Don't hold the meta page long term. */
00356         if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0)
00357                 ret = t_ret;
00358         return (ret);
00359 }
00360 
00361 /*
00362  * __qam_append --
00363  *      Perform a put(DB_APPEND) in queue.
00364  *
00365  * PUBLIC: int __qam_append __P((DBC *, DBT *, DBT *));
00366  */
00367 int
00368 __qam_append(dbc, key, data)
00369         DBC *dbc;
00370         DBT *key, *data;
00371 {
00372         DB *dbp;
00373         DB_LOCK lock;
00374         DB_MPOOLFILE *mpf;
00375         QMETA *meta;
00376         QPAGE *page;
00377         QUEUE *qp;
00378         QUEUE_CURSOR *cp;
00379         db_pgno_t pg;
00380         db_recno_t recno;
00381         int ret, t_ret;
00382 
00383         dbp = dbc->dbp;
00384         mpf = dbp->mpf;
00385         cp = (QUEUE_CURSOR *)dbc->internal;
00386 
00387         pg = ((QUEUE *)dbp->q_internal)->q_meta;
00388         /*
00389          * Get the meta page first, we don't want to write lock it while
00390          * trying to pin it.
00391          */
00392         if ((ret = __memp_fget(mpf, &pg, 0, &meta)) != 0)
00393                 return (ret);
00394         /* Write lock the meta page. */
00395         if ((ret = __db_lget(dbc, 0, pg,  DB_LOCK_WRITE, 0, &lock)) != 0) {
00396                 (void)__memp_fput(mpf, meta, 0);
00397                 return (ret);
00398         }
00399 
00400         /* Get the next record number. */
00401         recno = meta->cur_recno;
00402         meta->cur_recno++;
00403         if (meta->cur_recno == RECNO_OOB)
00404                 meta->cur_recno++;
00405         if (meta->cur_recno == meta->first_recno) {
00406                 meta->cur_recno--;
00407                 if (meta->cur_recno == RECNO_OOB)
00408                         meta->cur_recno--;
00409                 ret = __LPUT(dbc, lock);
00410 
00411                 if (ret == 0)
00412                         ret = EFBIG;
00413                 goto err;
00414         }
00415 
00416         if (QAM_BEFORE_FIRST(meta, recno))
00417                 meta->first_recno = recno;
00418 
00419         /* Lock the record and release meta page lock. */
00420         ret = __db_lget(dbc, LCK_COUPLE_ALWAYS,
00421             recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &lock);
00422 
00423         /*
00424          * The application may modify the data based on the selected record
00425          * number.  We always want to call this even if we ultimately end
00426          * up aborting, because we are allocating a record number, regardless.
00427          */
00428         if (dbc->dbp->db_append_recno != NULL &&
00429             (t_ret = dbc->dbp->db_append_recno(dbc->dbp, data, recno)) != 0 &&
00430             ret == 0)
00431                 ret = t_ret;
00432 
00433         /*
00434          * Capture errors from either the lock couple or the call to
00435          * dbp->db_append_recno.
00436          */
00437         if (ret != 0) {
00438                 (void)__LPUT(dbc, lock);
00439                 goto err;
00440         }
00441 
00442         cp->lock = lock;
00443         cp->lock_mode = DB_LOCK_WRITE;
00444 
00445         pg = QAM_RECNO_PAGE(dbp, recno);
00446 
00447         /* Fetch and write lock the data page. */
00448         if ((ret = __db_lget(dbc, 0, pg,  DB_LOCK_WRITE, 0, &lock)) != 0)
00449                 goto err;
00450         if ((ret = __qam_fget(dbp, &pg, DB_MPOOL_CREATE, &page)) != 0) {
00451                 /* We did not fetch it, we can release the lock. */
00452                 (void)__LPUT(dbc, lock);
00453                 goto err;
00454         }
00455 
00456         /* See if this is a new page. */
00457         if (page->pgno == 0) {
00458                 page->pgno = pg;
00459                 page->type = P_QAMDATA;
00460         }
00461 
00462         /* Put the item on the page and log it. */
00463         ret = __qam_pitem(dbc, page,
00464             QAM_RECNO_INDEX(dbp, pg, recno), recno, data);
00465 
00466         /* Doing record locking, release the page lock */
00467         if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0)
00468                 ret = t_ret;
00469 
00470         if ((t_ret =
00471             __qam_fput(dbp, pg, page, DB_MPOOL_DIRTY)) != 0 && ret == 0)
00472                 ret = t_ret;
00473 
00474         /* Return the record number to the user. */
00475         if (ret == 0)
00476                 ret = __db_retcopy(dbp->dbenv, key,
00477                     &recno, sizeof(recno), &dbc->rkey->data, &dbc->rkey->ulen);
00478 
00479         /* Position the cursor on this record. */
00480         cp->recno = recno;
00481 
00482         /* See if we are leaving the extent. */
00483         qp = (QUEUE *) dbp->q_internal;
00484         if (qp->page_ext != 0 &&
00485             (recno % (qp->page_ext * qp->rec_page) == 0 ||
00486             recno == UINT32_MAX)) {
00487                 if ((ret = __db_lget(dbc,
00488                     0, ((QUEUE *)dbp->q_internal)->q_meta,
00489                     DB_LOCK_WRITE, 0, &lock)) != 0)
00490                         goto err;
00491                 if (!QAM_AFTER_CURRENT(meta, recno))
00492                         ret = __qam_fclose(dbp, pg);
00493                 if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0)
00494                         ret = t_ret;
00495         }
00496 
00497 err:    /* Release the meta page. */
00498         if ((t_ret = __memp_fput(mpf, meta, DB_MPOOL_DIRTY)) != 0 && ret == 0)
00499                 ret = t_ret;
00500 
00501         return (ret);
00502 }
00503 
00504 /*
00505  * __qam_c_del --
00506  *      Qam cursor->am_del function
00507  */
00508 static int
00509 __qam_c_del(dbc)
00510         DBC *dbc;
00511 {
00512         DB *dbp;
00513         DBT data;
00514         DB_LOCK lock, metalock;
00515         DB_MPOOLFILE *mpf;
00516         PAGE *pagep;
00517         QAMDATA *qp;
00518         QMETA *meta;
00519         QUEUE_CURSOR *cp;
00520         db_pgno_t pg;
00521         int exact, ret, t_ret;
00522 
00523         dbp = dbc->dbp;
00524         mpf = dbp->mpf;
00525         cp = (QUEUE_CURSOR *)dbc->internal;
00526         LOCK_INIT(lock);
00527 
00528         pg = ((QUEUE *)dbp->q_internal)->q_meta;
00529         /*
00530          * Get the meta page first, we don't want to write lock it while
00531          * trying to pin it.
00532          */
00533         if ((ret = __memp_fget(mpf, &pg, 0, &meta)) != 0)
00534                 return (ret);
00535         /* Write lock the meta page. */
00536         if ((ret = __db_lget(dbc, 0, pg,  DB_LOCK_READ, 0, &metalock)) != 0) {
00537                 (void)__memp_fput(mpf, meta, 0);
00538                 return (ret);
00539         }
00540 
00541         if (QAM_NOT_VALID(meta, cp->recno))
00542                 ret = DB_NOTFOUND;
00543 
00544         /* Don't hold the meta page long term. */
00545         if ((t_ret = __LPUT(dbc, metalock)) != 0 && ret == 0)
00546                 ret = t_ret;
00547 
00548         if (ret != 0)
00549                 goto err;
00550 
00551         if ((ret = __db_lget(dbc, LCK_COUPLE,
00552             cp->recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &cp->lock)) != 0)
00553                 goto err;
00554         cp->lock_mode = DB_LOCK_WRITE;
00555         lock = cp->lock;
00556 
00557         /* Find the record ; delete only deletes exact matches. */
00558         if ((ret = __qam_position(dbc, &cp->recno, QAM_WRITE, &exact)) != 0)
00559                 goto err;
00560 
00561         if (!exact) {
00562                 ret = DB_NOTFOUND;
00563                 goto err;
00564         }
00565 
00566         pagep = cp->page;
00567         qp = QAM_GET_RECORD(dbp, pagep, cp->indx);
00568 
00569         if (DBC_LOGGING(dbc)) {
00570                 if (((QUEUE *)dbp->q_internal)->page_ext == 0 ||
00571                     ((QUEUE *)dbp->q_internal)->re_len == 0) {
00572                         if ((ret = __qam_del_log(dbp,
00573                             dbc->txn, &LSN(pagep), 0, &LSN(pagep),
00574                             pagep->pgno, cp->indx, cp->recno)) != 0)
00575                                 goto err;
00576                 } else {
00577                         data.size = ((QUEUE *)dbp->q_internal)->re_len;
00578                         data.data = qp->data;
00579                         if ((ret = __qam_delext_log(dbp,
00580                             dbc->txn, &LSN(pagep), 0, &LSN(pagep),
00581                             pagep->pgno, cp->indx, cp->recno, &data)) != 0)
00582                                 goto err;
00583                 }
00584         }
00585 
00586         F_CLR(qp, QAM_VALID);
00587 
00588         /*
00589          * Peek at the first_recno before locking the meta page.
00590          * Other threads cannot move first_recno past
00591          * our position while we have the record locked.
00592          * If it's pointing at the deleted record then lock
00593          * the metapage and check again as lower numbered
00594          * record may have been inserted.
00595          */
00596         if (cp->recno == meta->first_recno) {
00597                 pg = ((QUEUE *)dbp->q_internal)->q_meta;
00598                 if ((ret =
00599                     __db_lget(dbc, 0, pg,  DB_LOCK_WRITE, 0, &metalock)) != 0)
00600                         goto err;
00601                 if (cp->recno == meta->first_recno)
00602                         ret = __qam_consume(dbc, meta, meta->first_recno);
00603                 if ((t_ret = __LPUT(dbc, metalock)) != 0 && ret == 0)
00604                         ret = t_ret;
00605         }
00606 
00607 err:    if ((t_ret = __memp_fput(mpf, meta, 0)) != 0 && ret == 0)
00608                 ret = t_ret;
00609         if (cp->page != NULL && (t_ret = __qam_fput(dbp, cp->pgno,
00610             cp->page, ret == 0 ? DB_MPOOL_DIRTY : 0)) != 0 && ret == 0)
00611                 ret = t_ret;
00612         cp->page = NULL;
00613 
00614         /* Doing record locking, release the page lock */
00615         if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0)
00616                 ret = t_ret;
00617         cp->lock = lock;
00618 
00619         return (ret);
00620 }
00621 
00622 #ifdef  DEBUG_WOP
00623 #define QDEBUG
00624 #endif
00625 
00626 /*
00627  * __qam_c_get --
00628  *      Queue cursor->c_get function.
00629  */
00630 static int
00631 __qam_c_get(dbc, key, data, flags, pgnop)
00632         DBC *dbc;
00633         DBT *key, *data;
00634         u_int32_t flags;
00635         db_pgno_t *pgnop;
00636 {
00637         DB *dbp;
00638         DBC *dbcdup;
00639         DBT tmp;
00640         DB_ENV *dbenv;
00641         DB_LOCK lock, pglock, metalock;
00642         DB_MPOOLFILE *mpf;
00643         PAGE *pg;
00644         QAMDATA *qp;
00645         QMETA *meta;
00646         QUEUE *t;
00647         QUEUE_CURSOR *cp;
00648         db_lockmode_t lock_mode;
00649         db_pgno_t metapno;
00650         db_recno_t first;
00651         qam_position_mode mode;
00652         u_int32_t put_mode;
00653         int exact, inorder, is_first, locked, ret, t_ret, wait, with_delete;
00654         int retrying;
00655 
00656         dbp = dbc->dbp;
00657         dbenv = dbp->dbenv;
00658         mpf = dbp->mpf;
00659         cp = (QUEUE_CURSOR *)dbc->internal;
00660         LOCK_INIT(lock);
00661 
00662         PANIC_CHECK(dbenv);
00663 
00664         wait = 0;
00665         with_delete = 0;
00666         retrying = 0;
00667         lock_mode = DB_LOCK_READ;
00668         meta = NULL;
00669         inorder = F_ISSET(dbp, DB_AM_INORDER);
00670         put_mode = 0;
00671         t_ret = 0;
00672         *pgnop = 0;
00673         pg = NULL;
00674 
00675         mode = QAM_READ;
00676         if (F_ISSET(dbc, DBC_RMW)) {
00677                 lock_mode = DB_LOCK_WRITE;
00678                 mode = QAM_WRITE;
00679         }
00680 
00681         if (flags == DB_CONSUME_WAIT) {
00682                 wait = 1;
00683                 flags = DB_CONSUME;
00684         }
00685         if (flags == DB_CONSUME) {
00686                 with_delete = 1;
00687                 flags = DB_FIRST;
00688                 lock_mode = DB_LOCK_WRITE;
00689                 mode = QAM_CONSUME;
00690         }
00691 
00692         DEBUG_LREAD(dbc, dbc->txn, "qam_c_get",
00693             flags == DB_SET || flags == DB_SET_RANGE ? key : NULL, NULL, flags);
00694 
00695         /* Make lint and friends happy. */
00696         locked = 0;
00697 
00698         is_first = 0;
00699 
00700         t = (QUEUE *)dbp->q_internal;
00701         metapno = t->q_meta;
00702 
00703         /*
00704          * Get the meta page first, we don't want to write lock it while
00705          * trying to pin it.  This is because someone my have it pinned
00706          * but not locked.
00707          */
00708         if ((ret = __memp_fget(mpf, &metapno, 0, &meta)) != 0)
00709                 return (ret);
00710         if ((ret = __db_lget(dbc, 0, metapno, lock_mode, 0, &metalock)) != 0)
00711                 goto err;
00712         locked = 1;
00713 
00714         first = 0;
00715 
00716         /* Release any previous lock if not in a transaction. */
00717         if ((ret = __TLPUT(dbc, cp->lock)) != 0)
00718                 goto err;
00719 
00720 retry:  /* Update the record number. */
00721         switch (flags) {
00722         case DB_CURRENT:
00723                 break;
00724         case DB_NEXT_DUP:
00725                 ret = DB_NOTFOUND;
00726                 goto err;
00727                 /* NOTREACHED */
00728         case DB_NEXT:
00729         case DB_NEXT_NODUP:
00730 get_next:       if (cp->recno != RECNO_OOB) {
00731                         ++cp->recno;
00732                         /* Wrap around, skipping zero. */
00733                         if (cp->recno == RECNO_OOB)
00734                                 cp->recno++;
00735                         /*
00736                          * Check to see if we are out of data.
00737                          */
00738                         if (cp->recno == meta->cur_recno ||
00739                             QAM_AFTER_CURRENT(meta, cp->recno)) {
00740                                 pg = NULL;
00741                                 if (!wait) {
00742                                         ret = DB_NOTFOUND;
00743                                         goto err;
00744                                 }
00745                                 flags = DB_FIRST;
00746                                 /*
00747                                  * If first is not set, then we skipped
00748                                  * a locked record, go back and find it.
00749                                  * If we find a locked record again
00750                                  * wait for it.
00751                                  */
00752                                 if (first == 0) {
00753                                         retrying = 1;
00754                                         goto retry;
00755                                 }
00756 
00757                                 if (CDB_LOCKING(dbenv)) {
00758                                         /* Drop the metapage before we wait. */
00759                                         if ((ret =
00760                                             __memp_fput(mpf, meta, 0)) != 0)
00761                                                 goto err;
00762                                         meta = NULL;
00763                                         if ((ret = __lock_get(
00764                                             dbenv, dbc->locker,
00765                                             DB_LOCK_SWITCH, &dbc->lock_dbt,
00766                                             DB_LOCK_WAIT, &dbc->mylock)) != 0)
00767                                                 goto err;
00768 
00769                                         if ((ret = __memp_fget(mpf,
00770                                              &metapno, 0, &meta)) != 0)
00771                                                 goto err;
00772                                         if ((ret = __lock_get(
00773                                             dbenv, dbc->locker,
00774                                             DB_LOCK_UPGRADE, &dbc->lock_dbt,
00775                                             DB_LOCK_WRITE, &dbc->mylock)) != 0)
00776                                                 goto err;
00777                                         goto retry;
00778                                 }
00779                                 /*
00780                                  * Wait for someone to update the meta page.
00781                                  * This will probably mean there is something
00782                                  * in the queue.  We then go back up and
00783                                  * try again.
00784                                  */
00785                                 if (locked == 0) {
00786                                         if ((ret = __db_lget(dbc, 0, metapno,
00787                                              lock_mode, 0, &metalock)) != 0)
00788                                                 goto err;
00789                                         locked = 1;
00790                                         if (cp->recno != meta->cur_recno &&
00791                                             cp->recno != RECNO_OOB &&
00792                                             !QAM_AFTER_CURRENT(meta, cp->recno))
00793                                                 goto retry;
00794                                 }
00795                                 /* Drop the metapage before we wait. */
00796                                 if ((ret = __memp_fput(mpf, meta, 0)) != 0)
00797                                         goto err;
00798                                 meta = NULL;
00799                                 if ((ret = __db_lget(dbc,
00800                                      0, metapno, DB_LOCK_WAIT,
00801                                      DB_LOCK_SWITCH, &metalock)) != 0) {
00802                                         if (ret == DB_LOCK_DEADLOCK)
00803                                                 ret = DB_LOCK_NOTGRANTED;
00804                                         goto err;
00805                                 }
00806                                 if ((ret = __memp_fget(
00807                                      mpf, &metapno, 0, &meta)) != 0)
00808                                         goto err;
00809                                 if ((ret = __db_lget(dbc, 0,
00810                                      PGNO_INVALID, DB_LOCK_WRITE,
00811                                      DB_LOCK_UPGRADE, &metalock)) != 0) {
00812                                         if (ret == DB_LOCK_DEADLOCK)
00813                                                 ret = DB_LOCK_NOTGRANTED;
00814                                         goto err;
00815                                 }
00816                                 locked = 1;
00817                                 goto retry;
00818                         }
00819                         break;
00820                 }
00821                 /* FALLTHROUGH */
00822         case DB_FIRST:
00823                 flags = DB_NEXT;
00824                 is_first = 1;
00825 
00826                 /* get the first record number */
00827                 cp->recno = first = meta->first_recno;
00828 
00829                 break;
00830         case DB_PREV:
00831         case DB_PREV_NODUP:
00832                 if (cp->recno != RECNO_OOB) {
00833                         if (cp->recno == meta->first_recno ||
00834                            QAM_BEFORE_FIRST(meta, cp->recno)) {
00835                                 ret = DB_NOTFOUND;
00836                                 goto err;
00837                         }
00838                         --cp->recno;
00839                         /* Wrap around, skipping zero. */
00840                         if (cp->recno == RECNO_OOB)
00841                                 --cp->recno;
00842                         break;
00843                 }
00844                 /* FALLTHROUGH */
00845         case DB_LAST:
00846                 if (meta->first_recno == meta->cur_recno) {
00847                         ret = DB_NOTFOUND;
00848                         goto err;
00849                 }
00850                 cp->recno = meta->cur_recno - 1;
00851                 if (cp->recno == RECNO_OOB)
00852                         cp->recno--;
00853                 break;
00854         case DB_SET:
00855         case DB_SET_RANGE:
00856         case DB_GET_BOTH:
00857         case DB_GET_BOTH_RANGE:
00858                 if ((ret = __qam_getno(dbp, key, &cp->recno)) != 0)
00859                         goto err;
00860                 if (QAM_NOT_VALID(meta, cp->recno)) {
00861                         ret = DB_NOTFOUND;
00862                         goto err;
00863                 }
00864                 break;
00865         default:
00866                 ret = __db_unknown_flag(dbenv, "__qam_c_get", flags);
00867                 goto err;
00868         }
00869 
00870         /* Don't hold the meta page long term. */
00871         if (locked) {
00872                 if ((ret = __LPUT(dbc, metalock)) != 0)
00873                         goto err;
00874                 locked = 0;
00875         }
00876 
00877         /* Lock the record. */
00878         if (((ret = __db_lget(dbc, 0, cp->recno, lock_mode,
00879             (with_delete && !retrying) ?
00880             DB_LOCK_NOWAIT | DB_LOCK_RECORD : DB_LOCK_RECORD,
00881             &lock)) == DB_LOCK_DEADLOCK || ret == DB_LOCK_NOTGRANTED) &&
00882             with_delete) {
00883 #ifdef QDEBUG
00884                 if (DBC_LOGGING(dbc))
00885                         (void)__log_printf(dbenv,
00886                             dbc->txn, "Queue S: %x %d %d %d",
00887                             dbc->locker, cp->recno, first, meta->first_recno);
00888 #endif
00889                 first = 0;
00890                 if ((ret =
00891                     __db_lget(dbc, 0, metapno, lock_mode, 0, &metalock)) != 0)
00892                         goto err;
00893                 locked = 1;
00894                 goto retry;
00895         }
00896 
00897         if (ret != 0)
00898                 goto err;
00899 
00900         /*
00901          * In the DB_FIRST or DB_LAST cases we must wait and then start over
00902          * since the first/last may have moved while we slept.
00903          * We release our locks and try again.
00904          */
00905         if (((inorder || !with_delete) && is_first) || flags == DB_LAST) {
00906 get_first:
00907                 if ((ret =
00908                     __db_lget(dbc, 0, metapno, lock_mode, 0, &metalock)) != 0)
00909                         goto err;
00910                 if (cp->recno !=
00911                     (is_first ? meta->first_recno : (meta->cur_recno - 1))) {
00912                         if ((ret = __LPUT(dbc, lock)) != 0)
00913                                 goto err;
00914                         if (is_first)
00915                                 flags = DB_FIRST;
00916                         locked = 1;
00917                         goto retry;
00918                 }
00919                 /* Don't hold the meta page long term. */
00920                 if ((ret = __LPUT(dbc, metalock)) != 0)
00921                         goto err;
00922         }
00923 
00924         /* Position the cursor on the record. */
00925         if ((ret = __qam_position(dbc, &cp->recno, mode, &exact)) != 0) {
00926                 /* We cannot get the page, release the record lock. */
00927                 (void)__LPUT(dbc, lock);
00928                 goto err;
00929         }
00930 
00931         pg = cp->page;
00932         pglock = cp->lock;
00933         cp->lock = lock;
00934         cp->lock_mode = lock_mode;
00935 
00936         if (!exact) {
00937 release_retry:  /* Release locks and retry, if possible. */
00938                 if (pg != NULL)
00939                         (void)__qam_fput(dbp, cp->pgno, pg, 0);
00940                 cp->page = pg = NULL;
00941                 if ((ret = __LPUT(dbc, pglock)) != 0)
00942                         goto err1;
00943 
00944                 switch (flags) {
00945                 case DB_GET_BOTH_RANGE:
00946                         flags = DB_SET_RANGE;
00947                         /* FALLTHROUGH */
00948                 case DB_NEXT:
00949                 case DB_NEXT_NODUP:
00950                 case DB_SET_RANGE:
00951                         if (!with_delete)
00952                                 is_first = 0;
00953                         /* Peek at the meta page unlocked. */
00954                         if (QAM_BEFORE_FIRST(meta, cp->recno))
00955                                 goto get_first;
00956                         /* FALLTHROUGH */
00957                 case DB_PREV:
00958                 case DB_PREV_NODUP:
00959                 case DB_LAST:
00960                         if (flags == DB_LAST)
00961                                 flags = DB_PREV;
00962                         retrying = 0;
00963                         if ((ret = __LPUT(dbc, cp->lock)) != 0)
00964                                 goto err1;
00965                         if (flags == DB_SET_RANGE)
00966                                 goto get_next;
00967                         else
00968                                 goto retry;
00969 
00970                 default:
00971                         /* this is for the SET and GET_BOTH cases */
00972                         ret = DB_KEYEMPTY;
00973                         goto err1;
00974                 }
00975         }
00976 
00977         qp = QAM_GET_RECORD(dbp, pg, cp->indx);
00978 
00979         /* Return the data item. */
00980         if (flags == DB_GET_BOTH || flags == DB_GET_BOTH_RANGE) {
00981                 /*
00982                  * Need to compare
00983                  */
00984                 tmp.data = qp->data;
00985                 tmp.size = t->re_len;
00986                 if ((ret = __bam_defcmp(dbp, data, &tmp)) != 0) {
00987                         if (flags == DB_GET_BOTH_RANGE)
00988                                 goto release_retry;
00989                         ret = DB_NOTFOUND;
00990                         goto err1;
00991                 }
00992         }
00993 
00994         /* Return the key if the user didn't give us one. */
00995         if (key != NULL) {
00996                 if (flags != DB_GET_BOTH && flags != DB_SET &&
00997                     (ret = __db_retcopy(dbp->dbenv,
00998                     key, &cp->recno, sizeof(cp->recno),
00999                     &dbc->rkey->data, &dbc->rkey->ulen)) != 0)
01000                         goto err1;
01001                 F_SET(key, DB_DBT_ISSET);
01002         }
01003 
01004         if (data != NULL) {
01005                 if (!F_ISSET(dbc, DBC_MULTIPLE|DBC_MULTIPLE_KEY) &&
01006                     (ret = __db_retcopy(dbp->dbenv, data, qp->data, t->re_len,
01007                     &dbc->rdata->data, &dbc->rdata->ulen)) != 0)
01008                         goto err1;
01009                 F_SET(data, DB_DBT_ISSET);
01010         }
01011 
01012         /* Finally, if we are doing DB_CONSUME mark the record. */
01013         if (with_delete) {
01014                 /*
01015                  * Assert that we're not a secondary index.  Doing a DB_CONSUME
01016                  * on a secondary makes very little sense, since one can't
01017                  * DB_APPEND there;  attempting one should be forbidden by
01018                  * the interface.
01019                  */
01020                 DB_ASSERT(!F_ISSET(dbp, DB_AM_SECONDARY));
01021 
01022                 /*
01023                  * Check and see if we *have* any secondary indices.
01024                  * If we do, we're a primary, so call __db_c_del_primary
01025                  * to delete the references to the item we're about to
01026                  * delete.
01027                  *
01028                  * Note that we work on a duplicated cursor, since the
01029                  * __db_ret work has already been done, so it's not safe
01030                  * to perform any additional ops on this cursor.
01031                  */
01032                 if (LIST_FIRST(&dbp->s_secondaries) != NULL) {
01033                         if ((ret = __db_c_idup(dbc,
01034                             &dbcdup, DB_POSITION)) != 0)
01035                                 goto err1;
01036 
01037                         if ((ret = __db_c_del_primary(dbcdup)) != 0) {
01038                                 /*
01039                                  * The __db_c_del_primary return is more
01040                                  * interesting.
01041                                  */
01042                                 (void)__db_c_close(dbcdup);
01043                                 goto err1;
01044                         }
01045 
01046                         if ((ret = __db_c_close(dbcdup)) != 0)
01047                                 goto err1;
01048                 }
01049 
01050                 if (DBC_LOGGING(dbc)) {
01051                         if (t->page_ext == 0 || t->re_len == 0) {
01052                                 if ((ret = __qam_del_log(dbp, dbc->txn,
01053                                     &LSN(pg), 0, &LSN(pg),
01054                                     pg->pgno, cp->indx, cp->recno)) != 0)
01055                                         goto err1;
01056                         } else {
01057                                 tmp.data = qp->data;
01058                                 tmp.size = t->re_len;
01059                                 if ((ret = __qam_delext_log(dbp,
01060                                    dbc->txn, &LSN(pg), 0, &LSN(pg),
01061                                    pg->pgno, cp->indx, cp->recno, &tmp)) != 0)
01062                                         goto err1;
01063                         }
01064                 }
01065 
01066                 F_CLR(qp, QAM_VALID);
01067                 put_mode = DB_MPOOL_DIRTY;
01068 
01069                 if ((ret = __LPUT(dbc, pglock)) != 0)
01070                         goto err1;
01071 
01072                 /*
01073                  * Now we need to update the metapage
01074                  * first pointer. If we have deleted
01075                  * the record that is pointed to by
01076                  * first_recno then we move it as far
01077                  * forward as we can without blocking.
01078                  * The metapage lock must be held for
01079                  * the whole scan otherwise someone could
01080                  * do a random insert behind where we are
01081                  * looking.
01082                  */
01083 
01084                 if (locked == 0 && (ret = __db_lget(
01085                     dbc, 0, metapno, lock_mode, 0, &metalock)) != 0)
01086                         goto err1;
01087                 locked = 1;
01088 
01089 #ifdef QDEBUG
01090                 if (DBC_LOGGING(dbc))
01091                         (void)__log_printf(dbenv,
01092                             dbc->txn, "Queue D: %x %d %d %d",
01093                             dbc->locker, cp->recno, first, meta->first_recno);
01094 #endif
01095                 /*
01096                  * See if we deleted the "first" record.  If
01097                  * first is zero then we skipped something,
01098                  * see if first_recno has been move passed
01099                  * that to the record that we deleted.
01100                  */
01101                 if (first == 0)
01102                         first = cp->recno;
01103                 if (first != meta->first_recno)
01104                         goto done;
01105 
01106                 if ((ret = __qam_consume(dbc, meta, first)) != 0)
01107                         goto err1;
01108         }
01109 
01110 done:
01111 err1:   if (cp->page != NULL) {
01112                 if ((t_ret = __qam_fput(
01113                     dbp, cp->pgno, cp->page, put_mode)) != 0 && ret == 0)
01114                         ret = t_ret;
01115 
01116                 /* Doing record locking, release the page lock */
01117                 if ((t_ret = __LPUT(dbc, pglock)) != 0 && ret == 0)
01118                         ret = t_ret;
01119                 cp->page = NULL;
01120         }
01121 
01122 err:    if (meta) {
01123                 /* Release the meta page. */
01124                 if ((t_ret = __memp_fput(mpf, meta, 0)) != 0 && ret == 0)
01125                         ret = t_ret;
01126 
01127                 /* Don't hold the meta page long term. */
01128                 if (locked)
01129                         if ((t_ret = __LPUT(dbc, metalock)) != 0 && ret == 0)
01130                                 ret = t_ret;
01131         }
01132         DB_ASSERT(!LOCK_ISSET(metalock));
01133 
01134         return ((ret == DB_LOCK_NOTGRANTED &&
01135              !F_ISSET(dbenv, DB_ENV_TIME_NOTGRANTED)) ?
01136              DB_LOCK_DEADLOCK : ret);
01137 }
01138 
01139 /*
01140  * __qam_consume -- try to reset the head of the queue.
01141  *
01142  */
01143 static int
01144 __qam_consume(dbc, meta, first)
01145         DBC *dbc;
01146         QMETA *meta;
01147         db_recno_t first;
01148 {
01149         DB *dbp;
01150         DB_LOCK lock, save_lock;
01151         DB_MPOOLFILE *mpf;
01152         QUEUE_CURSOR *cp;
01153         db_indx_t save_indx;
01154         db_pgno_t save_page;
01155         db_recno_t current, save_recno;
01156         u_int32_t put_mode, rec_extent;
01157         int exact, ret, t_ret, wrapped;
01158 
01159         dbp = dbc->dbp;
01160         mpf = dbp->mpf;
01161         cp = (QUEUE_CURSOR *)dbc->internal;
01162         put_mode = DB_MPOOL_DIRTY;
01163         ret = 0;
01164 
01165         save_page = cp->pgno;
01166         save_indx = cp->indx;
01167         save_recno = cp->recno;
01168         save_lock = cp->lock;
01169 
01170         /*
01171          * If we skipped some deleted records, we need to
01172          * reposition on the first one.  Get a lock
01173          * in case someone is trying to put it back.
01174          */
01175         if (first != cp->recno) {
01176                 ret = __db_lget(dbc, 0, first, DB_LOCK_READ,
01177                     DB_LOCK_NOWAIT | DB_LOCK_RECORD, &lock);
01178                 if (ret == DB_LOCK_DEADLOCK) {
01179                         ret = 0;
01180                         goto done;
01181                 }
01182                 if (ret != 0)
01183                         goto done;
01184                 if ((ret =
01185                     __qam_fput(dbp, cp->pgno, cp->page, put_mode)) != 0)
01186                         goto done;
01187                 cp->page = NULL;
01188                 put_mode = 0;
01189                 if ((ret = __qam_position(dbc,
01190                     &first, QAM_READ, &exact)) != 0 || exact != 0) {
01191                         (void)__LPUT(dbc, lock);
01192                         goto done;
01193                 }
01194                 if ((ret =__LPUT(dbc, lock)) != 0)
01195                         goto done;
01196                 if ((ret = __LPUT(dbc, cp->lock)) != 0)
01197                         goto done;
01198         }
01199 
01200         current = meta->cur_recno;
01201         wrapped = 0;
01202         if (first > current)
01203                 wrapped = 1;
01204         rec_extent = meta->page_ext * meta->rec_page;
01205 
01206         /* Loop until we find a record or hit current */
01207         for (;;) {
01208                 /*
01209                  * Check to see if we are moving off the extent
01210                  * and remove the extent.
01211                  * If we are moving off a page we need to
01212                  * get rid of the buffer.
01213                  * Wait for the lagging readers to move off the
01214                  * page.
01215                  */
01216                 if (cp->page != NULL && rec_extent != 0 &&
01217                     ((exact = (first % rec_extent == 0)) ||
01218                     (first % meta->rec_page == 0) ||
01219                     first == UINT32_MAX)) {
01220                         if (exact == 1 && (ret = __db_lget(dbc,
01221                             0, cp->pgno, DB_LOCK_WRITE, 0, &cp->lock)) != 0)
01222                                 break;
01223 #ifdef QDEBUG
01224                         if (DBC_LOGGING(dbc))
01225                                 (void)__log_printf(dbp->dbenv, dbc->txn,
01226                                     "Queue R: %x %d %d %d", dbc->locker,
01227                                     cp->pgno, first, meta->first_recno);
01228 #endif
01229                         put_mode |= DB_MPOOL_DISCARD;
01230                         if ((ret = __qam_fput(dbp,
01231                             cp->pgno, cp->page, put_mode)) != 0)
01232                                 break;
01233                         cp->page = NULL;
01234 
01235                         if (exact == 1) {
01236                                 ret = __qam_fremove(dbp, cp->pgno);
01237                                 if ((t_ret =
01238                                     __LPUT(dbc, cp->lock)) != 0 && ret == 0)
01239                                         ret = t_ret;
01240                         }
01241                         if (ret != 0)
01242                                 break;
01243                 } else if (cp->page != NULL && (ret =
01244                     __qam_fput(dbp, cp->pgno, cp->page, put_mode)) != 0)
01245                         break;
01246                 cp->page = NULL;
01247                 first++;
01248                 if (first == RECNO_OOB) {
01249                         wrapped = 0;
01250                         first++;
01251                 }
01252 
01253                 /*
01254                  * LOOP EXIT when we come move to the current
01255                  * pointer.
01256                  */
01257                 if (!wrapped && first >= current)
01258                         break;
01259 
01260                 ret = __db_lget(dbc, 0, first, DB_LOCK_READ,
01261                     DB_LOCK_NOWAIT | DB_LOCK_RECORD, &lock);
01262                 if (ret == DB_LOCK_DEADLOCK) {
01263                         ret = 0;
01264                         break;
01265                 }
01266                 if (ret != 0)
01267                         break;
01268 
01269                 if ((ret = __qam_position(dbc,
01270                     &first, QAM_READ, &exact)) != 0) {
01271                         (void)__LPUT(dbc, lock);
01272                         break;
01273                 }
01274                 put_mode = 0;
01275                 if ((ret =__LPUT(dbc, lock)) != 0 ||
01276                     (ret = __LPUT(dbc, cp->lock)) != 0 || exact) {
01277                         if ((t_ret = __qam_fput(dbp, cp->pgno,
01278                             cp->page, put_mode)) != 0 && ret == 0)
01279                                 ret = t_ret;
01280                         cp->page = NULL;
01281                         break;
01282                 }
01283         }
01284 
01285         cp->pgno = save_page;
01286         cp->indx = save_indx;
01287         cp->recno = save_recno;
01288         cp->lock = save_lock;
01289 
01290         /*
01291          * We have advanced as far as we can.
01292          * Advance first_recno to this point.
01293          */
01294         if (ret == 0 && meta->first_recno != first) {
01295 #ifdef QDEBUG
01296                 if (DBC_LOGGING(dbc))
01297                         (void)__log_printf(dbp->dbenv, dbc->txn,
01298                             "Queue M: %x %d %d %d", dbc->locker, cp->recno,
01299                             first, meta->first_recno);
01300 #endif
01301                 if (DBC_LOGGING(dbc))
01302                         if ((ret = __qam_incfirst_log(dbp,
01303                             dbc->txn, &meta->dbmeta.lsn, 0,
01304                             cp->recno, PGNO_BASE_MD)) != 0)
01305                                 goto done;
01306                 meta->first_recno = first;
01307                 (void)__memp_fset(mpf, meta, DB_MPOOL_DIRTY);
01308         }
01309 
01310 done:
01311         return (ret);
01312 }
01313 
01314 static int
01315 __qam_bulk(dbc, data, flags)
01316         DBC *dbc;
01317         DBT *data;
01318         u_int32_t flags;
01319 {
01320         DB *dbp;
01321         DB_LOCK metalock, rlock;
01322         DB_MPOOLFILE *mpf;
01323         PAGE *pg;
01324         QMETA *meta;
01325         QAMDATA *qp;
01326         QUEUE_CURSOR *cp;
01327         db_indx_t indx;
01328         db_lockmode_t lkmode;
01329         db_pgno_t metapno;
01330         qam_position_mode mode;
01331         u_int32_t  *endp, *offp;
01332         u_int32_t pagesize, re_len, recs;
01333         u_int8_t *dbuf, *dp, *np;
01334         int exact, ret, t_ret, valid;
01335         int is_key, need_pg, size, space;
01336 
01337         dbp = dbc->dbp;
01338         mpf = dbp->mpf;
01339         cp = (QUEUE_CURSOR *)dbc->internal;
01340 
01341         mode = QAM_READ;
01342         lkmode = DB_LOCK_READ;
01343         if (F_ISSET(dbc, DBC_RMW)) {
01344                 mode = QAM_WRITE;
01345                 lkmode = DB_LOCK_WRITE;
01346         }
01347 
01348         pagesize = dbp->pgsize;
01349         re_len = ((QUEUE *)dbp->q_internal)->re_len;
01350         recs = ((QUEUE *)dbp->q_internal)->rec_page;
01351         metapno = ((QUEUE *)dbp->q_internal)->q_meta;
01352 
01353         is_key = LF_ISSET(DB_MULTIPLE_KEY) ? 1 : 0;
01354         size = 0;
01355 
01356         if ((ret = __db_lget(dbc, 0, metapno, DB_LOCK_READ, 0, &metalock)) != 0)
01357                 return (ret);
01358         if ((ret = __memp_fget(mpf, &metapno, 0, &meta)) != 0) {
01359                 /* We did not fetch it, we can release the lock. */
01360                 (void)__LPUT(dbc, metalock);
01361                 return (ret);
01362         }
01363 
01364         dbuf = data->data;
01365         np = dp = dbuf;
01366 
01367         /* Keep track of space that is left.  There is an termination entry */
01368         space = (int)data->ulen;
01369         space -= (int)sizeof(*offp);
01370 
01371         /* Build the offset/size table from the end up. */
01372         endp = (u_int32_t *)((u_int8_t *)dbuf + data->ulen);
01373         endp--;
01374         offp = endp;
01375         /* Save the lock on the current position of the cursor. */
01376         rlock = cp->lock;
01377         LOCK_INIT(cp->lock);
01378 
01379 next_pg:
01380         /* Wrap around, skipping zero. */
01381         if (cp->recno == RECNO_OOB)
01382                 cp->recno++;
01383         if ((ret = __qam_position(dbc, &cp->recno, mode, &exact)) != 0)
01384                 goto done;
01385 
01386         pg = cp->page;
01387         indx = cp->indx;
01388         need_pg = 1;
01389 
01390         do {
01391                 /*
01392                  * If this page is a nonexistent page at the end of an
01393                  * extent, pg may be NULL.  A NULL page has no valid records,
01394                  * so just keep looping as though qp exists and isn't QAM_VALID;
01395                  * calling QAM_GET_RECORD is unsafe.
01396                  */
01397                 valid = 0;
01398 
01399                 if (pg != NULL) {
01400                         if ((ret = __db_lget(dbc, LCK_COUPLE,
01401                              cp->recno, lkmode, DB_LOCK_RECORD, &rlock)) != 0)
01402                                 goto done;
01403                         qp = QAM_GET_RECORD(dbp, pg, indx);
01404                         if (F_ISSET(qp, QAM_VALID)) {
01405                                 valid = 1;
01406                                 space -= (int)
01407                                      ((is_key ? 3 : 2) * sizeof(*offp));
01408                                 if (space < 0)
01409                                         goto get_space;
01410                                 if (need_pg) {
01411                                         dp = np;
01412                                         size = (int)pagesize - QPAGE_SZ(dbp);
01413                                         if (space < size) {
01414 get_space:
01415                                                 if (offp == endp) {
01416                                                         data->size = (u_int32_t)
01417                                                             DB_ALIGN((u_int32_t)
01418                                                             size + pagesize,
01419                                                             sizeof(u_int32_t));
01420                                                         ret = DB_BUFFER_SMALL;
01421                                                         break;
01422                                                 }
01423                                                 if (indx != 0)
01424                                                         indx--;
01425                                                 cp->recno--;
01426                                                 space = 0;
01427                                                 break;
01428                                         }
01429                                         memcpy(dp,
01430                                             (char *)pg + QPAGE_SZ(dbp),
01431                                             (unsigned)size);
01432                                         need_pg = 0;
01433                                         space -= size;
01434                                         np += size;
01435                                 }
01436                                 if (is_key)
01437                                         *offp-- = cp->recno;
01438                                 *offp-- = (u_int32_t)((((u_int8_t*)qp -
01439                                     (u_int8_t*)pg) - QPAGE_SZ(dbp)) +
01440                                     (dp - dbuf) + SSZA(QAMDATA, data));
01441                                 *offp-- = re_len;
01442                         }
01443                 }
01444                 if (!valid && is_key == 0) {
01445                         *offp-- = 0;
01446                         *offp-- = 0;
01447                 }
01448                 cp->recno++;
01449         } while (++indx < recs && cp->recno != RECNO_OOB &&
01450             cp->recno != meta->cur_recno &&
01451             !QAM_AFTER_CURRENT(meta, cp->recno));
01452 
01453         /* Drop the page lock. */
01454         if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0)
01455                 ret = t_ret;
01456 
01457         if (cp->page != NULL) {
01458                 if ((t_ret =
01459                     __qam_fput(dbp, cp->pgno, cp->page, 0)) != 0 && ret == 0)
01460                         ret = t_ret;
01461                 cp->page = NULL;
01462         }
01463 
01464         if (ret == 0 && space > 0 &&
01465             (indx >= recs || cp->recno == RECNO_OOB) &&
01466             cp->recno != meta->cur_recno &&
01467             !QAM_AFTER_CURRENT(meta, cp->recno))
01468                 goto next_pg;
01469 
01470         /*
01471          * Correct recno in two cases:
01472          * 1) If we just wrapped fetch must start at record 1 not a FIRST.
01473          * 2) We ran out of space exactly at the end of a page.
01474          */
01475         if (cp->recno == RECNO_OOB || (space == 0 && indx == recs))
01476                 cp->recno--;
01477 
01478         if (is_key == 1)
01479                 *offp = RECNO_OOB;
01480         else
01481                 *offp = (u_int32_t)-1;
01482 
01483 done:   /* Release the meta page. */
01484         if ((t_ret = __memp_fput(mpf, meta, 0)) != 0 && ret == 0)
01485                 ret = t_ret;
01486         if ((t_ret = __LPUT(dbc, metalock)) != 0 && ret == 0)
01487                 ret = t_ret;
01488 
01489         cp->lock = rlock;
01490 
01491         return (ret);
01492 }
01493 
01494 /*
01495  * __qam_c_close --
01496  *      Close down the cursor from a single use.
01497  */
01498 static int
01499 __qam_c_close(dbc, root_pgno, rmroot)
01500         DBC *dbc;
01501         db_pgno_t root_pgno;
01502         int *rmroot;
01503 {
01504         QUEUE_CURSOR *cp;
01505         int ret;
01506 
01507         COMPQUIET(root_pgno, 0);
01508         COMPQUIET(rmroot, NULL);
01509 
01510         cp = (QUEUE_CURSOR *)dbc->internal;
01511 
01512         /* Discard any locks not acquired inside of a transaction. */
01513         ret = __TLPUT(dbc, cp->lock);
01514 
01515         LOCK_INIT(cp->lock);
01516         cp->page = NULL;
01517         cp->pgno = PGNO_INVALID;
01518         cp->indx = 0;
01519         cp->lock_mode = DB_LOCK_NG;
01520         cp->recno = RECNO_OOB;
01521         cp->flags = 0;
01522 
01523         return (ret);
01524 }
01525 
01526 /*
01527  * __qam_c_dup --
01528  *      Duplicate a queue cursor, such that the new one holds appropriate
01529  *      locks for the position of the original.
01530  *
01531  * PUBLIC: int __qam_c_dup __P((DBC *, DBC *));
01532  */
01533 int
01534 __qam_c_dup(orig_dbc, new_dbc)
01535         DBC *orig_dbc, *new_dbc;
01536 {
01537         QUEUE_CURSOR *orig, *new;
01538         int ret;
01539 
01540         orig = (QUEUE_CURSOR *)orig_dbc->internal;
01541         new = (QUEUE_CURSOR *)new_dbc->internal;
01542 
01543         new->recno = orig->recno;
01544 
01545         /* Acquire the long term lock if we are not in a transaction. */
01546         if (orig_dbc->txn == NULL && LOCK_ISSET(orig->lock))
01547                 if ((ret = __db_lget(new_dbc, 0, new->recno,
01548                     new->lock_mode, DB_LOCK_RECORD, &new->lock)) != 0)
01549                         return (ret);
01550 
01551         return (0);
01552 }
01553 
01554 /*
01555  * __qam_c_init
01556  *
01557  * PUBLIC: int __qam_c_init __P((DBC *));
01558  */
01559 int
01560 __qam_c_init(dbc)
01561         DBC *dbc;
01562 {
01563         QUEUE_CURSOR *cp;
01564         DB *dbp;
01565         int ret;
01566 
01567         dbp = dbc->dbp;
01568 
01569         /* Allocate the internal structure. */
01570         cp = (QUEUE_CURSOR *)dbc->internal;
01571         if (cp == NULL) {
01572                 if ((ret =
01573                     __os_calloc(dbp->dbenv, 1, sizeof(QUEUE_CURSOR), &cp)) != 0)
01574                         return (ret);
01575                 dbc->internal = (DBC_INTERNAL *)cp;
01576         }
01577 
01578         /* Initialize methods. */
01579         dbc->c_close = __db_c_close_pp;
01580         dbc->c_count = __db_c_count_pp;
01581         dbc->c_del = __db_c_del_pp;
01582         dbc->c_dup = __db_c_dup_pp;
01583         dbc->c_get = __db_c_get_pp;
01584         dbc->c_pget = __db_c_pget_pp;
01585         dbc->c_put = __db_c_put_pp;
01586         dbc->c_am_bulk = __qam_bulk;
01587         dbc->c_am_close = __qam_c_close;
01588         dbc->c_am_del = __qam_c_del;
01589         dbc->c_am_destroy = __qam_c_destroy;
01590         dbc->c_am_get = __qam_c_get;
01591         dbc->c_am_put = __qam_c_put;
01592         dbc->c_am_writelock = NULL;
01593 
01594         return (0);
01595 }
01596 
01597 /*
01598  * __qam_c_destroy --
01599  *      Close a single cursor -- internal version.
01600  */
01601 static int
01602 __qam_c_destroy(dbc)
01603         DBC *dbc;
01604 {
01605         /* Discard the structures. */
01606         __os_free(dbc->dbp->dbenv, dbc->internal);
01607 
01608         return (0);
01609 }
01610 
01611 /*
01612  * __qam_getno --
01613  *      Check the user's record number.
01614  */
01615 static int
01616 __qam_getno(dbp, key, rep)
01617         DB *dbp;
01618         const DBT *key;
01619         db_recno_t *rep;
01620 {
01621         if ((*rep = *(db_recno_t *)key->data) == 0) {
01622                 __db_err(dbp->dbenv, "illegal record number of 0");
01623                 return (EINVAL);
01624         }
01625         return (0);
01626 }
01627 
01628 /*
01629  * __qam_truncate --
01630  *      Truncate a queue database
01631  *
01632  * PUBLIC: int __qam_truncate __P((DBC *, u_int32_t *));
01633  */
01634 int
01635 __qam_truncate(dbc, countp)
01636         DBC *dbc;
01637         u_int32_t *countp;
01638 {
01639         DB *dbp;
01640         DB_LOCK metalock;
01641         DB_MPOOLFILE *mpf;
01642         QMETA *meta;
01643         db_pgno_t metapno;
01644         u_int32_t count;
01645         int ret, t_ret;
01646 
01647         dbp = dbc->dbp;
01648 
01649         /* Walk the queue, counting rows. */
01650         for (count = 0;
01651             (ret = __qam_c_get(dbc, NULL, NULL, DB_CONSUME, &metapno)) == 0;)
01652                 count++;
01653         if (ret != DB_NOTFOUND)
01654                 return (ret);
01655 
01656         /* Update the meta page. */
01657         metapno = ((QUEUE *)dbp->q_internal)->q_meta;
01658         if ((ret =
01659             __db_lget(dbc, 0, metapno, DB_LOCK_WRITE, 0, &metalock)) != 0)
01660                 return (ret);
01661 
01662         mpf = dbp->mpf;
01663         if ((ret = __memp_fget(mpf, &metapno, 0, &meta)) != 0) {
01664                 /* We did not fetch it, we can release the lock. */
01665                 (void)__LPUT(dbc, metalock);
01666                 return (ret);
01667         }
01668         /* Remove the last extent file. */
01669         if (meta->cur_recno > 1 && ((QUEUE *)dbp->q_internal)->page_ext != 0) {
01670                 if ((ret = __qam_fremove(dbp,
01671                      QAM_RECNO_PAGE(dbp, meta->cur_recno - 1))) != 0)
01672                         return (ret);
01673         }
01674 
01675         if (DBC_LOGGING(dbc)) {
01676                 ret = __qam_mvptr_log(dbp, dbc->txn, &meta->dbmeta.lsn, 0,
01677                     QAM_SETCUR | QAM_SETFIRST | QAM_TRUNCATE, meta->first_recno,
01678                     1, meta->cur_recno, 1, &meta->dbmeta.lsn, PGNO_BASE_MD);
01679         }
01680         if (ret == 0)
01681                 meta->first_recno = meta->cur_recno = 1;
01682 
01683         if ((t_ret = __memp_fput(mpf,
01684             meta, ret == 0 ? DB_MPOOL_DIRTY: 0)) != 0 && ret == 0)
01685                 ret = t_ret;
01686         if ((t_ret = __LPUT(dbc, metalock)) != 0 && ret == 0)
01687                 ret = t_ret;
01688 
01689         if (countp != NULL)
01690                 *countp = count;
01691 
01692         return (ret);
01693 }
01694 
01695 /*
01696  * __qam_delete --
01697  *      Queue fast delete function.
01698  *
01699  * PUBLIC: int __qam_delete __P((DBC *,  DBT *));
01700  */
01701 int
01702 __qam_delete(dbc, key)
01703         DBC *dbc;
01704         DBT *key;
01705 {
01706         QUEUE_CURSOR *cp;
01707         int ret;
01708 
01709         cp = (QUEUE_CURSOR *)dbc->internal;
01710         if ((ret = __qam_getno(dbc->dbp, key, &cp->recno)) != 0)
01711                 goto err;
01712 
01713         ret = __qam_c_del(dbc);
01714 
01715 err:    return (ret);
01716 }

Generated on Sun Dec 25 12:14:44 2005 for Berkeley DB 4.4.16 by  doxygen 1.4.2