#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "miscadmin.h"
#include "storage/backendid.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/procsignal.h"
#include "storage/shmem.h"
#include "storage/sinvaladt.h"
#include "storage/spin.h"
Go to the source code of this file.
Data Structures | |
struct | ProcState |
struct | SISeg |
Defines | |
#define | MAXNUMMESSAGES 4096 |
#define | MSGNUMWRAPAROUND (MAXNUMMESSAGES * 262144) |
#define | CLEANUP_MIN (MAXNUMMESSAGES / 2) |
#define | CLEANUP_QUANTUM (MAXNUMMESSAGES / 16) |
#define | SIG_THRESHOLD (MAXNUMMESSAGES / 2) |
#define | WRITE_QUANTUM 64 |
Typedefs | |
typedef struct ProcState | ProcState |
typedef struct SISeg | SISeg |
Functions | |
static void | CleanupInvalidationState (int status, Datum arg) |
Size | SInvalShmemSize (void) |
void | CreateSharedInvalidationState (void) |
void | SharedInvalBackendInit (bool sendOnly) |
PGPROC * | BackendIdGetProc (int backendID) |
void | SIInsertDataEntries (const SharedInvalidationMessage *data, int n) |
int | SIGetDataEntries (SharedInvalidationMessage *data, int datasize) |
void | SICleanupQueue (bool callerHasWriteLock, int minFree) |
LocalTransactionId | GetNextLocalTransactionId (void) |
Variables | |
static SISeg * | shmInvalBuffer |
static LocalTransactionId | nextLocalTransactionId |
#define CLEANUP_MIN (MAXNUMMESSAGES / 2) |
Definition at line 132 of file sinvaladt.c.
Referenced by SICleanupQueue().
#define CLEANUP_QUANTUM (MAXNUMMESSAGES / 16) |
Definition at line 133 of file sinvaladt.c.
Referenced by SICleanupQueue().
#define MAXNUMMESSAGES 4096 |
Definition at line 130 of file sinvaladt.c.
Referenced by SICleanupQueue(), and SIInsertDataEntries().
#define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 262144) |
Definition at line 131 of file sinvaladt.c.
Referenced by SICleanupQueue().
#define SIG_THRESHOLD (MAXNUMMESSAGES / 2) |
Definition at line 134 of file sinvaladt.c.
#define WRITE_QUANTUM 64 |
Definition at line 135 of file sinvaladt.c.
Referenced by SIInsertDataEntries().
PGPROC* BackendIdGetProc | ( | int | backendID | ) |
Definition at line 383 of file sinvaladt.c.
References LW_SHARED, LWLockAcquire(), LWLockRelease(), ProcState::proc, SISeg::procState, and SInvalWriteLock.
Referenced by do_autovacuum(), and VirtualXactLock().
{ PGPROC *result = NULL; SISeg *segP = shmInvalBuffer; /* Need to lock out additions/removals of backends */ LWLockAcquire(SInvalWriteLock, LW_SHARED); if (backendID > 0 && backendID <= segP->lastBackend) { ProcState *stateP = &segP->procState[backendID - 1]; result = stateP->proc; } LWLockRelease(SInvalWriteLock); return result; }
static void CleanupInvalidationState | ( | int | status, | |
Datum | arg | |||
) | [static] |
Definition at line 342 of file sinvaladt.c.
References Assert, DatumGetPointer, i, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyBackendId, nextLocalTransactionId, ProcState::nextLXID, ProcState::nextMsgNum, PointerIsValid, ProcState::proc, ProcState::procPid, SISeg::procState, ProcState::resetState, ProcState::signaled, and SInvalWriteLock.
Referenced by SharedInvalBackendInit().
{ SISeg *segP = (SISeg *) DatumGetPointer(arg); ProcState *stateP; int i; Assert(PointerIsValid(segP)); LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); stateP = &segP->procState[MyBackendId - 1]; /* Update next local transaction ID for next holder of this backendID */ stateP->nextLXID = nextLocalTransactionId; /* Mark myself inactive */ stateP->procPid = 0; stateP->proc = NULL; stateP->nextMsgNum = 0; stateP->resetState = false; stateP->signaled = false; /* Recompute index of last active backend */ for (i = segP->lastBackend; i > 0; i--) { if (segP->procState[i - 1].procPid != 0) break; } segP->lastBackend = i; LWLockRelease(SInvalWriteLock); }
void CreateSharedInvalidationState | ( | void | ) |
Definition at line 221 of file sinvaladt.c.
References add_size(), ProcState::hasMessages, i, SISeg::lastBackend, SISeg::maxBackends, MaxBackends, SISeg::maxMsgNum, SISeg::minMsgNum, SISeg::msgnumLock, mul_size(), ProcState::nextLXID, ProcState::nextMsgNum, SISeg::nextThreshold, offsetof, ProcState::proc, ProcState::procPid, SISeg::procState, ProcState::resetState, ShmemInitStruct(), ProcState::signaled, and SpinLockInit.
Referenced by CreateSharedMemoryAndSemaphores().
{ Size size; int i; bool found; /* Allocate space in shared memory */ size = offsetof(SISeg, procState); size = add_size(size, mul_size(sizeof(ProcState), MaxBackends)); shmInvalBuffer = (SISeg *) ShmemInitStruct("shmInvalBuffer", size, &found); if (found) return; /* Clear message counters, save size of procState array, init spinlock */ shmInvalBuffer->minMsgNum = 0; shmInvalBuffer->maxMsgNum = 0; shmInvalBuffer->nextThreshold = CLEANUP_MIN; shmInvalBuffer->lastBackend = 0; shmInvalBuffer->maxBackends = MaxBackends; SpinLockInit(&shmInvalBuffer->msgnumLock); /* The buffer[] array is initially all unused, so we need not fill it */ /* Mark all backends inactive, and initialize nextLXID */ for (i = 0; i < shmInvalBuffer->maxBackends; i++) { shmInvalBuffer->procState[i].procPid = 0; /* inactive */ shmInvalBuffer->procState[i].proc = NULL; shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */ shmInvalBuffer->procState[i].resetState = false; shmInvalBuffer->procState[i].signaled = false; shmInvalBuffer->procState[i].hasMessages = false; shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId; } }
LocalTransactionId GetNextLocalTransactionId | ( | void | ) |
Definition at line 751 of file sinvaladt.c.
References LocalTransactionIdIsValid, and nextLocalTransactionId.
Referenced by InitRecoveryTransactionEnvironment(), and StartTransaction().
{ LocalTransactionId result; /* loop to avoid returning InvalidLocalTransactionId at wraparound */ do { result = nextLocalTransactionId++; } while (!LocalTransactionIdIsValid(result)); return result; }
void SharedInvalBackendInit | ( | bool | sendOnly | ) |
Definition at line 264 of file sinvaladt.c.
References Assert, PGPROC::backendId, CleanupInvalidationState(), DEBUG4, elog, ereport, errcode(), errmsg(), FATAL, ProcState::hasMessages, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SISeg::maxBackends, SISeg::maxMsgNum, MyBackendId, MyProc, MyProcPid, nextLocalTransactionId, ProcState::nextLXID, ProcState::nextMsgNum, NULL, on_shmem_exit(), PointerGetDatum, ProcState::proc, ProcState::procPid, SISeg::procState, ProcState::resetState, ProcState::sendOnly, ProcState::signaled, and SInvalWriteLock.
Referenced by InitPostgres(), and InitRecoveryTransactionEnvironment().
{ int index; ProcState *stateP = NULL; SISeg *segP = shmInvalBuffer; /* * This can run in parallel with read operations, but not with write * operations, since SIInsertDataEntries relies on lastBackend to set * hasMessages appropriately. */ LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); /* Look for a free entry in the procState array */ for (index = 0; index < segP->lastBackend; index++) { if (segP->procState[index].procPid == 0) /* inactive slot? */ { stateP = &segP->procState[index]; break; } } if (stateP == NULL) { if (segP->lastBackend < segP->maxBackends) { stateP = &segP->procState[segP->lastBackend]; Assert(stateP->procPid == 0); segP->lastBackend++; } else { /* * out of procState slots: MaxBackends exceeded -- report normally */ MyBackendId = InvalidBackendId; LWLockRelease(SInvalWriteLock); ereport(FATAL, (errcode(ERRCODE_TOO_MANY_CONNECTIONS), errmsg("sorry, too many clients already"))); } } MyBackendId = (stateP - &segP->procState[0]) + 1; /* Advertise assigned backend ID in MyProc */ MyProc->backendId = MyBackendId; /* Fetch next local transaction ID into local memory */ nextLocalTransactionId = stateP->nextLXID; /* mark myself active, with all extant messages already read */ stateP->procPid = MyProcPid; stateP->proc = MyProc; stateP->nextMsgNum = segP->maxMsgNum; stateP->resetState = false; stateP->signaled = false; stateP->hasMessages = false; stateP->sendOnly = sendOnly; LWLockRelease(SInvalWriteLock); /* register exit routine to mark my entry inactive at exit */ on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP)); elog(DEBUG4, "my backend ID is %d", MyBackendId); }
void SICleanupQueue | ( | bool | callerHasWriteLock, | |
int | minFree | |||
) |
Definition at line 625 of file sinvaladt.c.
References CLEANUP_MIN, CLEANUP_QUANTUM, DEBUG4, elog, i, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, MAXNUMMESSAGES, SISeg::minMsgNum, MSGNUMWRAPAROUND, ProcState::nextMsgNum, SISeg::nextThreshold, ProcState::procPid, PROCSIG_CATCHUP_INTERRUPT, SISeg::procState, ProcState::resetState, ProcState::sendOnly, SendProcSignal(), ProcState::signaled, signaled, SInvalReadLock, and SInvalWriteLock.
Referenced by ReceiveSharedInvalidMessages(), and SIInsertDataEntries().
{ SISeg *segP = shmInvalBuffer; int min, minsig, lowbound, numMsgs, i; ProcState *needSig = NULL; /* Lock out all writers and readers */ if (!callerHasWriteLock) LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE); /* * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the * furthest-back backend that needs signaling (if any), and reset any * backends that are too far back. Note that because we ignore sendOnly * backends here it is possible for them to keep sending messages without * a problem even when they are the only active backend. */ min = segP->maxMsgNum; minsig = min - SIG_THRESHOLD; lowbound = min - MAXNUMMESSAGES + minFree; for (i = 0; i < segP->lastBackend; i++) { ProcState *stateP = &segP->procState[i]; int n = stateP->nextMsgNum; /* Ignore if inactive or already in reset state */ if (stateP->procPid == 0 || stateP->resetState || stateP->sendOnly) continue; /* * If we must free some space and this backend is preventing it, force * him into reset state and then ignore until he catches up. */ if (n < lowbound) { stateP->resetState = true; /* no point in signaling him ... */ continue; } /* Track the global minimum nextMsgNum */ if (n < min) min = n; /* Also see who's furthest back of the unsignaled backends */ if (n < minsig && !stateP->signaled) { minsig = n; needSig = stateP; } } segP->minMsgNum = min; /* * When minMsgNum gets really large, decrement all message counters so as * to forestall overflow of the counters. This happens seldom enough that * folding it into the previous loop would be a loser. */ if (min >= MSGNUMWRAPAROUND) { segP->minMsgNum -= MSGNUMWRAPAROUND; segP->maxMsgNum -= MSGNUMWRAPAROUND; for (i = 0; i < segP->lastBackend; i++) { /* we don't bother skipping inactive entries here */ segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND; } } /* * Determine how many messages are still in the queue, and set the * threshold at which we should repeat SICleanupQueue(). */ numMsgs = segP->maxMsgNum - segP->minMsgNum; if (numMsgs < CLEANUP_MIN) segP->nextThreshold = CLEANUP_MIN; else segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM; /* * Lastly, signal anyone who needs a catchup interrupt. Since * SendProcSignal() might not be fast, we don't want to hold locks while * executing it. */ if (needSig) { pid_t his_pid = needSig->procPid; BackendId his_backendId = (needSig - &segP->procState[0]) + 1; needSig->signaled = true; LWLockRelease(SInvalReadLock); LWLockRelease(SInvalWriteLock); elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid); SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_backendId); if (callerHasWriteLock) LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); } else { LWLockRelease(SInvalReadLock); if (!callerHasWriteLock) LWLockRelease(SInvalWriteLock); } }
int SIGetDataEntries | ( | SharedInvalidationMessage * | data, | |
int | datasize | |||
) |
Definition at line 516 of file sinvaladt.c.
References SISeg::buffer, ProcState::hasMessages, LW_SHARED, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, SISeg::msgnumLock, MyBackendId, ProcState::nextMsgNum, SISeg::procState, ProcState::resetState, ProcState::signaled, SInvalReadLock, SpinLockAcquire, and SpinLockRelease.
Referenced by ReceiveSharedInvalidMessages().
{ SISeg *segP; ProcState *stateP; int max; int n; segP = shmInvalBuffer; stateP = &segP->procState[MyBackendId - 1]; /* * Before starting to take locks, do a quick, unlocked test to see whether * there can possibly be anything to read. On a multiprocessor system, * it's possible that this load could migrate backwards and occur before * we actually enter this function, so we might miss a sinval message that * was just added by some other processor. But they can't migrate * backwards over a preceding lock acquisition, so it should be OK. If we * haven't acquired a lock preventing against further relevant * invalidations, any such occurrence is not much different than if the * invalidation had arrived slightly later in the first place. */ if (!stateP->hasMessages) return 0; LWLockAcquire(SInvalReadLock, LW_SHARED); /* * We must reset hasMessages before determining how many messages we're * going to read. That way, if new messages arrive after we have * determined how many we're reading, the flag will get reset and we'll * notice those messages part-way through. * * Note that, if we don't end up reading all of the messages, we had * better be certain to reset this flag before exiting! */ stateP->hasMessages = false; /* Fetch current value of maxMsgNum using spinlock */ { /* use volatile pointer to prevent code rearrangement */ volatile SISeg *vsegP = segP; SpinLockAcquire(&vsegP->msgnumLock); max = vsegP->maxMsgNum; SpinLockRelease(&vsegP->msgnumLock); } if (stateP->resetState) { /* * Force reset. We can say we have dealt with any messages added * since the reset, as well; and that means we should clear the * signaled flag, too. */ stateP->nextMsgNum = max; stateP->resetState = false; stateP->signaled = false; LWLockRelease(SInvalReadLock); return -1; } /* * Retrieve messages and advance backend's counter, until data array is * full or there are no more messages. * * There may be other backends that haven't read the message(s), so we * cannot delete them here. SICleanupQueue() will eventually remove them * from the queue. */ n = 0; while (n < datasize && stateP->nextMsgNum < max) { data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES]; stateP->nextMsgNum++; } /* * If we have caught up completely, reset our "signaled" flag so that * we'll get another signal if we fall behind again. * * If we haven't caught up completely, reset the hasMessages flag so that * we see the remaining messages next time. */ if (stateP->nextMsgNum >= max) stateP->signaled = false; else stateP->hasMessages = true; LWLockRelease(SInvalReadLock); return n; }
void SIInsertDataEntries | ( | const SharedInvalidationMessage * | data, | |
int | n | |||
) |
Definition at line 408 of file sinvaladt.c.
References SISeg::buffer, ProcState::hasMessages, i, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, MAXNUMMESSAGES, Min, SISeg::minMsgNum, SISeg::msgnumLock, SISeg::nextThreshold, SISeg::procState, SICleanupQueue(), SInvalWriteLock, SpinLockAcquire, SpinLockRelease, and WRITE_QUANTUM.
Referenced by SendSharedInvalidMessages().
{ SISeg *segP = shmInvalBuffer; /* * N can be arbitrarily large. We divide the work into groups of no more * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for * an unreasonably long time. (This is not so much because we care about * letting in other writers, as that some just-caught-up backend might be * trying to do SICleanupQueue to pass on its signal, and we don't want it * to have to wait a long time.) Also, we need to consider calling * SICleanupQueue every so often. */ while (n > 0) { int nthistime = Min(n, WRITE_QUANTUM); int numMsgs; int max; int i; n -= nthistime; LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); /* * If the buffer is full, we *must* acquire some space. Clean the * queue and reset anyone who is preventing space from being freed. * Otherwise, clean the queue only when it's exceeded the next * fullness threshold. We have to loop and recheck the buffer state * after any call of SICleanupQueue. */ for (;;) { numMsgs = segP->maxMsgNum - segP->minMsgNum; if (numMsgs + nthistime > MAXNUMMESSAGES || numMsgs >= segP->nextThreshold) SICleanupQueue(true, nthistime); else break; } /* * Insert new message(s) into proper slot of circular buffer */ max = segP->maxMsgNum; while (nthistime-- > 0) { segP->buffer[max % MAXNUMMESSAGES] = *data++; max++; } /* Update current value of maxMsgNum using spinlock */ { /* use volatile pointer to prevent code rearrangement */ volatile SISeg *vsegP = segP; SpinLockAcquire(&vsegP->msgnumLock); vsegP->maxMsgNum = max; SpinLockRelease(&vsegP->msgnumLock); } /* * Now that the maxMsgNum change is globally visible, we give everyone * a swift kick to make sure they read the newly added messages. * Releasing SInvalWriteLock will enforce a full memory barrier, so * these (unlocked) changes will be committed to memory before we exit * the function. */ for (i = 0; i < segP->lastBackend; i++) { ProcState *stateP = &segP->procState[i]; stateP->hasMessages = true; } LWLockRelease(SInvalWriteLock); } }
Size SInvalShmemSize | ( | void | ) |
Definition at line 206 of file sinvaladt.c.
References add_size(), MaxBackends, mul_size(), and offsetof.
Referenced by CreateSharedMemoryAndSemaphores().
LocalTransactionId nextLocalTransactionId [static] |
Definition at line 197 of file sinvaladt.c.
Referenced by CleanupInvalidationState(), GetNextLocalTransactionId(), and SharedInvalBackendInit().
SISeg* shmInvalBuffer [static] |
Definition at line 194 of file sinvaladt.c.