#include "access/xlogdefs.h"#include "utils/guc.h"

Go to the source code of this file.
Defines | |
| #define | SyncRepRequested() (max_wal_senders > 0 && synchronous_commit > SYNCHRONOUS_COMMIT_LOCAL_FLUSH) |
| #define | SYNC_REP_NO_WAIT -1 |
| #define | SYNC_REP_WAIT_WRITE 0 |
| #define | SYNC_REP_WAIT_FLUSH 1 |
| #define | NUM_SYNC_REP_WAIT_MODE 2 |
| #define | SYNC_REP_NOT_WAITING 0 |
| #define | SYNC_REP_WAITING 1 |
| #define | SYNC_REP_WAIT_COMPLETE 2 |
Functions | |
| void | SyncRepWaitForLSN (XLogRecPtr XactCommitLSN) |
| void | SyncRepCleanupAtProcExit (void) |
| void | SyncRepInitConfig (void) |
| void | SyncRepReleaseWaiters (void) |
| void | SyncRepUpdateSyncStandbysDefined (void) |
| int | SyncRepWakeQueue (bool all, int mode) |
| bool | check_synchronous_standby_names (char **newval, void **extra, GucSource source) |
| void | assign_synchronous_commit (int newval, void *extra) |
Variables | |
| char * | SyncRepStandbyNames |
| #define NUM_SYNC_REP_WAIT_MODE 2 |
Definition at line 27 of file syncrep.h.
Referenced by SyncRepQueueInsert(), and SyncRepWakeQueue().
| #define SYNC_REP_NOT_WAITING 0 |
Definition at line 30 of file syncrep.h.
Referenced by SyncRepWaitForLSN().
| #define SYNC_REP_WAIT_COMPLETE 2 |
Definition at line 32 of file syncrep.h.
Referenced by SyncRepWaitForLSN().
| #define SYNC_REP_WAIT_FLUSH 1 |
Definition at line 25 of file syncrep.h.
Referenced by SyncRepReleaseWaiters().
| #define SYNC_REP_WAIT_WRITE 0 |
Definition at line 24 of file syncrep.h.
Referenced by SyncRepReleaseWaiters().
| #define SYNC_REP_WAITING 1 |
Definition at line 31 of file syncrep.h.
Referenced by SyncRepWaitForLSN().
| #define SyncRepRequested | ( | ) | (max_wal_senders > 0 && synchronous_commit > SYNCHRONOUS_COMMIT_LOCAL_FLUSH) |
Definition at line 19 of file syncrep.h.
Referenced by SyncRepWaitForLSN().
| void assign_synchronous_commit | ( | int | newval, | |
| void * | extra | |||
| ) |
Definition at line 697 of file syncrep.c.
References SYNCHRONOUS_COMMIT_REMOTE_FLUSH, SYNCHRONOUS_COMMIT_REMOTE_WRITE, and SyncRepWaitMode.
{
switch (newval)
{
case SYNCHRONOUS_COMMIT_REMOTE_WRITE:
SyncRepWaitMode = SYNC_REP_WAIT_WRITE;
break;
case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
break;
default:
SyncRepWaitMode = SYNC_REP_NO_WAIT;
break;
}
}
Definition at line 664 of file syncrep.c.
References GUC_check_errdetail, list_free(), pfree(), pstrdup(), and SplitIdentifierString().
{
char *rawstring;
List *elemlist;
/* Need a modifiable copy of string */
rawstring = pstrdup(*newval);
/* Parse string into list of identifiers */
if (!SplitIdentifierString(rawstring, ',', &elemlist))
{
/* syntax error in list */
GUC_check_errdetail("List syntax is invalid.");
pfree(rawstring);
list_free(elemlist);
return false;
}
/*
* Any additional validation of standby names should go here.
*
* Don't attempt to set WALSender priority because this is executed by
* postmaster at startup, not WALSender, so the application_name is not
* yet correctly set.
*/
pfree(rawstring);
list_free(elemlist);
return true;
}
| void SyncRepCleanupAtProcExit | ( | void | ) |
Definition at line 318 of file syncrep.c.
References LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyProc, SHMQueueDelete(), SHMQueueIsDetached(), PGPROC::syncRepLinks, and SyncRepLock.
Referenced by ProcKill().
{
if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
{
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
SHMQueueDelete(&(MyProc->syncRepLinks));
LWLockRelease(SyncRepLock);
}
}
| void SyncRepInitConfig | ( | void | ) |
Definition at line 339 of file syncrep.c.
References application_name, DEBUG1, ereport, errmsg(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyWalSnd, WalSnd::sync_standby_priority, SyncRepGetStandbyPriority(), and SyncRepLock.
Referenced by StartReplication(), and WalSndLoop().
{
int priority;
/*
* Determine if we are a potential sync standby and remember the result
* for handling replies from standby.
*/
priority = SyncRepGetStandbyPriority();
if (MyWalSnd->sync_standby_priority != priority)
{
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
MyWalSnd->sync_standby_priority = priority;
LWLockRelease(SyncRepLock);
ereport(DEBUG1,
(errmsg("standby \"%s\" now has synchronous standby priority %u",
application_name, priority)));
}
}
| void SyncRepReleaseWaiters | ( | void | ) |
Definition at line 367 of file syncrep.c.
References announce_next_takeover, application_name, Assert, DEBUG3, elog, ereport, errmsg(), WalSnd::flush, i, LOG, WalSndCtlData::lsn, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_wal_senders, MyWalSnd, WalSnd::pid, WalSnd::state, SYNC_REP_WAIT_FLUSH, SYNC_REP_WAIT_WRITE, WalSnd::sync_standby_priority, SyncRepLock, SyncRepWakeQueue(), WalSndCtl, WalSndCtlData::walsnds, WALSNDSTATE_STREAMING, WalSnd::write, and XLogRecPtrIsInvalid.
Referenced by ProcessStandbyReplyMessage().
{
volatile WalSndCtlData *walsndctl = WalSndCtl;
volatile WalSnd *syncWalSnd = NULL;
int numwrite = 0;
int numflush = 0;
int priority = 0;
int i;
/*
* If this WALSender is serving a standby that is not on the list of
* potential standbys then we have nothing to do. If we are still starting
* up, still running base backup or the current flush position is still
* invalid, then leave quickly also.
*/
if (MyWalSnd->sync_standby_priority == 0 ||
MyWalSnd->state < WALSNDSTATE_STREAMING ||
XLogRecPtrIsInvalid(MyWalSnd->flush))
return;
/*
* We're a potential sync standby. Release waiters if we are the highest
* priority standby. If there are multiple standbys with same priorities
* then we use the first mentioned standby. If you change this, also
* change pg_stat_get_wal_senders().
*/
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
for (i = 0; i < max_wal_senders; i++)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalSnd *walsnd = &walsndctl->walsnds[i];
if (walsnd->pid != 0 &&
walsnd->state == WALSNDSTATE_STREAMING &&
walsnd->sync_standby_priority > 0 &&
(priority == 0 ||
priority > walsnd->sync_standby_priority) &&
!XLogRecPtrIsInvalid(walsnd->flush))
{
priority = walsnd->sync_standby_priority;
syncWalSnd = walsnd;
}
}
/*
* We should have found ourselves at least.
*/
Assert(syncWalSnd);
/*
* If we aren't managing the highest priority standby then just leave.
*/
if (syncWalSnd != MyWalSnd)
{
LWLockRelease(SyncRepLock);
announce_next_takeover = true;
return;
}
/*
* Set the lsn first so that when we wake backends they will release up to
* this location.
*/
if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < MyWalSnd->write)
{
walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write;
numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
}
if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < MyWalSnd->flush)
{
walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush;
numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
}
LWLockRelease(SyncRepLock);
elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X",
numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write,
numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush);
/*
* If we are managing the highest priority standby, though we weren't
* prior to this, then announce we are now the sync standby.
*/
if (announce_next_takeover)
{
announce_next_takeover = false;
ereport(LOG,
(errmsg("standby \"%s\" is now the synchronous standby with priority %u",
application_name, MyWalSnd->sync_standby_priority)));
}
}
| void SyncRepUpdateSyncStandbysDefined | ( | void | ) |
Definition at line 588 of file syncrep.c.
References i, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), WalSndCtlData::sync_standbys_defined, SyncRepLock, SyncRepWakeQueue(), SyncStandbysDefined, and WalSndCtl.
Referenced by UpdateSharedMemoryConfig().
{
bool sync_standbys_defined = SyncStandbysDefined();
if (sync_standbys_defined != WalSndCtl->sync_standbys_defined)
{
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
/*
* If synchronous_standby_names has been reset to empty, it's futile
* for backends to continue to waiting. Since the user no longer
* wants synchronous replication, we'd better wake them up.
*/
if (!sync_standbys_defined)
{
int i;
for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
SyncRepWakeQueue(true, i);
}
/*
* Only allow people to join the queue when there are synchronous
* standbys defined. Without this interlock, there's a race
* condition: we might wake up all the current waiters; then, some
* backend that hasn't yet reloaded its config might go to sleep on
* the queue (and never wake up). This prevents that.
*/
WalSndCtl->sync_standbys_defined = sync_standbys_defined;
LWLockRelease(SyncRepLock);
}
}
| void SyncRepWaitForLSN | ( | XLogRecPtr | XactCommitLSN | ) |
Definition at line 94 of file syncrep.c.
References Assert, ereport, errcode(), errdetail(), errmsg(), get_ps_display(), LW_EXCLUSIVE, LW_SHARED, LWLockAcquire(), LWLockRelease(), MyProc, NULL, palloc(), pfree(), PostmasterIsAlive(), ProcDiePending, PGPROC::procLatch, QueryCancelPending, ResetLatch(), set_ps_display(), SHMQueueIsDetached(), SYNC_REP_NOT_WAITING, SYNC_REP_WAIT_COMPLETE, SYNC_REP_WAITING, WalSndCtlData::sync_standbys_defined, SyncRepCancelWait(), PGPROC::syncRepLinks, SyncRepLock, SyncRepQueueInsert(), SyncRepRequested, PGPROC::syncRepState, SyncRepWaitMode, SyncStandbysDefined, update_process_title, WaitLatch(), PGPROC::waitLSN, WalSndCtl, WARNING, whereToSendOutput, WL_LATCH_SET, and WL_POSTMASTER_DEATH.
Referenced by EndPrepare(), RecordTransactionAbortPrepared(), RecordTransactionCommit(), and RecordTransactionCommitPrepared().
{
char *new_status = NULL;
const char *old_status;
int mode = SyncRepWaitMode;
/*
* Fast exit if user has not requested sync replication, or there are no
* sync replication standby names defined. Note that those standbys don't
* need to be connected.
*/
if (!SyncRepRequested() || !SyncStandbysDefined())
return;
Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
Assert(WalSndCtl != NULL);
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);
/*
* We don't wait for sync rep if WalSndCtl->sync_standbys_defined is not
* set. See SyncRepUpdateSyncStandbysDefined.
*
* Also check that the standby hasn't already replied. Unlikely race
* condition but we'll be fetching that cache line anyway so its likely to
* be a low cost check.
*/
if (!WalSndCtl->sync_standbys_defined ||
XactCommitLSN <= WalSndCtl->lsn[mode])
{
LWLockRelease(SyncRepLock);
return;
}
/*
* Set our waitLSN so WALSender will know when to wake us, and add
* ourselves to the queue.
*/
MyProc->waitLSN = XactCommitLSN;
MyProc->syncRepState = SYNC_REP_WAITING;
SyncRepQueueInsert(mode);
Assert(SyncRepQueueIsOrderedByLSN(mode));
LWLockRelease(SyncRepLock);
/* Alter ps display to show waiting for sync rep. */
if (update_process_title)
{
int len;
old_status = get_ps_display(&len);
new_status = (char *) palloc(len + 32 + 1);
memcpy(new_status, old_status, len);
sprintf(new_status + len, " waiting for %X/%X",
(uint32) (XactCommitLSN >> 32), (uint32) XactCommitLSN);
set_ps_display(new_status, false);
new_status[len] = '\0'; /* truncate off " waiting ..." */
}
/*
* Wait for specified LSN to be confirmed.
*
* Each proc has its own wait latch, so we perform a normal latch
* check/wait loop here.
*/
for (;;)
{
int syncRepState;
/* Must reset the latch before testing state. */
ResetLatch(&MyProc->procLatch);
/*
* Try checking the state without the lock first. There's no
* guarantee that we'll read the most up-to-date value, so if it looks
* like we're still waiting, recheck while holding the lock. But if
* it looks like we're done, we must really be done, because once
* walsender changes the state to SYNC_REP_WAIT_COMPLETE, it will
* never update it again, so we can't be seeing a stale value in that
* case.
*
* Note: on machines with weak memory ordering, the acquisition of the
* lock is essential to avoid race conditions: we cannot be sure the
* sender's state update has reached main memory until we acquire the
* lock. We could get rid of this dance if SetLatch/ResetLatch
* contained memory barriers.
*/
syncRepState = MyProc->syncRepState;
if (syncRepState == SYNC_REP_WAITING)
{
LWLockAcquire(SyncRepLock, LW_SHARED);
syncRepState = MyProc->syncRepState;
LWLockRelease(SyncRepLock);
}
if (syncRepState == SYNC_REP_WAIT_COMPLETE)
break;
/*
* If a wait for synchronous replication is pending, we can neither
* acknowledge the commit nor raise ERROR or FATAL. The latter would
* lead the client to believe that that the transaction aborted, which
* is not true: it's already committed locally. The former is no good
* either: the client has requested synchronous replication, and is
* entitled to assume that an acknowledged commit is also replicated,
* which might not be true. So in this case we issue a WARNING (which
* some clients may be able to interpret) and shut off further output.
* We do NOT reset ProcDiePending, so that the process will die after
* the commit is cleaned up.
*/
if (ProcDiePending)
{
ereport(WARNING,
(errcode(ERRCODE_ADMIN_SHUTDOWN),
errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
whereToSendOutput = DestNone;
SyncRepCancelWait();
break;
}
/*
* It's unclear what to do if a query cancel interrupt arrives. We
* can't actually abort at this point, but ignoring the interrupt
* altogether is not helpful, so we just terminate the wait with a
* suitable warning.
*/
if (QueryCancelPending)
{
QueryCancelPending = false;
ereport(WARNING,
(errmsg("canceling wait for synchronous replication due to user request"),
errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
SyncRepCancelWait();
break;
}
/*
* If the postmaster dies, we'll probably never get an
* acknowledgement, because all the wal sender processes will exit. So
* just bail out.
*/
if (!PostmasterIsAlive())
{
ProcDiePending = true;
whereToSendOutput = DestNone;
SyncRepCancelWait();
break;
}
/*
* Wait on latch. Any condition that should wake us up will set the
* latch, so no need for timeout.
*/
WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1);
}
/*
* WalSender has checked our LSN and has removed us from queue. Clean up
* state and leave. It's OK to reset these shared memory fields without
* holding SyncRepLock, because any walsenders will ignore us anyway when
* we're not on the queue.
*/
Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
MyProc->syncRepState = SYNC_REP_NOT_WAITING;
MyProc->waitLSN = 0;
if (new_status)
{
/* Reset ps display */
set_ps_display(new_status, false);
pfree(new_status);
}
}
| int SyncRepWakeQueue | ( | bool | all, | |
| int | mode | |||
| ) |
Definition at line 527 of file syncrep.c.
References Assert, WalSndCtlData::lsn, NUM_SYNC_REP_WAIT_MODE, offsetof, PGPROC::procLatch, SetLatch(), SHMQueueDelete(), SHMQueueNext(), PGPROC::syncRepLinks, WalSndCtlData::SyncRepQueue, PGPROC::syncRepState, PGPROC::waitLSN, and WalSndCtl.
Referenced by SyncRepReleaseWaiters(), and SyncRepUpdateSyncStandbysDefined().
{
volatile WalSndCtlData *walsndctl = WalSndCtl;
PGPROC *proc = NULL;
PGPROC *thisproc = NULL;
int numprocs = 0;
Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
Assert(SyncRepQueueIsOrderedByLSN(mode));
proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
&(WalSndCtl->SyncRepQueue[mode]),
offsetof(PGPROC, syncRepLinks));
while (proc)
{
/*
* Assume the queue is ordered by LSN
*/
if (!all && walsndctl->lsn[mode] < proc->waitLSN)
return numprocs;
/*
* Move to next proc, so we can delete thisproc from the queue.
* thisproc is valid, proc may be NULL after this.
*/
thisproc = proc;
proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
&(proc->syncRepLinks),
offsetof(PGPROC, syncRepLinks));
/*
* Set state to complete; see SyncRepWaitForLSN() for discussion of
* the various states.
*/
thisproc->syncRepState = SYNC_REP_WAIT_COMPLETE;
/*
* Remove thisproc from queue.
*/
SHMQueueDelete(&(thisproc->syncRepLinks));
/*
* Wake only when we have set state and removed from queue.
*/
SetLatch(&(thisproc->procLatch));
numprocs++;
}
return numprocs;
}
| char* SyncRepStandbyNames |
Definition at line 60 of file syncrep.c.
Referenced by SyncRepGetStandbyPriority().
1.7.1