Header And Logo

PostgreSQL
| The world's most advanced open source database.

sinvaladt.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * sinvaladt.c
00004  *    POSTGRES shared cache invalidation data manager.
00005  *
00006  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
00007  * Portions Copyright (c) 1994, Regents of the University of California
00008  *
00009  *
00010  * IDENTIFICATION
00011  *    src/backend/storage/ipc/sinvaladt.c
00012  *
00013  *-------------------------------------------------------------------------
00014  */
00015 #include "postgres.h"
00016 
00017 #include <signal.h>
00018 #include <unistd.h>
00019 
00020 #include "miscadmin.h"
00021 #include "storage/backendid.h"
00022 #include "storage/ipc.h"
00023 #include "storage/proc.h"
00024 #include "storage/procsignal.h"
00025 #include "storage/shmem.h"
00026 #include "storage/sinvaladt.h"
00027 #include "storage/spin.h"
00028 
00029 
00030 /*
00031  * Conceptually, the shared cache invalidation messages are stored in an
00032  * infinite array, where maxMsgNum is the next array subscript to store a
00033  * submitted message in, minMsgNum is the smallest array subscript containing
00034  * a message not yet read by all backends, and we always have maxMsgNum >=
00035  * minMsgNum.  (They are equal when there are no messages pending.)  For each
00036  * active backend, there is a nextMsgNum pointer indicating the next message it
00037  * needs to read; we have maxMsgNum >= nextMsgNum >= minMsgNum for every
00038  * backend.
00039  *
00040  * (In the current implementation, minMsgNum is a lower bound for the
00041  * per-process nextMsgNum values, but it isn't rigorously kept equal to the
00042  * smallest nextMsgNum --- it may lag behind.  We only update it when
00043  * SICleanupQueue is called, and we try not to do that often.)
00044  *
00045  * In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES
00046  * entries.  We translate MsgNum values into circular-buffer indexes by
00047  * computing MsgNum % MAXNUMMESSAGES (this should be fast as long as
00048  * MAXNUMMESSAGES is a constant and a power of 2).  As long as maxMsgNum
00049  * doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space
00050  * in the buffer.  If the buffer does overflow, we recover by setting the
00051  * "reset" flag for each backend that has fallen too far behind.  A backend
00052  * that is in "reset" state is ignored while determining minMsgNum.  When
00053  * it does finally attempt to receive inval messages, it must discard all
00054  * its invalidatable state, since it won't know what it missed.
00055  *
00056  * To reduce the probability of needing resets, we send a "catchup" interrupt
00057  * to any backend that seems to be falling unreasonably far behind.  The
00058  * normal behavior is that at most one such interrupt is in flight at a time;
00059  * when a backend completes processing a catchup interrupt, it executes
00060  * SICleanupQueue, which will signal the next-furthest-behind backend if
00061  * needed.  This avoids undue contention from multiple backends all trying
00062  * to catch up at once.  However, the furthest-back backend might be stuck
00063  * in a state where it can't catch up.  Eventually it will get reset, so it
00064  * won't cause any more problems for anyone but itself.  But we don't want
00065  * to find that a bunch of other backends are now too close to the reset
00066  * threshold to be saved.  So SICleanupQueue is designed to occasionally
00067  * send extra catchup interrupts as the queue gets fuller, to backends that
00068  * are far behind and haven't gotten one yet.  As long as there aren't a lot
00069  * of "stuck" backends, we won't need a lot of extra interrupts, since ones
00070  * that aren't stuck will propagate their interrupts to the next guy.
00071  *
00072  * We would have problems if the MsgNum values overflow an integer, so
00073  * whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND
00074  * from all the MsgNum variables simultaneously.  MSGNUMWRAPAROUND can be
00075  * large so that we don't need to do this often.  It must be a multiple of
00076  * MAXNUMMESSAGES so that the existing circular-buffer entries don't need
00077  * to be moved when we do it.
00078  *
00079  * Access to the shared sinval array is protected by two locks, SInvalReadLock
00080  * and SInvalWriteLock.  Readers take SInvalReadLock in shared mode; this
00081  * authorizes them to modify their own ProcState but not to modify or even
00082  * look at anyone else's.  When we need to perform array-wide updates,
00083  * such as in SICleanupQueue, we take SInvalReadLock in exclusive mode to
00084  * lock out all readers.  Writers take SInvalWriteLock (always in exclusive
00085  * mode) to serialize adding messages to the queue.  Note that a writer
00086  * can operate in parallel with one or more readers, because the writer
00087  * has no need to touch anyone's ProcState, except in the infrequent cases
00088  * when SICleanupQueue is needed.  The only point of overlap is that
00089  * the writer wants to change maxMsgNum while readers need to read it.
00090  * We deal with that by having a spinlock that readers must take for just
00091  * long enough to read maxMsgNum, while writers take it for just long enough
00092  * to write maxMsgNum.  (The exact rule is that you need the spinlock to
00093  * read maxMsgNum if you are not holding SInvalWriteLock, and you need the
00094  * spinlock to write maxMsgNum unless you are holding both locks.)
00095  *
00096  * Note: since maxMsgNum is an int and hence presumably atomically readable/
00097  * writable, the spinlock might seem unnecessary.  The reason it is needed
00098  * is to provide a memory barrier: we need to be sure that messages written
00099  * to the array are actually there before maxMsgNum is increased, and that
00100  * readers will see that data after fetching maxMsgNum.  Multiprocessors
00101  * that have weak memory-ordering guarantees can fail without the memory
00102  * barrier instructions that are included in the spinlock sequences.
00103  */
00104 
00105 
00106 /*
00107  * Configurable parameters.
00108  *
00109  * MAXNUMMESSAGES: max number of shared-inval messages we can buffer.
00110  * Must be a power of 2 for speed.
00111  *
00112  * MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow.
00113  * Must be a multiple of MAXNUMMESSAGES.  Should be large.
00114  *
00115  * CLEANUP_MIN: the minimum number of messages that must be in the buffer
00116  * before we bother to call SICleanupQueue.
00117  *
00118  * CLEANUP_QUANTUM: how often (in messages) to call SICleanupQueue once
00119  * we exceed CLEANUP_MIN.  Should be a power of 2 for speed.
00120  *
00121  * SIG_THRESHOLD: the minimum number of messages a backend must have fallen
00122  * behind before we'll send it PROCSIG_CATCHUP_INTERRUPT.
00123  *
00124  * WRITE_QUANTUM: the max number of messages to push into the buffer per
00125  * iteration of SIInsertDataEntries.  Noncritical but should be less than
00126  * CLEANUP_QUANTUM, because we only consider calling SICleanupQueue once
00127  * per iteration.
00128  */
00129 
00130 #define MAXNUMMESSAGES 4096
00131 #define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 262144)
00132 #define CLEANUP_MIN (MAXNUMMESSAGES / 2)
00133 #define CLEANUP_QUANTUM (MAXNUMMESSAGES / 16)
00134 #define SIG_THRESHOLD (MAXNUMMESSAGES / 2)
00135 #define WRITE_QUANTUM 64
00136 
00137 /* Per-backend state in shared invalidation structure */
00138 typedef struct ProcState
00139 {
00140     /* procPid is zero in an inactive ProcState array entry. */
00141     pid_t       procPid;        /* PID of backend, for signaling */
00142     PGPROC     *proc;           /* PGPROC of backend */
00143     /* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
00144     int         nextMsgNum;     /* next message number to read */
00145     bool        resetState;     /* backend needs to reset its state */
00146     bool        signaled;       /* backend has been sent catchup signal */
00147     bool        hasMessages;    /* backend has unread messages */
00148 
00149     /*
00150      * Backend only sends invalidations, never receives them. This only makes
00151      * sense for Startup process during recovery because it doesn't maintain a
00152      * relcache, yet it fires inval messages to allow query backends to see
00153      * schema changes.
00154      */
00155     bool        sendOnly;       /* backend only sends, never receives */
00156 
00157     /*
00158      * Next LocalTransactionId to use for each idle backend slot.  We keep
00159      * this here because it is indexed by BackendId and it is convenient to
00160      * copy the value to and from local memory when MyBackendId is set. It's
00161      * meaningless in an active ProcState entry.
00162      */
00163     LocalTransactionId nextLXID;
00164 } ProcState;
00165 
00166 /* Shared cache invalidation memory segment */
00167 typedef struct SISeg
00168 {
00169     /*
00170      * General state information
00171      */
00172     int         minMsgNum;      /* oldest message still needed */
00173     int         maxMsgNum;      /* next message number to be assigned */
00174     int         nextThreshold;  /* # of messages to call SICleanupQueue */
00175     int         lastBackend;    /* index of last active procState entry, +1 */
00176     int         maxBackends;    /* size of procState array */
00177 
00178     slock_t     msgnumLock;     /* spinlock protecting maxMsgNum */
00179 
00180     /*
00181      * Circular buffer holding shared-inval messages
00182      */
00183     SharedInvalidationMessage buffer[MAXNUMMESSAGES];
00184 
00185     /*
00186      * Per-backend state info.
00187      *
00188      * We declare procState as 1 entry because C wants a fixed-size array, but
00189      * actually it is maxBackends entries long.
00190      */
00191     ProcState   procState[1];   /* reflects the invalidation state */
00192 } SISeg;
00193 
00194 static SISeg *shmInvalBuffer;   /* pointer to the shared inval buffer */
00195 
00196 
00197 static LocalTransactionId nextLocalTransactionId;
00198 
00199 static void CleanupInvalidationState(int status, Datum arg);
00200 
00201 
00202 /*
00203  * SInvalShmemSize --- return shared-memory space needed
00204  */
00205 Size
00206 SInvalShmemSize(void)
00207 {
00208     Size        size;
00209 
00210     size = offsetof(SISeg, procState);
00211     size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
00212 
00213     return size;
00214 }
00215 
00216 /*
00217  * SharedInvalBufferInit
00218  *      Create and initialize the SI message buffer
00219  */
00220 void
00221 CreateSharedInvalidationState(void)
00222 {
00223     Size        size;
00224     int         i;
00225     bool        found;
00226 
00227     /* Allocate space in shared memory */
00228     size = offsetof(SISeg, procState);
00229     size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
00230 
00231     shmInvalBuffer = (SISeg *)
00232         ShmemInitStruct("shmInvalBuffer", size, &found);
00233     if (found)
00234         return;
00235 
00236     /* Clear message counters, save size of procState array, init spinlock */
00237     shmInvalBuffer->minMsgNum = 0;
00238     shmInvalBuffer->maxMsgNum = 0;
00239     shmInvalBuffer->nextThreshold = CLEANUP_MIN;
00240     shmInvalBuffer->lastBackend = 0;
00241     shmInvalBuffer->maxBackends = MaxBackends;
00242     SpinLockInit(&shmInvalBuffer->msgnumLock);
00243 
00244     /* The buffer[] array is initially all unused, so we need not fill it */
00245 
00246     /* Mark all backends inactive, and initialize nextLXID */
00247     for (i = 0; i < shmInvalBuffer->maxBackends; i++)
00248     {
00249         shmInvalBuffer->procState[i].procPid = 0;       /* inactive */
00250         shmInvalBuffer->procState[i].proc = NULL;
00251         shmInvalBuffer->procState[i].nextMsgNum = 0;    /* meaningless */
00252         shmInvalBuffer->procState[i].resetState = false;
00253         shmInvalBuffer->procState[i].signaled = false;
00254         shmInvalBuffer->procState[i].hasMessages = false;
00255         shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId;
00256     }
00257 }
00258 
00259 /*
00260  * SharedInvalBackendInit
00261  *      Initialize a new backend to operate on the sinval buffer
00262  */
00263 void
00264 SharedInvalBackendInit(bool sendOnly)
00265 {
00266     int         index;
00267     ProcState  *stateP = NULL;
00268     SISeg      *segP = shmInvalBuffer;
00269 
00270     /*
00271      * This can run in parallel with read operations, but not with write
00272      * operations, since SIInsertDataEntries relies on lastBackend to set
00273      * hasMessages appropriately.
00274      */
00275     LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
00276 
00277     /* Look for a free entry in the procState array */
00278     for (index = 0; index < segP->lastBackend; index++)
00279     {
00280         if (segP->procState[index].procPid == 0)        /* inactive slot? */
00281         {
00282             stateP = &segP->procState[index];
00283             break;
00284         }
00285     }
00286 
00287     if (stateP == NULL)
00288     {
00289         if (segP->lastBackend < segP->maxBackends)
00290         {
00291             stateP = &segP->procState[segP->lastBackend];
00292             Assert(stateP->procPid == 0);
00293             segP->lastBackend++;
00294         }
00295         else
00296         {
00297             /*
00298              * out of procState slots: MaxBackends exceeded -- report normally
00299              */
00300             MyBackendId = InvalidBackendId;
00301             LWLockRelease(SInvalWriteLock);
00302             ereport(FATAL,
00303                     (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
00304                      errmsg("sorry, too many clients already")));
00305         }
00306     }
00307 
00308     MyBackendId = (stateP - &segP->procState[0]) + 1;
00309 
00310     /* Advertise assigned backend ID in MyProc */
00311     MyProc->backendId = MyBackendId;
00312 
00313     /* Fetch next local transaction ID into local memory */
00314     nextLocalTransactionId = stateP->nextLXID;
00315 
00316     /* mark myself active, with all extant messages already read */
00317     stateP->procPid = MyProcPid;
00318     stateP->proc = MyProc;
00319     stateP->nextMsgNum = segP->maxMsgNum;
00320     stateP->resetState = false;
00321     stateP->signaled = false;
00322     stateP->hasMessages = false;
00323     stateP->sendOnly = sendOnly;
00324 
00325     LWLockRelease(SInvalWriteLock);
00326 
00327     /* register exit routine to mark my entry inactive at exit */
00328     on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
00329 
00330     elog(DEBUG4, "my backend ID is %d", MyBackendId);
00331 }
00332 
00333 /*
00334  * CleanupInvalidationState
00335  *      Mark the current backend as no longer active.
00336  *
00337  * This function is called via on_shmem_exit() during backend shutdown.
00338  *
00339  * arg is really of type "SISeg*".
00340  */
00341 static void
00342 CleanupInvalidationState(int status, Datum arg)
00343 {
00344     SISeg      *segP = (SISeg *) DatumGetPointer(arg);
00345     ProcState  *stateP;
00346     int         i;
00347 
00348     Assert(PointerIsValid(segP));
00349 
00350     LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
00351 
00352     stateP = &segP->procState[MyBackendId - 1];
00353 
00354     /* Update next local transaction ID for next holder of this backendID */
00355     stateP->nextLXID = nextLocalTransactionId;
00356 
00357     /* Mark myself inactive */
00358     stateP->procPid = 0;
00359     stateP->proc = NULL;
00360     stateP->nextMsgNum = 0;
00361     stateP->resetState = false;
00362     stateP->signaled = false;
00363 
00364     /* Recompute index of last active backend */
00365     for (i = segP->lastBackend; i > 0; i--)
00366     {
00367         if (segP->procState[i - 1].procPid != 0)
00368             break;
00369     }
00370     segP->lastBackend = i;
00371 
00372     LWLockRelease(SInvalWriteLock);
00373 }
00374 
00375 /*
00376  * BackendIdGetProc
00377  *      Get the PGPROC structure for a backend, given the backend ID.
00378  *      The result may be out of date arbitrarily quickly, so the caller
00379  *      must be careful about how this information is used.  NULL is
00380  *      returned if the backend is not active.
00381  */
00382 PGPROC *
00383 BackendIdGetProc(int backendID)
00384 {
00385     PGPROC     *result = NULL;
00386     SISeg      *segP = shmInvalBuffer;
00387 
00388     /* Need to lock out additions/removals of backends */
00389     LWLockAcquire(SInvalWriteLock, LW_SHARED);
00390 
00391     if (backendID > 0 && backendID <= segP->lastBackend)
00392     {
00393         ProcState  *stateP = &segP->procState[backendID - 1];
00394 
00395         result = stateP->proc;
00396     }
00397 
00398     LWLockRelease(SInvalWriteLock);
00399 
00400     return result;
00401 }
00402 
00403 /*
00404  * SIInsertDataEntries
00405  *      Add new invalidation message(s) to the buffer.
00406  */
00407 void
00408 SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
00409 {
00410     SISeg      *segP = shmInvalBuffer;
00411 
00412     /*
00413      * N can be arbitrarily large.  We divide the work into groups of no more
00414      * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for
00415      * an unreasonably long time.  (This is not so much because we care about
00416      * letting in other writers, as that some just-caught-up backend might be
00417      * trying to do SICleanupQueue to pass on its signal, and we don't want it
00418      * to have to wait a long time.)  Also, we need to consider calling
00419      * SICleanupQueue every so often.
00420      */
00421     while (n > 0)
00422     {
00423         int         nthistime = Min(n, WRITE_QUANTUM);
00424         int         numMsgs;
00425         int         max;
00426         int         i;
00427 
00428         n -= nthistime;
00429 
00430         LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
00431 
00432         /*
00433          * If the buffer is full, we *must* acquire some space.  Clean the
00434          * queue and reset anyone who is preventing space from being freed.
00435          * Otherwise, clean the queue only when it's exceeded the next
00436          * fullness threshold.  We have to loop and recheck the buffer state
00437          * after any call of SICleanupQueue.
00438          */
00439         for (;;)
00440         {
00441             numMsgs = segP->maxMsgNum - segP->minMsgNum;
00442             if (numMsgs + nthistime > MAXNUMMESSAGES ||
00443                 numMsgs >= segP->nextThreshold)
00444                 SICleanupQueue(true, nthistime);
00445             else
00446                 break;
00447         }
00448 
00449         /*
00450          * Insert new message(s) into proper slot of circular buffer
00451          */
00452         max = segP->maxMsgNum;
00453         while (nthistime-- > 0)
00454         {
00455             segP->buffer[max % MAXNUMMESSAGES] = *data++;
00456             max++;
00457         }
00458 
00459         /* Update current value of maxMsgNum using spinlock */
00460         {
00461             /* use volatile pointer to prevent code rearrangement */
00462             volatile SISeg *vsegP = segP;
00463 
00464             SpinLockAcquire(&vsegP->msgnumLock);
00465             vsegP->maxMsgNum = max;
00466             SpinLockRelease(&vsegP->msgnumLock);
00467         }
00468 
00469         /*
00470          * Now that the maxMsgNum change is globally visible, we give everyone
00471          * a swift kick to make sure they read the newly added messages.
00472          * Releasing SInvalWriteLock will enforce a full memory barrier, so
00473          * these (unlocked) changes will be committed to memory before we exit
00474          * the function.
00475          */
00476         for (i = 0; i < segP->lastBackend; i++)
00477         {
00478             ProcState  *stateP = &segP->procState[i];
00479 
00480             stateP->hasMessages = true;
00481         }
00482 
00483         LWLockRelease(SInvalWriteLock);
00484     }
00485 }
00486 
00487 /*
00488  * SIGetDataEntries
00489  *      get next SI message(s) for current backend, if there are any
00490  *
00491  * Possible return values:
00492  *  0:   no SI message available
00493  *  n>0: next n SI messages have been extracted into data[]
00494  * -1:   SI reset message extracted
00495  *
00496  * If the return value is less than the array size "datasize", the caller
00497  * can assume that there are no more SI messages after the one(s) returned.
00498  * Otherwise, another call is needed to collect more messages.
00499  *
00500  * NB: this can run in parallel with other instances of SIGetDataEntries
00501  * executing on behalf of other backends, since each instance will modify only
00502  * fields of its own backend's ProcState, and no instance will look at fields
00503  * of other backends' ProcStates.  We express this by grabbing SInvalReadLock
00504  * in shared mode.  Note that this is not exactly the normal (read-only)
00505  * interpretation of a shared lock! Look closely at the interactions before
00506  * allowing SInvalReadLock to be grabbed in shared mode for any other reason!
00507  *
00508  * NB: this can also run in parallel with SIInsertDataEntries.  It is not
00509  * guaranteed that we will return any messages added after the routine is
00510  * entered.
00511  *
00512  * Note: we assume that "datasize" is not so large that it might be important
00513  * to break our hold on SInvalReadLock into segments.
00514  */
00515 int
00516 SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
00517 {
00518     SISeg      *segP;
00519     ProcState  *stateP;
00520     int         max;
00521     int         n;
00522 
00523     segP = shmInvalBuffer;
00524     stateP = &segP->procState[MyBackendId - 1];
00525 
00526     /*
00527      * Before starting to take locks, do a quick, unlocked test to see whether
00528      * there can possibly be anything to read.  On a multiprocessor system,
00529      * it's possible that this load could migrate backwards and occur before
00530      * we actually enter this function, so we might miss a sinval message that
00531      * was just added by some other processor.  But they can't migrate
00532      * backwards over a preceding lock acquisition, so it should be OK.  If we
00533      * haven't acquired a lock preventing against further relevant
00534      * invalidations, any such occurrence is not much different than if the
00535      * invalidation had arrived slightly later in the first place.
00536      */
00537     if (!stateP->hasMessages)
00538         return 0;
00539 
00540     LWLockAcquire(SInvalReadLock, LW_SHARED);
00541 
00542     /*
00543      * We must reset hasMessages before determining how many messages we're
00544      * going to read.  That way, if new messages arrive after we have
00545      * determined how many we're reading, the flag will get reset and we'll
00546      * notice those messages part-way through.
00547      *
00548      * Note that, if we don't end up reading all of the messages, we had
00549      * better be certain to reset this flag before exiting!
00550      */
00551     stateP->hasMessages = false;
00552 
00553     /* Fetch current value of maxMsgNum using spinlock */
00554     {
00555         /* use volatile pointer to prevent code rearrangement */
00556         volatile SISeg *vsegP = segP;
00557 
00558         SpinLockAcquire(&vsegP->msgnumLock);
00559         max = vsegP->maxMsgNum;
00560         SpinLockRelease(&vsegP->msgnumLock);
00561     }
00562 
00563     if (stateP->resetState)
00564     {
00565         /*
00566          * Force reset.  We can say we have dealt with any messages added
00567          * since the reset, as well; and that means we should clear the
00568          * signaled flag, too.
00569          */
00570         stateP->nextMsgNum = max;
00571         stateP->resetState = false;
00572         stateP->signaled = false;
00573         LWLockRelease(SInvalReadLock);
00574         return -1;
00575     }
00576 
00577     /*
00578      * Retrieve messages and advance backend's counter, until data array is
00579      * full or there are no more messages.
00580      *
00581      * There may be other backends that haven't read the message(s), so we
00582      * cannot delete them here.  SICleanupQueue() will eventually remove them
00583      * from the queue.
00584      */
00585     n = 0;
00586     while (n < datasize && stateP->nextMsgNum < max)
00587     {
00588         data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
00589         stateP->nextMsgNum++;
00590     }
00591 
00592     /*
00593      * If we have caught up completely, reset our "signaled" flag so that
00594      * we'll get another signal if we fall behind again.
00595      *
00596      * If we haven't caught up completely, reset the hasMessages flag so that
00597      * we see the remaining messages next time.
00598      */
00599     if (stateP->nextMsgNum >= max)
00600         stateP->signaled = false;
00601     else
00602         stateP->hasMessages = true;
00603 
00604     LWLockRelease(SInvalReadLock);
00605     return n;
00606 }
00607 
00608 /*
00609  * SICleanupQueue
00610  *      Remove messages that have been consumed by all active backends
00611  *
00612  * callerHasWriteLock is TRUE if caller is holding SInvalWriteLock.
00613  * minFree is the minimum number of message slots to make free.
00614  *
00615  * Possible side effects of this routine include marking one or more
00616  * backends as "reset" in the array, and sending PROCSIG_CATCHUP_INTERRUPT
00617  * to some backend that seems to be getting too far behind.  We signal at
00618  * most one backend at a time, for reasons explained at the top of the file.
00619  *
00620  * Caution: because we transiently release write lock when we have to signal
00621  * some other backend, it is NOT guaranteed that there are still minFree
00622  * free message slots at exit.  Caller must recheck and perhaps retry.
00623  */
00624 void
00625 SICleanupQueue(bool callerHasWriteLock, int minFree)
00626 {
00627     SISeg      *segP = shmInvalBuffer;
00628     int         min,
00629                 minsig,
00630                 lowbound,
00631                 numMsgs,
00632                 i;
00633     ProcState  *needSig = NULL;
00634 
00635     /* Lock out all writers and readers */
00636     if (!callerHasWriteLock)
00637         LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
00638     LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);
00639 
00640     /*
00641      * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
00642      * furthest-back backend that needs signaling (if any), and reset any
00643      * backends that are too far back.  Note that because we ignore sendOnly
00644      * backends here it is possible for them to keep sending messages without
00645      * a problem even when they are the only active backend.
00646      */
00647     min = segP->maxMsgNum;
00648     minsig = min - SIG_THRESHOLD;
00649     lowbound = min - MAXNUMMESSAGES + minFree;
00650 
00651     for (i = 0; i < segP->lastBackend; i++)
00652     {
00653         ProcState  *stateP = &segP->procState[i];
00654         int         n = stateP->nextMsgNum;
00655 
00656         /* Ignore if inactive or already in reset state */
00657         if (stateP->procPid == 0 || stateP->resetState || stateP->sendOnly)
00658             continue;
00659 
00660         /*
00661          * If we must free some space and this backend is preventing it, force
00662          * him into reset state and then ignore until he catches up.
00663          */
00664         if (n < lowbound)
00665         {
00666             stateP->resetState = true;
00667             /* no point in signaling him ... */
00668             continue;
00669         }
00670 
00671         /* Track the global minimum nextMsgNum */
00672         if (n < min)
00673             min = n;
00674 
00675         /* Also see who's furthest back of the unsignaled backends */
00676         if (n < minsig && !stateP->signaled)
00677         {
00678             minsig = n;
00679             needSig = stateP;
00680         }
00681     }
00682     segP->minMsgNum = min;
00683 
00684     /*
00685      * When minMsgNum gets really large, decrement all message counters so as
00686      * to forestall overflow of the counters.  This happens seldom enough that
00687      * folding it into the previous loop would be a loser.
00688      */
00689     if (min >= MSGNUMWRAPAROUND)
00690     {
00691         segP->minMsgNum -= MSGNUMWRAPAROUND;
00692         segP->maxMsgNum -= MSGNUMWRAPAROUND;
00693         for (i = 0; i < segP->lastBackend; i++)
00694         {
00695             /* we don't bother skipping inactive entries here */
00696             segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;
00697         }
00698     }
00699 
00700     /*
00701      * Determine how many messages are still in the queue, and set the
00702      * threshold at which we should repeat SICleanupQueue().
00703      */
00704     numMsgs = segP->maxMsgNum - segP->minMsgNum;
00705     if (numMsgs < CLEANUP_MIN)
00706         segP->nextThreshold = CLEANUP_MIN;
00707     else
00708         segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;
00709 
00710     /*
00711      * Lastly, signal anyone who needs a catchup interrupt.  Since
00712      * SendProcSignal() might not be fast, we don't want to hold locks while
00713      * executing it.
00714      */
00715     if (needSig)
00716     {
00717         pid_t       his_pid = needSig->procPid;
00718         BackendId   his_backendId = (needSig - &segP->procState[0]) + 1;
00719 
00720         needSig->signaled = true;
00721         LWLockRelease(SInvalReadLock);
00722         LWLockRelease(SInvalWriteLock);
00723         elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
00724         SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_backendId);
00725         if (callerHasWriteLock)
00726             LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
00727     }
00728     else
00729     {
00730         LWLockRelease(SInvalReadLock);
00731         if (!callerHasWriteLock)
00732             LWLockRelease(SInvalWriteLock);
00733     }
00734 }
00735 
00736 
00737 /*
00738  * GetNextLocalTransactionId --- allocate a new LocalTransactionId
00739  *
00740  * We split VirtualTransactionIds into two parts so that it is possible
00741  * to allocate a new one without any contention for shared memory, except
00742  * for a bit of additional overhead during backend startup/shutdown.
00743  * The high-order part of a VirtualTransactionId is a BackendId, and the
00744  * low-order part is a LocalTransactionId, which we assign from a local
00745  * counter.  To avoid the risk of a VirtualTransactionId being reused
00746  * within a short interval, successive procs occupying the same backend ID
00747  * slot should use a consecutive sequence of local IDs, which is implemented
00748  * by copying nextLocalTransactionId as seen above.
00749  */
00750 LocalTransactionId
00751 GetNextLocalTransactionId(void)
00752 {
00753     LocalTransactionId result;
00754 
00755     /* loop to avoid returning InvalidLocalTransactionId at wraparound */
00756     do
00757     {
00758         result = nextLocalTransactionId++;
00759     } while (!LocalTransactionIdIsValid(result));
00760 
00761     return result;
00762 }