Header And Logo

PostgreSQL
| The world's most advanced open source database.

Functions

sinvaladt.h File Reference

#include "storage/lock.h"
#include "storage/sinval.h"
Include dependency graph for sinvaladt.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

Size SInvalShmemSize (void)
void CreateSharedInvalidationState (void)
void SharedInvalBackendInit (bool sendOnly)
PGPROCBackendIdGetProc (int backendID)
void SIInsertDataEntries (const SharedInvalidationMessage *data, int n)
int SIGetDataEntries (SharedInvalidationMessage *data, int datasize)
void SICleanupQueue (bool callerHasWriteLock, int minFree)
LocalTransactionId GetNextLocalTransactionId (void)

Function Documentation

PGPROC* BackendIdGetProc ( int  backendID  ) 

Definition at line 383 of file sinvaladt.c.

References LW_SHARED, LWLockAcquire(), LWLockRelease(), ProcState::proc, SISeg::procState, and SInvalWriteLock.

Referenced by do_autovacuum(), and VirtualXactLock().

{
    PGPROC     *result = NULL;
    SISeg      *segP = shmInvalBuffer;

    /* Need to lock out additions/removals of backends */
    LWLockAcquire(SInvalWriteLock, LW_SHARED);

    if (backendID > 0 && backendID <= segP->lastBackend)
    {
        ProcState  *stateP = &segP->procState[backendID - 1];

        result = stateP->proc;
    }

    LWLockRelease(SInvalWriteLock);

    return result;
}

void CreateSharedInvalidationState ( void   ) 

Definition at line 221 of file sinvaladt.c.

References add_size(), ProcState::hasMessages, i, SISeg::lastBackend, SISeg::maxBackends, MaxBackends, SISeg::maxMsgNum, SISeg::minMsgNum, SISeg::msgnumLock, mul_size(), ProcState::nextLXID, ProcState::nextMsgNum, SISeg::nextThreshold, offsetof, ProcState::proc, ProcState::procPid, SISeg::procState, ProcState::resetState, ShmemInitStruct(), ProcState::signaled, and SpinLockInit.

Referenced by CreateSharedMemoryAndSemaphores().

{
    Size        size;
    int         i;
    bool        found;

    /* Allocate space in shared memory */
    size = offsetof(SISeg, procState);
    size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));

    shmInvalBuffer = (SISeg *)
        ShmemInitStruct("shmInvalBuffer", size, &found);
    if (found)
        return;

    /* Clear message counters, save size of procState array, init spinlock */
    shmInvalBuffer->minMsgNum = 0;
    shmInvalBuffer->maxMsgNum = 0;
    shmInvalBuffer->nextThreshold = CLEANUP_MIN;
    shmInvalBuffer->lastBackend = 0;
    shmInvalBuffer->maxBackends = MaxBackends;
    SpinLockInit(&shmInvalBuffer->msgnumLock);

    /* The buffer[] array is initially all unused, so we need not fill it */

    /* Mark all backends inactive, and initialize nextLXID */
    for (i = 0; i < shmInvalBuffer->maxBackends; i++)
    {
        shmInvalBuffer->procState[i].procPid = 0;       /* inactive */
        shmInvalBuffer->procState[i].proc = NULL;
        shmInvalBuffer->procState[i].nextMsgNum = 0;    /* meaningless */
        shmInvalBuffer->procState[i].resetState = false;
        shmInvalBuffer->procState[i].signaled = false;
        shmInvalBuffer->procState[i].hasMessages = false;
        shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId;
    }
}

LocalTransactionId GetNextLocalTransactionId ( void   ) 

Definition at line 751 of file sinvaladt.c.

References LocalTransactionIdIsValid, and nextLocalTransactionId.

Referenced by InitRecoveryTransactionEnvironment(), and StartTransaction().

{
    LocalTransactionId result;

    /* loop to avoid returning InvalidLocalTransactionId at wraparound */
    do
    {
        result = nextLocalTransactionId++;
    } while (!LocalTransactionIdIsValid(result));

    return result;
}

void SharedInvalBackendInit ( bool  sendOnly  ) 

Definition at line 264 of file sinvaladt.c.

