Header And Logo

PostgreSQL
| The world's most advanced open source database.

Defines | Functions | Variables

syncrep.c File Reference

#include "postgres.h"
#include <unistd.h>
#include "access/xact.h"
#include "miscadmin.h"
#include "replication/syncrep.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/ps_status.h"
Include dependency graph for syncrep.c:

Go to the source code of this file.

Defines

#define SyncStandbysDefined()   (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')

Functions

static void SyncRepQueueInsert (int mode)
static void SyncRepCancelWait (void)
static int SyncRepGetStandbyPriority (void)
void SyncRepWaitForLSN (XLogRecPtr XactCommitLSN)
void SyncRepCleanupAtProcExit (void)
void SyncRepInitConfig (void)
void SyncRepReleaseWaiters (void)
int SyncRepWakeQueue (bool all, int mode)
void SyncRepUpdateSyncStandbysDefined (void)
bool check_synchronous_standby_names (char **newval, void **extra, GucSource source)
void assign_synchronous_commit (int newval, void *extra)

Variables

char * SyncRepStandbyNames
static bool announce_next_takeover = true
static int SyncRepWaitMode = SYNC_REP_NO_WAIT

Define Documentation

#define SyncStandbysDefined (  )     (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')

Definition at line 62 of file syncrep.c.

Referenced by SyncRepUpdateSyncStandbysDefined(), and SyncRepWaitForLSN().


Function Documentation

void assign_synchronous_commit ( int  newval,
void *  extra 
)

Definition at line 697 of file syncrep.c.

References SYNCHRONOUS_COMMIT_REMOTE_FLUSH, SYNCHRONOUS_COMMIT_REMOTE_WRITE, and SyncRepWaitMode.

{
    switch (newval)
    {
        case SYNCHRONOUS_COMMIT_REMOTE_WRITE:
            SyncRepWaitMode = SYNC_REP_WAIT_WRITE;
            break;
        case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
            SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
            break;
        default:
            SyncRepWaitMode = SYNC_REP_NO_WAIT;
            break;
    }
}

bool check_synchronous_standby_names ( char **  newval,
void **  extra,
GucSource  source 
)

Definition at line 664 of file syncrep.c.

References GUC_check_errdetail, list_free(), pfree(), pstrdup(), and SplitIdentifierString().

{
    char       *rawstring;
    List       *elemlist;

    /* Need a modifiable copy of string */
    rawstring = pstrdup(*newval);

    /* Parse string into list of identifiers */
    if (!SplitIdentifierString(rawstring, ',', &elemlist))
    {
        /* syntax error in list */
        GUC_check_errdetail("List syntax is invalid.");
        pfree(rawstring);
        list_free(elemlist);
        return false;
    }

    /*
     * Any additional validation of standby names should go here.
     *
     * Don't attempt to set WALSender priority because this is executed by
     * postmaster at startup, not WALSender, so the application_name is not
     * yet correctly set.
     */

    pfree(rawstring);
    list_free(elemlist);

    return true;
}

static void SyncRepCancelWait ( void   )  [static]
void SyncRepCleanupAtProcExit ( void   ) 
static int SyncRepGetStandbyPriority ( void   )  [static]

Definition at line 470 of file syncrep.c.

References am_cascading_walsender, application_name, lfirst, list_free(), pfree(), pg_strcasecmp(), pstrdup(), SplitIdentifierString(), and SyncRepStandbyNames.

Referenced by SyncRepInitConfig().

{
    char       *rawstring;
    List       *elemlist;
    ListCell   *l;
    int         priority = 0;
    bool        found = false;

    /*
     * Since synchronous cascade replication is not allowed, we always set the
     * priority of cascading walsender to zero.
     */
    if (am_cascading_walsender)
        return 0;

    /* Need a modifiable copy of string */
    rawstring = pstrdup(SyncRepStandbyNames);

    /* Parse string into list of identifiers */
    if (!SplitIdentifierString(rawstring, ',', &elemlist))
    {
        /* syntax error in list */
        pfree(rawstring);
        list_free(elemlist);
        /* GUC machinery will have already complained - no need to do again */
        return 0;
    }

    foreach(l, elemlist)
    {
        char       *standby_name = (char *) lfirst(l);

        priority++;

        if (pg_strcasecmp(standby_name, application_name) == 0 ||
            pg_strcasecmp(standby_name, "*") == 0)
        {
            found = true;
            break;
        }
    }

    pfree(rawstring);
    list_free(elemlist);

    return (found ? priority : 0);
}

