Header And Logo

PostgreSQL
| The world's most advanced open source database.

Defines | Functions | Variables

syncrep.h File Reference

#include "access/xlogdefs.h"
#include "utils/guc.h"
Include dependency graph for syncrep.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Defines

#define SyncRepRequested()   (max_wal_senders > 0 && synchronous_commit > SYNCHRONOUS_COMMIT_LOCAL_FLUSH)
#define SYNC_REP_NO_WAIT   -1
#define SYNC_REP_WAIT_WRITE   0
#define SYNC_REP_WAIT_FLUSH   1
#define NUM_SYNC_REP_WAIT_MODE   2
#define SYNC_REP_NOT_WAITING   0
#define SYNC_REP_WAITING   1
#define SYNC_REP_WAIT_COMPLETE   2

Functions

void SyncRepWaitForLSN (XLogRecPtr XactCommitLSN)
void SyncRepCleanupAtProcExit (void)
void SyncRepInitConfig (void)
void SyncRepReleaseWaiters (void)
void SyncRepUpdateSyncStandbysDefined (void)
int SyncRepWakeQueue (bool all, int mode)
bool check_synchronous_standby_names (char **newval, void **extra, GucSource source)
void assign_synchronous_commit (int newval, void *extra)

Variables

char * SyncRepStandbyNames

Define Documentation

#define NUM_SYNC_REP_WAIT_MODE   2

Definition at line 27 of file syncrep.h.

Referenced by SyncRepQueueInsert(), and SyncRepWakeQueue().

#define SYNC_REP_NO_WAIT   -1

Definition at line 23 of file syncrep.h.

#define SYNC_REP_NOT_WAITING   0

Definition at line 30 of file syncrep.h.

Referenced by SyncRepWaitForLSN().

#define SYNC_REP_WAIT_COMPLETE   2

Definition at line 32 of file syncrep.h.

Referenced by SyncRepWaitForLSN().

#define SYNC_REP_WAIT_FLUSH   1

Definition at line 25 of file syncrep.h.

Referenced by SyncRepReleaseWaiters().

#define SYNC_REP_WAIT_WRITE   0

Definition at line 24 of file syncrep.h.

Referenced by SyncRepReleaseWaiters().

#define SYNC_REP_WAITING   1

Definition at line 31 of file syncrep.h.

Referenced by SyncRepWaitForLSN().

#define SyncRepRequested (  )     (max_wal_senders > 0 && synchronous_commit > SYNCHRONOUS_COMMIT_LOCAL_FLUSH)

Definition at line 19 of file syncrep.h.

Referenced by SyncRepWaitForLSN().


Function Documentation

void assign_synchronous_commit ( int  newval,
void *  extra 
)

Definition at line 697 of file syncrep.c.

References SYNCHRONOUS_COMMIT_REMOTE_FLUSH, SYNCHRONOUS_COMMIT_REMOTE_WRITE, and SyncRepWaitMode.

{
    switch (newval)
    {
        case SYNCHRONOUS_COMMIT_REMOTE_WRITE:
            SyncRepWaitMode = SYNC_REP_WAIT_WRITE;
            break;
        case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
            SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
            break;
        default:
            SyncRepWaitMode = SYNC_REP_NO_WAIT;
            break;
    }
}

bool check_synchronous_standby_names ( char **  newval,
void **  extra,
GucSource  source 
)

Definition at line 664 of file syncrep.c.

References GUC_check_errdetail, list_free(), pfree(), pstrdup(), and SplitIdentifierString().

{
    char       *rawstring;
    List       *elemlist;

    /* Need a modifiable copy of string */
    rawstring = pstrdup(*newval);

    /* Parse string into list of identifiers */
    if (!SplitIdentifierString(rawstring, ',', &elemlist))
    {
        /* syntax error in list */
        GUC_check_errdetail("List syntax is invalid.");
        pfree(rawstring);
        list_free(elemlist);
        return false;
    }

    /*
     * Any additional validation of standby names should go here.
     *
     * Don't attempt to set WALSender priority because this is executed by
     * postmaster at startup, not WALSender, so the application_name is not
     * yet correctly set.
     */

    pfree(rawstring);
    list_free(elemlist);

    return true;
}

void SyncRepCleanupAtProcExit ( void   ) 
void SyncRepInitConfig ( void   ) 

Definition at line 339 of file syncrep.c.