References Assert, PGPROC::backendId, CleanupInvalidationState(), DEBUG4, elog, ereport, errcode(), errmsg(), FATAL, ProcState::hasMessages, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SISeg::maxBackends, SISeg::maxMsgNum, MyBackendId, MyProc, MyProcPid, nextLocalTransactionId, ProcState::nextLXID, ProcState::nextMsgNum, NULL, on_shmem_exit(), PointerGetDatum, ProcState::proc, ProcState::procPid, SISeg::procState, ProcState::resetState, ProcState::sendOnly, ProcState::signaled, and SInvalWriteLock.

Referenced by InitPostgres(), and InitRecoveryTransactionEnvironment().

{
    int         index;
    ProcState  *stateP = NULL;
    SISeg      *segP = shmInvalBuffer;

    /*
     * This can run in parallel with read operations, but not with write
     * operations, since SIInsertDataEntries relies on lastBackend to set
     * hasMessages appropriately.
     */
    LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);

    /* Look for a free entry in the procState array */
    for (index = 0; index < segP->lastBackend; index++)
    {
        if (segP->procState[index].procPid == 0)        /* inactive slot? */
        {
            stateP = &segP->procState[index];
            break;
        }
    }

    if (stateP == NULL)
    {
        if (segP->lastBackend < segP->maxBackends)
        {
            stateP = &segP->procState[segP->lastBackend];
            Assert(stateP->procPid == 0);
            segP->lastBackend++;
        }
        else
        {
            /*
             * out of procState slots: MaxBackends exceeded -- report normally
             */
            MyBackendId = InvalidBackendId;
            LWLockRelease(SInvalWriteLock);
            ereport(FATAL,
                    (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
                     errmsg("sorry, too many clients already")));
        }
    }

    MyBackendId = (stateP - &segP->procState[0]) + 1;

    /* Advertise assigned backend ID in MyProc */
    MyProc->backendId = MyBackendId;

    /* Fetch next local transaction ID into local memory */
    nextLocalTransactionId = stateP->nextLXID;

    /* mark myself active, with all extant messages already read */
    stateP->procPid = MyProcPid;
    stateP->proc = MyProc;
    stateP->nextMsgNum = segP->maxMsgNum;
    stateP->resetState = false;
    stateP->signaled = false;
    stateP->hasMessages = false;
    stateP->sendOnly = sendOnly;

    LWLockRelease(SInvalWriteLock);

    /* register exit routine to mark my entry inactive at exit */
    on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));

    elog(DEBUG4, "my backend ID is %d", MyBackendId);
}

void SICleanupQueue ( bool  callerHasWriteLock,
int  minFree 
)

Definition at line 625 of file sinvaladt.c.

References CLEANUP_MIN, CLEANUP_QUANTUM, DEBUG4, elog, i, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, MAXNUMMESSAGES, SISeg::minMsgNum, MSGNUMWRAPAROUND, ProcState::nextMsgNum, SISeg::nextThreshold, ProcState::procPid, PROCSIG_CATCHUP_INTERRUPT, SISeg::procState, ProcState::resetState, ProcState::sendOnly, SendProcSignal(), ProcState::signaled, signaled, SInvalReadLock, and SInvalWriteLock.

Referenced by ReceiveSharedInvalidMessages(), and SIInsertDataEntries().