void SyncRepInitConfig ( void   ) 

Definition at line 339 of file syncrep.c.

References application_name, DEBUG1, ereport, errmsg(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyWalSnd, WalSnd::sync_standby_priority, SyncRepGetStandbyPriority(), and SyncRepLock.

Referenced by StartReplication(), and WalSndLoop().

{
    int         priority;

    /*
     * Determine if we are a potential sync standby and remember the result
     * for handling replies from standby.
     */
    priority = SyncRepGetStandbyPriority();
    if (MyWalSnd->sync_standby_priority != priority)
    {
        LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
        MyWalSnd->sync_standby_priority = priority;
        LWLockRelease(SyncRepLock);
        ereport(DEBUG1,
            (errmsg("standby \"%s\" now has synchronous standby priority %u",
                    application_name, priority)));
    }
}

static void SyncRepQueueInsert ( int  mode  )  [static]

Definition at line 275 of file syncrep.c.

References Assert, MyProc, NUM_SYNC_REP_WAIT_MODE, offsetof, SHMQueueInsertAfter(), SHMQueuePrev(), PGPROC::syncRepLinks, WalSndCtlData::SyncRepQueue, PGPROC::waitLSN, and WalSndCtl.

Referenced by SyncRepWaitForLSN().

{
    PGPROC     *proc;

    Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
    proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
                                   &(WalSndCtl->SyncRepQueue[mode]),
                                   offsetof(PGPROC, syncRepLinks));

    while (proc)
    {
        /*
         * Stop at the queue element that we should after to ensure the queue
         * is ordered by LSN.
         */
        if (proc->waitLSN < MyProc->waitLSN)
            break;

        proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
                                       &(proc->syncRepLinks),
                                       offsetof(PGPROC, syncRepLinks));
    }

    if (proc)
        SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks));
    else
        SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue[mode]), &(MyProc->syncRepLinks));
}

void SyncRepReleaseWaiters ( void   ) 

Definition at line 367 of file syncrep.c.

References announce_next_takeover, application_name, Assert, DEBUG3, elog, ereport, errmsg(), WalSnd::flush, i, LOG, WalSndCtlData::lsn, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_wal_senders, MyWalSnd, WalSnd::pid, WalSnd::state, SYNC_REP_WAIT_FLUSH, SYNC_REP_WAIT_WRITE, WalSnd::sync_standby_priority, SyncRepLock, SyncRepWakeQueue(), WalSndCtl, WalSndCtlData::walsnds, WALSNDSTATE_STREAMING, WalSnd::write, and XLogRecPtrIsInvalid.

Referenced by ProcessStandbyReplyMessage().

