Header And Logo

PostgreSQL
| The world's most advanced open source database.

Data Structures | Defines | Typedefs | Functions | Variables

sinvaladt.c File Reference

#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "miscadmin.h"
#include "storage/backendid.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/procsignal.h"
#include "storage/shmem.h"
#include "storage/sinvaladt.h"
#include "storage/spin.h"
Include dependency graph for sinvaladt.c:

Go to the source code of this file.

Data Structures

struct  ProcState
struct  SISeg

Defines

#define MAXNUMMESSAGES   4096
#define MSGNUMWRAPAROUND   (MAXNUMMESSAGES * 262144)
#define CLEANUP_MIN   (MAXNUMMESSAGES / 2)
#define CLEANUP_QUANTUM   (MAXNUMMESSAGES / 16)
#define SIG_THRESHOLD   (MAXNUMMESSAGES / 2)
#define WRITE_QUANTUM   64

Typedefs

typedef struct ProcState ProcState
typedef struct SISeg SISeg

Functions

static void CleanupInvalidationState (int status, Datum arg)
Size SInvalShmemSize (void)
void CreateSharedInvalidationState (void)
void SharedInvalBackendInit (bool sendOnly)
PGPROCBackendIdGetProc (int backendID)
void SIInsertDataEntries (const SharedInvalidationMessage *data, int n)
int SIGetDataEntries (SharedInvalidationMessage *data, int datasize)
void SICleanupQueue (bool callerHasWriteLock, int minFree)
LocalTransactionId GetNextLocalTransactionId (void)

Variables

static SISegshmInvalBuffer
static LocalTransactionId nextLocalTransactionId

Define Documentation

#define CLEANUP_MIN   (MAXNUMMESSAGES / 2)

Definition at line 132 of file sinvaladt.c.

Referenced by SICleanupQueue().

#define CLEANUP_QUANTUM   (MAXNUMMESSAGES / 16)

Definition at line 133 of file sinvaladt.c.

Referenced by SICleanupQueue().

#define MAXNUMMESSAGES   4096

Definition at line 130 of file sinvaladt.c.

Referenced by SICleanupQueue(), and SIInsertDataEntries().

#define MSGNUMWRAPAROUND   (MAXNUMMESSAGES * 262144)

Definition at line 131 of file sinvaladt.c.

Referenced by SICleanupQueue().

#define SIG_THRESHOLD   (MAXNUMMESSAGES / 2)

Definition at line 134 of file sinvaladt.c.

#define WRITE_QUANTUM   64

Definition at line 135 of file sinvaladt.c.

Referenced by SIInsertDataEntries().


Typedef Documentation

typedef struct ProcState ProcState
typedef struct SISeg SISeg

Function Documentation

PGPROC* BackendIdGetProc ( int  backendID  ) 

Definition at line 383 of file sinvaladt.c.

References LW_SHARED, LWLockAcquire(), LWLockRelease(), ProcState::proc, SISeg::procState, and SInvalWriteLock.

Referenced by do_autovacuum(), and VirtualXactLock().

{
    PGPROC     *result = NULL;
    SISeg      *segP = shmInvalBuffer;

    /* Need to lock out additions/removals of backends */
    LWLockAcquire(SInvalWriteLock, LW_SHARED);

    if (backendID > 0 && backendID <= segP->lastBackend)
    {
        ProcState  *stateP = &segP->procState[backendID - 1];

        result = stateP->proc;
    }

    LWLockRelease(SInvalWriteLock);

    return result;
}

static void CleanupInvalidationState ( int  status,
Datum  arg 
) [static]

Definition at line 342 of file sinvaladt.c.

References Assert, DatumGetPointer, i, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyBackendId, nextLocalTransactionId, ProcState::nextLXID, ProcState::nextMsgNum, PointerIsValid, ProcState::proc, ProcState::procPid, SISeg::procState, ProcState::resetState, ProcState::signaled, and SInvalWriteLock.

Referenced by SharedInvalBackendInit().