{
    SISeg      *segP = shmInvalBuffer;
    int         min,
                minsig,
                lowbound,
                numMsgs,
                i;
    ProcState  *needSig = NULL;

    /* Lock out all writers and readers */
    if (!callerHasWriteLock)
        LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
    LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);

    /*
     * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
     * furthest-back backend that needs signaling (if any), and reset any
     * backends that are too far back.  Note that because we ignore sendOnly
     * backends here it is possible for them to keep sending messages without
     * a problem even when they are the only active backend.
     */
    min = segP->maxMsgNum;
    minsig = min - SIG_THRESHOLD;
    lowbound = min - MAXNUMMESSAGES + minFree;

    for (i = 0; i < segP->lastBackend; i++)
    {
        ProcState  *stateP = &segP->procState[i];
        int         n = stateP->nextMsgNum;

        /* Ignore if inactive or already in reset state */
        if (stateP->procPid == 0 || stateP->resetState || stateP->sendOnly)
            continue;

        /*
         * If we must free some space and this backend is preventing it, force
         * him into reset state and then ignore until he catches up.
         */
        if (n < lowbound)
        {
            stateP->resetState = true;
            /* no point in signaling him ... */
            continue;
        }

        /* Track the global minimum nextMsgNum */
        if (n < min)
            min = n;

        /* Also see who's furthest back of the unsignaled backends */
        if (n < minsig && !stateP->signaled)
        {
            minsig = n;
            needSig = stateP;
        }
    }
    segP->minMsgNum = min;

    /*
     * When minMsgNum gets really large, decrement all message counters so as
     * to forestall overflow of the counters.  This happens seldom enough that
     * folding it into the previous loop would be a loser.
     */
    if (min >= MSGNUMWRAPAROUND)
    {
        segP->minMsgNum -= MSGNUMWRAPAROUND;
        segP->maxMsgNum -= MSGNUMWRAPAROUND;
        for (i = 0; i < segP->lastBackend; i++)
        {
            /* we don't bother skipping inactive entries here */
            segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;
        }
    }

    /*
     * Determine how many messages are still in the queue, and set the
     * threshold at which we should repeat SICleanupQueue().
     */
    numMsgs = segP->maxMsgNum - segP->minMsgNum;
    if (numMsgs < CLEANUP_MIN)
        segP->nextThreshold = CLEANUP_MIN;
    else
        segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;

    /*
     * Lastly, signal anyone who needs a catchup interrupt.  Since
     * SendProcSignal() might not be fast, we don't want to hold locks while
     * executing it.
     */
    if (needSig)
    {
        pid_t       his_pid = needSig->procPid;
        BackendId   his_backendId = (needSig - &segP->procState[0]) + 1;

        needSig->signaled = true;
        LWLockRelease(SInvalReadLock);
        LWLockRelease(SInvalWriteLock);
        elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
        SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_backendId);
        if (callerHasWriteLock)
            LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
    }
    else
    {
        LWLockRelease(SInvalReadLock);
        if (!callerHasWriteLock)
            LWLockRelease(SInvalWriteLock);
    }
}

int SIGetDataEntries ( SharedInvalidationMessage data,
int  datasize 
)

Definition at line 516 of file sinvaladt.c.

References SISeg::buffer, ProcState::hasMessages, LW_SHARED, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, SISeg::msgnumLock, MyBackendId, ProcState::nextMsgNum, SISeg::procState, ProcState::resetState, ProcState::signaled, SInvalReadLock, SpinLockAcquire, and SpinLockRelease.

Referenced by ReceiveSharedInvalidMessages().

{
    SISeg      *segP;
    ProcState  *stateP;
    int         max;
    int         n;

    segP = shmInvalBuffer;
    stateP = &segP->procState[MyBackendId - 1];

    /*
     * Before starting to take locks, do a quick, unlocked test to see whether
     * there can possibly be anything to read.  On a multiprocessor system,
     * it's possible that this load could migrate backwards and occur before
     * we actually enter this function, so we might miss a sinval message that
     * was just added by some other processor.  But they can't migrate
     * backwards over a preceding lock acquisition, so it should be OK.  If we
     * haven't acquired a lock preventing against further relevant
     * invalidations, any such occurrence is not much different than if the
     * invalidation had arrived slightly later in the first place.
     */
    if (!stateP->hasMessages)
        return 0;

    LWLockAcquire(SInvalReadLock, LW_SHARED);

    /*
     * We must reset hasMessages before determining how many messages we're
     * going to read.  That way, if new messages arrive after we have
     * determined how many we're reading, the flag will get reset and we'll
     * notice those messages part-way through.
     *
     * Note that, if we don't end up reading all of the messages, we had
     * better be certain to reset this flag before exiting!
     */
    stateP->hasMessages = false;

    /* Fetch current value of maxMsgNum using spinlock */
    {
        /* use volatile pointer to prevent code rearrangement */
        volatile SISeg *vsegP = segP;

        SpinLockAcquire(&vsegP->msgnumLock);
        max = vsegP->maxMsgNum;
        SpinLockRelease(&vsegP->msgnumLock);
    }

    if (stateP->resetState)
    {
        /*
         * Force reset.  We can say we have dealt with any messages added
         * since the reset, as well; and that means we should clear the
         * signaled flag, too.
         */
        stateP->nextMsgNum = max;
        stateP->resetState = false;
        stateP->signaled = false;
        LWLockRelease(SInvalReadLock);
        return -1;
    }

    /*
     * Retrieve messages and advance backend's counter, until data array is
     * full or there are no more messages.
     *
     * There may be other backends that haven't read the message(s), so we
     * cannot delete them here.  SICleanupQueue() will eventually remove them
     * from the queue.
     */
    n = 0;
    while (n < datasize && stateP->nextMsgNum < max)
    {
        data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
        stateP->nextMsgNum++;
    }

    /*
     * If we have caught up completely, reset our "signaled" flag so that
     * we'll get another signal if we fall behind again.
     *
     * If we haven't caught up completely, reset the hasMessages flag so that
     * we see the remaining messages next time.
     */
    if (stateP->nextMsgNum >= max)
        stateP->signaled = false;
    else
        stateP->hasMessages = true;

    LWLockRelease(SInvalReadLock);
    return n;
}