{
    volatile WalSndCtlData *walsndctl = WalSndCtl;
    volatile WalSnd *syncWalSnd = NULL;
    int         numwrite = 0;
    int         numflush = 0;
    int         priority = 0;
    int         i;

    /*
     * If this WALSender is serving a standby that is not on the list of
     * potential standbys then we have nothing to do. If we are still starting
     * up, still running base backup or the current flush position is still
     * invalid, then leave quickly also.
     */
    if (MyWalSnd->sync_standby_priority == 0 ||
        MyWalSnd->state < WALSNDSTATE_STREAMING ||
        XLogRecPtrIsInvalid(MyWalSnd->flush))
        return;

    /*
     * We're a potential sync standby. Release waiters if we are the highest
     * priority standby. If there are multiple standbys with same priorities
     * then we use the first mentioned standby. If you change this, also
     * change pg_stat_get_wal_senders().
     */
    LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);

    for (i = 0; i < max_wal_senders; i++)
    {
        /* use volatile pointer to prevent code rearrangement */
        volatile WalSnd *walsnd = &walsndctl->walsnds[i];

        if (walsnd->pid != 0 &&
            walsnd->state == WALSNDSTATE_STREAMING &&
            walsnd->sync_standby_priority > 0 &&
            (priority == 0 ||
             priority > walsnd->sync_standby_priority) &&
            !XLogRecPtrIsInvalid(walsnd->flush))
        {
            priority = walsnd->sync_standby_priority;
            syncWalSnd = walsnd;
        }
    }

    /*
     * We should have found ourselves at least.
     */
    Assert(syncWalSnd);

    /*
     * If we aren't managing the highest priority standby then just leave.
     */
    if (syncWalSnd != MyWalSnd)
    {
        LWLockRelease(SyncRepLock);
        announce_next_takeover = true;
        return;
    }

    /*
     * Set the lsn first so that when we wake backends they will release up to
     * this location.
     */
    if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < MyWalSnd->write)
    {
        walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write;
        numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
    }
    if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < MyWalSnd->flush)
    {
        walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush;
        numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
    }

    LWLockRelease(SyncRepLock);

    elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X",
         numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write,
         numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush);

    /*
     * If we are managing the highest priority standby, though we weren't
     * prior to this, then announce we are now the sync standby.
     */
    if (announce_next_takeover)
    {
        announce_next_takeover = false;
        ereport(LOG,
                (errmsg("standby \"%s\" is now the synchronous standby with priority %u",
                        application_name, MyWalSnd->sync_standby_priority)));
    }
}

void SyncRepUpdateSyncStandbysDefined ( void   ) 

Definition at line 588 of file syncrep.c.

References i, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), WalSndCtlData::sync_standbys_defined, SyncRepLock, SyncRepWakeQueue(), SyncStandbysDefined, and WalSndCtl.

Referenced by UpdateSharedMemoryConfig().

{
    bool        sync_standbys_defined = SyncStandbysDefined();

    if (sync_standbys_defined != WalSndCtl->sync_standbys_defined)
    {
        LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);

        /*
         * If synchronous_standby_names has been reset to empty, it's futile
         * for backends to continue to waiting.  Since the user no longer
         * wants synchronous replication, we'd better wake them up.
         */
        if (!sync_standbys_defined)
        {
            int         i;

            for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
                SyncRepWakeQueue(true, i);
        }

        /*
         * Only allow people to join the queue when there are synchronous
         * standbys defined.  Without this interlock, there's a race
         * condition: we might wake up all the current waiters; then, some
         * backend that hasn't yet reloaded its config might go to sleep on
         * the queue (and never wake up).  This prevents that.
         */
        WalSndCtl->sync_standbys_defined = sync_standbys_defined;

        LWLockRelease(SyncRepLock);
    }
}

void SyncRepWaitForLSN ( XLogRecPtr  XactCommitLSN  ) 

Definition at line 94 of file syncrep.c.

References Assert, ereport, errcode(), errdetail(), errmsg(), get_ps_display(), LW_EXCLUSIVE, LW_SHARED, LWLockAcquire(), LWLockRelease(), MyProc, NULL, palloc(), pfree(), PostmasterIsAlive(), ProcDiePending, PGPROC::procLatch, QueryCancelPending, ResetLatch(), set_ps_display(), SHMQueueIsDetached(), SYNC_REP_NOT_WAITING, SYNC_REP_WAIT_COMPLETE, SYNC_REP_WAITING, WalSndCtlData::sync_standbys_defined, SyncRepCancelWait(), PGPROC::syncRepLinks, SyncRepLock, SyncRepQueueInsert(), SyncRepRequested, PGPROC::syncRepState, SyncRepWaitMode, SyncStandbysDefined, update_process_title, WaitLatch(), PGPROC::waitLSN, WalSndCtl, WARNING, whereToSendOutput, WL_LATCH_SET, and WL_POSTMASTER_DEATH.

