Header And Logo

PostgreSQL
| The world's most advanced open source database.

deadlock.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * deadlock.c
00004  *    POSTGRES deadlock detection code
00005  *
00006  * See src/backend/storage/lmgr/README for a description of the deadlock
00007  * detection and resolution algorithms.
00008  *
00009  *
00010  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
00011  * Portions Copyright (c) 1994, Regents of the University of California
00012  *
00013  *
00014  * IDENTIFICATION
00015  *    src/backend/storage/lmgr/deadlock.c
00016  *
00017  *  Interface:
00018  *
00019  *  DeadLockCheck()
00020  *  DeadLockReport()
00021  *  RememberSimpleDeadLock()
00022  *  InitDeadLockChecking()
00023  *
00024  *-------------------------------------------------------------------------
00025  */
00026 #include "postgres.h"
00027 
00028 #include "miscadmin.h"
00029 #include "pg_trace.h"
00030 #include "pgstat.h"
00031 #include "storage/lmgr.h"
00032 #include "storage/proc.h"
00033 #include "utils/memutils.h"
00034 
00035 
00036 /* One edge in the waits-for graph */
00037 typedef struct
00038 {
00039     PGPROC     *waiter;         /* the waiting process */
00040     PGPROC     *blocker;        /* the process it is waiting for */
00041     int         pred;           /* workspace for TopoSort */
00042     int         link;           /* workspace for TopoSort */
00043 } EDGE;
00044 
00045 /* One potential reordering of a lock's wait queue */
00046 typedef struct
00047 {
00048     LOCK       *lock;           /* the lock whose wait queue is described */
00049     PGPROC    **procs;          /* array of PGPROC *'s in new wait order */
00050     int         nProcs;
00051 } WAIT_ORDER;
00052 
00053 /*
00054  * Information saved about each edge in a detected deadlock cycle.  This
00055  * is used to print a diagnostic message upon failure.
00056  *
00057  * Note: because we want to examine this info after releasing the lock
00058  * manager's partition locks, we can't just store LOCK and PGPROC pointers;
00059  * we must extract out all the info we want to be able to print.
00060  */
00061 typedef struct
00062 {
00063     LOCKTAG     locktag;        /* ID of awaited lock object */
00064     LOCKMODE    lockmode;       /* type of lock we're waiting for */
00065     int         pid;            /* PID of blocked backend */
00066 } DEADLOCK_INFO;
00067 
00068 
00069 static bool DeadLockCheckRecurse(PGPROC *proc);
00070 static int  TestConfiguration(PGPROC *startProc);
00071 static bool FindLockCycle(PGPROC *checkProc,
00072               EDGE *softEdges, int *nSoftEdges);
00073 static bool FindLockCycleRecurse(PGPROC *checkProc, int depth,
00074                      EDGE *softEdges, int *nSoftEdges);
00075 static bool ExpandConstraints(EDGE *constraints, int nConstraints);
00076 static bool TopoSort(LOCK *lock, EDGE *constraints, int nConstraints,
00077          PGPROC **ordering);
00078 
00079 #ifdef DEBUG_DEADLOCK
00080 static void PrintLockQueue(LOCK *lock, const char *info);
00081 #endif
00082 
00083 
00084 /*
00085  * Working space for the deadlock detector
00086  */
00087 
00088 /* Workspace for FindLockCycle */
00089 static PGPROC **visitedProcs;   /* Array of visited procs */
00090 static int  nVisitedProcs;
00091 
00092 /* Workspace for TopoSort */
00093 static PGPROC **topoProcs;      /* Array of not-yet-output procs */
00094 static int *beforeConstraints;  /* Counts of remaining before-constraints */
00095 static int *afterConstraints;   /* List head for after-constraints */
00096 
00097 /* Output area for ExpandConstraints */
00098 static WAIT_ORDER *waitOrders;  /* Array of proposed queue rearrangements */
00099 static int  nWaitOrders;
00100 static PGPROC **waitOrderProcs; /* Space for waitOrders queue contents */
00101 
00102 /* Current list of constraints being considered */
00103 static EDGE *curConstraints;
00104 static int  nCurConstraints;
00105 static int  maxCurConstraints;
00106 
00107 /* Storage space for results from FindLockCycle */
00108 static EDGE *possibleConstraints;
00109 static int  nPossibleConstraints;
00110 static int  maxPossibleConstraints;
00111 static DEADLOCK_INFO *deadlockDetails;
00112 static int  nDeadlockDetails;
00113 
00114 /* PGPROC pointer of any blocking autovacuum worker found */
00115 static PGPROC *blocking_autovacuum_proc = NULL;
00116 
00117 
00118 /*
00119  * InitDeadLockChecking -- initialize deadlock checker during backend startup
00120  *
00121  * This does per-backend initialization of the deadlock checker; primarily,
00122  * allocation of working memory for DeadLockCheck.  We do this per-backend
00123  * since there's no percentage in making the kernel do copy-on-write
00124  * inheritance of workspace from the postmaster.  We want to allocate the
00125  * space at startup because (a) the deadlock checker might be invoked when
00126  * there's no free memory left, and (b) the checker is normally run inside a
00127  * signal handler, which is a very dangerous place to invoke palloc from.
00128  */
00129 void
00130 InitDeadLockChecking(void)
00131 {
00132     MemoryContext oldcxt;
00133 
00134     /* Make sure allocations are permanent */
00135     oldcxt = MemoryContextSwitchTo(TopMemoryContext);
00136 
00137     /*
00138      * FindLockCycle needs at most MaxBackends entries in visitedProcs[] and
00139      * deadlockDetails[].
00140      */
00141     visitedProcs = (PGPROC **) palloc(MaxBackends * sizeof(PGPROC *));
00142     deadlockDetails = (DEADLOCK_INFO *) palloc(MaxBackends * sizeof(DEADLOCK_INFO));
00143 
00144     /*
00145      * TopoSort needs to consider at most MaxBackends wait-queue entries, and
00146      * it needn't run concurrently with FindLockCycle.
00147      */
00148     topoProcs = visitedProcs;   /* re-use this space */
00149     beforeConstraints = (int *) palloc(MaxBackends * sizeof(int));
00150     afterConstraints = (int *) palloc(MaxBackends * sizeof(int));
00151 
00152     /*
00153      * We need to consider rearranging at most MaxBackends/2 wait queues
00154      * (since it takes at least two waiters in a queue to create a soft edge),
00155      * and the expanded form of the wait queues can't involve more than
00156      * MaxBackends total waiters.
00157      */
00158     waitOrders = (WAIT_ORDER *)
00159         palloc((MaxBackends / 2) * sizeof(WAIT_ORDER));
00160     waitOrderProcs = (PGPROC **) palloc(MaxBackends * sizeof(PGPROC *));
00161 
00162     /*
00163      * Allow at most MaxBackends distinct constraints in a configuration. (Is
00164      * this enough?  In practice it seems it should be, but I don't quite see
00165      * how to prove it.  If we run out, we might fail to find a workable wait
00166      * queue rearrangement even though one exists.)  NOTE that this number
00167      * limits the maximum recursion depth of DeadLockCheckRecurse. Making it
00168      * really big might potentially allow a stack-overflow problem.
00169      */
00170     maxCurConstraints = MaxBackends;
00171     curConstraints = (EDGE *) palloc(maxCurConstraints * sizeof(EDGE));
00172 
00173     /*
00174      * Allow up to 3*MaxBackends constraints to be saved without having to
00175      * re-run TestConfiguration.  (This is probably more than enough, but we
00176      * can survive if we run low on space by doing excess runs of
00177      * TestConfiguration to re-compute constraint lists each time needed.) The
00178      * last MaxBackends entries in possibleConstraints[] are reserved as
00179      * output workspace for FindLockCycle.
00180      */
00181     maxPossibleConstraints = MaxBackends * 4;
00182     possibleConstraints =
00183         (EDGE *) palloc(maxPossibleConstraints * sizeof(EDGE));
00184 
00185     MemoryContextSwitchTo(oldcxt);
00186 }
00187 
00188 /*
00189  * DeadLockCheck -- Checks for deadlocks for a given process
00190  *
00191  * This code looks for deadlocks involving the given process.  If any
00192  * are found, it tries to rearrange lock wait queues to resolve the
00193  * deadlock.  If resolution is impossible, return DS_HARD_DEADLOCK ---
00194  * the caller is then expected to abort the given proc's transaction.
00195  *
00196  * Caller must already have locked all partitions of the lock tables.
00197  *
00198  * On failure, deadlock details are recorded in deadlockDetails[] for
00199  * subsequent printing by DeadLockReport().  That activity is separate
00200  * because (a) we don't want to do it while holding all those LWLocks,
00201  * and (b) we are typically invoked inside a signal handler.
00202  */
00203 DeadLockState
00204 DeadLockCheck(PGPROC *proc)
00205 {
00206     int         i,
00207                 j;
00208 
00209     /* Initialize to "no constraints" */
00210     nCurConstraints = 0;
00211     nPossibleConstraints = 0;
00212     nWaitOrders = 0;
00213 
00214     /* Initialize to not blocked by an autovacuum worker */
00215     blocking_autovacuum_proc = NULL;
00216 
00217     /* Search for deadlocks and possible fixes */
00218     if (DeadLockCheckRecurse(proc))
00219     {
00220         /*
00221          * Call FindLockCycle one more time, to record the correct
00222          * deadlockDetails[] for the basic state with no rearrangements.
00223          */
00224         int         nSoftEdges;
00225 
00226         TRACE_POSTGRESQL_DEADLOCK_FOUND();
00227 
00228         nWaitOrders = 0;
00229         if (!FindLockCycle(proc, possibleConstraints, &nSoftEdges))
00230             elog(FATAL, "deadlock seems to have disappeared");
00231 
00232         return DS_HARD_DEADLOCK;    /* cannot find a non-deadlocked state */
00233     }
00234 
00235     /* Apply any needed rearrangements of wait queues */
00236     for (i = 0; i < nWaitOrders; i++)
00237     {
00238         LOCK       *lock = waitOrders[i].lock;
00239         PGPROC    **procs = waitOrders[i].procs;
00240         int         nProcs = waitOrders[i].nProcs;
00241         PROC_QUEUE *waitQueue = &(lock->waitProcs);
00242 
00243         Assert(nProcs == waitQueue->size);
00244 
00245 #ifdef DEBUG_DEADLOCK
00246         PrintLockQueue(lock, "DeadLockCheck:");
00247 #endif
00248 
00249         /* Reset the queue and re-add procs in the desired order */
00250         ProcQueueInit(waitQueue);
00251         for (j = 0; j < nProcs; j++)
00252         {
00253             SHMQueueInsertBefore(&(waitQueue->links), &(procs[j]->links));
00254             waitQueue->size++;
00255         }
00256 
00257 #ifdef DEBUG_DEADLOCK
00258         PrintLockQueue(lock, "rearranged to:");
00259 #endif
00260 
00261         /* See if any waiters for the lock can be woken up now */
00262         ProcLockWakeup(GetLocksMethodTable(lock), lock);
00263     }
00264 
00265     /* Return code tells caller if we had to escape a deadlock or not */
00266     if (nWaitOrders > 0)
00267         return DS_SOFT_DEADLOCK;
00268     else if (blocking_autovacuum_proc != NULL)
00269         return DS_BLOCKED_BY_AUTOVACUUM;
00270     else
00271         return DS_NO_DEADLOCK;
00272 }
00273 
00274 /*
00275  * Return the PGPROC of the autovacuum that's blocking a process.
00276  *
00277  * We reset the saved pointer as soon as we pass it back.
00278  */
00279 PGPROC *
00280 GetBlockingAutoVacuumPgproc(void)
00281 {
00282     PGPROC     *ptr;
00283 
00284     ptr = blocking_autovacuum_proc;
00285     blocking_autovacuum_proc = NULL;
00286 
00287     return ptr;
00288 }
00289 
00290 /*
00291  * DeadLockCheckRecurse -- recursively search for valid orderings
00292  *
00293  * curConstraints[] holds the current set of constraints being considered
00294  * by an outer level of recursion.  Add to this each possible solution
00295  * constraint for any cycle detected at this level.
00296  *
00297  * Returns TRUE if no solution exists.  Returns FALSE if a deadlock-free
00298  * state is attainable, in which case waitOrders[] shows the required
00299  * rearrangements of lock wait queues (if any).
00300  */
00301 static bool
00302 DeadLockCheckRecurse(PGPROC *proc)
00303 {
00304     int         nEdges;
00305     int         oldPossibleConstraints;
00306     bool        savedList;
00307     int         i;
00308 
00309     nEdges = TestConfiguration(proc);
00310     if (nEdges < 0)
00311         return true;            /* hard deadlock --- no solution */
00312     if (nEdges == 0)
00313         return false;           /* good configuration found */
00314     if (nCurConstraints >= maxCurConstraints)
00315         return true;            /* out of room for active constraints? */
00316     oldPossibleConstraints = nPossibleConstraints;
00317     if (nPossibleConstraints + nEdges + MaxBackends <= maxPossibleConstraints)
00318     {
00319         /* We can save the edge list in possibleConstraints[] */
00320         nPossibleConstraints += nEdges;
00321         savedList = true;
00322     }
00323     else
00324     {
00325         /* Not room; will need to regenerate the edges on-the-fly */
00326         savedList = false;
00327     }
00328 
00329     /*
00330      * Try each available soft edge as an addition to the configuration.
00331      */
00332     for (i = 0; i < nEdges; i++)
00333     {
00334         if (!savedList && i > 0)
00335         {
00336             /* Regenerate the list of possible added constraints */
00337             if (nEdges != TestConfiguration(proc))
00338                 elog(FATAL, "inconsistent results during deadlock check");
00339         }
00340         curConstraints[nCurConstraints] =
00341             possibleConstraints[oldPossibleConstraints + i];
00342         nCurConstraints++;
00343         if (!DeadLockCheckRecurse(proc))
00344             return false;       /* found a valid solution! */
00345         /* give up on that added constraint, try again */
00346         nCurConstraints--;
00347     }
00348     nPossibleConstraints = oldPossibleConstraints;
00349     return true;                /* no solution found */
00350 }
00351 
00352 
00353 /*--------------------
00354  * Test a configuration (current set of constraints) for validity.
00355  *
00356  * Returns:
00357  *      0: the configuration is good (no deadlocks)
00358  *     -1: the configuration has a hard deadlock or is not self-consistent
00359  *      >0: the configuration has one or more soft deadlocks
00360  *
00361  * In the soft-deadlock case, one of the soft cycles is chosen arbitrarily
00362  * and a list of its soft edges is returned beginning at
00363  * possibleConstraints+nPossibleConstraints.  The return value is the
00364  * number of soft edges.
00365  *--------------------
00366  */
00367 static int
00368 TestConfiguration(PGPROC *startProc)
00369 {
00370     int         softFound = 0;
00371     EDGE       *softEdges = possibleConstraints + nPossibleConstraints;
00372     int         nSoftEdges;
00373     int         i;
00374 
00375     /*
00376      * Make sure we have room for FindLockCycle's output.
00377      */
00378     if (nPossibleConstraints + MaxBackends > maxPossibleConstraints)
00379         return -1;
00380 
00381     /*
00382      * Expand current constraint set into wait orderings.  Fail if the
00383      * constraint set is not self-consistent.
00384      */
00385     if (!ExpandConstraints(curConstraints, nCurConstraints))
00386         return -1;
00387 
00388     /*
00389      * Check for cycles involving startProc or any of the procs mentioned in
00390      * constraints.  We check startProc last because if it has a soft cycle
00391      * still to be dealt with, we want to deal with that first.
00392      */
00393     for (i = 0; i < nCurConstraints; i++)
00394     {
00395         if (FindLockCycle(curConstraints[i].waiter, softEdges, &nSoftEdges))
00396         {
00397             if (nSoftEdges == 0)
00398                 return -1;      /* hard deadlock detected */
00399             softFound = nSoftEdges;
00400         }
00401         if (FindLockCycle(curConstraints[i].blocker, softEdges, &nSoftEdges))
00402         {
00403             if (nSoftEdges == 0)
00404                 return -1;      /* hard deadlock detected */
00405             softFound = nSoftEdges;
00406         }
00407     }
00408     if (FindLockCycle(startProc, softEdges, &nSoftEdges))
00409     {
00410         if (nSoftEdges == 0)
00411             return -1;          /* hard deadlock detected */
00412         softFound = nSoftEdges;
00413     }
00414     return softFound;
00415 }
00416 
00417 
00418 /*
00419  * FindLockCycle -- basic check for deadlock cycles
00420  *
00421  * Scan outward from the given proc to see if there is a cycle in the
00422  * waits-for graph that includes this proc.  Return TRUE if a cycle
00423  * is found, else FALSE.  If a cycle is found, we return a list of
00424  * the "soft edges", if any, included in the cycle.  These edges could
00425  * potentially be eliminated by rearranging wait queues.  We also fill
00426  * deadlockDetails[] with information about the detected cycle; this info
00427  * is not used by the deadlock algorithm itself, only to print a useful
00428  * message after failing.
00429  *
00430  * Since we need to be able to check hypothetical configurations that would
00431  * exist after wait queue rearrangement, the routine pays attention to the
00432  * table of hypothetical queue orders in waitOrders[].  These orders will
00433  * be believed in preference to the actual ordering seen in the locktable.
00434  */
00435 static bool
00436 FindLockCycle(PGPROC *checkProc,
00437               EDGE *softEdges,  /* output argument */
00438               int *nSoftEdges)  /* output argument */
00439 {
00440     nVisitedProcs = 0;
00441     nDeadlockDetails = 0;
00442     *nSoftEdges = 0;
00443     return FindLockCycleRecurse(checkProc, 0, softEdges, nSoftEdges);
00444 }
00445 
00446 static bool
00447 FindLockCycleRecurse(PGPROC *checkProc,
00448                      int depth,
00449                      EDGE *softEdges,   /* output argument */
00450                      int *nSoftEdges)   /* output argument */
00451 {
00452     PGPROC     *proc;
00453     PGXACT     *pgxact;
00454     LOCK       *lock;
00455     PROCLOCK   *proclock;
00456     SHM_QUEUE  *procLocks;
00457     LockMethod  lockMethodTable;
00458     PROC_QUEUE *waitQueue;
00459     int         queue_size;
00460     int         conflictMask;
00461     int         i;
00462     int         numLockModes,
00463                 lm;
00464 
00465     /*
00466      * Have we already seen this proc?
00467      */
00468     for (i = 0; i < nVisitedProcs; i++)
00469     {
00470         if (visitedProcs[i] == checkProc)
00471         {
00472             /* If we return to starting point, we have a deadlock cycle */
00473             if (i == 0)
00474             {
00475                 /*
00476                  * record total length of cycle --- outer levels will now fill
00477                  * deadlockDetails[]
00478                  */
00479                 Assert(depth <= MaxBackends);
00480                 nDeadlockDetails = depth;
00481 
00482                 return true;
00483             }
00484 
00485             /*
00486              * Otherwise, we have a cycle but it does not include the start
00487              * point, so say "no deadlock".
00488              */
00489             return false;
00490         }
00491     }
00492     /* Mark proc as seen */
00493     Assert(nVisitedProcs < MaxBackends);
00494     visitedProcs[nVisitedProcs++] = checkProc;
00495 
00496     /*
00497      * If the proc is not waiting, we have no outgoing waits-for edges.
00498      */
00499     if (checkProc->links.next == NULL)
00500         return false;
00501     lock = checkProc->waitLock;
00502     if (lock == NULL)
00503         return false;
00504     lockMethodTable = GetLocksMethodTable(lock);
00505     numLockModes = lockMethodTable->numLockModes;
00506     conflictMask = lockMethodTable->conflictTab[checkProc->waitLockMode];
00507 
00508     /*
00509      * Scan for procs that already hold conflicting locks.  These are "hard"
00510      * edges in the waits-for graph.
00511      */
00512     procLocks = &(lock->procLocks);
00513 
00514     proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
00515                                          offsetof(PROCLOCK, lockLink));
00516 
00517     while (proclock)
00518     {
00519         proc = proclock->tag.myProc;
00520         pgxact = &ProcGlobal->allPgXact[proc->pgprocno];
00521 
00522         /* A proc never blocks itself */
00523         if (proc != checkProc)
00524         {
00525             for (lm = 1; lm <= numLockModes; lm++)
00526             {
00527                 if ((proclock->holdMask & LOCKBIT_ON(lm)) &&
00528                     (conflictMask & LOCKBIT_ON(lm)))
00529                 {
00530                     /* This proc hard-blocks checkProc */
00531                     if (FindLockCycleRecurse(proc, depth + 1,
00532                                              softEdges, nSoftEdges))
00533                     {
00534                         /* fill deadlockDetails[] */
00535                         DEADLOCK_INFO *info = &deadlockDetails[depth];
00536 
00537                         info->locktag = lock->tag;
00538                         info->lockmode = checkProc->waitLockMode;
00539                         info->pid = checkProc->pid;
00540 
00541                         return true;
00542                     }
00543 
00544                     /*
00545                      * No deadlock here, but see if this proc is an autovacuum
00546                      * that is directly hard-blocking our own proc.  If so,
00547                      * report it so that the caller can send a cancel signal
00548                      * to it, if appropriate.  If there's more than one such
00549                      * proc, it's indeterminate which one will be reported.
00550                      *
00551                      * We don't touch autovacuums that are indirectly blocking
00552                      * us; it's up to the direct blockee to take action.  This
00553                      * rule simplifies understanding the behavior and ensures
00554                      * that an autovacuum won't be canceled with less than
00555                      * deadlock_timeout grace period.
00556                      *
00557                      * Note we read vacuumFlags without any locking.  This is
00558                      * OK only for checking the PROC_IS_AUTOVACUUM flag,
00559                      * because that flag is set at process start and never
00560                      * reset.  There is logic elsewhere to avoid canceling an
00561                      * autovacuum that is working to prevent XID wraparound
00562                      * problems (which needs to read a different vacuumFlag
00563                      * bit), but we don't do that here to avoid grabbing
00564                      * ProcArrayLock.
00565                      */
00566                     if (checkProc == MyProc &&
00567                         pgxact->vacuumFlags & PROC_IS_AUTOVACUUM)
00568                         blocking_autovacuum_proc = proc;
00569 
00570                     /* We're done looking at this proclock */
00571                     break;
00572                 }
00573             }
00574         }
00575 
00576         proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink,
00577                                              offsetof(PROCLOCK, lockLink));
00578     }
00579 
00580     /*
00581      * Scan for procs that are ahead of this one in the lock's wait queue.
00582      * Those that have conflicting requests soft-block this one.  This must be
00583      * done after the hard-block search, since if another proc both hard- and
00584      * soft-blocks this one, we want to call it a hard edge.
00585      *
00586      * If there is a proposed re-ordering of the lock's wait order, use that
00587      * rather than the current wait order.
00588      */
00589     for (i = 0; i < nWaitOrders; i++)
00590     {
00591         if (waitOrders[i].lock == lock)
00592             break;
00593     }
00594 
00595     if (i < nWaitOrders)
00596     {
00597         /* Use the given hypothetical wait queue order */
00598         PGPROC    **procs = waitOrders[i].procs;
00599 
00600         queue_size = waitOrders[i].nProcs;
00601 
00602         for (i = 0; i < queue_size; i++)
00603         {
00604             proc = procs[i];
00605 
00606             /* Done when we reach the target proc */
00607             if (proc == checkProc)
00608                 break;
00609 
00610             /* Is there a conflict with this guy's request? */
00611             if (((1 << proc->waitLockMode) & conflictMask) != 0)
00612             {
00613                 /* This proc soft-blocks checkProc */
00614                 if (FindLockCycleRecurse(proc, depth + 1,
00615                                          softEdges, nSoftEdges))
00616                 {
00617                     /* fill deadlockDetails[] */
00618                     DEADLOCK_INFO *info = &deadlockDetails[depth];
00619 
00620                     info->locktag = lock->tag;
00621                     info->lockmode = checkProc->waitLockMode;
00622                     info->pid = checkProc->pid;
00623 
00624                     /*
00625                      * Add this edge to the list of soft edges in the cycle
00626                      */
00627                     Assert(*nSoftEdges < MaxBackends);
00628                     softEdges[*nSoftEdges].waiter = checkProc;
00629                     softEdges[*nSoftEdges].blocker = proc;
00630                     (*nSoftEdges)++;
00631                     return true;
00632                 }
00633             }
00634         }
00635     }
00636     else
00637     {
00638         /* Use the true lock wait queue order */
00639         waitQueue = &(lock->waitProcs);
00640         queue_size = waitQueue->size;
00641 
00642         proc = (PGPROC *) waitQueue->links.next;
00643 
00644         while (queue_size-- > 0)
00645         {
00646             /* Done when we reach the target proc */
00647             if (proc == checkProc)
00648                 break;
00649 
00650             /* Is there a conflict with this guy's request? */
00651             if (((1 << proc->waitLockMode) & conflictMask) != 0)
00652             {
00653                 /* This proc soft-blocks checkProc */
00654                 if (FindLockCycleRecurse(proc, depth + 1,
00655                                          softEdges, nSoftEdges))
00656                 {
00657                     /* fill deadlockDetails[] */
00658                     DEADLOCK_INFO *info = &deadlockDetails[depth];
00659 
00660                     info->locktag = lock->tag;
00661                     info->lockmode = checkProc->waitLockMode;
00662                     info->pid = checkProc->pid;
00663 
00664                     /*
00665                      * Add this edge to the list of soft edges in the cycle
00666                      */
00667                     Assert(*nSoftEdges < MaxBackends);
00668                     softEdges[*nSoftEdges].waiter = checkProc;
00669                     softEdges[*nSoftEdges].blocker = proc;
00670                     (*nSoftEdges)++;
00671                     return true;
00672                 }
00673             }
00674 
00675             proc = (PGPROC *) proc->links.next;
00676         }
00677     }
00678 
00679     /*
00680      * No conflict detected here.
00681      */
00682     return false;
00683 }
00684 
00685 
00686 /*
00687  * ExpandConstraints -- expand a list of constraints into a set of
00688  *      specific new orderings for affected wait queues
00689  *
00690  * Input is a list of soft edges to be reversed.  The output is a list
00691  * of nWaitOrders WAIT_ORDER structs in waitOrders[], with PGPROC array
00692  * workspace in waitOrderProcs[].
00693  *
00694  * Returns TRUE if able to build an ordering that satisfies all the
00695  * constraints, FALSE if not (there are contradictory constraints).
00696  */
00697 static bool
00698 ExpandConstraints(EDGE *constraints,
00699                   int nConstraints)
00700 {
00701     int         nWaitOrderProcs = 0;
00702     int         i,
00703                 j;
00704 
00705     nWaitOrders = 0;
00706 
00707     /*
00708      * Scan constraint list backwards.  This is because the last-added
00709      * constraint is the only one that could fail, and so we want to test it
00710      * for inconsistency first.
00711      */
00712     for (i = nConstraints; --i >= 0;)
00713     {
00714         PGPROC     *proc = constraints[i].waiter;
00715         LOCK       *lock = proc->waitLock;
00716 
00717         /* Did we already make a list for this lock? */
00718         for (j = nWaitOrders; --j >= 0;)
00719         {
00720             if (waitOrders[j].lock == lock)
00721                 break;
00722         }
00723         if (j >= 0)
00724             continue;
00725         /* No, so allocate a new list */
00726         waitOrders[nWaitOrders].lock = lock;
00727         waitOrders[nWaitOrders].procs = waitOrderProcs + nWaitOrderProcs;
00728         waitOrders[nWaitOrders].nProcs = lock->waitProcs.size;
00729         nWaitOrderProcs += lock->waitProcs.size;
00730         Assert(nWaitOrderProcs <= MaxBackends);
00731 
00732         /*
00733          * Do the topo sort.  TopoSort need not examine constraints after this
00734          * one, since they must be for different locks.
00735          */
00736         if (!TopoSort(lock, constraints, i + 1,
00737                       waitOrders[nWaitOrders].procs))
00738             return false;
00739         nWaitOrders++;
00740     }
00741     return true;
00742 }
00743 
00744 
00745 /*
00746  * TopoSort -- topological sort of a wait queue
00747  *
00748  * Generate a re-ordering of a lock's wait queue that satisfies given
00749  * constraints about certain procs preceding others.  (Each such constraint
00750  * is a fact of a partial ordering.)  Minimize rearrangement of the queue
00751  * not needed to achieve the partial ordering.
00752  *
00753  * This is a lot simpler and slower than, for example, the topological sort
00754  * algorithm shown in Knuth's Volume 1.  However, Knuth's method doesn't
00755  * try to minimize the damage to the existing order.  In practice we are
00756  * not likely to be working with more than a few constraints, so the apparent
00757  * slowness of the algorithm won't really matter.
00758  *
00759  * The initial queue ordering is taken directly from the lock's wait queue.
00760  * The output is an array of PGPROC pointers, of length equal to the lock's
00761  * wait queue length (the caller is responsible for providing this space).
00762  * The partial order is specified by an array of EDGE structs.  Each EDGE
00763  * is one that we need to reverse, therefore the "waiter" must appear before
00764  * the "blocker" in the output array.  The EDGE array may well contain
00765  * edges associated with other locks; these should be ignored.
00766  *
00767  * Returns TRUE if able to build an ordering that satisfies all the
00768  * constraints, FALSE if not (there are contradictory constraints).
00769  */
00770 static bool
00771 TopoSort(LOCK *lock,
00772          EDGE *constraints,
00773          int nConstraints,
00774          PGPROC **ordering)     /* output argument */
00775 {
00776     PROC_QUEUE *waitQueue = &(lock->waitProcs);
00777     int         queue_size = waitQueue->size;
00778     PGPROC     *proc;
00779     int         i,
00780                 j,
00781                 k,
00782                 last;
00783 
00784     /* First, fill topoProcs[] array with the procs in their current order */
00785     proc = (PGPROC *) waitQueue->links.next;
00786     for (i = 0; i < queue_size; i++)
00787     {
00788         topoProcs[i] = proc;
00789         proc = (PGPROC *) proc->links.next;
00790     }
00791 
00792     /*
00793      * Scan the constraints, and for each proc in the array, generate a count
00794      * of the number of constraints that say it must be before something else,
00795      * plus a list of the constraints that say it must be after something
00796      * else. The count for the j'th proc is stored in beforeConstraints[j],
00797      * and the head of its list in afterConstraints[j].  Each constraint
00798      * stores its list link in constraints[i].link (note any constraint will
00799      * be in just one list). The array index for the before-proc of the i'th
00800      * constraint is remembered in constraints[i].pred.
00801      */
00802     MemSet(beforeConstraints, 0, queue_size * sizeof(int));
00803     MemSet(afterConstraints, 0, queue_size * sizeof(int));
00804     for (i = 0; i < nConstraints; i++)
00805     {
00806         proc = constraints[i].waiter;
00807         /* Ignore constraint if not for this lock */
00808         if (proc->waitLock != lock)
00809             continue;
00810         /* Find the waiter proc in the array */
00811         for (j = queue_size; --j >= 0;)
00812         {
00813             if (topoProcs[j] == proc)
00814                 break;
00815         }
00816         Assert(j >= 0);         /* should have found a match */
00817         /* Find the blocker proc in the array */
00818         proc = constraints[i].blocker;
00819         for (k = queue_size; --k >= 0;)
00820         {
00821             if (topoProcs[k] == proc)
00822                 break;
00823         }
00824         Assert(k >= 0);         /* should have found a match */
00825         beforeConstraints[j]++; /* waiter must come before */
00826         /* add this constraint to list of after-constraints for blocker */
00827         constraints[i].pred = j;
00828         constraints[i].link = afterConstraints[k];
00829         afterConstraints[k] = i + 1;
00830     }
00831     /*--------------------
00832      * Now scan the topoProcs array backwards.  At each step, output the
00833      * last proc that has no remaining before-constraints, and decrease
00834      * the beforeConstraints count of each of the procs it was constrained
00835      * against.
00836      * i = index of ordering[] entry we want to output this time
00837      * j = search index for topoProcs[]
00838      * k = temp for scanning constraint list for proc j
00839      * last = last non-null index in topoProcs (avoid redundant searches)
00840      *--------------------
00841      */
00842     last = queue_size - 1;
00843     for (i = queue_size; --i >= 0;)
00844     {
00845         /* Find next candidate to output */
00846         while (topoProcs[last] == NULL)
00847             last--;
00848         for (j = last; j >= 0; j--)
00849         {
00850             if (topoProcs[j] != NULL && beforeConstraints[j] == 0)
00851                 break;
00852         }
00853         /* If no available candidate, topological sort fails */
00854         if (j < 0)
00855             return false;
00856         /* Output candidate, and mark it done by zeroing topoProcs[] entry */
00857         ordering[i] = topoProcs[j];
00858         topoProcs[j] = NULL;
00859         /* Update beforeConstraints counts of its predecessors */
00860         for (k = afterConstraints[j]; k > 0; k = constraints[k - 1].link)
00861             beforeConstraints[constraints[k - 1].pred]--;
00862     }
00863 
00864     /* Done */
00865     return true;
00866 }
00867 
00868 #ifdef DEBUG_DEADLOCK
00869 static void
00870 PrintLockQueue(LOCK *lock, const char *info)
00871 {
00872     PROC_QUEUE *waitQueue = &(lock->waitProcs);
00873     int         queue_size = waitQueue->size;
00874     PGPROC     *proc;
00875     int         i;
00876 
00877     printf("%s lock %p queue ", info, lock);
00878     proc = (PGPROC *) waitQueue->links.next;
00879     for (i = 0; i < queue_size; i++)
00880     {
00881         printf(" %d", proc->pid);
00882         proc = (PGPROC *) proc->links.next;
00883     }
00884     printf("\n");
00885     fflush(stdout);
00886 }
00887 #endif
00888 
00889 /*
00890  * Report a detected deadlock, with available details.
00891  */
00892 void
00893 DeadLockReport(void)
00894 {
00895     StringInfoData clientbuf;   /* errdetail for client */
00896     StringInfoData logbuf;      /* errdetail for server log */
00897     StringInfoData locktagbuf;
00898     int         i;
00899 
00900     initStringInfo(&clientbuf);
00901     initStringInfo(&logbuf);
00902     initStringInfo(&locktagbuf);
00903 
00904     /* Generate the "waits for" lines sent to the client */
00905     for (i = 0; i < nDeadlockDetails; i++)
00906     {
00907         DEADLOCK_INFO *info = &deadlockDetails[i];
00908         int         nextpid;
00909 
00910         /* The last proc waits for the first one... */
00911         if (i < nDeadlockDetails - 1)
00912             nextpid = info[1].pid;
00913         else
00914             nextpid = deadlockDetails[0].pid;
00915 
00916         /* reset locktagbuf to hold next object description */
00917         resetStringInfo(&locktagbuf);
00918 
00919         DescribeLockTag(&locktagbuf, &info->locktag);
00920 
00921         if (i > 0)
00922             appendStringInfoChar(&clientbuf, '\n');
00923 
00924         appendStringInfo(&clientbuf,
00925                   _("Process %d waits for %s on %s; blocked by process %d."),
00926                          info->pid,
00927                          GetLockmodeName(info->locktag.locktag_lockmethodid,
00928                                          info->lockmode),
00929                          locktagbuf.data,
00930                          nextpid);
00931     }
00932 
00933     /* Duplicate all the above for the server ... */
00934     appendStringInfoString(&logbuf, clientbuf.data);
00935 
00936     /* ... and add info about query strings */
00937     for (i = 0; i < nDeadlockDetails; i++)
00938     {
00939         DEADLOCK_INFO *info = &deadlockDetails[i];
00940 
00941         appendStringInfoChar(&logbuf, '\n');
00942 
00943         appendStringInfo(&logbuf,
00944                          _("Process %d: %s"),
00945                          info->pid,
00946                       pgstat_get_backend_current_activity(info->pid, false));
00947     }
00948 
00949     pgstat_report_deadlock();
00950 
00951     ereport(ERROR,
00952             (errcode(ERRCODE_T_R_DEADLOCK_DETECTED),
00953              errmsg("deadlock detected"),
00954              errdetail_internal("%s", clientbuf.data),
00955              errdetail_log("%s", logbuf.data),
00956              errhint("See server log for query details.")));
00957 }
00958 
00959 /*
00960  * RememberSimpleDeadLock: set up info for DeadLockReport when ProcSleep
00961  * detects a trivial (two-way) deadlock.  proc1 wants to block for lockmode
00962  * on lock, but proc2 is already waiting and would be blocked by proc1.
00963  */
00964 void
00965 RememberSimpleDeadLock(PGPROC *proc1,
00966                        LOCKMODE lockmode,
00967                        LOCK *lock,
00968                        PGPROC *proc2)
00969 {
00970     DEADLOCK_INFO *info = &deadlockDetails[0];
00971 
00972     info->locktag = lock->tag;
00973     info->lockmode = lockmode;
00974     info->pid = proc1->pid;
00975     info++;
00976     info->locktag = proc2->waitLock->tag;
00977     info->lockmode = proc2->waitLockMode;
00978     info->pid = proc2->pid;
00979     nDeadlockDetails = 2;
00980 }