#include "postgres.h"
#include <unistd.h>
#include "access/xact.h"
#include "miscadmin.h"
#include "replication/syncrep.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/ps_status.h"
Go to the source code of this file.
Defines | |
#define | SyncStandbysDefined() (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0') |
Functions | |
static void | SyncRepQueueInsert (int mode) |
static void | SyncRepCancelWait (void) |
static int | SyncRepGetStandbyPriority (void) |
void | SyncRepWaitForLSN (XLogRecPtr XactCommitLSN) |
void | SyncRepCleanupAtProcExit (void) |
void | SyncRepInitConfig (void) |
void | SyncRepReleaseWaiters (void) |
int | SyncRepWakeQueue (bool all, int mode) |
void | SyncRepUpdateSyncStandbysDefined (void) |
bool | check_synchronous_standby_names (char **newval, void **extra, GucSource source) |
void | assign_synchronous_commit (int newval, void *extra) |
Variables | |
char * | SyncRepStandbyNames |
static bool | announce_next_takeover = true |
static int | SyncRepWaitMode = SYNC_REP_NO_WAIT |
#define SyncStandbysDefined | ( | ) | (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0') |
Definition at line 62 of file syncrep.c.
Referenced by SyncRepUpdateSyncStandbysDefined(), and SyncRepWaitForLSN().
void assign_synchronous_commit | ( | int | newval, | |
void * | extra | |||
) |
Definition at line 697 of file syncrep.c.
References SYNCHRONOUS_COMMIT_REMOTE_FLUSH, SYNCHRONOUS_COMMIT_REMOTE_WRITE, and SyncRepWaitMode.
{ switch (newval) { case SYNCHRONOUS_COMMIT_REMOTE_WRITE: SyncRepWaitMode = SYNC_REP_WAIT_WRITE; break; case SYNCHRONOUS_COMMIT_REMOTE_FLUSH: SyncRepWaitMode = SYNC_REP_WAIT_FLUSH; break; default: SyncRepWaitMode = SYNC_REP_NO_WAIT; break; } }
Definition at line 664 of file syncrep.c.
References GUC_check_errdetail, list_free(), pfree(), pstrdup(), and SplitIdentifierString().
{ char *rawstring; List *elemlist; /* Need a modifiable copy of string */ rawstring = pstrdup(*newval); /* Parse string into list of identifiers */ if (!SplitIdentifierString(rawstring, ',', &elemlist)) { /* syntax error in list */ GUC_check_errdetail("List syntax is invalid."); pfree(rawstring); list_free(elemlist); return false; } /* * Any additional validation of standby names should go here. * * Don't attempt to set WALSender priority because this is executed by * postmaster at startup, not WALSender, so the application_name is not * yet correctly set. */ pfree(rawstring); list_free(elemlist); return true; }
static void SyncRepCancelWait | ( | void | ) | [static] |
Definition at line 308 of file syncrep.c.
References LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyProc, SHMQueueDelete(), SHMQueueIsDetached(), PGPROC::syncRepLinks, SyncRepLock, and PGPROC::syncRepState.
Referenced by SyncRepWaitForLSN().
{ LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); if (!SHMQueueIsDetached(&(MyProc->syncRepLinks))) SHMQueueDelete(&(MyProc->syncRepLinks)); MyProc->syncRepState = SYNC_REP_NOT_WAITING; LWLockRelease(SyncRepLock); }
void SyncRepCleanupAtProcExit | ( | void | ) |
Definition at line 318 of file syncrep.c.
References LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyProc, SHMQueueDelete(), SHMQueueIsDetached(), PGPROC::syncRepLinks, and SyncRepLock.
Referenced by ProcKill().
{ if (!SHMQueueIsDetached(&(MyProc->syncRepLinks))) { LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); SHMQueueDelete(&(MyProc->syncRepLinks)); LWLockRelease(SyncRepLock); } }
static int SyncRepGetStandbyPriority | ( | void | ) | [static] |
Definition at line 470 of file syncrep.c.
References am_cascading_walsender, application_name, lfirst, list_free(), pfree(), pg_strcasecmp(), pstrdup(), SplitIdentifierString(), and SyncRepStandbyNames.
Referenced by SyncRepInitConfig().
{ char *rawstring; List *elemlist; ListCell *l; int priority = 0; bool found = false; /* * Since synchronous cascade replication is not allowed, we always set the * priority of cascading walsender to zero. */ if (am_cascading_walsender) return 0; /* Need a modifiable copy of string */ rawstring = pstrdup(SyncRepStandbyNames); /* Parse string into list of identifiers */ if (!SplitIdentifierString(rawstring, ',', &elemlist)) { /* syntax error in list */ pfree(rawstring); list_free(elemlist); /* GUC machinery will have already complained - no need to do again */ return 0; } foreach(l, elemlist) { char *standby_name = (char *) lfirst(l); priority++; if (pg_strcasecmp(standby_name, application_name) == 0 || pg_strcasecmp(standby_name, "*") == 0) { found = true; break; } } pfree(rawstring); list_free(elemlist); return (found ? priority : 0); }
void SyncRepInitConfig | ( | void | ) |
Definition at line 339 of file syncrep.c.
References application_name, DEBUG1, ereport, errmsg(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyWalSnd, WalSnd::sync_standby_priority, SyncRepGetStandbyPriority(), and SyncRepLock.
Referenced by StartReplication(), and WalSndLoop().
{ int priority; /* * Determine if we are a potential sync standby and remember the result * for handling replies from standby. */ priority = SyncRepGetStandbyPriority(); if (MyWalSnd->sync_standby_priority != priority) { LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); MyWalSnd->sync_standby_priority = priority; LWLockRelease(SyncRepLock); ereport(DEBUG1, (errmsg("standby \"%s\" now has synchronous standby priority %u", application_name, priority))); } }
static void SyncRepQueueInsert | ( | int | mode | ) | [static] |
Definition at line 275 of file syncrep.c.
References Assert, MyProc, NUM_SYNC_REP_WAIT_MODE, offsetof, SHMQueueInsertAfter(), SHMQueuePrev(), PGPROC::syncRepLinks, WalSndCtlData::SyncRepQueue, PGPROC::waitLSN, and WalSndCtl.
Referenced by SyncRepWaitForLSN().
{ PGPROC *proc; Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE); proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]), &(WalSndCtl->SyncRepQueue[mode]), offsetof(PGPROC, syncRepLinks)); while (proc) { /* * Stop at the queue element that we should after to ensure the queue * is ordered by LSN. */ if (proc->waitLSN < MyProc->waitLSN) break; proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]), &(proc->syncRepLinks), offsetof(PGPROC, syncRepLinks)); } if (proc) SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks)); else SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue[mode]), &(MyProc->syncRepLinks)); }
void SyncRepReleaseWaiters | ( | void | ) |
Definition at line 367 of file syncrep.c.
References announce_next_takeover, application_name, Assert, DEBUG3, elog, ereport, errmsg(), WalSnd::flush, i, LOG, WalSndCtlData::lsn, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_wal_senders, MyWalSnd, WalSnd::pid, WalSnd::state, SYNC_REP_WAIT_FLUSH, SYNC_REP_WAIT_WRITE, WalSnd::sync_standby_priority, SyncRepLock, SyncRepWakeQueue(), WalSndCtl, WalSndCtlData::walsnds, WALSNDSTATE_STREAMING, WalSnd::write, and XLogRecPtrIsInvalid.
Referenced by ProcessStandbyReplyMessage().
{ volatile WalSndCtlData *walsndctl = WalSndCtl; volatile WalSnd *syncWalSnd = NULL; int numwrite = 0; int numflush = 0; int priority = 0; int i; /* * If this WALSender is serving a standby that is not on the list of * potential standbys then we have nothing to do. If we are still starting * up, still running base backup or the current flush position is still * invalid, then leave quickly also. */ if (MyWalSnd->sync_standby_priority == 0 || MyWalSnd->state < WALSNDSTATE_STREAMING || XLogRecPtrIsInvalid(MyWalSnd->flush)) return; /* * We're a potential sync standby. Release waiters if we are the highest * priority standby. If there are multiple standbys with same priorities * then we use the first mentioned standby. If you change this, also * change pg_stat_get_wal_senders(). */ LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); for (i = 0; i < max_wal_senders; i++) { /* use volatile pointer to prevent code rearrangement */ volatile WalSnd *walsnd = &walsndctl->walsnds[i]; if (walsnd->pid != 0 && walsnd->state == WALSNDSTATE_STREAMING && walsnd->sync_standby_priority > 0 && (priority == 0 || priority > walsnd->sync_standby_priority) && !XLogRecPtrIsInvalid(walsnd->flush)) { priority = walsnd->sync_standby_priority; syncWalSnd = walsnd; } } /* * We should have found ourselves at least. */ Assert(syncWalSnd); /* * If we aren't managing the highest priority standby then just leave. */ if (syncWalSnd != MyWalSnd) { LWLockRelease(SyncRepLock); announce_next_takeover = true; return; } /* * Set the lsn first so that when we wake backends they will release up to * this location. */ if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < MyWalSnd->write) { walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write; numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE); } if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < MyWalSnd->flush) { walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush; numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH); } LWLockRelease(SyncRepLock); elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X", numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write, numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush); /* * If we are managing the highest priority standby, though we weren't * prior to this, then announce we are now the sync standby. */ if (announce_next_takeover) { announce_next_takeover = false; ereport(LOG, (errmsg("standby \"%s\" is now the synchronous standby with priority %u", application_name, MyWalSnd->sync_standby_priority))); } }
void SyncRepUpdateSyncStandbysDefined | ( | void | ) |
Definition at line 588 of file syncrep.c.
References i, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), WalSndCtlData::sync_standbys_defined, SyncRepLock, SyncRepWakeQueue(), SyncStandbysDefined, and WalSndCtl.
Referenced by UpdateSharedMemoryConfig().
{ bool sync_standbys_defined = SyncStandbysDefined(); if (sync_standbys_defined != WalSndCtl->sync_standbys_defined) { LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); /* * If synchronous_standby_names has been reset to empty, it's futile * for backends to continue to waiting. Since the user no longer * wants synchronous replication, we'd better wake them up. */ if (!sync_standbys_defined) { int i; for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++) SyncRepWakeQueue(true, i); } /* * Only allow people to join the queue when there are synchronous * standbys defined. Without this interlock, there's a race * condition: we might wake up all the current waiters; then, some * backend that hasn't yet reloaded its config might go to sleep on * the queue (and never wake up). This prevents that. */ WalSndCtl->sync_standbys_defined = sync_standbys_defined; LWLockRelease(SyncRepLock); } }
void SyncRepWaitForLSN | ( | XLogRecPtr | XactCommitLSN | ) |
Definition at line 94 of file syncrep.c.
References Assert, ereport, errcode(), errdetail(), errmsg(), get_ps_display(), LW_EXCLUSIVE, LW_SHARED, LWLockAcquire(), LWLockRelease(), MyProc, NULL, palloc(), pfree(), PostmasterIsAlive(), ProcDiePending, PGPROC::procLatch, QueryCancelPending, ResetLatch(), set_ps_display(), SHMQueueIsDetached(), SYNC_REP_NOT_WAITING, SYNC_REP_WAIT_COMPLETE, SYNC_REP_WAITING, WalSndCtlData::sync_standbys_defined, SyncRepCancelWait(), PGPROC::syncRepLinks, SyncRepLock, SyncRepQueueInsert(), SyncRepRequested, PGPROC::syncRepState, SyncRepWaitMode, SyncStandbysDefined, update_process_title, WaitLatch(), PGPROC::waitLSN, WalSndCtl, WARNING, whereToSendOutput, WL_LATCH_SET, and WL_POSTMASTER_DEATH.
Referenced by EndPrepare(), RecordTransactionAbortPrepared(), RecordTransactionCommit(), and RecordTransactionCommitPrepared().
{ char *new_status = NULL; const char *old_status; int mode = SyncRepWaitMode; /* * Fast exit if user has not requested sync replication, or there are no * sync replication standby names defined. Note that those standbys don't * need to be connected. */ if (!SyncRepRequested() || !SyncStandbysDefined()) return; Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks))); Assert(WalSndCtl != NULL); LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING); /* * We don't wait for sync rep if WalSndCtl->sync_standbys_defined is not * set. See SyncRepUpdateSyncStandbysDefined. * * Also check that the standby hasn't already replied. Unlikely race * condition but we'll be fetching that cache line anyway so its likely to * be a low cost check. */ if (!WalSndCtl->sync_standbys_defined || XactCommitLSN <= WalSndCtl->lsn[mode]) { LWLockRelease(SyncRepLock); return; } /* * Set our waitLSN so WALSender will know when to wake us, and add * ourselves to the queue. */ MyProc->waitLSN = XactCommitLSN; MyProc->syncRepState = SYNC_REP_WAITING; SyncRepQueueInsert(mode); Assert(SyncRepQueueIsOrderedByLSN(mode)); LWLockRelease(SyncRepLock); /* Alter ps display to show waiting for sync rep. */ if (update_process_title) { int len; old_status = get_ps_display(&len); new_status = (char *) palloc(len + 32 + 1); memcpy(new_status, old_status, len); sprintf(new_status + len, " waiting for %X/%X", (uint32) (XactCommitLSN >> 32), (uint32) XactCommitLSN); set_ps_display(new_status, false); new_status[len] = '\0'; /* truncate off " waiting ..." */ } /* * Wait for specified LSN to be confirmed. * * Each proc has its own wait latch, so we perform a normal latch * check/wait loop here. */ for (;;) { int syncRepState; /* Must reset the latch before testing state. */ ResetLatch(&MyProc->procLatch); /* * Try checking the state without the lock first. There's no * guarantee that we'll read the most up-to-date value, so if it looks * like we're still waiting, recheck while holding the lock. But if * it looks like we're done, we must really be done, because once * walsender changes the state to SYNC_REP_WAIT_COMPLETE, it will * never update it again, so we can't be seeing a stale value in that * case. * * Note: on machines with weak memory ordering, the acquisition of the * lock is essential to avoid race conditions: we cannot be sure the * sender's state update has reached main memory until we acquire the * lock. We could get rid of this dance if SetLatch/ResetLatch * contained memory barriers. */ syncRepState = MyProc->syncRepState; if (syncRepState == SYNC_REP_WAITING) { LWLockAcquire(SyncRepLock, LW_SHARED); syncRepState = MyProc->syncRepState; LWLockRelease(SyncRepLock); } if (syncRepState == SYNC_REP_WAIT_COMPLETE) break; /* * If a wait for synchronous replication is pending, we can neither * acknowledge the commit nor raise ERROR or FATAL. The latter would * lead the client to believe that that the transaction aborted, which * is not true: it's already committed locally. The former is no good * either: the client has requested synchronous replication, and is * entitled to assume that an acknowledged commit is also replicated, * which might not be true. So in this case we issue a WARNING (which * some clients may be able to interpret) and shut off further output. * We do NOT reset ProcDiePending, so that the process will die after * the commit is cleaned up. */ if (ProcDiePending) { ereport(WARNING, (errcode(ERRCODE_ADMIN_SHUTDOWN), errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"), errdetail("The transaction has already committed locally, but might not have been replicated to the standby."))); whereToSendOutput = DestNone; SyncRepCancelWait(); break; } /* * It's unclear what to do if a query cancel interrupt arrives. We * can't actually abort at this point, but ignoring the interrupt * altogether is not helpful, so we just terminate the wait with a * suitable warning. */ if (QueryCancelPending) { QueryCancelPending = false; ereport(WARNING, (errmsg("canceling wait for synchronous replication due to user request"), errdetail("The transaction has already committed locally, but might not have been replicated to the standby."))); SyncRepCancelWait(); break; } /* * If the postmaster dies, we'll probably never get an * acknowledgement, because all the wal sender processes will exit. So * just bail out. */ if (!PostmasterIsAlive()) { ProcDiePending = true; whereToSendOutput = DestNone; SyncRepCancelWait(); break; } /* * Wait on latch. Any condition that should wake us up will set the * latch, so no need for timeout. */ WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1); } /* * WalSender has checked our LSN and has removed us from queue. Clean up * state and leave. It's OK to reset these shared memory fields without * holding SyncRepLock, because any walsenders will ignore us anyway when * we're not on the queue. */ Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks))); MyProc->syncRepState = SYNC_REP_NOT_WAITING; MyProc->waitLSN = 0; if (new_status) { /* Reset ps display */ set_ps_display(new_status, false); pfree(new_status); } }
int SyncRepWakeQueue | ( | bool | all, | |
int | mode | |||
) |
Definition at line 527 of file syncrep.c.
References Assert, WalSndCtlData::lsn, NUM_SYNC_REP_WAIT_MODE, offsetof, PGPROC::procLatch, SetLatch(), SHMQueueDelete(), SHMQueueNext(), PGPROC::syncRepLinks, WalSndCtlData::SyncRepQueue, PGPROC::syncRepState, PGPROC::waitLSN, and WalSndCtl.
Referenced by SyncRepReleaseWaiters(), and SyncRepUpdateSyncStandbysDefined().
{ volatile WalSndCtlData *walsndctl = WalSndCtl; PGPROC *proc = NULL; PGPROC *thisproc = NULL; int numprocs = 0; Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE); Assert(SyncRepQueueIsOrderedByLSN(mode)); proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]), &(WalSndCtl->SyncRepQueue[mode]), offsetof(PGPROC, syncRepLinks)); while (proc) { /* * Assume the queue is ordered by LSN */ if (!all && walsndctl->lsn[mode] < proc->waitLSN) return numprocs; /* * Move to next proc, so we can delete thisproc from the queue. * thisproc is valid, proc may be NULL after this. */ thisproc = proc; proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]), &(proc->syncRepLinks), offsetof(PGPROC, syncRepLinks)); /* * Set state to complete; see SyncRepWaitForLSN() for discussion of * the various states. */ thisproc->syncRepState = SYNC_REP_WAIT_COMPLETE; /* * Remove thisproc from queue. */ SHMQueueDelete(&(thisproc->syncRepLinks)); /* * Wake only when we have set state and removed from queue. */ SetLatch(&(thisproc->procLatch)); numprocs++; } return numprocs; }
bool announce_next_takeover = true [static] |
Definition at line 65 of file syncrep.c.
Referenced by SyncRepReleaseWaiters().
char* SyncRepStandbyNames |
Definition at line 60 of file syncrep.c.
Referenced by SyncRepGetStandbyPriority().
int SyncRepWaitMode = SYNC_REP_NO_WAIT [static] |
Definition at line 67 of file syncrep.c.
Referenced by assign_synchronous_commit(), and SyncRepWaitForLSN().