Referenced by EndPrepare(), RecordTransactionAbortPrepared(), RecordTransactionCommit(), and RecordTransactionCommitPrepared().

{
    char       *new_status = NULL;
    const char *old_status;
    int         mode = SyncRepWaitMode;

    /*
     * Fast exit if user has not requested sync replication, or there are no
     * sync replication standby names defined. Note that those standbys don't
     * need to be connected.
     */
    if (!SyncRepRequested() || !SyncStandbysDefined())
        return;

    Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
    Assert(WalSndCtl != NULL);

    LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
    Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);

    /*
     * We don't wait for sync rep if WalSndCtl->sync_standbys_defined is not
     * set.  See SyncRepUpdateSyncStandbysDefined.
     *
     * Also check that the standby hasn't already replied. Unlikely race
     * condition but we'll be fetching that cache line anyway so its likely to
     * be a low cost check.
     */
    if (!WalSndCtl->sync_standbys_defined ||
        XactCommitLSN <= WalSndCtl->lsn[mode])
    {
        LWLockRelease(SyncRepLock);
        return;
    }

    /*
     * Set our waitLSN so WALSender will know when to wake us, and add
     * ourselves to the queue.
     */
    MyProc->waitLSN = XactCommitLSN;
    MyProc->syncRepState = SYNC_REP_WAITING;
    SyncRepQueueInsert(mode);
    Assert(SyncRepQueueIsOrderedByLSN(mode));
    LWLockRelease(SyncRepLock);

    /* Alter ps display to show waiting for sync rep. */
    if (update_process_title)
    {
        int         len;

        old_status = get_ps_display(&len);
        new_status = (char *) palloc(len + 32 + 1);
        memcpy(new_status, old_status, len);
        sprintf(new_status + len, " waiting for %X/%X",
                (uint32) (XactCommitLSN >> 32), (uint32) XactCommitLSN);
        set_ps_display(new_status, false);
        new_status[len] = '\0'; /* truncate off " waiting ..." */
    }

    /*
     * Wait for specified LSN to be confirmed.
     *
     * Each proc has its own wait latch, so we perform a normal latch
     * check/wait loop here.
     */
    for (;;)
    {
        int         syncRepState;

        /* Must reset the latch before testing state. */
        ResetLatch(&MyProc->procLatch);

        /*
         * Try checking the state without the lock first.  There's no
         * guarantee that we'll read the most up-to-date value, so if it looks
         * like we're still waiting, recheck while holding the lock.  But if
         * it looks like we're done, we must really be done, because once
         * walsender changes the state to SYNC_REP_WAIT_COMPLETE, it will
         * never update it again, so we can't be seeing a stale value in that
         * case.
         *
         * Note: on machines with weak memory ordering, the acquisition of the
         * lock is essential to avoid race conditions: we cannot be sure the
         * sender's state update has reached main memory until we acquire the
         * lock.  We could get rid of this dance if SetLatch/ResetLatch
         * contained memory barriers.
         */
        syncRepState = MyProc->syncRepState;
        if (syncRepState == SYNC_REP_WAITING)
        {
            LWLockAcquire(SyncRepLock, LW_SHARED);
            syncRepState = MyProc->syncRepState;
            LWLockRelease(SyncRepLock);
        }
        if (syncRepState == SYNC_REP_WAIT_COMPLETE)
            break;

        /*
         * If a wait for synchronous replication is pending, we can neither
         * acknowledge the commit nor raise ERROR or FATAL.  The latter would
         * lead the client to believe that that the transaction aborted, which
         * is not true: it's already committed locally. The former is no good
         * either: the client has requested synchronous replication, and is
         * entitled to assume that an acknowledged commit is also replicated,
         * which might not be true. So in this case we issue a WARNING (which
         * some clients may be able to interpret) and shut off further output.
         * We do NOT reset ProcDiePending, so that the process will die after
         * the commit is cleaned up.
         */
        if (ProcDiePending)
        {
            ereport(WARNING,
                    (errcode(ERRCODE_ADMIN_SHUTDOWN),
                     errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
                     errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
            whereToSendOutput = DestNone;
            SyncRepCancelWait();
            break;
        }

        /*
         * It's unclear what to do if a query cancel interrupt arrives.  We
         * can't actually abort at this point, but ignoring the interrupt
         * altogether is not helpful, so we just terminate the wait with a
         * suitable warning.
         */
        if (QueryCancelPending)
        {
            QueryCancelPending = false;
            ereport(WARNING,
                    (errmsg("canceling wait for synchronous replication due to user request"),
                     errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
            SyncRepCancelWait();
            break;
        }

        /*
         * If the postmaster dies, we'll probably never get an
         * acknowledgement, because all the wal sender processes will exit. So
         * just bail out.
         */
        if (!PostmasterIsAlive())
        {
            ProcDiePending = true;
            whereToSendOutput = DestNone;
            SyncRepCancelWait();
            break;
        }

        /*
         * Wait on latch.  Any condition that should wake us up will set the
         * latch, so no need for timeout.
         */
        WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1);
    }

    /*
     * WalSender has checked our LSN and has removed us from queue. Clean up
     * state and leave.  It's OK to reset these shared memory fields without
     * holding SyncRepLock, because any walsenders will ignore us anyway when
     * we're not on the queue.
     */
    Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
    MyProc->syncRepState = SYNC_REP_NOT_WAITING;
    MyProc->waitLSN = 0;

    if (new_status)
    {
        /* Reset ps display */
        set_ps_display(new_status, false);
        pfree(new_status);
    }
}

int SyncRepWakeQueue ( bool  all,
int  mode 
)

Definition at line 527 of file syncrep.c.

References Assert, WalSndCtlData::lsn, NUM_SYNC_REP_WAIT_MODE, offsetof, PGPROC::procLatch, SetLatch(), SHMQueueDelete(), SHMQueueNext(), PGPROC::syncRepLinks, WalSndCtlData::SyncRepQueue, PGPROC::syncRepState, PGPROC::waitLSN, and WalSndCtl.

Referenced by SyncRepReleaseWaiters(), and SyncRepUpdateSyncStandbysDefined().

{
    volatile WalSndCtlData *walsndctl = WalSndCtl;
    PGPROC     *proc = NULL;
    PGPROC     *thisproc = NULL;
    int         numprocs = 0;

    Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
    Assert(SyncRepQueueIsOrderedByLSN(mode));

    proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
                                   &(WalSndCtl->SyncRepQueue[mode]),
                                   offsetof(PGPROC, syncRepLinks));

    while (proc)
    {
        /*
         * Assume the queue is ordered by LSN
         */
        if (!all && walsndctl->lsn[mode] < proc->waitLSN)
            return numprocs;

        /*
         * Move to next proc, so we can delete thisproc from the queue.
         * thisproc is valid, proc may be NULL after this.
         */
        thisproc = proc;
        proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
                                       &(proc->syncRepLinks),
                                       offsetof(PGPROC, syncRepLinks));

        /*
         * Set state to complete; see SyncRepWaitForLSN() for discussion of
         * the various states.
         */
        thisproc->syncRepState = SYNC_REP_WAIT_COMPLETE;

        /*
         * Remove thisproc from queue.
         */
        SHMQueueDelete(&(thisproc->syncRepLinks));

        /*
         * Wake only when we have set state and removed from queue.
         */
        SetLatch(&(thisproc->procLatch));

        numprocs++;
    }

    return numprocs;
}


Variable Documentation

bool announce_next_takeover = true [static]

Definition at line 65 of file syncrep.c.

Referenced by SyncRepReleaseWaiters().

Definition at line 60 of file syncrep.c.

Referenced by SyncRepGetStandbyPriority().

int SyncRepWaitMode = SYNC_REP_NO_WAIT [static]

Definition at line 67 of file syncrep.c.

Referenced by assign_synchronous_commit(), and SyncRepWaitForLSN().