Main Page | Class Hierarchy | Data Structures | Directories | File List | Data Fields | Related Pages

lock_deadlock.c

00001 /*-
00002  * See the file LICENSE for redistribution information.
00003  *
00004  * Copyright (c) 1996-2005
00005  *      Sleepycat Software.  All rights reserved.
00006  *
00007  * $Id: lock_deadlock.c,v 12.10 2005/10/07 20:21:30 ubell Exp $
00008  */
00009 
00010 #include "db_config.h"
00011 
00012 #ifndef NO_SYSTEM_INCLUDES
00013 #include <sys/types.h>
00014 
00015 #include <string.h>
00016 #endif
00017 
00018 #include "db_int.h"
00019 #include "dbinc/db_shash.h"
00020 #include "dbinc/lock.h"
00021 #include "dbinc/log.h"
00022 #include "dbinc/txn.h"
00023 
00024 #define ISSET_MAP(M, N) ((M)[(N) / 32] & (1 << ((N) % 32)))
00025 
00026 #define CLEAR_MAP(M, N) {                                               \
00027         u_int32_t __i;                                                  \
00028         for (__i = 0; __i < (N); __i++)                                 \
00029                 (M)[__i] = 0;                                           \
00030 }
00031 
00032 #define SET_MAP(M, B)   ((M)[(B) / 32] |= (1 << ((B) % 32)))
00033 #define CLR_MAP(M, B)   ((M)[(B) / 32] &= ~((u_int)1 << ((B) % 32)))
00034 
00035 #define OR_MAP(D, S, N) {                                               \
00036         u_int32_t __i;                                                  \
00037         for (__i = 0; __i < (N); __i++)                                 \
00038                 D[__i] |= S[__i];                                       \
00039 }
00040 #define BAD_KILLID      0xffffffff
00041 
00042 typedef struct {
00043         int             valid;
00044         int             self_wait;
00045         int             in_abort;
00046         u_int32_t       count;
00047         u_int32_t       id;
00048         roff_t          last_lock;
00049         roff_t          last_obj;
00050         u_int32_t       last_locker_id;
00051         db_pgno_t       pgno;
00052 } locker_info;
00053 
00054 static int __dd_abort __P((DB_ENV *, locker_info *, int *));
00055 static int __dd_build __P((DB_ENV *,
00056             u_int32_t, u_int32_t **, u_int32_t *, u_int32_t *, locker_info **));
00057 static int __dd_find __P((DB_ENV *,
00058             u_int32_t *, locker_info *, u_int32_t, u_int32_t, u_int32_t ***));
00059 static int __dd_isolder __P((u_int32_t, u_int32_t, u_int32_t, u_int32_t));
00060 static int __dd_verify __P((locker_info *, u_int32_t *, u_int32_t *,
00061             u_int32_t *, u_int32_t, u_int32_t, u_int32_t));
00062 
00063 #ifdef DIAGNOSTIC
00064 static void __dd_debug
00065             __P((DB_ENV *, locker_info *, u_int32_t *, u_int32_t, u_int32_t));
00066 #endif
00067 
00068 /*
00069  * __lock_detect_pp --
00070  *      DB_ENV->lock_detect pre/post processing.
00071  *
00072  * PUBLIC: int __lock_detect_pp __P((DB_ENV *, u_int32_t, u_int32_t, int *));
00073  */
00074 int
00075 __lock_detect_pp(dbenv, flags, atype, abortp)
00076         DB_ENV *dbenv;
00077         u_int32_t flags, atype;
00078         int *abortp;
00079 {
00080         DB_THREAD_INFO *ip;
00081         int ret;
00082 
00083         PANIC_CHECK(dbenv);
00084         ENV_REQUIRES_CONFIG(dbenv,
00085             dbenv->lk_handle, "DB_ENV->lock_detect", DB_INIT_LOCK);
00086 
00087         /* Validate arguments. */
00088         if ((ret = __db_fchk(dbenv, "DB_ENV->lock_detect", flags, 0)) != 0)
00089                 return (ret);
00090         switch (atype) {
00091         case DB_LOCK_DEFAULT:
00092         case DB_LOCK_EXPIRE:
00093         case DB_LOCK_MAXLOCKS:
00094         case DB_LOCK_MAXWRITE:
00095         case DB_LOCK_MINLOCKS:
00096         case DB_LOCK_MINWRITE:
00097         case DB_LOCK_OLDEST:
00098         case DB_LOCK_RANDOM:
00099         case DB_LOCK_YOUNGEST:
00100                 break;
00101         default:
00102                 __db_err(dbenv,
00103             "DB_ENV->lock_detect: unknown deadlock detection mode specified");
00104                 return (EINVAL);
00105         }
00106 
00107         ENV_ENTER(dbenv, ip);
00108         REPLICATION_WRAP(dbenv, (__lock_detect(dbenv, atype, abortp)), ret);
00109         ENV_LEAVE(dbenv, ip);
00110         return (ret);
00111 }
00112 
00113 /*
00114  * __lock_detect --
00115  *      DB_ENV->lock_detect.
00116  *
00117  * PUBLIC: int __lock_detect __P((DB_ENV *, u_int32_t, int *));
00118  */
00119 int
00120 __lock_detect(dbenv, atype, abortp)
00121         DB_ENV *dbenv;
00122         u_int32_t atype;
00123         int *abortp;
00124 {
00125         DB_LOCKREGION *region;
00126         DB_LOCKTAB *lt;
00127         db_timeval_t now;
00128         locker_info *idmap;
00129         u_int32_t *bitmap, *copymap, **deadp, **free_me, *tmpmap;
00130         u_int32_t i, cid, keeper, killid, limit, nalloc, nlockers;
00131         u_int32_t lock_max, txn_max;
00132         int ret, status;
00133 
00134         /*
00135          * If this environment is a replication client, then we must use the
00136          * MINWRITE detection discipline.
00137          */
00138         if (__rep_is_client(dbenv))
00139                 atype = DB_LOCK_MINWRITE;
00140 
00141         free_me = NULL;
00142 
00143         lt = dbenv->lk_handle;
00144         if (abortp != NULL)
00145                 *abortp = 0;
00146 
00147         /* Check if a detector run is necessary. */
00148         LOCK_SYSTEM_LOCK(dbenv);
00149 
00150         /* Make a pass only if auto-detect would run. */
00151         region = lt->reginfo.primary;
00152 
00153         LOCK_SET_TIME_INVALID(&now);
00154         if (region->need_dd == 0 &&
00155              (!LOCK_TIME_ISVALID(&region->next_timeout) ||
00156              !__lock_expired(dbenv, &now, &region->next_timeout))) {
00157                 LOCK_SYSTEM_UNLOCK(dbenv);
00158                 return (0);
00159         }
00160         if (region->need_dd == 0)
00161                 atype = DB_LOCK_EXPIRE;
00162 
00163         /* Reset need_dd, so we know we've run the detector. */
00164         region->need_dd = 0;
00165 
00166         /* Build the waits-for bitmap. */
00167         ret = __dd_build(dbenv, atype, &bitmap, &nlockers, &nalloc, &idmap);
00168         lock_max = region->stat.st_cur_maxid;
00169         LOCK_SYSTEM_UNLOCK(dbenv);
00170         if (ret != 0 || atype == DB_LOCK_EXPIRE)
00171                 return (ret);
00172 
00173         /* If there are no lockers, there are no deadlocks. */
00174         if (nlockers == 0)
00175                 return (0);
00176 
00177 #ifdef DIAGNOSTIC
00178         if (FLD_ISSET(dbenv->verbose, DB_VERB_WAITSFOR))
00179                 __dd_debug(dbenv, idmap, bitmap, nlockers, nalloc);
00180 #endif
00181 
00182         /* Now duplicate the bitmaps so we can verify deadlock participants. */
00183         if ((ret = __os_calloc(dbenv, (size_t)nlockers,
00184             sizeof(u_int32_t) * nalloc, &copymap)) != 0)
00185                 goto err;
00186         memcpy(copymap, bitmap, nlockers * sizeof(u_int32_t) * nalloc);
00187 
00188         if ((ret = __os_calloc(dbenv, sizeof(u_int32_t), nalloc, &tmpmap)) != 0)
00189                 goto err1;
00190 
00191         /* Find a deadlock. */
00192         if ((ret =
00193             __dd_find(dbenv, bitmap, idmap, nlockers, nalloc, &deadp)) != 0)
00194                 return (ret);
00195 
00196         /*
00197          * We need the cur_maxid from the txn region as well.  In order
00198          * to avoid tricky synchronization between the lock and txn
00199          * regions, we simply unlock the lock region and then lock the
00200          * txn region.  This introduces a small window during which the
00201          * transaction system could then wrap.  We're willing to return
00202          * the wrong answer for "oldest" or "youngest" in those rare
00203          * circumstances.
00204          */
00205         if (TXN_ON(dbenv)) {
00206                 TXN_SYSTEM_LOCK(dbenv);
00207                 txn_max = ((DB_TXNREGION *)((DB_TXNMGR *)
00208                     dbenv->tx_handle)->reginfo.primary)->cur_maxid;
00209                 TXN_SYSTEM_UNLOCK(dbenv);
00210         } else
00211                 txn_max = TXN_MAXIMUM;
00212 
00213         killid = BAD_KILLID;
00214         free_me = deadp;
00215         for (; *deadp != NULL; deadp++) {
00216                 if (abortp != NULL)
00217                         ++*abortp;
00218                 killid = (u_int32_t)(*deadp - bitmap) / nalloc;
00219                 limit = killid;
00220 
00221                 /*
00222                  * There are cases in which our general algorithm will
00223                  * fail.  Returning 1 from verify indicates that the
00224                  * particular locker is not only involved in a deadlock,
00225                  * but that killing him will allow others to make forward
00226                  * progress.  Unfortunately, there are cases where we need
00227                  * to abort someone, but killing them will not necessarily
00228                  * ensure forward progress (imagine N readers all trying to
00229                  * acquire a write lock).
00230                  * killid is only set to lockers that pass the db_verify test.
00231                  * keeper will hold the best candidate even if it does
00232                  * not pass db_verify.  Once we fill in killid then we do
00233                  * not need a keeper, but we keep updating it anyway.
00234                  */
00235 
00236                 keeper = idmap[killid].in_abort == 0 ? killid : BAD_KILLID;
00237                 if (keeper == BAD_KILLID ||
00238                     __dd_verify(idmap, *deadp,
00239                     tmpmap, copymap, nlockers, nalloc, keeper) == 0)
00240                         killid = BAD_KILLID;
00241 
00242                 if (killid != BAD_KILLID &&
00243                     (atype == DB_LOCK_DEFAULT || atype == DB_LOCK_RANDOM))
00244                         goto dokill;
00245 
00246                 /*
00247                  * Start with the id that we know is deadlocked, then examine
00248                  * all other set bits and see if any are a better candidate
00249                  * for abortion and they are genuinely part of the deadlock.
00250                  * The definition of "best":
00251                  *      MAXLOCKS: maximum count
00252                  *      MAXWRITE: maximum write count
00253                  *      MINLOCKS: minimum count
00254                  *      MINWRITE: minimum write count
00255                  *      OLDEST: smallest id
00256                  *      YOUNGEST: largest id
00257                  */
00258                 for (i = (limit + 1) % nlockers;
00259                     i != limit;
00260                     i = (i + 1) % nlockers) {
00261                         if (!ISSET_MAP(*deadp, i) || idmap[i].in_abort)
00262                                 continue;
00263 
00264                         /*
00265                          * Determine if we have a verified candidate
00266                          * in killid, if not then compare with the
00267                          * non-verified candidate in keeper.
00268                          */
00269                         if (killid == BAD_KILLID) {
00270                                 if (keeper == BAD_KILLID)
00271                                         goto use_next;
00272                                 else
00273                                         cid = keeper;
00274                         } else
00275                                 cid = killid;
00276 
00277                         switch (atype) {
00278                         case DB_LOCK_OLDEST:
00279                                 if (__dd_isolder(idmap[cid].id,
00280                                     idmap[i].id, lock_max, txn_max))
00281                                         continue;
00282                                 break;
00283                         case DB_LOCK_YOUNGEST:
00284                                 if (__dd_isolder(idmap[i].id,
00285                                     idmap[cid].id, lock_max, txn_max))
00286                                         continue;
00287                                 break;
00288                         case DB_LOCK_MAXLOCKS:
00289                                 if (idmap[i].count < idmap[cid].count)
00290                                         continue;
00291                                 break;
00292                         case DB_LOCK_MAXWRITE:
00293                                 if (idmap[i].count < idmap[cid].count)
00294                                         continue;
00295                                 break;
00296                         case DB_LOCK_MINLOCKS:
00297                         case DB_LOCK_MINWRITE:
00298                                 if (idmap[i].count > idmap[cid].count)
00299                                         continue;
00300                                 break;
00301                         case DB_LOCK_DEFAULT:
00302                         case DB_LOCK_RANDOM:
00303                                 goto dokill;
00304 
00305                         default:
00306                                 killid = BAD_KILLID;
00307                                 ret = EINVAL;
00308                                 goto dokill;
00309                         }
00310 
00311 use_next:               keeper = i;
00312                         if (__dd_verify(idmap, *deadp,
00313                             tmpmap, copymap, nlockers, nalloc, i))
00314                                 killid = i;
00315                 }
00316 
00317 dokill:         if (killid == BAD_KILLID) {
00318                         if (keeper == BAD_KILLID)
00319                                 /*
00320                                  * It's conceivable that under XA, the
00321                                  * locker could have gone away.
00322                                  */
00323                                 continue;
00324                         else {
00325                                 /*
00326                                  * Removing a single locker will not
00327                                  * break the deadlock, signal to run
00328                                  * detection again.
00329                                  */
00330                                 LOCK_SYSTEM_LOCK(dbenv);
00331                                 region->need_dd = 1;
00332                                 LOCK_SYSTEM_UNLOCK(dbenv);
00333                                 killid = keeper;
00334                         }
00335                 }
00336 
00337                 /* Kill the locker with lockid idmap[killid]. */
00338                 if ((ret = __dd_abort(dbenv, &idmap[killid], &status)) != 0)
00339                         break;
00340 
00341                 /*
00342                  * It's possible that the lock was already aborted; this isn't
00343                  * necessarily a problem, so do not treat it as an error.
00344                  */
00345                 if (status != 0) {
00346                         if (status != DB_ALREADY_ABORTED)
00347                                 __db_err(dbenv,
00348                                     "warning: unable to abort locker %lx",
00349                                     (u_long)idmap[killid].id);
00350                 } else if (FLD_ISSET(dbenv->verbose, DB_VERB_DEADLOCK))
00351                         __db_msg(dbenv,
00352                             "Aborting locker %lx", (u_long)idmap[killid].id);
00353         }
00354         __os_free(dbenv, tmpmap);
00355 err1:   __os_free(dbenv, copymap);
00356 
00357 err:    if (free_me != NULL)
00358                 __os_free(dbenv, free_me);
00359         __os_free(dbenv, bitmap);
00360         __os_free(dbenv, idmap);
00361 
00362         return (ret);
00363 }
00364 
00365 /*
00366  * ========================================================================
00367  * Utilities
00368  */
00369 
00370 #define DD_INVALID_ID   ((u_int32_t) -1)
00371 
00372 static int
00373 __dd_build(dbenv, atype, bmp, nlockers, allocp, idmap)
00374         DB_ENV *dbenv;
00375         u_int32_t atype, **bmp, *nlockers, *allocp;
00376         locker_info **idmap;
00377 {
00378         struct __db_lock *lp;
00379         DB_LOCKER *lip, *lockerp, *child;
00380         DB_LOCKOBJ *op, *lo;
00381         DB_LOCKREGION *region;
00382         DB_LOCKTAB *lt;
00383         locker_info *id_array;
00384         db_timeval_t now, min_timeout;
00385         u_int32_t *bitmap, count, dd, *entryp, id, ndx, nentries, *tmpmap;
00386         u_int8_t *pptr;
00387         int is_first, ret;
00388 
00389         lt = dbenv->lk_handle;
00390         region = lt->reginfo.primary;
00391         LOCK_SET_TIME_INVALID(&now);
00392         LOCK_SET_TIME_MAX(&min_timeout);
00393 
00394         /*
00395          * While we always check for expired timeouts, if we are called with
00396          * DB_LOCK_EXPIRE, then we are only checking for timeouts (i.e., not
00397          * doing deadlock detection at all).  If we aren't doing real deadlock
00398          * detection, then we can skip a significant, amount of the processing.
00399          * In particular we do not build the conflict array and our caller
00400          * needs to expect this.
00401          */
00402         if (atype == DB_LOCK_EXPIRE) {
00403                 for (op = SH_TAILQ_FIRST(&region->dd_objs, __db_lockobj);
00404                     op != NULL;
00405                     op = SH_TAILQ_NEXT(op, dd_links, __db_lockobj))
00406                         for (lp = SH_TAILQ_FIRST(&op->waiters, __db_lock);
00407                             lp != NULL;
00408                             lp = SH_TAILQ_NEXT(lp, links, __db_lock)) {
00409                                 LOCKER_LOCK(lt, region, lp->holder, ndx);
00410                                 if ((ret = __lock_getlocker(lt,
00411                                     lp->holder, ndx, 0, &lockerp)) != 0)
00412                                         continue;
00413                                 if (lp->status == DB_LSTAT_WAITING) {
00414                                         if (__lock_expired(dbenv,
00415                                             &now, &lockerp->lk_expire)) {
00416                                                 lp->status = DB_LSTAT_EXPIRED;
00417                                                 MUTEX_UNLOCK(
00418                                                     dbenv, lp->mtx_lock);
00419                                                 continue;
00420                                         }
00421                                         if (LOCK_TIME_GREATER(
00422                                             &min_timeout, &lockerp->lk_expire))
00423                                                 min_timeout =
00424                                                     lockerp->lk_expire;
00425                                 }
00426                         }
00427                 goto done;
00428         }
00429 
00430         /*
00431          * We'll check how many lockers there are, add a few more in for
00432          * good measure and then allocate all the structures.  Then we'll
00433          * verify that we have enough room when we go back in and get the
00434          * mutex the second time.
00435          */
00436 retry:  count = region->stat.st_nlockers;
00437         if (count == 0) {
00438                 *nlockers = 0;
00439                 return (0);
00440         }
00441 
00442         if (FLD_ISSET(dbenv->verbose, DB_VERB_DEADLOCK))
00443                 __db_msg(dbenv, "%lu lockers", (u_long)count);
00444 
00445         count += 20;
00446         nentries = (u_int32_t)DB_ALIGN(count, 32) / 32;
00447 
00448         /*
00449          * Allocate enough space for a count by count bitmap matrix.
00450          *
00451          * XXX
00452          * We can probably save the malloc's between iterations just
00453          * reallocing if necessary because count grew by too much.
00454          */
00455         if ((ret = __os_calloc(dbenv, (size_t)count,
00456             sizeof(u_int32_t) * nentries, &bitmap)) != 0)
00457                 return (ret);
00458 
00459         if ((ret = __os_calloc(dbenv,
00460             sizeof(u_int32_t), nentries, &tmpmap)) != 0) {
00461                 __os_free(dbenv, bitmap);
00462                 return (ret);
00463         }
00464 
00465         if ((ret = __os_calloc(dbenv,
00466             (size_t)count, sizeof(locker_info), &id_array)) != 0) {
00467                 __os_free(dbenv, bitmap);
00468                 __os_free(dbenv, tmpmap);
00469                 return (ret);
00470         }
00471 
00472         /*
00473          * Now go back in and actually fill in the matrix.
00474          */
00475         if (region->stat.st_nlockers > count) {
00476                 __os_free(dbenv, bitmap);
00477                 __os_free(dbenv, tmpmap);
00478                 __os_free(dbenv, id_array);
00479                 goto retry;
00480         }
00481 
00482         /*
00483          * First we go through and assign each locker a deadlock detector id.
00484          */
00485         for (id = 0, lip = SH_TAILQ_FIRST(&region->lockers, __db_locker);
00486             lip != NULL;
00487             lip = SH_TAILQ_NEXT(lip, ulinks, __db_locker)) {
00488                 if (lip->master_locker == INVALID_ROFF) {
00489                         lip->dd_id = id++;
00490                         id_array[lip->dd_id].id = lip->id;
00491                         switch (atype) {
00492                         case DB_LOCK_MINLOCKS:
00493                         case DB_LOCK_MAXLOCKS:
00494                                 id_array[lip->dd_id].count = lip->nlocks;
00495                                 break;
00496                         case DB_LOCK_MINWRITE:
00497                         case DB_LOCK_MAXWRITE:
00498                                 id_array[lip->dd_id].count = lip->nwrites;
00499                                 break;
00500                         default:
00501                                 break;
00502                         }
00503                         if (F_ISSET(lip, DB_LOCKER_INABORT))
00504                                 id_array[lip->dd_id].in_abort = 1;
00505                 } else
00506                         lip->dd_id = DD_INVALID_ID;
00507 
00508         }
00509 
00510         /*
00511          * We only need consider objects that have waiters, so we use
00512          * the list of objects with waiters (dd_objs) instead of traversing
00513          * the entire hash table.  For each object, we traverse the waiters
00514          * list and add an entry in the waitsfor matrix for each waiter/holder
00515          * combination.
00516          */
00517         for (op = SH_TAILQ_FIRST(&region->dd_objs, __db_lockobj);
00518             op != NULL; op = SH_TAILQ_NEXT(op, dd_links, __db_lockobj)) {
00519                 CLEAR_MAP(tmpmap, nentries);
00520 
00521                 /*
00522                  * First we go through and create a bit map that
00523                  * represents all the holders of this object.
00524                  */
00525                 for (lp = SH_TAILQ_FIRST(&op->holders, __db_lock);
00526                     lp != NULL;
00527                     lp = SH_TAILQ_NEXT(lp, links, __db_lock)) {
00528                         LOCKER_LOCK(lt, region, lp->holder, ndx);
00529                         if ((ret = __lock_getlocker(lt,
00530                             lp->holder, ndx, 0, &lockerp)) != 0)
00531                                 continue;
00532 
00533                         if (lockerp->dd_id == DD_INVALID_ID) {
00534                                 dd = ((DB_LOCKER *)R_ADDR(&lt->reginfo,
00535                                     lockerp->master_locker))->dd_id;
00536                                 lockerp->dd_id = dd;
00537                                 switch (atype) {
00538                                 case DB_LOCK_MINLOCKS:
00539                                 case DB_LOCK_MAXLOCKS:
00540                                         id_array[dd].count += lockerp->nlocks;
00541                                         break;
00542                                 case DB_LOCK_MINWRITE:
00543                                 case DB_LOCK_MAXWRITE:
00544                                         id_array[dd].count += lockerp->nwrites;
00545                                         break;
00546                                 default:
00547                                         break;
00548                                 }
00549                                 if (F_ISSET(lockerp, DB_LOCKER_INABORT))
00550                                         id_array[dd].in_abort = 1;
00551 
00552                         } else
00553                                 dd = lockerp->dd_id;
00554                         id_array[dd].valid = 1;
00555 
00556                         /*
00557                          * If the holder has already been aborted, then
00558                          * we should ignore it for now.
00559                          */
00560                         if (lp->status == DB_LSTAT_HELD)
00561                                 SET_MAP(tmpmap, dd);
00562                 }
00563 
00564                 /*
00565                  * Next, for each waiter, we set its row in the matrix
00566                  * equal to the map of holders we set up above.
00567                  */
00568                 for (is_first = 1,
00569                     lp = SH_TAILQ_FIRST(&op->waiters, __db_lock);
00570                     lp != NULL;
00571                     is_first = 0,
00572                     lp = SH_TAILQ_NEXT(lp, links, __db_lock)) {
00573                         LOCKER_LOCK(lt, region, lp->holder, ndx);
00574                         if ((ret = __lock_getlocker(lt,
00575                             lp->holder, ndx, 0, &lockerp)) != 0)
00576                                 continue;
00577                         if (lp->status == DB_LSTAT_WAITING) {
00578                                 if (__lock_expired(dbenv,
00579                                     &now, &lockerp->lk_expire)) {
00580                                         lp->status = DB_LSTAT_EXPIRED;
00581                                         MUTEX_UNLOCK(dbenv, lp->mtx_lock);
00582                                         continue;
00583                                 }
00584                                 if (LOCK_TIME_GREATER(
00585                                     &min_timeout, &lockerp->lk_expire))
00586                                         min_timeout = lockerp->lk_expire;
00587                         }
00588 
00589                         if (lockerp->dd_id == DD_INVALID_ID) {
00590                                 dd = ((DB_LOCKER *)R_ADDR(&lt->reginfo,
00591                                     lockerp->master_locker))->dd_id;
00592                                 lockerp->dd_id = dd;
00593                                 switch (atype) {
00594                                 case DB_LOCK_MINLOCKS:
00595                                 case DB_LOCK_MAXLOCKS:
00596                                         id_array[dd].count += lockerp->nlocks;
00597                                         break;
00598                                 case DB_LOCK_MINWRITE:
00599                                 case DB_LOCK_MAXWRITE:
00600                                         id_array[dd].count += lockerp->nwrites;
00601                                         break;
00602                                 default:
00603                                         break;
00604                                 }
00605                         } else
00606                                 dd = lockerp->dd_id;
00607                         id_array[dd].valid = 1;
00608 
00609                         /*
00610                          * If the transaction is pending abortion, then
00611                          * ignore it on this iteration.
00612                          */
00613                         if (lp->status != DB_LSTAT_WAITING)
00614                                 continue;
00615 
00616                         entryp = bitmap + (nentries * dd);
00617                         OR_MAP(entryp, tmpmap, nentries);
00618                         /*
00619                          * If this is the first waiter on the queue,
00620                          * then we remove the waitsfor relationship
00621                          * with oneself.  However, if it's anywhere
00622                          * else on the queue, then we have to keep
00623                          * it and we have an automatic deadlock.
00624                          */
00625                         if (is_first) {
00626                                 if (ISSET_MAP(entryp, dd))
00627                                         id_array[dd].self_wait = 1;
00628                                 CLR_MAP(entryp, dd);
00629                         }
00630                 }
00631         }
00632 
00633         /* Now for each locker; record its last lock. */
00634         for (id = 0; id < count; id++) {
00635                 if (!id_array[id].valid)
00636                         continue;
00637                 LOCKER_LOCK(lt, region, id_array[id].id, ndx);
00638                 if ((ret = __lock_getlocker(lt,
00639                     id_array[id].id, ndx, 0, &lockerp)) != 0) {
00640                         __db_err(dbenv,
00641                             "No locks for locker %lu", (u_long)id_array[id].id);
00642                         continue;
00643                 }
00644 
00645                 /*
00646                  * If this is a master transaction, try to
00647                  * find one of its children's locks first,
00648                  * as they are probably more recent.
00649                  */
00650                 child = SH_LIST_FIRST(&lockerp->child_locker, __db_locker);
00651                 if (child != NULL) {
00652                         do {
00653                                 lp = SH_LIST_FIRST(&child->heldby, __db_lock);
00654                                 if (lp != NULL &&
00655                                     lp->status == DB_LSTAT_WAITING) {
00656                                         id_array[id].last_locker_id = child->id;
00657                                         goto get_lock;
00658                                 }
00659                                 child = SH_LIST_NEXT(
00660                                     child, child_link, __db_locker);
00661                         } while (child != NULL);
00662                 }
00663                 lp = SH_LIST_FIRST(&lockerp->heldby, __db_lock);
00664                 if (lp != NULL) {
00665                         id_array[id].last_locker_id = lockerp->id;
00666 get_lock:               id_array[id].last_lock = R_OFFSET(&lt->reginfo, lp);
00667                         id_array[id].last_obj = lp->obj;
00668                         lo = (DB_LOCKOBJ *)((u_int8_t *)lp + lp->obj);
00669                         pptr = SH_DBT_PTR(&lo->lockobj);
00670                         if (lo->lockobj.size >= sizeof(db_pgno_t))
00671                                 memcpy(&id_array[id].pgno,
00672                                     pptr, sizeof(db_pgno_t));
00673                         else
00674                                 id_array[id].pgno = 0;
00675                 }
00676         }
00677 
00678         /*
00679          * Pass complete, reset the deadlock detector bit.
00680          */
00681         region->need_dd = 0;
00682 
00683         /*
00684          * Now we can release everything except the bitmap matrix that we
00685          * created.
00686          */
00687         *nlockers = id;
00688         *idmap = id_array;
00689         *bmp = bitmap;
00690         *allocp = nentries;
00691         __os_free(dbenv, tmpmap);
00692 done:   if (LOCK_TIME_ISVALID(&region->next_timeout)) {
00693                 if (LOCK_TIME_ISMAX(&min_timeout))
00694                         LOCK_SET_TIME_INVALID(&region->next_timeout);
00695                 else
00696                         region->next_timeout = min_timeout;
00697         }
00698         return (0);
00699 }
00700 
00701 static int
00702 __dd_find(dbenv, bmp, idmap, nlockers, nalloc, deadp)
00703         DB_ENV *dbenv;
00704         u_int32_t *bmp, nlockers, nalloc;
00705         locker_info *idmap;
00706         u_int32_t ***deadp;
00707 {
00708         u_int32_t i, j, k, *mymap, *tmpmap, **retp;
00709         u_int ndead, ndeadalloc;
00710         int ret;
00711 
00712 #undef  INITIAL_DEAD_ALLOC
00713 #define INITIAL_DEAD_ALLOC      8
00714 
00715         ndeadalloc = INITIAL_DEAD_ALLOC;
00716         ndead = 0;
00717         if ((ret = __os_malloc(dbenv,
00718             ndeadalloc * sizeof(u_int32_t *), &retp)) != 0)
00719                 return (ret);
00720 
00721         /*
00722          * For each locker, OR in the bits from the lockers on which that
00723          * locker is waiting.
00724          */
00725         for (mymap = bmp, i = 0; i < nlockers; i++, mymap += nalloc) {
00726                 if (!idmap[i].valid)
00727                         continue;
00728                 for (j = 0; j < nlockers; j++) {
00729                         if (!ISSET_MAP(mymap, j))
00730                                 continue;
00731 
00732                         /* Find the map for this bit. */
00733                         tmpmap = bmp + (nalloc * j);
00734                         OR_MAP(mymap, tmpmap, nalloc);
00735                         if (!ISSET_MAP(mymap, i))
00736                                 continue;
00737 
00738                         /* Make sure we leave room for NULL. */
00739                         if (ndead + 2 >= ndeadalloc) {
00740                                 ndeadalloc <<= 1;
00741                                 /*
00742                                  * If the alloc fails, then simply return the
00743                                  * deadlocks that we already have.
00744                                  */
00745                                 if (__os_realloc(dbenv,
00746                                     ndeadalloc * sizeof(u_int32_t),
00747                                     &retp) != 0) {
00748                                         retp[ndead] = NULL;
00749                                         *deadp = retp;
00750                                         return (0);
00751                                 }
00752                         }
00753                         retp[ndead++] = mymap;
00754 
00755                         /* Mark all participants in this deadlock invalid. */
00756                         for (k = 0; k < nlockers; k++)
00757                                 if (ISSET_MAP(mymap, k))
00758                                         idmap[k].valid = 0;
00759                         break;
00760                 }
00761         }
00762         retp[ndead] = NULL;
00763         *deadp = retp;
00764         return (0);
00765 }
00766 
00767 static int
00768 __dd_abort(dbenv, info, statusp)
00769         DB_ENV *dbenv;
00770         locker_info *info;
00771         int *statusp;
00772 {
00773         struct __db_lock *lockp;
00774         DB_LOCKER *lockerp;
00775         DB_LOCKOBJ *sh_obj;
00776         DB_LOCKREGION *region;
00777         DB_LOCKTAB *lt;
00778         u_int32_t ndx;
00779         int ret;
00780 
00781         *statusp = 0;
00782 
00783         lt = dbenv->lk_handle;
00784         region = lt->reginfo.primary;
00785         ret = 0;
00786 
00787         LOCK_SYSTEM_LOCK(dbenv);
00788 
00789         /*
00790          * Get the locker.  If it's gone or was aborted while we were
00791          * detecting, return that.
00792          */
00793         LOCKER_LOCK(lt, region, info->last_locker_id, ndx);
00794         if ((ret = __lock_getlocker(lt,
00795             info->last_locker_id, ndx, 0, &lockerp)) != 0)
00796                 goto err;
00797         if (lockerp == NULL || F_ISSET(lockerp, DB_LOCKER_INABORT)) {
00798                 *statusp = DB_ALREADY_ABORTED;
00799                 goto out;
00800         }
00801 
00802         /*
00803          * Find the locker's last lock.  It is possible for this lock to have
00804          * been freed, either though a timeout or another detector run.
00805          */
00806         if ((lockp = SH_LIST_FIRST(&lockerp->heldby, __db_lock)) == NULL) {
00807                 *statusp = DB_ALREADY_ABORTED;
00808                 goto out;
00809         }
00810         if (R_OFFSET(&lt->reginfo, lockp) != info->last_lock ||
00811             lockp->holder != lockerp->id ||
00812             lockp->obj != info->last_obj || lockp->status != DB_LSTAT_WAITING) {
00813                 *statusp = DB_ALREADY_ABORTED;
00814                 goto out;
00815         }
00816 
00817         sh_obj = (DB_LOCKOBJ *)((u_int8_t *)lockp + lockp->obj);
00818 
00819         /* Abort lock, take it off list, and wake up this lock. */
00820         SHOBJECT_LOCK(lt, region, sh_obj, ndx);
00821         lockp->status = DB_LSTAT_ABORTED;
00822         SH_TAILQ_REMOVE(&sh_obj->waiters, lockp, links, __db_lock);
00823 
00824         /*
00825          * Either the waiters list is now empty, in which case we remove
00826          * it from dd_objs, or it is not empty, in which case we need to
00827          * do promotion.
00828          */
00829         if (SH_TAILQ_FIRST(&sh_obj->waiters, __db_lock) == NULL)
00830                 SH_TAILQ_REMOVE(&region->dd_objs,
00831                     sh_obj, dd_links, __db_lockobj);
00832         else
00833                 ret = __lock_promote(lt, sh_obj, NULL, 0);
00834         MUTEX_UNLOCK(dbenv, lockp->mtx_lock);
00835 
00836         region->stat.st_ndeadlocks++;
00837 err:
00838 out:    LOCK_SYSTEM_UNLOCK(dbenv);
00839         return (ret);
00840 }
00841 
00842 #ifdef DIAGNOSTIC
00843 static void
00844 __dd_debug(dbenv, idmap, bitmap, nlockers, nalloc)
00845         DB_ENV *dbenv;
00846         locker_info *idmap;
00847         u_int32_t *bitmap, nlockers, nalloc;
00848 {
00849         DB_MSGBUF mb;
00850         u_int32_t i, j, *mymap;
00851 
00852         DB_MSGBUF_INIT(&mb);
00853 
00854         __db_msg(dbenv, "Waitsfor array\nWaiter:\tWaiting on:");
00855         for (mymap = bitmap, i = 0; i < nlockers; i++, mymap += nalloc) {
00856                 if (!idmap[i].valid)
00857                         continue;
00858 
00859                 __db_msgadd(dbenv, &mb,                         /* Waiter. */
00860                     "%lx/%lu:\t", (u_long)idmap[i].id, (u_long)idmap[i].pgno);
00861                 for (j = 0; j < nlockers; j++)
00862                         if (ISSET_MAP(mymap, j))
00863                                 __db_msgadd(dbenv,
00864                                     &mb, " %lx", (u_long)idmap[j].id);
00865                 __db_msgadd(dbenv, &mb, " %lu", (u_long)idmap[i].last_lock);
00866                 DB_MSGBUF_FLUSH(dbenv, &mb);
00867         }
00868 }
00869 #endif
00870 
00871 /*
00872  * Given a bitmap that contains a deadlock, verify that the bit
00873  * specified in the which parameter indicates a transaction that
00874  * is actually deadlocked.  Return 1 if really deadlocked, 0 otherwise.
00875  * deadmap  --  the array that identified the deadlock.
00876  * tmpmap   --  a copy of the initial bitmaps from the dd_build phase.
00877  * origmap  --  a temporary bit map into which we can OR things.
00878  * nlockers --  the number of actual lockers under consideration.
00879  * nalloc   --  the number of words allocated for the bitmap.
00880  * which    --  the locker in question.
00881  */
00882 static int
00883 __dd_verify(idmap, deadmap, tmpmap, origmap, nlockers, nalloc, which)
00884         locker_info *idmap;
00885         u_int32_t *deadmap, *tmpmap, *origmap;
00886         u_int32_t nlockers, nalloc, which;
00887 {
00888         u_int32_t *tmap;
00889         u_int32_t j;
00890         int count;
00891 
00892         memset(tmpmap, 0, sizeof(u_int32_t) * nalloc);
00893 
00894         /*
00895          * In order for "which" to be actively involved in
00896          * the deadlock, removing him from the evaluation
00897          * must remove the deadlock.  So, we OR together everyone
00898          * except which; if all the participants still have their
00899          * bits set, then the deadlock persists and which does
00900          * not participate.  If the deadlock does not persist
00901          * then "which" does participate.
00902          */
00903         count = 0;
00904         for (j = 0; j < nlockers; j++) {
00905                 if (!ISSET_MAP(deadmap, j) || j == which)
00906                         continue;
00907 
00908                 /* Find the map for this bit. */
00909                 tmap = origmap + (nalloc * j);
00910 
00911                 /*
00912                  * We special case the first waiter who is also a holder, so
00913                  * we don't automatically call that a deadlock.  However, if
00914                  * it really is a deadlock, we need the bit set now so that
00915                  * we treat the first waiter like other waiters.
00916                  */
00917                 if (idmap[j].self_wait)
00918                         SET_MAP(tmap, j);
00919                 OR_MAP(tmpmap, tmap, nalloc);
00920                 count++;
00921         }
00922 
00923         if (count == 1)
00924                 return (1);
00925 
00926         /*
00927          * Now check the resulting map and see whether
00928          * all participants still have their bit set.
00929          */
00930         for (j = 0; j < nlockers; j++) {
00931                 if (!ISSET_MAP(deadmap, j) || j == which)
00932                         continue;
00933                 if (!ISSET_MAP(tmpmap, j))
00934                         return (1);
00935         }
00936         return (0);
00937 }
00938 
00939 /*
00940  * __dd_isolder --
00941  *
00942  * Figure out the relative age of two lockers.  We make all lockers
00943  * older than all transactions, because that's how it's worked
00944  * historically (because lockers are lower ids).
00945  */
00946 static int
00947 __dd_isolder(a, b, lock_max, txn_max)
00948         u_int32_t       a, b;
00949         u_int32_t       lock_max, txn_max;
00950 {
00951         u_int32_t max;
00952 
00953         /* Check for comparing lock-id and txnid. */
00954         if (a <= DB_LOCK_MAXID && b > DB_LOCK_MAXID)
00955                 return (1);
00956         if (b <= DB_LOCK_MAXID && a > DB_LOCK_MAXID)
00957                 return (0);
00958 
00959         /* In the same space; figure out which one. */
00960         max = txn_max;
00961         if (a <= DB_LOCK_MAXID)
00962                 max = lock_max;
00963 
00964         /*
00965          * We can't get a 100% correct ordering, because we don't know
00966          * where the current interval started and if there were older
00967          * lockers outside the interval.  We do the best we can.
00968          */
00969 
00970         /*
00971          * Check for a wrapped case with ids above max.
00972          */
00973         if (a > max && b < max)
00974                 return (1);
00975         if (b > max && a < max)
00976                 return (0);
00977 
00978         return (a < b);
00979 }

Generated on Sun Dec 25 12:14:40 2005 for Berkeley DB 4.4.16 by  doxygen 1.4.2