References application_name, DEBUG1, ereport, errmsg(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyWalSnd, WalSnd::sync_standby_priority, SyncRepGetStandbyPriority(), and SyncRepLock.

Referenced by StartReplication(), and WalSndLoop().

{
    int         priority;

    /*
     * Determine if we are a potential sync standby and remember the result
     * for handling replies from standby.
     */
    priority = SyncRepGetStandbyPriority();
    if (MyWalSnd->sync_standby_priority != priority)
    {
        LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
        MyWalSnd->sync_standby_priority = priority;
        LWLockRelease(SyncRepLock);
        ereport(DEBUG1,
            (errmsg("standby \"%s\" now has synchronous standby priority %u",
                    application_name, priority)));
    }
}

void SyncRepReleaseWaiters ( void   ) 

Definition at line 367 of file syncrep.c.

References announce_next_takeover, application_name, Assert, DEBUG3, elog, ereport, errmsg(), WalSnd::flush, i, LOG, WalSndCtlData::lsn, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_wal_senders, MyWalSnd, WalSnd::pid, WalSnd::state, SYNC_REP_WAIT_FLUSH, SYNC_REP_WAIT_WRITE, WalSnd::sync_standby_priority, SyncRepLock, SyncRepWakeQueue(), WalSndCtl, WalSndCtlData::walsnds, WALSNDSTATE_STREAMING, WalSnd::write, and XLogRecPtrIsInvalid.

Referenced by ProcessStandbyReplyMessage().

{
    volatile WalSndCtlData *walsndctl = WalSndCtl;
    volatile WalSnd *syncWalSnd = NULL;
    int         numwrite = 0;
    int         numflush = 0;
    int         priority = 0;
    int         i;

    /*
     * If this WALSender is serving a standby that is not on the list of
     * potential standbys then we have nothing to do. If we are still starting
     * up, still running base backup or the current flush position is still
     * invalid, then leave quickly also.
     */
    if (MyWalSnd->sync_standby_priority == 0 ||
        MyWalSnd->state < WALSNDSTATE_STREAMING ||
        XLogRecPtrIsInvalid(MyWalSnd->flush))
        return;

    /*
     * We're a potential sync standby. Release waiters if we are the highest
     * priority standby. If there are multiple standbys with same priorities
     * then we use the first mentioned standby. If you change this, also
     * change pg_stat_get_wal_senders().
     */
    LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);

    for (i = 0; i < max_wal_senders; i++)
    {
        /* use volatile pointer to prevent code rearrangement */
        volatile WalSnd *walsnd = &walsndctl->walsnds[i];

        if (walsnd->pid != 0 &&
            walsnd->state == WALSNDSTATE_STREAMING &&
            walsnd->sync_standby_priority > 0 &&
            (priority == 0 ||
             priority > walsnd->sync_standby_priority) &&
            !XLogRecPtrIsInvalid(walsnd->flush))
        {
            priority = walsnd->sync_standby_priority;
            syncWalSnd = walsnd;
        }
    }

    /*
     * We should have found ourselves at least.
     */
    Assert(syncWalSnd);

    /*
     * If we aren't managing the highest priority standby then just leave.
     */
    if (syncWalSnd != MyWalSnd)
    {
        LWLockRelease(SyncRepLock);
        announce_next_takeover = true;
        return;
    }

    /*
     * Set the lsn first so that when we wake backends they will release up to
     * this location.
     */
    if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < MyWalSnd->write)
    {
        walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write;
        numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
    }
    if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < MyWalSnd->flush)
    {
        walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush;
        numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
    }

    LWLockRelease(SyncRepLock);

    elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X",
         numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write,
         numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush);

    /*
     * If we are managing the highest priority standby, though we weren't
     * prior to this, then announce we are now the sync standby.
     */
    if (announce_next_takeover)
    {
        announce_next_takeover = false;
        ereport(LOG,
                (errmsg("standby \"%s\" is now the synchronous standby with priority %u",
                        application_name, MyWalSnd->sync_standby_priority)));
    }
}

void SyncRepUpdateSyncStandbysDefined ( void   ) 

Definition at line 588 of file syncrep.c.

References i, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), WalSndCtlData::sync_standbys_defined, SyncRepLock, SyncRepWakeQueue(), SyncStandbysDefined, and WalSndCtl.

Referenced by UpdateSharedMemoryConfig().

{
    bool        sync_standbys_defined = SyncStandbysDefined();

    if (sync_standbys_defined != WalSndCtl->sync_standbys_defined)
    {
        LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);

        /*
         * If synchronous_standby_names has been reset to empty, it's futile
         * for backends to continue to waiting.  Since the user no longer
         * wants synchronous replication, we'd better wake them up.
         */
        if (!sync_standbys_defined)
        {
            int         i;

            for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
                SyncRepWakeQueue(true, i);
        }

        /*
         * Only allow people to join the queue when there are synchronous
         * standbys defined.  Without this interlock, there's a race
         * condition: we might wake up all the current waiters; then, some
         * backend that hasn't yet reloaded its config might go to sleep on
         * the queue (and never wake up).  This prevents that.
         */
        WalSndCtl->sync_standbys_defined = sync_standbys_defined;

        LWLockRelease(SyncRepLock);
    }
}