void SIInsertDataEntries ( const SharedInvalidationMessage data,
int  n 
)

Definition at line 408 of file sinvaladt.c.

References SISeg::buffer, ProcState::hasMessages, i, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, MAXNUMMESSAGES, Min, SISeg::minMsgNum, SISeg::msgnumLock, SISeg::nextThreshold, SISeg::procState, SICleanupQueue(), SInvalWriteLock, SpinLockAcquire, SpinLockRelease, and WRITE_QUANTUM.

Referenced by SendSharedInvalidMessages().

{
    SISeg      *segP = shmInvalBuffer;

    /*
     * N can be arbitrarily large.  We divide the work into groups of no more
     * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for
     * an unreasonably long time.  (This is not so much because we care about
     * letting in other writers, as that some just-caught-up backend might be
     * trying to do SICleanupQueue to pass on its signal, and we don't want it
     * to have to wait a long time.)  Also, we need to consider calling
     * SICleanupQueue every so often.
     */
    while (n > 0)
    {
        int         nthistime = Min(n, WRITE_QUANTUM);
        int         numMsgs;
        int         max;
        int         i;

        n -= nthistime;

        LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);

        /*
         * If the buffer is full, we *must* acquire some space.  Clean the
         * queue and reset anyone who is preventing space from being freed.
         * Otherwise, clean the queue only when it's exceeded the next
         * fullness threshold.  We have to loop and recheck the buffer state
         * after any call of SICleanupQueue.
         */
        for (;;)
        {
            numMsgs = segP->maxMsgNum - segP->minMsgNum;
            if (numMsgs + nthistime > MAXNUMMESSAGES ||
                numMsgs >= segP->nextThreshold)
                SICleanupQueue(true, nthistime);
            else
                break;
        }

        /*
         * Insert new message(s) into proper slot of circular buffer
         */
        max = segP->maxMsgNum;
        while (nthistime-- > 0)
        {
            segP->buffer[max % MAXNUMMESSAGES] = *data++;
            max++;
        }

        /* Update current value of maxMsgNum using spinlock */
        {
            /* use volatile pointer to prevent code rearrangement */
            volatile SISeg *vsegP = segP;

            SpinLockAcquire(&vsegP->msgnumLock);
            vsegP->maxMsgNum = max;
            SpinLockRelease(&vsegP->msgnumLock);
        }

        /*
         * Now that the maxMsgNum change is globally visible, we give everyone
         * a swift kick to make sure they read the newly added messages.
         * Releasing SInvalWriteLock will enforce a full memory barrier, so
         * these (unlocked) changes will be committed to memory before we exit
         * the function.
         */
        for (i = 0; i < segP->lastBackend; i++)
        {
            ProcState  *stateP = &segP->procState[i];

            stateP->hasMessages = true;
        }

        LWLockRelease(SInvalWriteLock);
    }
}

Size SInvalShmemSize ( void   ) 

Definition at line 206 of file sinvaladt.c.

References add_size(), MaxBackends, mul_size(), and offsetof.

Referenced by CreateSharedMemoryAndSemaphores().

{
    Size        size;

    size = offsetof(SISeg, procState);
    size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));

    return size;
}