{
    SISeg      *segP = (SISeg *) DatumGetPointer(arg);
    ProcState  *stateP;
    int         i;

    Assert(PointerIsValid(segP));

    LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);

    stateP = &segP->procState[MyBackendId - 1];

    /* Update next local transaction ID for next holder of this backendID */
    stateP->nextLXID = nextLocalTransactionId;

    /* Mark myself inactive */
    stateP->procPid = 0;
    stateP->proc = NULL;
    stateP->nextMsgNum = 0;
    stateP->resetState = false;
    stateP->signaled = false;

    /* Recompute index of last active backend */
    for (i = segP->lastBackend; i > 0; i--)
    {
        if (segP->procState[i - 1].procPid != 0)
            break;
    }
    segP->lastBackend = i;

    LWLockRelease(SInvalWriteLock);
}

void CreateSharedInvalidationState ( void   ) 

Definition at line 221 of file sinvaladt.c.

References add_size(), ProcState::hasMessages, i, SISeg::lastBackend, SISeg::maxBackends, MaxBackends, SISeg::maxMsgNum, SISeg::minMsgNum, SISeg::msgnumLock, mul_size(), ProcState::nextLXID, ProcState::nextMsgNum, SISeg::nextThreshold, offsetof, ProcState::proc, ProcState::procPid, SISeg::procState, ProcState::resetState, ShmemInitStruct(), ProcState::signaled, and SpinLockInit.

Referenced by CreateSharedMemoryAndSemaphores().

{
    Size        size;
    int         i;
    bool        found;

    /* Allocate space in shared memory */
    size = offsetof(SISeg, procState);
    size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));

    shmInvalBuffer = (SISeg *)
        ShmemInitStruct("shmInvalBuffer", size, &found);
    if (found)
        return;

    /* Clear message counters, save size of procState array, init spinlock */
    shmInvalBuffer->minMsgNum = 0;
    shmInvalBuffer->maxMsgNum = 0;
    shmInvalBuffer->nextThreshold = CLEANUP_MIN;
    shmInvalBuffer->lastBackend = 0;
    shmInvalBuffer->maxBackends = MaxBackends;
    SpinLockInit(&shmInvalBuffer->msgnumLock);

    /* The buffer[] array is initially all unused, so we need not fill it */

    /* Mark all backends inactive, and initialize nextLXID */
    for (i = 0; i < shmInvalBuffer->maxBackends; i++)
    {
        shmInvalBuffer->procState[i].procPid = 0;       /* inactive */
        shmInvalBuffer->procState[i].proc = NULL;
        shmInvalBuffer->procState[i].nextMsgNum = 0;    /* meaningless */
        shmInvalBuffer->procState[i].resetState = false;
        shmInvalBuffer->procState[i].signaled = false;
        shmInvalBuffer->procState[i].hasMessages = false;
        shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId;
    }
}

LocalTransactionId GetNextLocalTransactionId ( void   ) 

Definition at line 751 of file sinvaladt.c.

References LocalTransactionIdIsValid, and nextLocalTransactionId.

Referenced by InitRecoveryTransactionEnvironment(), and StartTransaction().

{
    LocalTransactionId result;

    /* loop to avoid returning InvalidLocalTransactionId at wraparound */
    do
    {
        result = nextLocalTransactionId++;
    } while (!LocalTransactionIdIsValid(result));

    return result;
}

void SharedInvalBackendInit ( bool  sendOnly  ) 

Definition at line 264 of file sinvaladt.c.

References Assert, PGPROC::backendId, CleanupInvalidationState(), DEBUG4, elog, ereport, errcode(), errmsg(), FATAL, ProcState::hasMessages, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SISeg::maxBackends, SISeg::maxMsgNum, MyBackendId, MyProc, MyProcPid, nextLocalTransactionId, ProcState::nextLXID, ProcState::nextMsgNum, NULL, on_shmem_exit(), PointerGetDatum, ProcState::proc, ProcState::procPid, SISeg::procState, ProcState::resetState, ProcState::sendOnly, ProcState::signaled, and SInvalWriteLock.

Referenced by InitPostgres(), and InitRecoveryTransactionEnvironment().

