Main Page | Class Hierarchy | Data Structures | Directories | File List | Data Fields | Related Pages

lock.c

00001 /*-
00002  * See the file LICENSE for redistribution information.
00003  *
00004  * Copyright (c) 1996-2005
00005  *      Sleepycat Software.  All rights reserved.
00006  *
00007  * $Id: lock.c,v 12.19 2005/10/15 15:16:57 bostic Exp $
00008  */
00009 
00010 #include "db_config.h"
00011 
00012 #ifndef NO_SYSTEM_INCLUDES
00013 #include <sys/types.h>
00014 
00015 #include <string.h>
00016 #endif
00017 
00018 #include "db_int.h"
00019 #include "dbinc/db_shash.h"
00020 #include "dbinc/lock.h"
00021 #include "dbinc/log.h"
00022 
00023 static int  __lock_freelock __P((DB_LOCKTAB *,
00024                 struct __db_lock *, u_int32_t, u_int32_t));
00025 static int  __lock_getobj
00026                 __P((DB_LOCKTAB *, const DBT *, u_int32_t, int, DB_LOCKOBJ **));
00027 static int  __lock_inherit_locks __P ((DB_LOCKTAB *, u_int32_t, u_int32_t));
00028 static int  __lock_is_parent __P((DB_LOCKTAB *, u_int32_t, DB_LOCKER *));
00029 static int  __lock_put_internal __P((DB_LOCKTAB *,
00030                 struct __db_lock *, u_int32_t,  u_int32_t));
00031 static int  __lock_put_nolock __P((DB_ENV *, DB_LOCK *, int *, u_int32_t));
00032 static int __lock_remove_waiter __P((DB_LOCKTAB *,
00033                 DB_LOCKOBJ *, struct __db_lock *, db_status_t));
00034 static int __lock_trade __P((DB_ENV *, DB_LOCK *, u_int32_t));
00035 
00036 static const char __db_lock_invalid[] = "%s: Lock is no longer valid";
00037 static const char __db_locker_invalid[] = "Locker is not valid";
00038 
00039 /*
00040  * __lock_vec_pp --
00041  *      DB_ENV->lock_vec pre/post processing.
00042  *
00043  * PUBLIC: int __lock_vec_pp __P((DB_ENV *,
00044  * PUBLIC:     u_int32_t, u_int32_t, DB_LOCKREQ *, int, DB_LOCKREQ **));
00045  */
00046 int
00047 __lock_vec_pp(dbenv, locker, flags, list, nlist, elistp)
00048         DB_ENV *dbenv;
00049         u_int32_t locker, flags;
00050         int nlist;
00051         DB_LOCKREQ *list, **elistp;
00052 {
00053         DB_THREAD_INFO *ip;
00054         int ret;
00055 
00056         PANIC_CHECK(dbenv);
00057         ENV_REQUIRES_CONFIG(dbenv,
00058             dbenv->lk_handle, "DB_ENV->lock_vec", DB_INIT_LOCK);
00059 
00060         /* Validate arguments. */
00061         if ((ret = __db_fchk(dbenv,
00062              "DB_ENV->lock_vec", flags, DB_LOCK_NOWAIT)) != 0)
00063                 return (ret);
00064 
00065         ENV_ENTER(dbenv, ip);
00066         REPLICATION_WRAP(dbenv,
00067             (__lock_vec(dbenv, locker, flags, list, nlist, elistp)), ret);
00068         ENV_LEAVE(dbenv, ip);
00069         return (ret);
00070 }
00071 
00072 /*
00073  * __lock_vec --
00074  *      DB_ENV->lock_vec.
00075  *
00076  *      Vector lock routine.  This function takes a set of operations
00077  *      and performs them all at once.  In addition, lock_vec provides
00078  *      functionality for lock inheritance, releasing all locks for a
00079  *      given locker (used during transaction commit/abort), releasing
00080  *      all locks on a given object, and generating debugging information.
00081  *
00082  * PUBLIC: int __lock_vec __P((DB_ENV *,
00083  * PUBLIC:     u_int32_t, u_int32_t, DB_LOCKREQ *, int, DB_LOCKREQ **));
00084  */
00085 int
00086 __lock_vec(dbenv, locker, flags, list, nlist, elistp)
00087         DB_ENV *dbenv;
00088         u_int32_t locker, flags;
00089         int nlist;
00090         DB_LOCKREQ *list, **elistp;
00091 {
00092         struct __db_lock *lp, *next_lock;
00093         DB_LOCK lock;
00094         DB_LOCKER *sh_locker;
00095         DB_LOCKOBJ *sh_obj;
00096         DB_LOCKREGION *region;
00097         DB_LOCKTAB *lt;
00098         DBT *objlist, *np;
00099         u_int32_t lndx, ndx;
00100         int did_abort, i, ret, run_dd, upgrade, writes;
00101 
00102         /* Check if locks have been globally turned off. */
00103         if (F_ISSET(dbenv, DB_ENV_NOLOCKING))
00104                 return (0);
00105 
00106         lt = dbenv->lk_handle;
00107         region = lt->reginfo.primary;
00108 
00109         run_dd = 0;
00110         LOCK_SYSTEM_LOCK(dbenv);
00111         for (i = 0, ret = 0; i < nlist && ret == 0; i++)
00112                 switch (list[i].op) {
00113                 case DB_LOCK_GET_TIMEOUT:
00114                         LF_SET(DB_LOCK_SET_TIMEOUT);
00115                         /* FALLTHROUGH */
00116                 case DB_LOCK_GET:
00117                         if (IS_RECOVERING(dbenv)) {
00118                                 LOCK_INIT(list[i].lock);
00119                                 break;
00120                         }
00121                         ret = __lock_get_internal(lt,
00122                             locker, flags, list[i].obj,
00123                             list[i].mode, list[i].timeout, &list[i].lock);
00124                         break;
00125                 case DB_LOCK_INHERIT:
00126                         ret = __lock_inherit_locks(lt, locker, flags);
00127                         break;
00128                 case DB_LOCK_PUT:
00129                         ret = __lock_put_nolock(dbenv,
00130                             &list[i].lock, &run_dd, flags);
00131                         break;
00132                 case DB_LOCK_PUT_ALL:
00133                 case DB_LOCK_PUT_READ:
00134                 case DB_LOCK_UPGRADE_WRITE:
00135                         /*
00136                          * Get the locker and mark it as deleted.  This
00137                          * allows us to traverse the locker links without
00138                          * worrying that someone else is deleting locks out
00139                          * from under us.  Since the locker may hold no
00140                          * locks (i.e., you could call abort before you've
00141                          * done any work), it's perfectly reasonable for there
00142                          * to be no locker; this is not an error.
00143                          */
00144                         LOCKER_LOCK(lt, region, locker, ndx);
00145                         if ((ret = __lock_getlocker(lt,
00146                             locker, ndx, 0, &sh_locker)) != 0 ||
00147                             sh_locker == NULL ||
00148                             F_ISSET(sh_locker, DB_LOCKER_DELETED))
00149                                 /*
00150                                  * If ret is set, then we'll generate an
00151                                  * error.  If it's not set, we have nothing
00152                                  * to do.
00153                                  */
00154                                 break;
00155                         upgrade = 0;
00156                         writes = 1;
00157                         if (list[i].op == DB_LOCK_PUT_READ)
00158                                 writes = 0;
00159                         else if (list[i].op == DB_LOCK_UPGRADE_WRITE) {
00160                                 if (F_ISSET(sh_locker, DB_LOCKER_DIRTY))
00161                                         upgrade = 1;
00162                                 writes = 0;
00163                         }
00164                         objlist = list[i].obj;
00165                         if (objlist != NULL) {
00166                                 /*
00167                                  * We know these should be ilocks,
00168                                  * but they could be something else,
00169                                  * so allocate room for the size too.
00170                                  */
00171                                 objlist->size =
00172                                      sh_locker->nwrites * sizeof(DBT);
00173                                 if ((ret = __os_malloc(dbenv,
00174                                      objlist->size, &objlist->data)) != 0)
00175                                         goto up_done;
00176                                 memset(objlist->data, 0, objlist->size);
00177                                 np = (DBT *) objlist->data;
00178                         } else
00179                                 np = NULL;
00180 
00181                         F_SET(sh_locker, DB_LOCKER_DELETED);
00182 
00183                         /* Now traverse the locks, releasing each one. */
00184                         for (lp = SH_LIST_FIRST(&sh_locker->heldby, __db_lock);
00185                             lp != NULL; lp = next_lock) {
00186                                 sh_obj = (DB_LOCKOBJ *)
00187                                     ((u_int8_t *)lp + lp->obj);
00188                                 next_lock = SH_LIST_NEXT(lp,
00189                                     locker_links, __db_lock);
00190                                 if (writes == 1 ||
00191                                     lp->mode == DB_LOCK_READ ||
00192                                     lp->mode == DB_LOCK_READ_UNCOMMITTED) {
00193                                         SH_LIST_REMOVE(lp,
00194                                             locker_links, __db_lock);
00195                                         sh_obj = (DB_LOCKOBJ *)
00196                                             ((u_int8_t *)lp + lp->obj);
00197                                         SHOBJECT_LOCK(lt, region, sh_obj, lndx);
00198                                         /*
00199                                          * We are not letting lock_put_internal
00200                                          * unlink the lock, so we'll have to
00201                                          * update counts here.
00202                                          */
00203                                         sh_locker->nlocks--;
00204                                         if (IS_WRITELOCK(lp->mode))
00205                                                 sh_locker->nwrites--;
00206                                         ret = __lock_put_internal(lt, lp,
00207                                             lndx, DB_LOCK_FREE | DB_LOCK_DOALL);
00208                                         if (ret != 0)
00209                                                 break;
00210                                         continue;
00211                                 }
00212                                 if (objlist != NULL) {
00213                                         DB_ASSERT((char *)np <
00214                                              (char *)objlist->data +
00215                                              objlist->size);
00216                                         np->data = SH_DBT_PTR(&sh_obj->lockobj);
00217                                         np->size = sh_obj->lockobj.size;
00218                                         np++;
00219                                 }
00220                         }
00221                         if (ret != 0)
00222                                 goto up_done;
00223 
00224                         if (objlist != NULL)
00225                                 if ((ret = __lock_fix_list(dbenv,
00226                                      objlist, sh_locker->nwrites)) != 0)
00227                                         goto up_done;
00228                         switch (list[i].op) {
00229                         case DB_LOCK_UPGRADE_WRITE:
00230                                 if (upgrade != 1)
00231                                         goto up_done;
00232                                 for (lp = SH_LIST_FIRST(
00233                                     &sh_locker->heldby, __db_lock);
00234                                     lp != NULL;
00235                                     lp = SH_LIST_NEXT(lp,
00236                                             locker_links, __db_lock)) {
00237                                         if (lp->mode != DB_LOCK_WWRITE)
00238                                                 continue;
00239                                         lock.off = R_OFFSET(&lt->reginfo, lp);
00240                                         lock.gen = lp->gen;
00241                                         F_SET(sh_locker, DB_LOCKER_INABORT);
00242                                         if ((ret = __lock_get_internal(lt,
00243                                             locker, flags | DB_LOCK_UPGRADE,
00244                                             NULL, DB_LOCK_WRITE, 0, &lock)) !=0)
00245                                                 break;
00246                                 }
00247                         up_done:
00248                                 /* FALLTHROUGH */
00249                         case DB_LOCK_PUT_READ:
00250                         case DB_LOCK_PUT_ALL:
00251                                 F_CLR(sh_locker, DB_LOCKER_DELETED);
00252                                 break;
00253                         default:
00254                                 break;
00255                         }
00256                         break;
00257                 case DB_LOCK_PUT_OBJ:
00258                         /* Remove all the locks associated with an object. */
00259                         OBJECT_LOCK(lt, region, list[i].obj, ndx);
00260                         if ((ret = __lock_getobj(lt, list[i].obj,
00261                             ndx, 0, &sh_obj)) != 0 || sh_obj == NULL) {
00262                                 if (ret == 0)
00263                                         ret = EINVAL;
00264                                 break;
00265                         }
00266 
00267                         /*
00268                          * Go through both waiters and holders.  Don't bother
00269                          * to run promotion, because everyone is getting
00270                          * released.  The processes waiting will still get
00271                          * awakened as their waiters are released.
00272                          */
00273                         for (lp = SH_TAILQ_FIRST(&sh_obj->waiters, __db_lock);
00274                             ret == 0 && lp != NULL;
00275                             lp = SH_TAILQ_FIRST(&sh_obj->waiters, __db_lock))
00276                                 ret = __lock_put_internal(lt, lp, ndx,
00277                                     DB_LOCK_UNLINK |
00278                                     DB_LOCK_NOPROMOTE | DB_LOCK_DOALL);
00279 
00280                         /*
00281                          * On the last time around, the object will get
00282                          * reclaimed by __lock_put_internal, structure the
00283                          * loop carefully so we do not get bitten.
00284                          */
00285                         for (lp = SH_TAILQ_FIRST(&sh_obj->holders, __db_lock);
00286                             ret == 0 && lp != NULL;
00287                             lp = next_lock) {
00288                                 next_lock = SH_TAILQ_NEXT(lp, links, __db_lock);
00289                                 ret = __lock_put_internal(lt, lp, ndx,
00290                                     DB_LOCK_UNLINK |
00291                                     DB_LOCK_NOPROMOTE | DB_LOCK_DOALL);
00292                         }
00293                         break;
00294 
00295                 case DB_LOCK_TIMEOUT:
00296                         ret = __lock_set_timeout_internal(dbenv,
00297                             locker, 0, DB_SET_TXN_NOW);
00298                         break;
00299 
00300                 case DB_LOCK_TRADE:
00301                         /*
00302                          * INTERNAL USE ONLY.
00303                          * Change the holder of the lock described in
00304                          * list[i].lock to the locker-id specified by
00305                          * the locker parameter.
00306                          */
00307                         /*
00308                          * You had better know what you're doing here.
00309                          * We are trading locker-id's on a lock to
00310                          * facilitate file locking on open DB handles.
00311                          * We do not do any conflict checking on this,
00312                          * so heaven help you if you use this flag under
00313                          * any other circumstances.
00314                          */
00315                         ret = __lock_trade(dbenv, &list[i].lock, locker);
00316                         break;
00317 #if defined(DEBUG) && defined(HAVE_STATISTICS)
00318                 case DB_LOCK_DUMP:
00319                         /* Find the locker. */
00320                         LOCKER_LOCK(lt, region, locker, ndx);
00321                         if ((ret = __lock_getlocker(lt,
00322                             locker, ndx, 0, &sh_locker)) != 0 ||
00323                             sh_locker == NULL ||
00324                             F_ISSET(sh_locker, DB_LOCKER_DELETED))
00325                                 break;
00326 
00327                         for (lp = SH_LIST_FIRST(&sh_locker->heldby, __db_lock);
00328                             lp != NULL;
00329                             lp = SH_LIST_NEXT(lp, locker_links, __db_lock)) {
00330                                 __lock_printlock(lt, NULL, lp, 1);
00331                         }
00332                         break;
00333 #endif
00334                 default:
00335                         __db_err(dbenv,
00336                             "Invalid lock operation: %d", list[i].op);
00337                         ret = EINVAL;
00338                         break;
00339                 }
00340 
00341         if (ret == 0 && region->detect != DB_LOCK_NORUN &&
00342              (region->need_dd || LOCK_TIME_ISVALID(&region->next_timeout)))
00343                 run_dd = 1;
00344         LOCK_SYSTEM_UNLOCK(dbenv);
00345 
00346         if (run_dd)
00347                 (void)__lock_detect(dbenv, region->detect, &did_abort);
00348 
00349         if (ret != 0 && elistp != NULL)
00350                 *elistp = &list[i - 1];
00351 
00352         return (ret);
00353 }
00354 
00355 /*
00356  * __lock_get_pp --
00357  *      DB_ENV->lock_get pre/post processing.
00358  *
00359  * PUBLIC: int __lock_get_pp __P((DB_ENV *,
00360  * PUBLIC:     u_int32_t, u_int32_t, const DBT *, db_lockmode_t, DB_LOCK *));
00361  */
00362 int
00363 __lock_get_pp(dbenv, locker, flags, obj, lock_mode, lock)
00364         DB_ENV *dbenv;
00365         u_int32_t locker, flags;
00366         const DBT *obj;
00367         db_lockmode_t lock_mode;
00368         DB_LOCK *lock;
00369 {
00370         DB_THREAD_INFO *ip;
00371         int ret;
00372 
00373         PANIC_CHECK(dbenv);
00374         ENV_REQUIRES_CONFIG(dbenv,
00375             dbenv->lk_handle, "DB_ENV->lock_get", DB_INIT_LOCK);
00376 
00377         /* Validate arguments. */
00378         if ((ret = __db_fchk(dbenv, "DB_ENV->lock_get", flags,
00379             DB_LOCK_NOWAIT | DB_LOCK_UPGRADE | DB_LOCK_SWITCH)) != 0)
00380                 return (ret);
00381 
00382         ENV_ENTER(dbenv, ip);
00383         REPLICATION_WRAP(dbenv,
00384             (__lock_get(dbenv, locker, flags, obj, lock_mode, lock)), ret);
00385         ENV_LEAVE(dbenv, ip);
00386         return (ret);
00387 }
00388 
00389 /*
00390  * __lock_get --
00391  *      DB_ENV->lock_get.
00392  *
00393  * PUBLIC: int __lock_get __P((DB_ENV *,
00394  * PUBLIC:     u_int32_t, u_int32_t, const DBT *, db_lockmode_t, DB_LOCK *));
00395  */
00396 int
00397 __lock_get(dbenv, locker, flags, obj, lock_mode, lock)
00398         DB_ENV *dbenv;
00399         u_int32_t locker, flags;
00400         const DBT *obj;
00401         db_lockmode_t lock_mode;
00402         DB_LOCK *lock;
00403 {
00404         DB_LOCKTAB *lt;
00405         int ret;
00406 
00407         lt = dbenv->lk_handle;
00408 
00409         if (IS_RECOVERING(dbenv)) {
00410                 LOCK_INIT(*lock);
00411                 return (0);
00412         }
00413 
00414         LOCK_SYSTEM_LOCK(dbenv);
00415         ret = __lock_get_internal(lt, locker, flags, obj, lock_mode, 0, lock);
00416         LOCK_SYSTEM_UNLOCK(dbenv);
00417         return (ret);
00418 }
00419 
00420 /*
00421  * __lock_get_internal --
00422  *      All the work for lock_get (and for the GET option of lock_vec) is done
00423  *      inside of lock_get_internal.
00424  *
00425  * PUBLIC: int  __lock_get_internal __P((DB_LOCKTAB *, u_int32_t, u_int32_t,
00426  * PUBLIC:     const DBT *, db_lockmode_t, db_timeout_t, DB_LOCK *));
00427  */
00428 int
00429 __lock_get_internal(lt, locker, flags, obj, lock_mode, timeout, lock)
00430         DB_LOCKTAB *lt;
00431         u_int32_t locker, flags;
00432         const DBT *obj;
00433         db_lockmode_t lock_mode;
00434         db_timeout_t timeout;
00435         DB_LOCK *lock;
00436 {
00437         struct __db_lock *newl, *lp;
00438         DB_ENV *dbenv;
00439         DB_LOCKER *sh_locker;
00440         DB_LOCKOBJ *sh_obj;
00441         DB_LOCKREGION *region;
00442         DB_THREAD_INFO *ip;
00443         u_int32_t holder, locker_ndx, obj_ndx;
00444         int did_abort, ihold, grant_dirty, no_dd, ret, t_ret;
00445 
00446         /*
00447          * We decide what action to take based on what locks are already held
00448          * and what locks are in the wait queue.
00449          */
00450         enum {
00451                 GRANT,          /* Grant the lock. */
00452                 UPGRADE,        /* Upgrade the lock. */
00453                 HEAD,           /* Wait at head of wait queue. */
00454                 SECOND,         /* Wait as the second waiter. */
00455                 TAIL            /* Wait at tail of the wait queue. */
00456         } action;
00457 
00458         dbenv = lt->dbenv;
00459         region = lt->reginfo.primary;
00460 
00461         /* Check if locks have been globally turned off. */
00462         if (F_ISSET(dbenv, DB_ENV_NOLOCKING))
00463                 return (0);
00464 
00465         no_dd = ret = 0;
00466         newl = NULL;
00467 
00468         /* Check that the lock mode is valid.  */
00469         if (lock_mode >= (db_lockmode_t)region->stat.st_nmodes) {
00470                 __db_err(dbenv, "DB_ENV->lock_get: invalid lock mode %lu",
00471                     (u_long)lock_mode);
00472                 return (EINVAL);
00473         }
00474         if (LF_ISSET(DB_LOCK_UPGRADE))
00475                 region->stat.st_nupgrade++;
00476         else if (!LF_ISSET(DB_LOCK_SWITCH))
00477                 region->stat.st_nrequests++;
00478 
00479         if (obj == NULL) {
00480                 DB_ASSERT(LOCK_ISSET(*lock));
00481                 lp = R_ADDR(&lt->reginfo, lock->off);
00482                 sh_obj = (DB_LOCKOBJ *)((u_int8_t *)lp + lp->obj);
00483         } else {
00484                 /* Allocate a shared memory new object. */
00485                 OBJECT_LOCK(lt, region, obj, lock->ndx);
00486                 if ((ret = __lock_getobj(lt, obj, lock->ndx, 1, &sh_obj)) != 0)
00487                         goto err;
00488         }
00489 
00490         /*
00491          * If we are not going to reuse this lock, invalidate it
00492          * so that if we fail it will not look like a valid lock.
00493          */
00494         if (!LF_ISSET(DB_LOCK_UPGRADE | DB_LOCK_SWITCH))
00495                 LOCK_INIT(*lock);
00496 
00497         /* Get the locker, we may need it to find our parent. */
00498         LOCKER_LOCK(lt, region, locker, locker_ndx);
00499         if ((ret = __lock_getlocker(lt, locker,
00500             locker_ndx, locker > DB_LOCK_MAXID ? 1 : 0, &sh_locker)) != 0) {
00501                 /*
00502                  * XXX
00503                  * We cannot tell if we created the object or not, so we don't
00504                  * kow if we should free it or not.
00505                  */
00506                 goto err;
00507         }
00508 
00509         if (sh_locker == NULL) {
00510                 __db_err(dbenv, "Locker does not exist");
00511                 ret = EINVAL;
00512                 goto err;
00513         }
00514 
00515         /*
00516          * Figure out if we can grant this lock or if it should wait.
00517          * By default, we can grant the new lock if it does not conflict with
00518          * anyone on the holders list OR anyone on the waiters list.
00519          * The reason that we don't grant if there's a conflict is that
00520          * this can lead to starvation (a writer waiting on a popularly
00521          * read item will never be granted).  The downside of this is that
00522          * a waiting reader can prevent an upgrade from reader to writer,
00523          * which is not uncommon.
00524          *
00525          * There are two exceptions to the no-conflict rule.  First, if
00526          * a lock is held by the requesting locker AND the new lock does
00527          * not conflict with any other holders, then we grant the lock.
00528          * The most common place this happens is when the holder has a
00529          * WRITE lock and a READ lock request comes in for the same locker.
00530          * If we do not grant the read lock, then we guarantee deadlock.
00531          * Second, dirty readers are granted if at all possible while
00532          * avoiding starvation, see below.
00533          *
00534          * In case of conflict, we put the new lock on the end of the waiters
00535          * list, unless we are upgrading or this is a dirty reader in which
00536          * case the locker goes at or near the front of the list.
00537          */
00538         ihold = 0;
00539         grant_dirty = 0;
00540         holder = 0;
00541 
00542         /*
00543          * SWITCH is a special case, used by the queue access method
00544          * when we want to get an entry which is past the end of the queue.
00545          * We have a DB_READ_LOCK and need to switch it to DB_LOCK_WAIT and
00546          * join the waiters queue.  This must be done as a single operation
00547          * so that another locker cannot get in and fail to wake us up.
00548          */
00549         if (LF_ISSET(DB_LOCK_SWITCH))
00550                 lp = NULL;
00551         else
00552                 lp = SH_TAILQ_FIRST(&sh_obj->holders, __db_lock);
00553         for (; lp != NULL; lp = SH_TAILQ_NEXT(lp, links, __db_lock)) {
00554                 DB_ASSERT(lp->status != DB_LSTAT_FREE);
00555                 if (locker == lp->holder) {
00556                         if (lp->mode == lock_mode &&
00557                             lp->status == DB_LSTAT_HELD) {
00558                                 if (LF_ISSET(DB_LOCK_UPGRADE))
00559                                         goto upgrade;
00560 
00561                                 /*
00562                                  * Lock is held, so we can increment the
00563                                  * reference count and return this lock
00564                                  * to the caller.  We do not count reference
00565                                  * increments towards the locks held by
00566                                  * the locker.
00567                                  */
00568                                 lp->refcount++;
00569                                 lock->off = R_OFFSET(&lt->reginfo, lp);
00570                                 lock->gen = lp->gen;
00571                                 lock->mode = lp->mode;
00572                                 goto done;
00573                         } else {
00574                                 ihold = 1;
00575                         }
00576                 } else if (__lock_is_parent(lt, lp->holder, sh_locker))
00577                         ihold = 1;
00578                 else if (CONFLICTS(lt, region, lp->mode, lock_mode))
00579                         break;
00580                 else if (lp->mode == DB_LOCK_READ ||
00581                      lp->mode == DB_LOCK_WWRITE) {
00582                         grant_dirty = 1;
00583                         holder = lp->holder;
00584                 }
00585         }
00586 
00587         /*
00588          * If there are conflicting holders we will have to wait.  An upgrade
00589          * or dirty reader goes to the head of the queue, everyone else to the
00590          * back.
00591          */
00592         if (lp != NULL) {
00593                 if (LF_ISSET(DB_LOCK_UPGRADE) ||
00594                     lock_mode == DB_LOCK_READ_UNCOMMITTED)
00595                         action = HEAD;
00596                 else
00597                         action = TAIL;
00598         } else {
00599                 if (LF_ISSET(DB_LOCK_SWITCH))
00600                         action = TAIL;
00601                 else if (LF_ISSET(DB_LOCK_UPGRADE))
00602                         action = UPGRADE;
00603                 else  if (ihold)
00604                         action = GRANT;
00605                 else {
00606                         /*
00607                          * Look for conflicting waiters.
00608                          */
00609                         for (lp = SH_TAILQ_FIRST(&sh_obj->waiters, __db_lock);
00610                             lp != NULL;
00611                             lp = SH_TAILQ_NEXT(lp, links, __db_lock)) {
00612                                 if (CONFLICTS(lt, region, lp->mode,
00613                                      lock_mode) && locker != lp->holder)
00614                                         break;
00615                         }
00616                         /*
00617                          * If there are no conflicting holders or waiters,
00618                          * then we grant. Normally when we wait, we
00619                          * wait at the end (TAIL).  However, the goal of
00620                          * DIRTY_READ locks to allow forward progress in the
00621                          * face of updating transactions, so we try to allow
00622                          * all DIRTY_READ requests to proceed as rapidly
00623                          * as possible, so long as we can prevent starvation.
00624                          *
00625                          * When determining how to queue a DIRTY_READ
00626                          * request:
00627                          *
00628                          *      1. If there is a waiting upgrading writer,
00629                          *         then we enqueue the dirty reader BEHIND it
00630                          *         (second in the queue).
00631                          *      2. Else, if the current holders are either
00632                          *         READ or WWRITE, we grant
00633                          *      3. Else queue SECOND i.e., behind the first
00634                          *         waiter.
00635                          *
00636                          * The end result is that dirty_readers get to run
00637                          * so long as other lockers are blocked.  Once
00638                          * there is a locker which is only waiting on
00639                          * dirty readers then they queue up behind that
00640                          * locker so that it gets to run.  In general
00641                          * this locker will be a WRITE which will shortly
00642                          * get downgraded to a WWRITE, permitting the
00643                          * DIRTY locks to be granted.
00644                          */
00645                         if (lp == NULL)
00646                                 action = GRANT;
00647                         else if (grant_dirty &&
00648                             lock_mode == DB_LOCK_READ_UNCOMMITTED) {
00649                                 /*
00650                                  * An upgrade will be at the head of the
00651                                  * queue.
00652                                  */
00653                                 lp = SH_TAILQ_FIRST(
00654                                      &sh_obj->waiters, __db_lock);
00655                                 if (lp->mode == DB_LOCK_WRITE &&
00656                                      lp->holder == holder)
00657                                         action = SECOND;
00658                                 else
00659                                         action = GRANT;
00660                         } else if (lock_mode == DB_LOCK_READ_UNCOMMITTED)
00661                                 action = SECOND;
00662                         else
00663                                 action = TAIL;
00664                 }
00665         }
00666 
00667         switch (action) {
00668         case HEAD:
00669         case TAIL:
00670         case SECOND:
00671         case GRANT:
00672                 /* Allocate a new lock. */
00673                 if ((newl =
00674                     SH_TAILQ_FIRST(&region->free_locks, __db_lock)) == NULL)
00675                         return (__lock_nomem(dbenv, "locks"));
00676                 SH_TAILQ_REMOVE(&region->free_locks, newl, links, __db_lock);
00677 
00678                 /* Update new lock statistics. */
00679                 if (++region->stat.st_nlocks > region->stat.st_maxnlocks)
00680                         region->stat.st_maxnlocks = region->stat.st_nlocks;
00681 
00682                 /*
00683                  * Allocate a mutex if we do not have a mutex backing the lock.
00684                  *
00685                  * Use the lock mutex to block the thread; lock the mutex
00686                  * when it is allocated so that we will block when we try
00687                  * to lock it again.  We will wake up when another thread
00688                  * grants the lock and releases the mutex.  We leave it
00689                  * locked for the next use of this lock object.
00690                  */
00691                 if (newl->mtx_lock == MUTEX_INVALID) {
00692                         if ((ret = __mutex_alloc(dbenv, MTX_LOGICAL_LOCK,
00693                             DB_MUTEX_LOGICAL_LOCK | DB_MUTEX_SELF_BLOCK,
00694                             &newl->mtx_lock)) != 0)
00695                                 goto err;
00696                         MUTEX_LOCK(dbenv, newl->mtx_lock);
00697                 }
00698 
00699                 newl->holder = locker;
00700                 newl->refcount = 1;
00701                 newl->mode = lock_mode;
00702                 newl->obj = (roff_t)SH_PTR_TO_OFF(newl, sh_obj);
00703                 /*
00704                  * Now, insert the lock onto its locker's list.
00705                  * If the locker does not currently hold any locks,
00706                  * there's no reason to run a deadlock
00707                  * detector, save that information.
00708                  */
00709                 no_dd = sh_locker->master_locker == INVALID_ROFF &&
00710                     SH_LIST_FIRST(
00711                     &sh_locker->child_locker, __db_locker) == NULL &&
00712                     SH_LIST_FIRST(&sh_locker->heldby, __db_lock) == NULL;
00713 
00714                 SH_LIST_INSERT_HEAD(
00715                     &sh_locker->heldby, newl, locker_links, __db_lock);
00716                 break;
00717 
00718         case UPGRADE:
00719 upgrade:        lp = R_ADDR(&lt->reginfo, lock->off);
00720                 if (IS_WRITELOCK(lock_mode) && !IS_WRITELOCK(lp->mode))
00721                         sh_locker->nwrites++;
00722                 lp->mode = lock_mode;
00723                 goto done;
00724         }
00725 
00726         switch (action) {
00727         case UPGRADE:
00728                 DB_ASSERT(0);
00729                 break;
00730         case GRANT:
00731                 newl->status = DB_LSTAT_HELD;
00732                 SH_TAILQ_INSERT_TAIL(&sh_obj->holders, newl, links);
00733                 break;
00734         case HEAD:
00735         case TAIL:
00736         case SECOND:
00737                 if (LF_ISSET(DB_LOCK_NOWAIT)) {
00738                         ret = DB_LOCK_NOTGRANTED;
00739                         region->stat.st_lock_nowait++;
00740                         goto err;
00741                 }
00742                 if ((lp = SH_TAILQ_FIRST(&sh_obj->waiters, __db_lock)) == NULL)
00743                         SH_TAILQ_INSERT_HEAD(&region->dd_objs,
00744                                     sh_obj, dd_links, __db_lockobj);
00745                 switch (action) {
00746                 case HEAD:
00747                         SH_TAILQ_INSERT_HEAD(
00748                              &sh_obj->waiters, newl, links, __db_lock);
00749                         break;
00750                 case SECOND:
00751                         SH_TAILQ_INSERT_AFTER(
00752                              &sh_obj->waiters, lp, newl, links, __db_lock);
00753                         break;
00754                 case TAIL:
00755                         SH_TAILQ_INSERT_TAIL(&sh_obj->waiters, newl, links);
00756                         break;
00757                 default:
00758                         DB_ASSERT(0);
00759                 }
00760 
00761                 /* If we are switching drop the lock we had. */
00762                 if (LF_ISSET(DB_LOCK_SWITCH) &&
00763                     (ret = __lock_put_nolock(dbenv,
00764                     lock, &ihold, DB_LOCK_NOWAITERS)) != 0) {
00765                         (void)__lock_remove_waiter(
00766                             lt, sh_obj, newl, DB_LSTAT_FREE);
00767                         goto err;
00768                 }
00769 
00770                 /*
00771                  * First check to see if this txn has expired.
00772                  * If not then see if the lock timeout is past
00773                  * the expiration of the txn, if it is, use
00774                  * the txn expiration time.  lk_expire is passed
00775                  * to avoid an extra call to get the time.
00776                  */
00777                 if (__lock_expired(dbenv,
00778                     &sh_locker->lk_expire, &sh_locker->tx_expire)) {
00779                         newl->status = DB_LSTAT_EXPIRED;
00780                         sh_locker->lk_expire = sh_locker->tx_expire;
00781 
00782                         /* We are done. */
00783                         goto expired;
00784                 }
00785 
00786                 /*
00787                  * If a timeout was specified in this call then it
00788                  * takes priority.  If a lock timeout has been specified
00789                  * for this transaction then use that, otherwise use
00790                  * the global timeout value.
00791                  */
00792                 if (!LF_ISSET(DB_LOCK_SET_TIMEOUT)) {
00793                         if (F_ISSET(sh_locker, DB_LOCKER_TIMEOUT))
00794                                 timeout = sh_locker->lk_timeout;
00795                         else
00796                                 timeout = region->lk_timeout;
00797                 }
00798                 if (timeout != 0)
00799                         __lock_expires(dbenv, &sh_locker->lk_expire, timeout);
00800                 else
00801                         LOCK_SET_TIME_INVALID(&sh_locker->lk_expire);
00802 
00803                 if (LOCK_TIME_ISVALID(&sh_locker->tx_expire) &&
00804                         (timeout == 0 || __lock_expired(dbenv,
00805                             &sh_locker->lk_expire, &sh_locker->tx_expire)))
00806                                 sh_locker->lk_expire = sh_locker->tx_expire;
00807                 if (LOCK_TIME_ISVALID(&sh_locker->lk_expire) &&
00808                     (!LOCK_TIME_ISVALID(&region->next_timeout) ||
00809                     LOCK_TIME_GREATER(
00810                     &region->next_timeout, &sh_locker->lk_expire)))
00811                         region->next_timeout = sh_locker->lk_expire;
00812 
00813                 newl->status = DB_LSTAT_WAITING;
00814                 region->stat.st_lock_wait++;
00815                 /* We are about to block, deadlock detector must run. */
00816                 region->need_dd = 1;
00817 
00818                 LOCK_SYSTEM_UNLOCK(dbenv);
00819 
00820                 /*
00821                  * Before waiting, see if the deadlock detector should run.
00822                  */
00823                 if (region->detect != DB_LOCK_NORUN && !no_dd)
00824                         (void)__lock_detect(dbenv, region->detect, &did_abort);
00825 
00826                 ip = NULL;
00827                 if (dbenv->thr_hashtab != NULL &&
00828                      (ret = __env_set_state(dbenv, &ip, THREAD_BLOCKED)) != 0)
00829                         goto err;
00830                 MUTEX_LOCK(dbenv, newl->mtx_lock);
00831                 if (ip != NULL)
00832                         ip->dbth_state = THREAD_ACTIVE;
00833 
00834                 LOCK_SYSTEM_LOCK(dbenv);
00835 
00836                 /* Turn off lock timeout. */
00837                 if (newl->status != DB_LSTAT_EXPIRED)
00838                         LOCK_SET_TIME_INVALID(&sh_locker->lk_expire);
00839 
00840                 switch (newl->status) {
00841                 case DB_LSTAT_ABORTED:
00842                         ret = DB_LOCK_DEADLOCK;
00843                         goto err;
00844                 case DB_LSTAT_EXPIRED:
00845 expired:                SHOBJECT_LOCK(lt, region, sh_obj, obj_ndx);
00846                         if ((ret = __lock_put_internal(lt, newl,
00847                             obj_ndx, DB_LOCK_UNLINK | DB_LOCK_FREE)) != 0)
00848                                 break;
00849                         if (LOCK_TIME_EQUAL(
00850                             &sh_locker->lk_expire, &sh_locker->tx_expire))
00851                                 region->stat.st_ntxntimeouts++;
00852                         else
00853                                 region->stat.st_nlocktimeouts++;
00854                         return (DB_LOCK_NOTGRANTED);
00855                 case DB_LSTAT_PENDING:
00856                         if (LF_ISSET(DB_LOCK_UPGRADE)) {
00857                                 /*
00858                                  * The lock just granted got put on the holders
00859                                  * list.  Since we're upgrading some other lock,
00860                                  * we've got to remove it here.
00861                                  */
00862                                 SH_TAILQ_REMOVE(
00863                                     &sh_obj->holders, newl, links, __db_lock);
00864                                 /*
00865                                  * Ensure the object is not believed to be on
00866                                  * the object's lists, if we're traversing by
00867                                  * locker.
00868                                  */
00869                                 newl->links.stqe_prev = -1;
00870                                 goto upgrade;
00871                         } else
00872                                 newl->status = DB_LSTAT_HELD;
00873                         break;
00874                 case DB_LSTAT_FREE:
00875                 case DB_LSTAT_HELD:
00876                 case DB_LSTAT_WAITING:
00877                 default:
00878                         __db_err(dbenv,
00879                             "Unexpected lock status: %d", (int)newl->status);
00880                         ret = __db_panic(dbenv, EINVAL);
00881                         goto err;
00882                 }
00883         }
00884 
00885         lock->off = R_OFFSET(&lt->reginfo, newl);
00886         lock->gen = newl->gen;
00887         lock->mode = newl->mode;
00888         sh_locker->nlocks++;
00889         if (IS_WRITELOCK(newl->mode)) {
00890                 sh_locker->nwrites++;
00891                 if (newl->mode == DB_LOCK_WWRITE)
00892                         F_SET(sh_locker, DB_LOCKER_DIRTY);
00893         }
00894 
00895         return (0);
00896 
00897 done:
00898         ret = 0;
00899 err:
00900         if (newl != NULL &&
00901              (t_ret = __lock_freelock(lt, newl, locker,
00902              DB_LOCK_FREE | DB_LOCK_UNLINK)) != 0 && ret == 0)
00903                 ret = t_ret;
00904         return (ret);
00905 }
00906 
00907 /*
00908  * __lock_put_pp --
00909  *      DB_ENV->lock_put pre/post processing.
00910  *
00911  * PUBLIC: int  __lock_put_pp __P((DB_ENV *, DB_LOCK *));
00912  */
00913 int
00914 __lock_put_pp(dbenv, lock)
00915         DB_ENV *dbenv;
00916         DB_LOCK *lock;
00917 {
00918         DB_THREAD_INFO *ip;
00919         int ret;
00920 
00921         PANIC_CHECK(dbenv);
00922         ENV_REQUIRES_CONFIG(dbenv,
00923             dbenv->lk_handle, "DB_LOCK->lock_put", DB_INIT_LOCK);
00924 
00925         ENV_ENTER(dbenv, ip);
00926         REPLICATION_WRAP(dbenv, (__lock_put(dbenv, lock)), ret);
00927         ENV_LEAVE(dbenv, ip);
00928         return (ret);
00929 }
00930 
00931 /*
00932  * __lock_put --
00933  *
00934  * PUBLIC: int  __lock_put __P((DB_ENV *, DB_LOCK *));
00935  *  Internal lock_put interface.
00936  */
00937 int
00938 __lock_put(dbenv, lock)
00939         DB_ENV *dbenv;
00940         DB_LOCK *lock;
00941 {
00942         DB_LOCKTAB *lt;
00943         int ret, run_dd;
00944 
00945         if (IS_RECOVERING(dbenv))
00946                 return (0);
00947 
00948         lt = dbenv->lk_handle;
00949 
00950         LOCK_SYSTEM_LOCK(dbenv);
00951         ret = __lock_put_nolock(dbenv, lock, &run_dd, 0);
00952         LOCK_SYSTEM_UNLOCK(dbenv);
00953 
00954         /*
00955          * Only run the lock detector if put told us to AND we are running
00956          * in auto-detect mode.  If we are not running in auto-detect, then
00957          * a call to lock_detect here will 0 the need_dd bit, but will not
00958          * actually abort anything.
00959          */
00960         if (ret == 0 && run_dd)
00961                 (void)__lock_detect(dbenv,
00962                     ((DB_LOCKREGION *)lt->reginfo.primary)->detect, NULL);
00963         return (ret);
00964 }
00965 
00966 static int
00967 __lock_put_nolock(dbenv, lock, runp, flags)
00968         DB_ENV *dbenv;
00969         DB_LOCK *lock;
00970         int *runp;
00971         u_int32_t flags;
00972 {
00973         struct __db_lock *lockp;
00974         DB_LOCKREGION *region;
00975         DB_LOCKTAB *lt;
00976         int ret;
00977 
00978         /* Check if locks have been globally turned off. */
00979         if (F_ISSET(dbenv, DB_ENV_NOLOCKING))
00980                 return (0);
00981 
00982         lt = dbenv->lk_handle;
00983         region = lt->reginfo.primary;
00984 
00985         lockp = R_ADDR(&lt->reginfo, lock->off);
00986         if (lock->gen != lockp->gen) {
00987                 __db_err(dbenv, __db_lock_invalid, "DB_LOCK->lock_put");
00988                 LOCK_INIT(*lock);
00989                 return (EINVAL);
00990         }
00991 
00992         ret = __lock_put_internal(lt,
00993             lockp, lock->ndx, flags | DB_LOCK_UNLINK | DB_LOCK_FREE);
00994         LOCK_INIT(*lock);
00995 
00996         *runp = 0;
00997         if (ret == 0 && region->detect != DB_LOCK_NORUN &&
00998              (region->need_dd || LOCK_TIME_ISVALID(&region->next_timeout)))
00999                 *runp = 1;
01000 
01001         return (ret);
01002 }
01003 
01004 /*
01005  * __lock_downgrade --
01006  *
01007  * Used to downgrade locks.  Currently this is used in three places: 1) by the
01008  * Concurrent Data Store product to downgrade write locks back to iwrite locks
01009  * and 2) to downgrade write-handle locks to read-handle locks at the end of
01010  * an open/create. 3) To downgrade write locks to was_write to support dirty
01011  * reads.
01012  *
01013  * PUBLIC: int __lock_downgrade __P((DB_ENV *,
01014  * PUBLIC:     DB_LOCK *, db_lockmode_t, u_int32_t));
01015  */
01016 int
01017 __lock_downgrade(dbenv, lock, new_mode, flags)
01018         DB_ENV *dbenv;
01019         DB_LOCK *lock;
01020         db_lockmode_t new_mode;
01021         u_int32_t flags;
01022 {
01023         struct __db_lock *lockp;
01024         DB_LOCKER *sh_locker;
01025         DB_LOCKOBJ *obj;
01026         DB_LOCKREGION *region;
01027         DB_LOCKTAB *lt;
01028         u_int32_t indx;
01029         int ret;
01030 
01031         PANIC_CHECK(dbenv);
01032         ret = 0;
01033 
01034         /* Check if locks have been globally turned off. */
01035         if (F_ISSET(dbenv, DB_ENV_NOLOCKING))
01036                 return (0);
01037 
01038         lt = dbenv->lk_handle;
01039         region = lt->reginfo.primary;
01040 
01041         if (!LF_ISSET(DB_LOCK_NOREGION))
01042                 LOCK_SYSTEM_LOCK(dbenv);
01043 
01044         region->stat.st_ndowngrade++;
01045 
01046         lockp = R_ADDR(&lt->reginfo, lock->off);
01047         if (lock->gen != lockp->gen) {
01048                 __db_err(dbenv, __db_lock_invalid, "lock_downgrade");
01049                 ret = EINVAL;
01050                 goto out;
01051         }
01052 
01053         LOCKER_LOCK(lt, region, lockp->holder, indx);
01054 
01055         if ((ret = __lock_getlocker(lt, lockp->holder,
01056             indx, 0, &sh_locker)) != 0 || sh_locker == NULL) {
01057                 if (ret == 0)
01058                         ret = EINVAL;
01059                 __db_err(dbenv, __db_locker_invalid);
01060                 goto out;
01061         }
01062         if (IS_WRITELOCK(lockp->mode) && !IS_WRITELOCK(new_mode))
01063                 sh_locker->nwrites--;
01064 
01065         lockp->mode = new_mode;
01066         lock->mode = new_mode;
01067 
01068         /* Get the object associated with this lock. */
01069         obj = (DB_LOCKOBJ *)((u_int8_t *)lockp + lockp->obj);
01070         ret = __lock_promote(lt, obj, NULL, LF_ISSET(DB_LOCK_NOWAITERS));
01071 
01072 out:    if (!LF_ISSET(DB_LOCK_NOREGION))
01073                 LOCK_SYSTEM_UNLOCK(dbenv);
01074 
01075         return (ret);
01076 }
01077 
01078 static int
01079 __lock_put_internal(lt, lockp, obj_ndx, flags)
01080         DB_LOCKTAB *lt;
01081         struct __db_lock *lockp;
01082         u_int32_t obj_ndx, flags;
01083 {
01084         DB_LOCKOBJ *sh_obj;
01085         DB_LOCKREGION *region;
01086         int ret, state_changed;
01087 
01088         region = lt->reginfo.primary;
01089         ret = state_changed = 0;
01090 
01091         if (!OBJ_LINKS_VALID(lockp)) {
01092                 /*
01093                  * Someone removed this lock while we were doing a release
01094                  * by locker id.  We are trying to free this lock, but it's
01095                  * already been done; all we need to do is return it to the
01096                  * free list.
01097                  */
01098                 (void)__lock_freelock(lt, lockp, 0, DB_LOCK_FREE);
01099                 return (0);
01100         }
01101 
01102         if (LF_ISSET(DB_LOCK_DOALL))
01103                 region->stat.st_nreleases += lockp->refcount;
01104         else
01105                 region->stat.st_nreleases++;
01106 
01107         if (!LF_ISSET(DB_LOCK_DOALL) && lockp->refcount > 1) {
01108                 lockp->refcount--;
01109                 return (0);
01110         }
01111 
01112         /* Increment generation number. */
01113         lockp->gen++;
01114 
01115         /* Get the object associated with this lock. */
01116         sh_obj = (DB_LOCKOBJ *)((u_int8_t *)lockp + lockp->obj);
01117 
01118         /*
01119          * Remove this lock from its holders/waitlist.  Set its status
01120          * to ABORTED.  It may get freed below, but if not then the
01121          * waiter has been aborted (it will panic if the lock is
01122          * free).
01123          */
01124         if (lockp->status != DB_LSTAT_HELD &&
01125             lockp->status != DB_LSTAT_PENDING) {
01126                 if ((ret = __lock_remove_waiter(
01127                     lt, sh_obj, lockp, DB_LSTAT_ABORTED)) != 0)
01128                         return (ret);
01129         } else {
01130                 SH_TAILQ_REMOVE(&sh_obj->holders, lockp, links, __db_lock);
01131                 lockp->links.stqe_prev = -1;
01132         }
01133 
01134         if (LF_ISSET(DB_LOCK_NOPROMOTE))
01135                 state_changed = 0;
01136         else
01137                 if ((ret = __lock_promote(lt, sh_obj, &state_changed,
01138                     LF_ISSET(DB_LOCK_NOWAITERS))) != 0)
01139                         return (ret);
01140 
01141         /* Check if object should be reclaimed. */
01142         if (SH_TAILQ_FIRST(&sh_obj->holders, __db_lock) == NULL &&
01143             SH_TAILQ_FIRST(&sh_obj->waiters, __db_lock) == NULL) {
01144                 HASHREMOVE_EL(lt->obj_tab,
01145                     obj_ndx, __db_lockobj, links, sh_obj);
01146                 if (sh_obj->lockobj.size > sizeof(sh_obj->objdata))
01147                         __db_shalloc_free(&lt->reginfo,
01148                             SH_DBT_PTR(&sh_obj->lockobj));
01149                 SH_TAILQ_INSERT_HEAD(
01150                     &region->free_objs, sh_obj, links, __db_lockobj);
01151                 region->stat.st_nobjects--;
01152                 state_changed = 1;
01153         }
01154 
01155         /* Free lock. */
01156         if (LF_ISSET(DB_LOCK_UNLINK | DB_LOCK_FREE))
01157                 ret = __lock_freelock(lt, lockp, lockp->holder, flags);
01158 
01159         /*
01160          * If we did not promote anyone; we need to run the deadlock
01161          * detector again.
01162          */
01163         if (state_changed == 0)
01164                 region->need_dd = 1;
01165 
01166         return (ret);
01167 }
01168 
01169 /*
01170  * __lock_freelock --
01171  *      Free a lock.  Unlink it from its locker if necessary.
01172  *
01173  */
01174 static int
01175 __lock_freelock(lt, lockp, locker, flags)
01176         DB_LOCKTAB *lt;
01177         struct __db_lock *lockp;
01178         u_int32_t locker, flags;
01179 {
01180         DB_ENV *dbenv;
01181         DB_LOCKER *sh_locker;
01182         DB_LOCKREGION *region;
01183         u_int32_t indx;
01184         int ret;
01185 
01186         dbenv = lt->dbenv;
01187         region = lt->reginfo.primary;
01188 
01189         if (LF_ISSET(DB_LOCK_UNLINK)) {
01190                 LOCKER_LOCK(lt, region, locker, indx);
01191                 if ((ret = __lock_getlocker(lt,
01192                     locker, indx, 0, &sh_locker)) != 0 || sh_locker == NULL) {
01193                         __db_err(dbenv, __db_locker_invalid);
01194                         return (ret == 0 ? EINVAL : ret);
01195                 }
01196 
01197                 SH_LIST_REMOVE(lockp, locker_links, __db_lock);
01198                 if (lockp->status == DB_LSTAT_HELD) {
01199                         sh_locker->nlocks--;
01200                         if (IS_WRITELOCK(lockp->mode))
01201                                 sh_locker->nwrites--;
01202                 }
01203         }
01204 
01205         if (LF_ISSET(DB_LOCK_FREE)) {
01206                 /*
01207                  * If the lock is not held we cannot be sure of its mutex
01208                  * state so we just destroy it and let it be re-created
01209                  * when needed.
01210                  */
01211                 if (lockp->mtx_lock != MUTEX_INVALID &&
01212                      lockp->status != DB_LSTAT_HELD &&
01213                      lockp->status != DB_LSTAT_EXPIRED &&
01214                      (ret = __mutex_free(dbenv, &lockp->mtx_lock)) != 0)
01215                         return (ret);
01216                 lockp->status = DB_LSTAT_FREE;
01217                 SH_TAILQ_INSERT_HEAD(
01218                     &region->free_locks, lockp, links, __db_lock);
01219                 region->stat.st_nlocks--;
01220         }
01221 
01222         return (0);
01223 }
01224 
01225 /*
01226  * __lock_getobj --
01227  *      Get an object in the object hash table.  The create parameter
01228  * indicates if the object should be created if it doesn't exist in
01229  * the table.
01230  *
01231  * This must be called with the object bucket locked.
01232  */
01233 static int
01234 __lock_getobj(lt, obj, ndx, create, retp)
01235         DB_LOCKTAB *lt;
01236         const DBT *obj;
01237         u_int32_t ndx;
01238         int create;
01239         DB_LOCKOBJ **retp;
01240 {
01241         DB_ENV *dbenv;
01242         DB_LOCKOBJ *sh_obj;
01243         DB_LOCKREGION *region;
01244         int ret;
01245         void *p;
01246 
01247         dbenv = lt->dbenv;
01248         region = lt->reginfo.primary;
01249 
01250         /* Look up the object in the hash table. */
01251         HASHLOOKUP(lt->obj_tab,
01252             ndx, __db_lockobj, links, obj, sh_obj, __lock_cmp);
01253 
01254         /*
01255          * If we found the object, then we can just return it.  If
01256          * we didn't find the object, then we need to create it.
01257          */
01258         if (sh_obj == NULL && create) {
01259                 /* Create new object and then insert it into hash table. */
01260                 if ((sh_obj =
01261                     SH_TAILQ_FIRST(&region->free_objs, __db_lockobj)) == NULL) {
01262                         ret = __lock_nomem(lt->dbenv, "object entries");
01263                         goto err;
01264                 }
01265 
01266                 /*
01267                  * If we can fit this object in the structure, do so instead
01268                  * of shalloc-ing space for it.
01269                  */
01270                 if (obj->size <= sizeof(sh_obj->objdata))
01271                         p = sh_obj->objdata;
01272                 else if ((ret =
01273                     __db_shalloc(&lt->reginfo, obj->size, 0, &p)) != 0) {
01274                         __db_err(dbenv, "No space for lock object storage");
01275                         goto err;
01276                 }
01277 
01278                 memcpy(p, obj->data, obj->size);
01279 
01280                 SH_TAILQ_REMOVE(
01281                     &region->free_objs, sh_obj, links, __db_lockobj);
01282                 if (++region->stat.st_nobjects > region->stat.st_maxnobjects)
01283                         region->stat.st_maxnobjects = region->stat.st_nobjects;
01284 
01285                 SH_TAILQ_INIT(&sh_obj->waiters);
01286                 SH_TAILQ_INIT(&sh_obj->holders);
01287                 sh_obj->lockobj.size = obj->size;
01288                 sh_obj->lockobj.off =
01289                     (roff_t)SH_PTR_TO_OFF(&sh_obj->lockobj, p);
01290 
01291                 HASHINSERT(lt->obj_tab, ndx, __db_lockobj, links, sh_obj);
01292         }
01293 
01294         *retp = sh_obj;
01295         return (0);
01296 
01297 err:    return (ret);
01298 }
01299 
01300 /*
01301  * __lock_is_parent --
01302  *      Given a locker and a transaction, return 1 if the locker is
01303  * an ancestor of the designated transaction.  This is used to determine
01304  * if we should grant locks that appear to conflict, but don't because
01305  * the lock is already held by an ancestor.
01306  */
01307 static int
01308 __lock_is_parent(lt, locker, sh_locker)
01309         DB_LOCKTAB *lt;
01310         u_int32_t locker;
01311         DB_LOCKER *sh_locker;
01312 {
01313         DB_LOCKER *parent;
01314 
01315         parent = sh_locker;
01316         while (parent->parent_locker != INVALID_ROFF) {
01317                 parent = R_ADDR(&lt->reginfo, parent->parent_locker);
01318                 if (parent->id == locker)
01319                         return (1);
01320         }
01321 
01322         return (0);
01323 }
01324 
01325 /*
01326  * __lock_locker_is_parent --
01327  *      Determine if "locker" is an ancestor of "child".
01328  * *retp == 1 if so, 0 otherwise.
01329  *
01330  * PUBLIC: int __lock_locker_is_parent
01331  * PUBLIC:     __P((DB_ENV *, u_int32_t, u_int32_t, int *));
01332  */
01333 int
01334 __lock_locker_is_parent(dbenv, locker, child, retp)
01335         DB_ENV *dbenv;
01336         u_int32_t locker, child;
01337         int *retp;
01338 {
01339         DB_LOCKER *sh_locker;
01340         DB_LOCKREGION *region;
01341         DB_LOCKTAB *lt;
01342         u_int32_t locker_ndx;
01343         int ret;
01344 
01345         lt = dbenv->lk_handle;
01346         region = lt->reginfo.primary;
01347 
01348         LOCKER_LOCK(lt, region, child, locker_ndx);
01349         if ((ret =
01350             __lock_getlocker(lt, child, locker_ndx, 0, &sh_locker)) != 0) {
01351                 __db_err(dbenv, __db_locker_invalid);
01352                 return (ret);
01353         }
01354 
01355         /*
01356          * The locker may not exist for this transaction, if not then it has
01357          * no parents.
01358          */
01359         if (sh_locker == NULL)
01360                 *retp = 0;
01361         else
01362                 *retp = __lock_is_parent(lt, locker, sh_locker);
01363         return (0);
01364 }
01365 
01366 /*
01367  * __lock_inherit_locks --
01368  *      Called on child commit to merge child's locks with parent's.
01369  */
01370 static int
01371 __lock_inherit_locks(lt, locker, flags)
01372         DB_LOCKTAB *lt;
01373         u_int32_t locker;
01374         u_int32_t flags;
01375 {
01376         DB_ENV *dbenv;
01377         DB_LOCKER *sh_locker, *sh_parent;
01378         DB_LOCKOBJ *obj;
01379         DB_LOCKREGION *region;
01380         int ret;
01381         struct __db_lock *hlp, *lp;
01382         u_int32_t ndx;
01383 
01384         region = lt->reginfo.primary;
01385         dbenv = lt->dbenv;
01386 
01387         /*
01388          * Get the committing locker and mark it as deleted.
01389          * This allows us to traverse the locker links without
01390          * worrying that someone else is deleting locks out
01391          * from under us.  However, if the locker doesn't
01392          * exist, that just means that the child holds no
01393          * locks, so inheritance is easy!
01394          */
01395         LOCKER_LOCK(lt, region, locker, ndx);
01396         if ((ret = __lock_getlocker(lt,
01397             locker, ndx, 0, &sh_locker)) != 0 ||
01398             sh_locker == NULL ||
01399             F_ISSET(sh_locker, DB_LOCKER_DELETED)) {
01400                 if (ret == 0 && sh_locker != NULL)
01401                         ret = EINVAL;
01402                 __db_err(dbenv, __db_locker_invalid);
01403                 return (ret);
01404         }
01405 
01406         /* Make sure we are a child transaction. */
01407         if (sh_locker->parent_locker == INVALID_ROFF) {
01408                 __db_err(dbenv, "Not a child transaction");
01409                 return (EINVAL);
01410         }
01411         sh_parent = R_ADDR(&lt->reginfo, sh_locker->parent_locker);
01412         F_SET(sh_locker, DB_LOCKER_DELETED);
01413 
01414         /*
01415          * Now, lock the parent locker; move locks from
01416          * the committing list to the parent's list.
01417          */
01418         LOCKER_LOCK(lt, region, locker, ndx);
01419         if (F_ISSET(sh_parent, DB_LOCKER_DELETED)) {
01420                 if (ret == 0) {
01421                         __db_err(dbenv,
01422                             "Parent locker is not valid");
01423                         ret = EINVAL;
01424                 }
01425                 return (ret);
01426         }
01427 
01428         /*
01429          * In order to make it possible for a parent to have
01430          * many, many children who lock the same objects, and
01431          * not require an inordinate number of locks, we try
01432          * to merge the child's locks with its parent's.
01433          */
01434         for (lp = SH_LIST_FIRST(&sh_locker->heldby, __db_lock);
01435             lp != NULL;
01436             lp = SH_LIST_FIRST(&sh_locker->heldby, __db_lock)) {
01437                 SH_LIST_REMOVE(lp, locker_links, __db_lock);
01438 
01439                 /* See if the parent already has a lock. */
01440                 obj = (DB_LOCKOBJ *)((u_int8_t *)lp + lp->obj);
01441                 for (hlp = SH_TAILQ_FIRST(&obj->holders, __db_lock);
01442                     hlp != NULL;
01443                     hlp = SH_TAILQ_NEXT(hlp, links, __db_lock))
01444                         if (hlp->holder == sh_parent->id &&
01445                             lp->mode == hlp->mode)
01446                                 break;
01447 
01448                 if (hlp != NULL) {
01449                         /* Parent already holds lock. */
01450                         hlp->refcount += lp->refcount;
01451 
01452                         /* Remove lock from object list and free it. */
01453                         DB_ASSERT(lp->status == DB_LSTAT_HELD);
01454                         SH_TAILQ_REMOVE(&obj->holders, lp, links, __db_lock);
01455                         (void)__lock_freelock(lt, lp, locker, DB_LOCK_FREE);
01456                 } else {
01457                         /* Just move lock to parent chains. */
01458                         SH_LIST_INSERT_HEAD(&sh_parent->heldby,
01459                             lp, locker_links, __db_lock);
01460                         lp->holder = sh_parent->id;
01461                 }
01462 
01463                 /*
01464                  * We may need to promote regardless of whether we simply
01465                  * moved the lock to the parent or changed the parent's
01466                  * reference count, because there might be a sibling waiting,
01467                  * who will now be allowed to make forward progress.
01468                  */
01469                 if ((ret = __lock_promote(
01470                     lt, obj, NULL, LF_ISSET(DB_LOCK_NOWAITERS))) != 0)
01471                         return (ret);
01472         }
01473 
01474         /* Transfer child counts to parent. */
01475         sh_parent->nlocks += sh_locker->nlocks;
01476         sh_parent->nwrites += sh_locker->nwrites;
01477 
01478         return (ret);
01479 }
01480 
01481 /*
01482  * __lock_promote --
01483  *
01484  * Look through the waiters and holders lists and decide which (if any)
01485  * locks can be promoted.   Promote any that are eligible.
01486  *
01487  * PUBLIC: int __lock_promote
01488  * PUBLIC:    __P((DB_LOCKTAB *, DB_LOCKOBJ *, int *, u_int32_t));
01489  */
01490 int
01491 __lock_promote(lt, obj, state_changedp, flags)
01492         DB_LOCKTAB *lt;
01493         DB_LOCKOBJ *obj;
01494         int *state_changedp;
01495         u_int32_t flags;
01496 {
01497         struct __db_lock *lp_w, *lp_h, *next_waiter;
01498         DB_LOCKER *sh_locker;
01499         DB_LOCKREGION *region;
01500         u_int32_t locker_ndx;
01501         int had_waiters, state_changed;
01502 
01503         region = lt->reginfo.primary;
01504         had_waiters = 0;
01505 
01506         /*
01507          * We need to do lock promotion.  We also need to determine if we're
01508          * going to need to run the deadlock detector again.  If we release
01509          * locks, and there are waiters, but no one gets promoted, then we
01510          * haven't fundamentally changed the lockmgr state, so we may still
01511          * have a deadlock and we have to run again.  However, if there were
01512          * no waiters, or we actually promoted someone, then we are OK and we
01513          * don't have to run it immediately.
01514          *
01515          * During promotion, we look for state changes so we can return this
01516          * information to the caller.
01517          */
01518 
01519         for (lp_w = SH_TAILQ_FIRST(&obj->waiters, __db_lock),
01520             state_changed = lp_w == NULL;
01521             lp_w != NULL;
01522             lp_w = next_waiter) {
01523                 had_waiters = 1;
01524                 next_waiter = SH_TAILQ_NEXT(lp_w, links, __db_lock);
01525 
01526                 /* Waiter may have aborted or expired. */
01527                 if (lp_w->status != DB_LSTAT_WAITING)
01528                         continue;
01529                 /* Are we switching locks? */
01530                 if (LF_ISSET(DB_LOCK_NOWAITERS) && lp_w->mode == DB_LOCK_WAIT)
01531                         continue;
01532 
01533                 for (lp_h = SH_TAILQ_FIRST(&obj->holders, __db_lock);
01534                     lp_h != NULL;
01535                     lp_h = SH_TAILQ_NEXT(lp_h, links, __db_lock)) {
01536                         if (lp_h->holder != lp_w->holder &&
01537                             CONFLICTS(lt, region, lp_h->mode, lp_w->mode)) {
01538                                 LOCKER_LOCK(lt,
01539                                     region, lp_w->holder, locker_ndx);
01540                                 if ((__lock_getlocker(lt, lp_w->holder,
01541                                     locker_ndx, 0, &sh_locker)) != 0) {
01542                                         __db_err(lt->dbenv,
01543                                            "Locker %#lx missing",
01544                                            (u_long)lp_w->holder);
01545                                         return (__db_panic(lt->dbenv, EINVAL));
01546                                 }
01547                                 if (!__lock_is_parent(lt,
01548                                     lp_h->holder, sh_locker))
01549                                         break;
01550                         }
01551                 }
01552                 if (lp_h != NULL)       /* Found a conflict. */
01553                         break;
01554 
01555                 /* No conflict, promote the waiting lock. */
01556                 SH_TAILQ_REMOVE(&obj->waiters, lp_w, links, __db_lock);
01557                 lp_w->status = DB_LSTAT_PENDING;
01558                 SH_TAILQ_INSERT_TAIL(&obj->holders, lp_w, links);
01559 
01560                 /* Wake up waiter. */
01561                 MUTEX_UNLOCK(lt->dbenv, lp_w->mtx_lock);
01562                 state_changed = 1;
01563         }
01564 
01565         /*
01566          * If this object had waiters and doesn't any more, then we need
01567          * to remove it from the dd_obj list.
01568          */
01569         if (had_waiters && SH_TAILQ_FIRST(&obj->waiters, __db_lock) == NULL)
01570                 SH_TAILQ_REMOVE(&region->dd_objs, obj, dd_links, __db_lockobj);
01571 
01572         if (state_changedp != NULL)
01573                 *state_changedp = state_changed;
01574 
01575         return (0);
01576 }
01577 
01578 /*
01579  * __lock_remove_waiter --
01580  *      Any lock on the waitlist has a process waiting for it.  Therefore,
01581  * we can't return the lock to the freelist immediately.  Instead, we can
01582  * remove the lock from the list of waiters, set the status field of the
01583  * lock, and then let the process waking up return the lock to the
01584  * free list.
01585  *
01586  * This must be called with the Object bucket locked.
01587  */
01588 static int
01589 __lock_remove_waiter(lt, sh_obj, lockp, status)
01590         DB_LOCKTAB *lt;
01591         DB_LOCKOBJ *sh_obj;
01592         struct __db_lock *lockp;
01593         db_status_t status;
01594 {
01595         DB_LOCKREGION *region;
01596         int do_wakeup;
01597 
01598         region = lt->reginfo.primary;
01599 
01600         do_wakeup = lockp->status == DB_LSTAT_WAITING;
01601 
01602         SH_TAILQ_REMOVE(&sh_obj->waiters, lockp, links, __db_lock);
01603         lockp->links.stqe_prev = -1;
01604         lockp->status = status;
01605         if (SH_TAILQ_FIRST(&sh_obj->waiters, __db_lock) == NULL)
01606                 SH_TAILQ_REMOVE(
01607                     &region->dd_objs,
01608                     sh_obj, dd_links, __db_lockobj);
01609 
01610         /*
01611          * Wake whoever is waiting on this lock.
01612          */
01613         if (do_wakeup)
01614                 MUTEX_UNLOCK(lt->dbenv, lockp->mtx_lock);
01615 
01616         return (0);
01617 }
01618 
01619 /*
01620  * __lock_trade --
01621  *
01622  * Trade locker ids on a lock.  This is used to reassign file locks from
01623  * a transactional locker id to a long-lived locker id.  This should be
01624  * called with the region mutex held.
01625  */
01626 static int
01627 __lock_trade(dbenv, lock, new_locker)
01628         DB_ENV *dbenv;
01629         DB_LOCK *lock;
01630         u_int32_t new_locker;
01631 {
01632         struct __db_lock *lp;
01633         DB_LOCKREGION *region;
01634         DB_LOCKTAB *lt;
01635         DB_LOCKER *sh_locker;
01636         int ret;
01637         u_int32_t locker_ndx;
01638 
01639         lt = dbenv->lk_handle;
01640         region = lt->reginfo.primary;
01641         lp = R_ADDR(&lt->reginfo, lock->off);
01642 
01643         /* If the lock is already released, simply return. */
01644         if (lp->gen != lock->gen)
01645                 return (DB_NOTFOUND);
01646 
01647         /* Make sure that we can get new locker and add this lock to it. */
01648         LOCKER_LOCK(lt, region, new_locker, locker_ndx);
01649         if ((ret =
01650             __lock_getlocker(lt, new_locker, locker_ndx, 0, &sh_locker)) != 0)
01651                 return (ret);
01652 
01653         if (sh_locker == NULL) {
01654                 __db_err(dbenv, "Locker does not exist");
01655                 return (EINVAL);
01656         }
01657 
01658         /* Remove the lock from its current locker. */
01659         if ((ret = __lock_freelock(lt, lp, lp->holder, DB_LOCK_UNLINK)) != 0)
01660                 return (ret);
01661 
01662         /* Add lock to its new locker. */
01663         SH_LIST_INSERT_HEAD(&sh_locker->heldby, lp, locker_links, __db_lock);
01664         sh_locker->nlocks++;
01665         if (IS_WRITELOCK(lp->mode))
01666                 sh_locker->nwrites++;
01667         lp->holder = new_locker;
01668 
01669         return (0);
01670 }

Generated on Sun Dec 25 12:14:40 2005 for Berkeley DB 4.4.16 by  doxygen 1.4.2