#include "postgres.h"#include <signal.h>#include <unistd.h>#include "miscadmin.h"#include "storage/backendid.h"#include "storage/ipc.h"#include "storage/proc.h"#include "storage/procsignal.h"#include "storage/shmem.h"#include "storage/sinvaladt.h"#include "storage/spin.h"
Go to the source code of this file.
Data Structures | |
| struct | ProcState |
| struct | SISeg |
Defines | |
| #define | MAXNUMMESSAGES 4096 |
| #define | MSGNUMWRAPAROUND (MAXNUMMESSAGES * 262144) |
| #define | CLEANUP_MIN (MAXNUMMESSAGES / 2) |
| #define | CLEANUP_QUANTUM (MAXNUMMESSAGES / 16) |
| #define | SIG_THRESHOLD (MAXNUMMESSAGES / 2) |
| #define | WRITE_QUANTUM 64 |
Typedefs | |
| typedef struct ProcState | ProcState |
| typedef struct SISeg | SISeg |
Functions | |
| static void | CleanupInvalidationState (int status, Datum arg) |
| Size | SInvalShmemSize (void) |
| void | CreateSharedInvalidationState (void) |
| void | SharedInvalBackendInit (bool sendOnly) |
| PGPROC * | BackendIdGetProc (int backendID) |
| void | SIInsertDataEntries (const SharedInvalidationMessage *data, int n) |
| int | SIGetDataEntries (SharedInvalidationMessage *data, int datasize) |
| void | SICleanupQueue (bool callerHasWriteLock, int minFree) |
| LocalTransactionId | GetNextLocalTransactionId (void) |
Variables | |
| static SISeg * | shmInvalBuffer |
| static LocalTransactionId | nextLocalTransactionId |
| #define CLEANUP_MIN (MAXNUMMESSAGES / 2) |
Definition at line 132 of file sinvaladt.c.
Referenced by SICleanupQueue().
| #define CLEANUP_QUANTUM (MAXNUMMESSAGES / 16) |
Definition at line 133 of file sinvaladt.c.
Referenced by SICleanupQueue().
| #define MAXNUMMESSAGES 4096 |
Definition at line 130 of file sinvaladt.c.
Referenced by SICleanupQueue(), and SIInsertDataEntries().
| #define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 262144) |
Definition at line 131 of file sinvaladt.c.
Referenced by SICleanupQueue().
| #define SIG_THRESHOLD (MAXNUMMESSAGES / 2) |
Definition at line 134 of file sinvaladt.c.
| #define WRITE_QUANTUM 64 |
Definition at line 135 of file sinvaladt.c.
Referenced by SIInsertDataEntries().
| PGPROC* BackendIdGetProc | ( | int | backendID | ) |
Definition at line 383 of file sinvaladt.c.
References LW_SHARED, LWLockAcquire(), LWLockRelease(), ProcState::proc, SISeg::procState, and SInvalWriteLock.
Referenced by do_autovacuum(), and VirtualXactLock().
{
PGPROC *result = NULL;
SISeg *segP = shmInvalBuffer;
/* Need to lock out additions/removals of backends */
LWLockAcquire(SInvalWriteLock, LW_SHARED);
if (backendID > 0 && backendID <= segP->lastBackend)
{
ProcState *stateP = &segP->procState[backendID - 1];
result = stateP->proc;
}
LWLockRelease(SInvalWriteLock);
return result;
}
| static void CleanupInvalidationState | ( | int | status, | |
| Datum | arg | |||
| ) | [static] |
Definition at line 342 of file sinvaladt.c.
References Assert, DatumGetPointer, i, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyBackendId, nextLocalTransactionId, ProcState::nextLXID, ProcState::nextMsgNum, PointerIsValid, ProcState::proc, ProcState::procPid, SISeg::procState, ProcState::resetState, ProcState::signaled, and SInvalWriteLock.
Referenced by SharedInvalBackendInit().
{
SISeg *segP = (SISeg *) DatumGetPointer(arg);
ProcState *stateP;
int i;
Assert(PointerIsValid(segP));
LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
stateP = &segP->procState[MyBackendId - 1];
/* Update next local transaction ID for next holder of this backendID */
stateP->nextLXID = nextLocalTransactionId;
/* Mark myself inactive */
stateP->procPid = 0;
stateP->proc = NULL;
stateP->nextMsgNum = 0;
stateP->resetState = false;
stateP->signaled = false;
/* Recompute index of last active backend */
for (i = segP->lastBackend; i > 0; i--)
{
if (segP->procState[i - 1].procPid != 0)
break;
}
segP->lastBackend = i;
LWLockRelease(SInvalWriteLock);
}
| void CreateSharedInvalidationState | ( | void | ) |
Definition at line 221 of file sinvaladt.c.
References add_size(), ProcState::hasMessages, i, SISeg::lastBackend, SISeg::maxBackends, MaxBackends, SISeg::maxMsgNum, SISeg::minMsgNum, SISeg::msgnumLock, mul_size(), ProcState::nextLXID, ProcState::nextMsgNum, SISeg::nextThreshold, offsetof, ProcState::proc, ProcState::procPid, SISeg::procState, ProcState::resetState, ShmemInitStruct(), ProcState::signaled, and SpinLockInit.
Referenced by CreateSharedMemoryAndSemaphores().
{
Size size;
int i;
bool found;
/* Allocate space in shared memory */
size = offsetof(SISeg, procState);
size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
shmInvalBuffer = (SISeg *)
ShmemInitStruct("shmInvalBuffer", size, &found);
if (found)
return;
/* Clear message counters, save size of procState array, init spinlock */
shmInvalBuffer->minMsgNum = 0;
shmInvalBuffer->maxMsgNum = 0;
shmInvalBuffer->nextThreshold = CLEANUP_MIN;
shmInvalBuffer->lastBackend = 0;
shmInvalBuffer->maxBackends = MaxBackends;
SpinLockInit(&shmInvalBuffer->msgnumLock);
/* The buffer[] array is initially all unused, so we need not fill it */
/* Mark all backends inactive, and initialize nextLXID */
for (i = 0; i < shmInvalBuffer->maxBackends; i++)
{
shmInvalBuffer->procState[i].procPid = 0; /* inactive */
shmInvalBuffer->procState[i].proc = NULL;
shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */
shmInvalBuffer->procState[i].resetState = false;
shmInvalBuffer->procState[i].signaled = false;
shmInvalBuffer->procState[i].hasMessages = false;
shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId;
}
}
| LocalTransactionId GetNextLocalTransactionId | ( | void | ) |
Definition at line 751 of file sinvaladt.c.
References LocalTransactionIdIsValid, and nextLocalTransactionId.
Referenced by InitRecoveryTransactionEnvironment(), and StartTransaction().
{
LocalTransactionId result;
/* loop to avoid returning InvalidLocalTransactionId at wraparound */
do
{
result = nextLocalTransactionId++;
} while (!LocalTransactionIdIsValid(result));
return result;
}
| void SharedInvalBackendInit | ( | bool | sendOnly | ) |
Definition at line 264 of file sinvaladt.c.
References Assert, PGPROC::backendId, CleanupInvalidationState(), DEBUG4, elog, ereport, errcode(), errmsg(), FATAL, ProcState::hasMessages, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SISeg::maxBackends, SISeg::maxMsgNum, MyBackendId, MyProc, MyProcPid, nextLocalTransactionId, ProcState::nextLXID, ProcState::nextMsgNum, NULL, on_shmem_exit(), PointerGetDatum, ProcState::proc, ProcState::procPid, SISeg::procState, ProcState::resetState, ProcState::sendOnly, ProcState::signaled, and SInvalWriteLock.
Referenced by InitPostgres(), and InitRecoveryTransactionEnvironment().
{
int index;
ProcState *stateP = NULL;
SISeg *segP = shmInvalBuffer;
/*
* This can run in parallel with read operations, but not with write
* operations, since SIInsertDataEntries relies on lastBackend to set
* hasMessages appropriately.
*/
LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
/* Look for a free entry in the procState array */
for (index = 0; index < segP->lastBackend; index++)
{
if (segP->procState[index].procPid == 0) /* inactive slot? */
{
stateP = &segP->procState[index];
break;
}
}
if (stateP == NULL)
{
if (segP->lastBackend < segP->maxBackends)
{
stateP = &segP->procState[segP->lastBackend];
Assert(stateP->procPid == 0);
segP->lastBackend++;
}
else
{
/*
* out of procState slots: MaxBackends exceeded -- report normally
*/
MyBackendId = InvalidBackendId;
LWLockRelease(SInvalWriteLock);
ereport(FATAL,
(errcode(ERRCODE_TOO_MANY_CONNECTIONS),
errmsg("sorry, too many clients already")));
}
}
MyBackendId = (stateP - &segP->procState[0]) + 1;
/* Advertise assigned backend ID in MyProc */
MyProc->backendId = MyBackendId;
/* Fetch next local transaction ID into local memory */
nextLocalTransactionId = stateP->nextLXID;
/* mark myself active, with all extant messages already read */
stateP->procPid = MyProcPid;
stateP->proc = MyProc;
stateP->nextMsgNum = segP->maxMsgNum;
stateP->resetState = false;
stateP->signaled = false;
stateP->hasMessages = false;
stateP->sendOnly = sendOnly;
LWLockRelease(SInvalWriteLock);
/* register exit routine to mark my entry inactive at exit */
on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
elog(DEBUG4, "my backend ID is %d", MyBackendId);
}
| void SICleanupQueue | ( | bool | callerHasWriteLock, | |
| int | minFree | |||
| ) |
Definition at line 625 of file sinvaladt.c.
References CLEANUP_MIN, CLEANUP_QUANTUM, DEBUG4, elog, i, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, MAXNUMMESSAGES, SISeg::minMsgNum, MSGNUMWRAPAROUND, ProcState::nextMsgNum, SISeg::nextThreshold, ProcState::procPid, PROCSIG_CATCHUP_INTERRUPT, SISeg::procState, ProcState::resetState, ProcState::sendOnly, SendProcSignal(), ProcState::signaled, signaled, SInvalReadLock, and SInvalWriteLock.
Referenced by ReceiveSharedInvalidMessages(), and SIInsertDataEntries().
{
SISeg *segP = shmInvalBuffer;
int min,
minsig,
lowbound,
numMsgs,
i;
ProcState *needSig = NULL;
/* Lock out all writers and readers */
if (!callerHasWriteLock)
LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);
/*
* Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
* furthest-back backend that needs signaling (if any), and reset any
* backends that are too far back. Note that because we ignore sendOnly
* backends here it is possible for them to keep sending messages without
* a problem even when they are the only active backend.
*/
min = segP->maxMsgNum;
minsig = min - SIG_THRESHOLD;
lowbound = min - MAXNUMMESSAGES + minFree;
for (i = 0; i < segP->lastBackend; i++)
{
ProcState *stateP = &segP->procState[i];
int n = stateP->nextMsgNum;
/* Ignore if inactive or already in reset state */
if (stateP->procPid == 0 || stateP->resetState || stateP->sendOnly)
continue;
/*
* If we must free some space and this backend is preventing it, force
* him into reset state and then ignore until he catches up.
*/
if (n < lowbound)
{
stateP->resetState = true;
/* no point in signaling him ... */
continue;
}
/* Track the global minimum nextMsgNum */
if (n < min)
min = n;
/* Also see who's furthest back of the unsignaled backends */
if (n < minsig && !stateP->signaled)
{
minsig = n;
needSig = stateP;
}
}
segP->minMsgNum = min;
/*
* When minMsgNum gets really large, decrement all message counters so as
* to forestall overflow of the counters. This happens seldom enough that
* folding it into the previous loop would be a loser.
*/
if (min >= MSGNUMWRAPAROUND)
{
segP->minMsgNum -= MSGNUMWRAPAROUND;
segP->maxMsgNum -= MSGNUMWRAPAROUND;
for (i = 0; i < segP->lastBackend; i++)
{
/* we don't bother skipping inactive entries here */
segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;
}
}
/*
* Determine how many messages are still in the queue, and set the
* threshold at which we should repeat SICleanupQueue().
*/
numMsgs = segP->maxMsgNum - segP->minMsgNum;
if (numMsgs < CLEANUP_MIN)
segP->nextThreshold = CLEANUP_MIN;
else
segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;
/*
* Lastly, signal anyone who needs a catchup interrupt. Since
* SendProcSignal() might not be fast, we don't want to hold locks while
* executing it.
*/
if (needSig)
{
pid_t his_pid = needSig->procPid;
BackendId his_backendId = (needSig - &segP->procState[0]) + 1;
needSig->signaled = true;
LWLockRelease(SInvalReadLock);
LWLockRelease(SInvalWriteLock);
elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_backendId);
if (callerHasWriteLock)
LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
}
else
{
LWLockRelease(SInvalReadLock);
if (!callerHasWriteLock)
LWLockRelease(SInvalWriteLock);
}
}
| int SIGetDataEntries | ( | SharedInvalidationMessage * | data, | |
| int | datasize | |||
| ) |
Definition at line 516 of file sinvaladt.c.
References SISeg::buffer, ProcState::hasMessages, LW_SHARED, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, SISeg::msgnumLock, MyBackendId, ProcState::nextMsgNum, SISeg::procState, ProcState::resetState, ProcState::signaled, SInvalReadLock, SpinLockAcquire, and SpinLockRelease.
Referenced by ReceiveSharedInvalidMessages().
{
SISeg *segP;
ProcState *stateP;
int max;
int n;
segP = shmInvalBuffer;
stateP = &segP->procState[MyBackendId - 1];
/*
* Before starting to take locks, do a quick, unlocked test to see whether
* there can possibly be anything to read. On a multiprocessor system,
* it's possible that this load could migrate backwards and occur before
* we actually enter this function, so we might miss a sinval message that
* was just added by some other processor. But they can't migrate
* backwards over a preceding lock acquisition, so it should be OK. If we
* haven't acquired a lock preventing against further relevant
* invalidations, any such occurrence is not much different than if the
* invalidation had arrived slightly later in the first place.
*/
if (!stateP->hasMessages)
return 0;
LWLockAcquire(SInvalReadLock, LW_SHARED);
/*
* We must reset hasMessages before determining how many messages we're
* going to read. That way, if new messages arrive after we have
* determined how many we're reading, the flag will get reset and we'll
* notice those messages part-way through.
*
* Note that, if we don't end up reading all of the messages, we had
* better be certain to reset this flag before exiting!
*/
stateP->hasMessages = false;
/* Fetch current value of maxMsgNum using spinlock */
{
/* use volatile pointer to prevent code rearrangement */
volatile SISeg *vsegP = segP;
SpinLockAcquire(&vsegP->msgnumLock);
max = vsegP->maxMsgNum;
SpinLockRelease(&vsegP->msgnumLock);
}
if (stateP->resetState)
{
/*
* Force reset. We can say we have dealt with any messages added
* since the reset, as well; and that means we should clear the
* signaled flag, too.
*/
stateP->nextMsgNum = max;
stateP->resetState = false;
stateP->signaled = false;
LWLockRelease(SInvalReadLock);
return -1;
}
/*
* Retrieve messages and advance backend's counter, until data array is
* full or there are no more messages.
*
* There may be other backends that haven't read the message(s), so we
* cannot delete them here. SICleanupQueue() will eventually remove them
* from the queue.
*/
n = 0;
while (n < datasize && stateP->nextMsgNum < max)
{
data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
stateP->nextMsgNum++;
}
/*
* If we have caught up completely, reset our "signaled" flag so that
* we'll get another signal if we fall behind again.
*
* If we haven't caught up completely, reset the hasMessages flag so that
* we see the remaining messages next time.
*/
if (stateP->nextMsgNum >= max)
stateP->signaled = false;
else
stateP->hasMessages = true;
LWLockRelease(SInvalReadLock);
return n;
}
| void SIInsertDataEntries | ( | const SharedInvalidationMessage * | data, | |
| int | n | |||
| ) |
Definition at line 408 of file sinvaladt.c.
References SISeg::buffer, ProcState::hasMessages, i, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, MAXNUMMESSAGES, Min, SISeg::minMsgNum, SISeg::msgnumLock, SISeg::nextThreshold, SISeg::procState, SICleanupQueue(), SInvalWriteLock, SpinLockAcquire, SpinLockRelease, and WRITE_QUANTUM.
Referenced by SendSharedInvalidMessages().
{
SISeg *segP = shmInvalBuffer;
/*
* N can be arbitrarily large. We divide the work into groups of no more
* than WRITE_QUANTUM messages, to be sure that we don't hold the lock for
* an unreasonably long time. (This is not so much because we care about
* letting in other writers, as that some just-caught-up backend might be
* trying to do SICleanupQueue to pass on its signal, and we don't want it
* to have to wait a long time.) Also, we need to consider calling
* SICleanupQueue every so often.
*/
while (n > 0)
{
int nthistime = Min(n, WRITE_QUANTUM);
int numMsgs;
int max;
int i;
n -= nthistime;
LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
/*
* If the buffer is full, we *must* acquire some space. Clean the
* queue and reset anyone who is preventing space from being freed.
* Otherwise, clean the queue only when it's exceeded the next
* fullness threshold. We have to loop and recheck the buffer state
* after any call of SICleanupQueue.
*/
for (;;)
{
numMsgs = segP->maxMsgNum - segP->minMsgNum;
if (numMsgs + nthistime > MAXNUMMESSAGES ||
numMsgs >= segP->nextThreshold)
SICleanupQueue(true, nthistime);
else
break;
}
/*
* Insert new message(s) into proper slot of circular buffer
*/
max = segP->maxMsgNum;
while (nthistime-- > 0)
{
segP->buffer[max % MAXNUMMESSAGES] = *data++;
max++;
}
/* Update current value of maxMsgNum using spinlock */
{
/* use volatile pointer to prevent code rearrangement */
volatile SISeg *vsegP = segP;
SpinLockAcquire(&vsegP->msgnumLock);
vsegP->maxMsgNum = max;
SpinLockRelease(&vsegP->msgnumLock);
}
/*
* Now that the maxMsgNum change is globally visible, we give everyone
* a swift kick to make sure they read the newly added messages.
* Releasing SInvalWriteLock will enforce a full memory barrier, so
* these (unlocked) changes will be committed to memory before we exit
* the function.
*/
for (i = 0; i < segP->lastBackend; i++)
{
ProcState *stateP = &segP->procState[i];
stateP->hasMessages = true;
}
LWLockRelease(SInvalWriteLock);
}
}
| Size SInvalShmemSize | ( | void | ) |
Definition at line 206 of file sinvaladt.c.
References add_size(), MaxBackends, mul_size(), and offsetof.
Referenced by CreateSharedMemoryAndSemaphores().
LocalTransactionId nextLocalTransactionId [static] |
Definition at line 197 of file sinvaladt.c.
Referenced by CleanupInvalidationState(), GetNextLocalTransactionId(), and SharedInvalBackendInit().
SISeg* shmInvalBuffer [static] |
Definition at line 194 of file sinvaladt.c.
1.7.1