{
    int         index;
    ProcState  *stateP = NULL;
    SISeg      *segP = shmInvalBuffer;

    /*
     * This can run in parallel with read operations, but not with write
     * operations, since SIInsertDataEntries relies on lastBackend to set
     * hasMessages appropriately.
     */
    LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);

    /* Look for a free entry in the procState array */
    for (index = 0; index < segP->lastBackend; index++)
    {
        if (segP->procState[index].procPid == 0)        /* inactive slot? */
        {
            stateP = &segP->procState[index];
            break;
        }
    }

    if (stateP == NULL)
    {
        if (segP->lastBackend < segP->maxBackends)
        {
            stateP = &segP->procState[segP->lastBackend];
            Assert(stateP->procPid == 0);
            segP->lastBackend++;
        }
        else
        {
            /*
             * out of procState slots: MaxBackends exceeded -- report normally
             */
            MyBackendId = InvalidBackendId;
            LWLockRelease(SInvalWriteLock);
            ereport(FATAL,
                    (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
                     errmsg("sorry, too many clients already")));
        }
    }

    MyBackendId = (stateP - &segP->procState[0]) + 1;

    /* Advertise assigned backend ID in MyProc */
    MyProc->backendId = MyBackendId;

    /* Fetch next local transaction ID into local memory */
    nextLocalTransactionId = stateP->nextLXID;

    /* mark myself active, with all extant messages already read */
    stateP->procPid = MyProcPid;
    stateP->proc = MyProc;
    stateP->nextMsgNum = segP->maxMsgNum;
    stateP->resetState = false;
    stateP->signaled = false;
    stateP->hasMessages = false;
    stateP->sendOnly = sendOnly;

    LWLockRelease(SInvalWriteLock);

    /* register exit routine to mark my entry inactive at exit */
    on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));

    elog(DEBUG4, "my backend ID is %d", MyBackendId);
}

void SICleanupQueue ( bool  callerHasWriteLock,
int  minFree 
)

Definition at line 625 of file sinvaladt.c.

References CLEANUP_MIN, CLEANUP_QUANTUM, DEBUG4, elog, i, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, MAXNUMMESSAGES, SISeg::minMsgNum, MSGNUMWRAPAROUND, ProcState::nextMsgNum, SISeg::nextThreshold, ProcState::procPid, PROCSIG_CATCHUP_INTERRUPT, SISeg::procState, ProcState::resetState, ProcState::sendOnly, SendProcSignal(), ProcState::signaled, signaled, SInvalReadLock, and SInvalWriteLock.

Referenced by ReceiveSharedInvalidMessages(), and SIInsertDataEntries().

