#include "storage/lock.h"
#include "storage/sinval.h"
Go to the source code of this file.
Functions | |
Size | SInvalShmemSize (void) |
void | CreateSharedInvalidationState (void) |
void | SharedInvalBackendInit (bool sendOnly) |
PGPROC * | BackendIdGetProc (int backendID) |
void | SIInsertDataEntries (const SharedInvalidationMessage *data, int n) |
int | SIGetDataEntries (SharedInvalidationMessage *data, int datasize) |
void | SICleanupQueue (bool callerHasWriteLock, int minFree) |
LocalTransactionId | GetNextLocalTransactionId (void) |
PGPROC* BackendIdGetProc | ( | int | backendID | ) |
Definition at line 383 of file sinvaladt.c.
References LW_SHARED, LWLockAcquire(), LWLockRelease(), ProcState::proc, SISeg::procState, and SInvalWriteLock.
Referenced by do_autovacuum(), and VirtualXactLock().
{ PGPROC *result = NULL; SISeg *segP = shmInvalBuffer; /* Need to lock out additions/removals of backends */ LWLockAcquire(SInvalWriteLock, LW_SHARED); if (backendID > 0 && backendID <= segP->lastBackend) { ProcState *stateP = &segP->procState[backendID - 1]; result = stateP->proc; } LWLockRelease(SInvalWriteLock); return result; }
void CreateSharedInvalidationState | ( | void | ) |
Definition at line 221 of file sinvaladt.c.
References add_size(), ProcState::hasMessages, i, SISeg::lastBackend, SISeg::maxBackends, MaxBackends, SISeg::maxMsgNum, SISeg::minMsgNum, SISeg::msgnumLock, mul_size(), ProcState::nextLXID, ProcState::nextMsgNum, SISeg::nextThreshold, offsetof, ProcState::proc, ProcState::procPid, SISeg::procState, ProcState::resetState, ShmemInitStruct(), ProcState::signaled, and SpinLockInit.
Referenced by CreateSharedMemoryAndSemaphores().
{ Size size; int i; bool found; /* Allocate space in shared memory */ size = offsetof(SISeg, procState); size = add_size(size, mul_size(sizeof(ProcState), MaxBackends)); shmInvalBuffer = (SISeg *) ShmemInitStruct("shmInvalBuffer", size, &found); if (found) return; /* Clear message counters, save size of procState array, init spinlock */ shmInvalBuffer->minMsgNum = 0; shmInvalBuffer->maxMsgNum = 0; shmInvalBuffer->nextThreshold = CLEANUP_MIN; shmInvalBuffer->lastBackend = 0; shmInvalBuffer->maxBackends = MaxBackends; SpinLockInit(&shmInvalBuffer->msgnumLock); /* The buffer[] array is initially all unused, so we need not fill it */ /* Mark all backends inactive, and initialize nextLXID */ for (i = 0; i < shmInvalBuffer->maxBackends; i++) { shmInvalBuffer->procState[i].procPid = 0; /* inactive */ shmInvalBuffer->procState[i].proc = NULL; shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */ shmInvalBuffer->procState[i].resetState = false; shmInvalBuffer->procState[i].signaled = false; shmInvalBuffer->procState[i].hasMessages = false; shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId; } }
LocalTransactionId GetNextLocalTransactionId | ( | void | ) |
Definition at line 751 of file sinvaladt.c.
References LocalTransactionIdIsValid, and nextLocalTransactionId.
Referenced by InitRecoveryTransactionEnvironment(), and StartTransaction().
{ LocalTransactionId result; /* loop to avoid returning InvalidLocalTransactionId at wraparound */ do { result = nextLocalTransactionId++; } while (!LocalTransactionIdIsValid(result)); return result; }
void SharedInvalBackendInit | ( | bool | sendOnly | ) |
Definition at line 264 of file sinvaladt.c.
References Assert, PGPROC::backendId, CleanupInvalidationState(), DEBUG4, elog, ereport, errcode(), errmsg(), FATAL, ProcState::hasMessages, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SISeg::maxBackends, SISeg::maxMsgNum, MyBackendId, MyProc, MyProcPid, nextLocalTransactionId, ProcState::nextLXID, ProcState::nextMsgNum, NULL, on_shmem_exit(), PointerGetDatum, ProcState::proc, ProcState::procPid, SISeg::procState, ProcState::resetState, ProcState::sendOnly, ProcState::signaled, and SInvalWriteLock.
Referenced by InitPostgres(), and InitRecoveryTransactionEnvironment().
{ int index; ProcState *stateP = NULL; SISeg *segP = shmInvalBuffer; /* * This can run in parallel with read operations, but not with write * operations, since SIInsertDataEntries relies on lastBackend to set * hasMessages appropriately. */ LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); /* Look for a free entry in the procState array */ for (index = 0; index < segP->lastBackend; index++) { if (segP->procState[index].procPid == 0) /* inactive slot? */ { stateP = &segP->procState[index]; break; } } if (stateP == NULL) { if (segP->lastBackend < segP->maxBackends) { stateP = &segP->procState[segP->lastBackend]; Assert(stateP->procPid == 0); segP->lastBackend++; } else { /* * out of procState slots: MaxBackends exceeded -- report normally */ MyBackendId = InvalidBackendId; LWLockRelease(SInvalWriteLock); ereport(FATAL, (errcode(ERRCODE_TOO_MANY_CONNECTIONS), errmsg("sorry, too many clients already"))); } } MyBackendId = (stateP - &segP->procState[0]) + 1; /* Advertise assigned backend ID in MyProc */ MyProc->backendId = MyBackendId; /* Fetch next local transaction ID into local memory */ nextLocalTransactionId = stateP->nextLXID; /* mark myself active, with all extant messages already read */ stateP->procPid = MyProcPid; stateP->proc = MyProc; stateP->nextMsgNum = segP->maxMsgNum; stateP->resetState = false; stateP->signaled = false; stateP->hasMessages = false; stateP->sendOnly = sendOnly; LWLockRelease(SInvalWriteLock); /* register exit routine to mark my entry inactive at exit */ on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP)); elog(DEBUG4, "my backend ID is %d", MyBackendId); }
void SICleanupQueue | ( | bool | callerHasWriteLock, | |
int | minFree | |||
) |
Definition at line 625 of file sinvaladt.c.
References CLEANUP_MIN, CLEANUP_QUANTUM, DEBUG4, elog, i, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, MAXNUMMESSAGES, SISeg::minMsgNum, MSGNUMWRAPAROUND, ProcState::nextMsgNum, SISeg::nextThreshold, ProcState::procPid, PROCSIG_CATCHUP_INTERRUPT, SISeg::procState, ProcState::resetState, ProcState::sendOnly, SendProcSignal(), ProcState::signaled, signaled, SInvalReadLock, and SInvalWriteLock.
Referenced by ReceiveSharedInvalidMessages(), and SIInsertDataEntries().
{ SISeg *segP = shmInvalBuffer; int min, minsig, lowbound, numMsgs, i; ProcState *needSig = NULL; /* Lock out all writers and readers */ if (!callerHasWriteLock) LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE); /* * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the * furthest-back backend that needs signaling (if any), and reset any * backends that are too far back. Note that because we ignore sendOnly * backends here it is possible for them to keep sending messages without * a problem even when they are the only active backend. */ min = segP->maxMsgNum; minsig = min - SIG_THRESHOLD; lowbound = min - MAXNUMMESSAGES + minFree; for (i = 0; i < segP->lastBackend; i++) { ProcState *stateP = &segP->procState[i]; int n = stateP->nextMsgNum; /* Ignore if inactive or already in reset state */ if (stateP->procPid == 0 || stateP->resetState || stateP->sendOnly) continue; /* * If we must free some space and this backend is preventing it, force * him into reset state and then ignore until he catches up. */ if (n < lowbound) { stateP->resetState = true; /* no point in signaling him ... */ continue; } /* Track the global minimum nextMsgNum */ if (n < min) min = n; /* Also see who's furthest back of the unsignaled backends */ if (n < minsig && !stateP->signaled) { minsig = n; needSig = stateP; } } segP->minMsgNum = min; /* * When minMsgNum gets really large, decrement all message counters so as * to forestall overflow of the counters. This happens seldom enough that * folding it into the previous loop would be a loser. */ if (min >= MSGNUMWRAPAROUND) { segP->minMsgNum -= MSGNUMWRAPAROUND; segP->maxMsgNum -= MSGNUMWRAPAROUND; for (i = 0; i < segP->lastBackend; i++) { /* we don't bother skipping inactive entries here */ segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND; } } /* * Determine how many messages are still in the queue, and set the * threshold at which we should repeat SICleanupQueue(). */ numMsgs = segP->maxMsgNum - segP->minMsgNum; if (numMsgs < CLEANUP_MIN) segP->nextThreshold = CLEANUP_MIN; else segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM; /* * Lastly, signal anyone who needs a catchup interrupt. Since * SendProcSignal() might not be fast, we don't want to hold locks while * executing it. */ if (needSig) { pid_t his_pid = needSig->procPid; BackendId his_backendId = (needSig - &segP->procState[0]) + 1; needSig->signaled = true; LWLockRelease(SInvalReadLock); LWLockRelease(SInvalWriteLock); elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid); SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_backendId); if (callerHasWriteLock) LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); } else { LWLockRelease(SInvalReadLock); if (!callerHasWriteLock) LWLockRelease(SInvalWriteLock); } }
int SIGetDataEntries | ( | SharedInvalidationMessage * | data, | |
int | datasize | |||
) |
Definition at line 516 of file sinvaladt.c.
References SISeg::buffer, ProcState::hasMessages, LW_SHARED, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, SISeg::msgnumLock, MyBackendId, ProcState::nextMsgNum, SISeg::procState, ProcState::resetState, ProcState::signaled, SInvalReadLock, SpinLockAcquire, and SpinLockRelease.
Referenced by ReceiveSharedInvalidMessages().
{ SISeg *segP; ProcState *stateP; int max; int n; segP = shmInvalBuffer; stateP = &segP->procState[MyBackendId - 1]; /* * Before starting to take locks, do a quick, unlocked test to see whether * there can possibly be anything to read. On a multiprocessor system, * it's possible that this load could migrate backwards and occur before * we actually enter this function, so we might miss a sinval message that * was just added by some other processor. But they can't migrate * backwards over a preceding lock acquisition, so it should be OK. If we * haven't acquired a lock preventing against further relevant * invalidations, any such occurrence is not much different than if the * invalidation had arrived slightly later in the first place. */ if (!stateP->hasMessages) return 0; LWLockAcquire(SInvalReadLock, LW_SHARED); /* * We must reset hasMessages before determining how many messages we're * going to read. That way, if new messages arrive after we have * determined how many we're reading, the flag will get reset and we'll * notice those messages part-way through. * * Note that, if we don't end up reading all of the messages, we had * better be certain to reset this flag before exiting! */ stateP->hasMessages = false; /* Fetch current value of maxMsgNum using spinlock */ { /* use volatile pointer to prevent code rearrangement */ volatile SISeg *vsegP = segP; SpinLockAcquire(&vsegP->msgnumLock); max = vsegP->maxMsgNum; SpinLockRelease(&vsegP->msgnumLock); } if (stateP->resetState) { /* * Force reset. We can say we have dealt with any messages added * since the reset, as well; and that means we should clear the * signaled flag, too. */ stateP->nextMsgNum = max; stateP->resetState = false; stateP->signaled = false; LWLockRelease(SInvalReadLock); return -1; } /* * Retrieve messages and advance backend's counter, until data array is * full or there are no more messages. * * There may be other backends that haven't read the message(s), so we * cannot delete them here. SICleanupQueue() will eventually remove them * from the queue. */ n = 0; while (n < datasize && stateP->nextMsgNum < max) { data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES]; stateP->nextMsgNum++; } /* * If we have caught up completely, reset our "signaled" flag so that * we'll get another signal if we fall behind again. * * If we haven't caught up completely, reset the hasMessages flag so that * we see the remaining messages next time. */ if (stateP->nextMsgNum >= max) stateP->signaled = false; else stateP->hasMessages = true; LWLockRelease(SInvalReadLock); return n; }
void SIInsertDataEntries | ( | const SharedInvalidationMessage * | data, | |
int | n | |||
) |
Definition at line 408 of file sinvaladt.c.
References SISeg::buffer, ProcState::hasMessages, i, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, MAXNUMMESSAGES, Min, SISeg::minMsgNum, SISeg::msgnumLock, SISeg::nextThreshold, SISeg::procState, SICleanupQueue(), SInvalWriteLock, SpinLockAcquire, SpinLockRelease, and WRITE_QUANTUM.
Referenced by SendSharedInvalidMessages().
{ SISeg *segP = shmInvalBuffer; /* * N can be arbitrarily large. We divide the work into groups of no more * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for * an unreasonably long time. (This is not so much because we care about * letting in other writers, as that some just-caught-up backend might be * trying to do SICleanupQueue to pass on its signal, and we don't want it * to have to wait a long time.) Also, we need to consider calling * SICleanupQueue every so often. */ while (n > 0) { int nthistime = Min(n, WRITE_QUANTUM); int numMsgs; int max; int i; n -= nthistime; LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); /* * If the buffer is full, we *must* acquire some space. Clean the * queue and reset anyone who is preventing space from being freed. * Otherwise, clean the queue only when it's exceeded the next * fullness threshold. We have to loop and recheck the buffer state * after any call of SICleanupQueue. */ for (;;) { numMsgs = segP->maxMsgNum - segP->minMsgNum; if (numMsgs + nthistime > MAXNUMMESSAGES || numMsgs >= segP->nextThreshold) SICleanupQueue(true, nthistime); else break; } /* * Insert new message(s) into proper slot of circular buffer */ max = segP->maxMsgNum; while (nthistime-- > 0) { segP->buffer[max % MAXNUMMESSAGES] = *data++; max++; } /* Update current value of maxMsgNum using spinlock */ { /* use volatile pointer to prevent code rearrangement */ volatile SISeg *vsegP = segP; SpinLockAcquire(&vsegP->msgnumLock); vsegP->maxMsgNum = max; SpinLockRelease(&vsegP->msgnumLock); } /* * Now that the maxMsgNum change is globally visible, we give everyone * a swift kick to make sure they read the newly added messages. * Releasing SInvalWriteLock will enforce a full memory barrier, so * these (unlocked) changes will be committed to memory before we exit * the function. */ for (i = 0; i < segP->lastBackend; i++) { ProcState *stateP = &segP->procState[i]; stateP->hasMessages = true; } LWLockRelease(SInvalWriteLock); } }
Size SInvalShmemSize | ( | void | ) |
Definition at line 206 of file sinvaladt.c.
References add_size(), MaxBackends, mul_size(), and offsetof.
Referenced by CreateSharedMemoryAndSemaphores().