void SyncRepWaitForLSN ( XLogRecPtr  XactCommitLSN  ) 

Definition at line 94 of file syncrep.c.

References Assert, ereport, errcode(), errdetail(), errmsg(), get_ps_display(), LW_EXCLUSIVE, LW_SHARED, LWLockAcquire(), LWLockRelease(), MyProc, NULL, palloc(), pfree(), PostmasterIsAlive(), ProcDiePending, PGPROC::procLatch, QueryCancelPending, ResetLatch(), set_ps_display(), SHMQueueIsDetached(), SYNC_REP_NOT_WAITING, SYNC_REP_WAIT_COMPLETE, SYNC_REP_WAITING, WalSndCtlData::sync_standbys_defined, SyncRepCancelWait(), PGPROC::syncRepLinks, SyncRepLock, SyncRepQueueInsert(), SyncRepRequested, PGPROC::syncRepState, SyncRepWaitMode, SyncStandbysDefined, update_process_title, WaitLatch(), PGPROC::waitLSN, WalSndCtl, WARNING, whereToSendOutput, WL_LATCH_SET, and WL_POSTMASTER_DEATH.

Referenced by EndPrepare(), RecordTransactionAbortPrepared(), RecordTransactionCommit(), and RecordTransactionCommitPrepared().

{
    char       *new_status = NULL;
    const char *old_status;
    int         mode = SyncRepWaitMode;

    /*
     * Fast exit if user has not requested sync replication, or there are no
     * sync replication standby names defined. Note that those standbys don't
     * need to be connected.
     */
    if (!SyncRepRequested() || !SyncStandbysDefined())
        return;

    Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
    Assert(WalSndCtl != NULL);

    LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
    Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);

    /*
     * We don't wait for sync rep if WalSndCtl->sync_standbys_defined is not
     * set.  See SyncRepUpdateSyncStandbysDefined.
     *
     * Also check that the standby hasn't already replied. Unlikely race
     * condition but we'll be fetching that cache line anyway so its likely to
     * be a low cost check.
     */
    if (!WalSndCtl->sync_standbys_defined ||
        XactCommitLSN <= WalSndCtl->lsn[mode])
    {
        LWLockRelease(SyncRepLock);
        return;
    }

    /*
     * Set our waitLSN so WALSender will know when to wake us, and add
     * ourselves to the queue.
     */
    MyProc->waitLSN = XactCommitLSN;
    MyProc->syncRepState = SYNC_REP_WAITING;
    SyncRepQueueInsert(mode);
    Assert(SyncRepQueueIsOrderedByLSN(mode));
    LWLockRelease(SyncRepLock);

    /* Alter ps display to show waiting for sync rep. */
    if (update_process_title)
    {
        int         len;

        old_status = get_ps_display(&len);
        new_status = (char *) palloc(len + 32 + 1);
        memcpy(new_status, old_status, len);
        sprintf(new_status + len, " waiting for %X/%X",
                (uint32) (XactCommitLSN >> 32), (uint32) XactCommitLSN);
        set_ps_display(new_status, false);
        new_status[len] = '\0'; /* truncate off " waiting ..." */
    }

    /*
     * Wait for specified LSN to be confirmed.
     *
     * Each proc has its own wait latch, so we perform a normal latch
     * check/wait loop here.
     */
    for (;;)
    {
        int         syncRepState;

        /* Must reset the latch before testing state. */
        ResetLatch(&MyProc->procLatch);

        /*
         * Try checking the state without the lock first.  There's no
         * guarantee that we'll read the most up-to-date value, so if it looks
         * like we're still waiting, recheck while holding the lock.  But if
         * it looks like we're done, we must really be done, because once
         * walsender changes the state to SYNC_REP_WAIT_COMPLETE, it will
         * never update it again, so we can't be seeing a stale value in that
         * case.
         *
         * Note: on machines with weak memory ordering, the acquisition of the
         * lock is essential to avoid race conditions: we cannot be sure the
         * sender's state update has reached main memory until we acquire the
         * lock.  We could get rid of this dance if SetLatch/ResetLatch
         * contained memory barriers.
         */
        syncRepState = MyProc->syncRepState;
        if (syncRepState == SYNC_REP_WAITING)
        {
            LWLockAcquire(SyncRepLock, LW_SHARED);
            syncRepState = MyProc->syncRepState;
            LWLockRelease(SyncRepLock);
        }
        if (syncRepState == SYNC_REP_WAIT_COMPLETE)
            break;

        /*
         * If a wait for synchronous replication is pending, we can neither
         * acknowledge the commit nor raise ERROR or FATAL.  The latter would
         * lead the client to believe that that the transaction aborted, which
         * is not true: it's already committed locally. The former is no good
         * either: the client has requested synchronous replication, and is
         * entitled to assume that an acknowledged commit is also replicated,
         * which might not be true. So in this case we issue a WARNING (which
         * some clients may be able to interpret) and shut off further output.
         * We do NOT reset ProcDiePending, so that the process will die after
         * the commit is cleaned up.
         */
        if (ProcDiePending)
        {
            ereport(WARNING,
                    (errcode(ERRCODE_ADMIN_SHUTDOWN),
                     errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
                     errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
            whereToSendOutput = DestNone;
            SyncRepCancelWait();
            break;
        }

        /*
         * It's unclear what to do if a query cancel interrupt arrives.  We
         * can't actually abort at this point, but ignoring the interrupt
         * altogether is not helpful, so we just terminate the wait with a
         * suitable warning.
         */
        if (QueryCancelPending)
        {
            QueryCancelPending = false;
            ereport(WARNING,
                    (errmsg("canceling wait for synchronous replication due to user request"),
                     errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
            SyncRepCancelWait();
            break;
        }

        /*
         * If the postmaster dies, we'll probably never get an
         * acknowledgement, because all the wal sender processes will exit. So
         * just bail out.
         */
        if (!PostmasterIsAlive())
        {
            ProcDiePending = true;
            whereToSendOutput = DestNone;
            SyncRepCancelWait();
            break;
        }

        /*
         * Wait on latch.  Any condition that should wake us up will set the
         * latch, so no need for timeout.
         */
        WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1);
    }

    /*
     * WalSender has checked our LSN and has removed us from queue. Clean up
     * state and leave.  It's OK to reset these shared memory fields without
     * holding SyncRepLock, because any walsenders will ignore us anyway when
     * we're not on the queue.
     */
    Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
    MyProc->syncRepState = SYNC_REP_NOT_WAITING;
    MyProc->waitLSN = 0;

    if (new_status)
    {
        /* Reset ps display */
        set_ps_display(new_status, false);
        pfree(new_status);
    }
}

int SyncRepWakeQueue ( bool  all,
int  mode 
)

Definition at line 527 of file syncrep.c.

References Assert, WalSndCtlData::lsn, NUM_SYNC_REP_WAIT_MODE, offsetof, PGPROC::procLatch, SetLatch(), SHMQueueDelete(), SHMQueueNext(), PGPROC::syncRepLinks, WalSndCtlData::SyncRepQueue, PGPROC::syncRepState, PGPROC::waitLSN, and WalSndCtl.

Referenced by SyncRepReleaseWaiters(), and SyncRepUpdateSyncStandbysDefined().

{
    volatile WalSndCtlData *walsndctl = WalSndCtl;
    PGPROC     *proc = NULL;
    PGPROC     *thisproc = NULL;
    int         numprocs = 0;

    Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
    Assert(SyncRepQueueIsOrderedByLSN(mode));

    proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
                                   &(WalSndCtl->SyncRepQueue[mode]),
                                   offsetof(PGPROC, syncRepLinks));

    while (proc)
    {
        /*
         * Assume the queue is ordered by LSN
         */
        if (!all && walsndctl->lsn[mode] < proc->waitLSN)
            return numprocs;

        /*
         * Move to next proc, so we can delete thisproc from the queue.
         * thisproc is valid, proc may be NULL after this.
         */
        thisproc = proc;
        proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
                                       &(proc->syncRepLinks),
                                       offsetof(PGPROC, syncRepLinks));

        /*
         * Set state to complete; see SyncRepWaitForLSN() for discussion of
         * the various states.
         */
        thisproc->syncRepState = SYNC_REP_WAIT_COMPLETE;

        /*
         * Remove thisproc from queue.
         */
        SHMQueueDelete(&(thisproc->syncRepLinks));

        /*
         * Wake only when we have set state and removed from queue.
         */
        SetLatch(&(thisproc->procLatch));

        numprocs++;
    }

    return numprocs;
}


Variable Documentation

Definition at line 60 of file syncrep.c.

Referenced by SyncRepGetStandbyPriority().