{
    SISeg      *segP = shmInvalBuffer;
    int         min,
                minsig,
                lowbound,
                numMsgs,
                i;
    ProcState  *needSig = NULL;

    /* Lock out all writers and readers */
    if (!callerHasWriteLock)
        LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
    LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);

    /*
     * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
     * furthest-back backend that needs signaling (if any), and reset any
     * backends that are too far back.  Note that because we ignore sendOnly
     * backends here it is possible for them to keep sending messages without
     * a problem even when they are the only active backend.
     */
    min = segP->maxMsgNum;
    minsig = min - SIG_THRESHOLD;
    lowbound = min - MAXNUMMESSAGES + minFree;

    for (i = 0; i < segP->lastBackend; i++)
    {
        ProcState  *stateP = &segP->procState[i];
        int         n = stateP->nextMsgNum;

        /* Ignore if inactive or already in reset state */
        if (stateP->procPid == 0 || stateP->resetState || stateP->sendOnly)
            continue;

        /*
         * If we must free some space and this backend is preventing it, force
         * him into reset state and then ignore until he catches up.
         */
        if (n < lowbound)
        {
            stateP->resetState = true;
            /* no point in signaling him ... */
            continue;
        }

        /* Track the global minimum nextMsgNum */
        if (n < min)
            min = n;

        /* Also see who's furthest back of the unsignaled backends */
        if (n < minsig && !stateP->signaled)
        {
            minsig = n;
            needSig = stateP;
        }
    }
    segP->minMsgNum = min;

    /*
     * When minMsgNum gets really large, decrement all message counters so as
     * to forestall overflow of the counters.  This happens seldom enough that
     * folding it into the previous loop would be a loser.
     */
    if (min >= MSGNUMWRAPAROUND)
    {
        segP->minMsgNum -= MSGNUMWRAPAROUND;
        segP->maxMsgNum -= MSGNUMWRAPAROUND;
        for (i = 0; i < segP->lastBackend; i++)
        {
            /* we don't bother skipping inactive entries here */
            segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;
        }
    }

    /*
     * Determine how many messages are still in the queue, and set the
     * threshold at which we should repeat SICleanupQueue().
     */
    numMsgs = segP->maxMsgNum - segP->minMsgNum;
    if (numMsgs < CLEANUP_MIN)
        segP->nextThreshold = CLEANUP_MIN;
    else
        segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;

    /*
     * Lastly, signal anyone who needs a catchup interrupt.  Since
     * SendProcSignal() might not be fast, we don't want to hold locks while
     * executing it.
     */
    if (needSig)
    {
        pid_t       his_pid = needSig->procPid;
        BackendId   his_backendId = (needSig - &segP->procState[0]) + 1;

        needSig->signaled = true;
        LWLockRelease(SInvalReadLock);
        LWLockRelease(SInvalWriteLock);
        elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
        SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_backendId);
        if (callerHasWriteLock)
            LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
    }
    else
    {
        LWLockRelease(SInvalReadLock);
        if (!callerHasWriteLock)
            LWLockRelease(SInvalWriteLock);
    }
}

int SIGetDataEntries ( SharedInvalidationMessage data,
int  datasize 
)

Definition at line 516 of file sinvaladt.c.

References SISeg::buffer, ProcState::hasMessages, LW_SHARED, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, SISeg::msgnumLock, MyBackendId, ProcState::nextMsgNum, SISeg::procState, ProcState::resetState, ProcState::signaled, SInvalReadLock, SpinLockAcquire, and SpinLockRelease.

Referenced by ReceiveSharedInvalidMessages().

{
    SISeg      *segP;
    ProcState  *stateP;
    int         max;
    int         n;

    segP = shmInvalBuffer;
    stateP = &segP->procState[MyBackendId - 1];

    /*
     * Before starting to take locks, do a quick, unlocked test to see whether
     * there can possibly be anything to read.  On a multiprocessor system,
     * it's possible that this load could migrate backwards and occur before
     * we actually enter this function, so we might miss a sinval message that
     * was just added by some other processor.  But they can't migrate
     * backwards over a preceding lock acquisition, so it should be OK.  If we
     * haven't acquired a lock preventing against further relevant
     * invalidations, any such occurrence is not much different than if the
     * invalidation had arrived slightly later in the first place.
     */
    if (!stateP->hasMessages)
        return 0;

    LWLockAcquire(SInvalReadLock, LW_SHARED);

    /*
     * We must reset hasMessages before determining how many messages we're
     * going to read.  That way, if new messages arrive after we have
     * determined how many we're reading, the flag will get reset and we'll
     * notice those messages part-way through.
     *
     * Note that, if we don't end up reading all of the messages, we had
     * better be certain to reset this flag before exiting!
     */
    stateP->hasMessages = false;

    /* Fetch current value of maxMsgNum using spinlock */
    {
        /* use volatile pointer to prevent code rearrangement */
        volatile SISeg *vsegP = segP;

        SpinLockAcquire(&vsegP->msgnumLock);
        max = vsegP->maxMsgNum;
        SpinLockRelease(&vsegP->msgnumLock);
    }

    if (stateP->resetState)
    {
        /*
         * Force reset.  We can say we have dealt with any messages added
         * since the reset, as well; and that means we should clear the
         * signaled flag, too.
         */
        stateP->nextMsgNum = max;
        stateP->resetState = false;
        stateP->signaled = false;
        LWLockRelease(SInvalReadLock);
        return -1;
    }

    /*
     * Retrieve messages and advance backend's counter, until data array is
     * full or there are no more messages.
     *
     * There may be other backends that haven't read the message(s), so we
     * cannot delete them here.  SICleanupQueue() will eventually remove them
     * from the queue.
     */
    n = 0;
    while (n < datasize && stateP->nextMsgNum < max)
    {
        data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
        stateP->nextMsgNum++;
    }

    /*
     * If we have caught up completely, reset our "signaled" flag so that
     * we'll get another signal if we fall behind again.
     *
     * If we haven't caught up completely, reset the hasMessages flag so that
     * we see the remaining messages next time.
     */
    if (stateP->nextMsgNum >= max)
        stateP->signaled = false;
    else
        stateP->hasMessages = true;

    LWLockRelease(SInvalReadLock);
    return n;
}

void SIInsertDataEntries ( const SharedInvalidationMessage data,
int  n 
)

Definition at line 408 of file sinvaladt.c.

References SISeg::buffer, ProcState::hasMessages, i, SISeg::lastBackend, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), SISeg::maxMsgNum, MAXNUMMESSAGES, Min, SISeg::minMsgNum, SISeg::msgnumLock, SISeg::nextThreshold, SISeg::procState, SICleanupQueue(), SInvalWriteLock, SpinLockAcquire, SpinLockRelease, and WRITE_QUANTUM.

Referenced by SendSharedInvalidMessages().

{
    SISeg      *segP = shmInvalBuffer;

    /*
     * N can be arbitrarily large.  We divide the work into groups of no more
     * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for
     * an unreasonably long time.  (This is not so much because we care about
     * letting in other writers, as that some just-caught-up backend might be
     * trying to do SICleanupQueue to pass on its signal, and we don't want it
     * to have to wait a long time.)  Also, we need to consider calling
     * SICleanupQueue every so often.
     */
    while (n > 0)
    {
        int         nthistime = Min(n, WRITE_QUANTUM);
        int         numMsgs;
        int         max;
        int         i;

        n -= nthistime;

        LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);

        /*
         * If the buffer is full, we *must* acquire some space.  Clean the
         * queue and reset anyone who is preventing space from being freed.
         * Otherwise, clean the queue only when it's exceeded the next
         * fullness threshold.  We have to loop and recheck the buffer state
         * after any call of SICleanupQueue.
         */
        for (;;)
        {
            numMsgs = segP->maxMsgNum - segP->minMsgNum;
            if (numMsgs + nthistime > MAXNUMMESSAGES ||
                numMsgs >= segP->nextThreshold)
                SICleanupQueue(true, nthistime);
            else
                break;
        }

        /*
         * Insert new message(s) into proper slot of circular buffer
         */
        max = segP->maxMsgNum;
        while (nthistime-- > 0)
        {
            segP->buffer[max % MAXNUMMESSAGES] = *data++;
            max++;
        }

        /* Update current value of maxMsgNum using spinlock */
        {
            /* use volatile pointer to prevent code rearrangement */
            volatile SISeg *vsegP = segP;

            SpinLockAcquire(&vsegP->msgnumLock);
            vsegP->maxMsgNum = max;
            SpinLockRelease(&vsegP->msgnumLock);
        }

        /*
         * Now that the maxMsgNum change is globally visible, we give everyone
         * a swift kick to make sure they read the newly added messages.
         * Releasing SInvalWriteLock will enforce a full memory barrier, so
         * these (unlocked) changes will be committed to memory before we exit
         * the function.
         */
        for (i = 0; i < segP->lastBackend; i++)
        {
            ProcState  *stateP = &segP->procState[i];

            stateP->hasMessages = true;
        }

        LWLockRelease(SInvalWriteLock);
    }
}

Size SInvalShmemSize ( void   ) 

Definition at line 206 of file sinvaladt.c.

References add_size(), MaxBackends, mul_size(), and offsetof.

Referenced by CreateSharedMemoryAndSemaphores().

{
    Size        size;

    size = offsetof(SISeg, procState);
    size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));

    return size;
}


Variable Documentation

SISeg* shmInvalBuffer [static]

Definition at line 194 of file sinvaladt.c.