Header And Logo

PostgreSQL
| The world's most advanced open source database.

syncrep.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * syncrep.c
00004  *
00005  * Synchronous replication is new as of PostgreSQL 9.1.
00006  *
00007  * If requested, transaction commits wait until their commit LSN is
00008  * acknowledged by the sync standby.
00009  *
00010  * This module contains the code for waiting and release of backends.
00011  * All code in this module executes on the primary. The core streaming
00012  * replication transport remains within WALreceiver/WALsender modules.
00013  *
00014  * The essence of this design is that it isolates all logic about
00015  * waiting/releasing onto the primary. The primary defines which standbys
00016  * it wishes to wait for. The standby is completely unaware of the
00017  * durability requirements of transactions on the primary, reducing the
00018  * complexity of the code and streamlining both standby operations and
00019  * network bandwidth because there is no requirement to ship
00020  * per-transaction state information.
00021  *
00022  * Replication is either synchronous or not synchronous (async). If it is
00023  * async, we just fastpath out of here. If it is sync, then we wait for
00024  * the write or flush location on the standby before releasing the waiting backend.
00025  * Further complexity in that interaction is expected in later releases.
00026  *
00027  * The best performing way to manage the waiting backends is to have a
00028  * single ordered queue of waiting backends, so that we can avoid
00029  * searching the through all waiters each time we receive a reply.
00030  *
00031  * In 9.1 we support only a single synchronous standby, chosen from a
00032  * priority list of synchronous_standby_names. Before it can become the
00033  * synchronous standby it must have caught up with the primary; that may
00034  * take some time. Once caught up, the current highest priority standby
00035  * will release waiters from the queue.
00036  *
00037  * Portions Copyright (c) 2010-2013, PostgreSQL Global Development Group
00038  *
00039  * IDENTIFICATION
00040  *    src/backend/replication/syncrep.c
00041  *
00042  *-------------------------------------------------------------------------
00043  */
00044 #include "postgres.h"
00045 
00046 #include <unistd.h>
00047 
00048 #include "access/xact.h"
00049 #include "miscadmin.h"
00050 #include "replication/syncrep.h"
00051 #include "replication/walsender.h"
00052 #include "replication/walsender_private.h"
00053 #include "storage/pmsignal.h"
00054 #include "storage/proc.h"
00055 #include "tcop/tcopprot.h"
00056 #include "utils/builtins.h"
00057 #include "utils/ps_status.h"
00058 
00059 /* User-settable parameters for sync rep */
00060 char       *SyncRepStandbyNames;
00061 
00062 #define SyncStandbysDefined() \
00063     (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
00064 
00065 static bool announce_next_takeover = true;
00066 
00067 static int  SyncRepWaitMode = SYNC_REP_NO_WAIT;
00068 
00069 static void SyncRepQueueInsert(int mode);
00070 static void SyncRepCancelWait(void);
00071 
00072 static int  SyncRepGetStandbyPriority(void);
00073 
00074 #ifdef USE_ASSERT_CHECKING
00075 static bool SyncRepQueueIsOrderedByLSN(int mode);
00076 #endif
00077 
00078 /*
00079  * ===========================================================
00080  * Synchronous Replication functions for normal user backends
00081  * ===========================================================
00082  */
00083 
00084 /*
00085  * Wait for synchronous replication, if requested by user.
00086  *
00087  * Initially backends start in state SYNC_REP_NOT_WAITING and then
00088  * change that state to SYNC_REP_WAITING before adding ourselves
00089  * to the wait queue. During SyncRepWakeQueue() a WALSender changes
00090  * the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
00091  * This backend then resets its state to SYNC_REP_NOT_WAITING.
00092  */
00093 void
00094 SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
00095 {
00096     char       *new_status = NULL;
00097     const char *old_status;
00098     int         mode = SyncRepWaitMode;
00099 
00100     /*
00101      * Fast exit if user has not requested sync replication, or there are no
00102      * sync replication standby names defined. Note that those standbys don't
00103      * need to be connected.
00104      */
00105     if (!SyncRepRequested() || !SyncStandbysDefined())
00106         return;
00107 
00108     Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
00109     Assert(WalSndCtl != NULL);
00110 
00111     LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
00112     Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);
00113 
00114     /*
00115      * We don't wait for sync rep if WalSndCtl->sync_standbys_defined is not
00116      * set.  See SyncRepUpdateSyncStandbysDefined.
00117      *
00118      * Also check that the standby hasn't already replied. Unlikely race
00119      * condition but we'll be fetching that cache line anyway so its likely to
00120      * be a low cost check.
00121      */
00122     if (!WalSndCtl->sync_standbys_defined ||
00123         XactCommitLSN <= WalSndCtl->lsn[mode])
00124     {
00125         LWLockRelease(SyncRepLock);
00126         return;
00127     }
00128 
00129     /*
00130      * Set our waitLSN so WALSender will know when to wake us, and add
00131      * ourselves to the queue.
00132      */
00133     MyProc->waitLSN = XactCommitLSN;
00134     MyProc->syncRepState = SYNC_REP_WAITING;
00135     SyncRepQueueInsert(mode);
00136     Assert(SyncRepQueueIsOrderedByLSN(mode));
00137     LWLockRelease(SyncRepLock);
00138 
00139     /* Alter ps display to show waiting for sync rep. */
00140     if (update_process_title)
00141     {
00142         int         len;
00143 
00144         old_status = get_ps_display(&len);
00145         new_status = (char *) palloc(len + 32 + 1);
00146         memcpy(new_status, old_status, len);
00147         sprintf(new_status + len, " waiting for %X/%X",
00148                 (uint32) (XactCommitLSN >> 32), (uint32) XactCommitLSN);
00149         set_ps_display(new_status, false);
00150         new_status[len] = '\0'; /* truncate off " waiting ..." */
00151     }
00152 
00153     /*
00154      * Wait for specified LSN to be confirmed.
00155      *
00156      * Each proc has its own wait latch, so we perform a normal latch
00157      * check/wait loop here.
00158      */
00159     for (;;)
00160     {
00161         int         syncRepState;
00162 
00163         /* Must reset the latch before testing state. */
00164         ResetLatch(&MyProc->procLatch);
00165 
00166         /*
00167          * Try checking the state without the lock first.  There's no
00168          * guarantee that we'll read the most up-to-date value, so if it looks
00169          * like we're still waiting, recheck while holding the lock.  But if
00170          * it looks like we're done, we must really be done, because once
00171          * walsender changes the state to SYNC_REP_WAIT_COMPLETE, it will
00172          * never update it again, so we can't be seeing a stale value in that
00173          * case.
00174          *
00175          * Note: on machines with weak memory ordering, the acquisition of the
00176          * lock is essential to avoid race conditions: we cannot be sure the
00177          * sender's state update has reached main memory until we acquire the
00178          * lock.  We could get rid of this dance if SetLatch/ResetLatch
00179          * contained memory barriers.
00180          */
00181         syncRepState = MyProc->syncRepState;
00182         if (syncRepState == SYNC_REP_WAITING)
00183         {
00184             LWLockAcquire(SyncRepLock, LW_SHARED);
00185             syncRepState = MyProc->syncRepState;
00186             LWLockRelease(SyncRepLock);
00187         }
00188         if (syncRepState == SYNC_REP_WAIT_COMPLETE)
00189             break;
00190 
00191         /*
00192          * If a wait for synchronous replication is pending, we can neither
00193          * acknowledge the commit nor raise ERROR or FATAL.  The latter would
00194          * lead the client to believe that that the transaction aborted, which
00195          * is not true: it's already committed locally. The former is no good
00196          * either: the client has requested synchronous replication, and is
00197          * entitled to assume that an acknowledged commit is also replicated,
00198          * which might not be true. So in this case we issue a WARNING (which
00199          * some clients may be able to interpret) and shut off further output.
00200          * We do NOT reset ProcDiePending, so that the process will die after
00201          * the commit is cleaned up.
00202          */
00203         if (ProcDiePending)
00204         {
00205             ereport(WARNING,
00206                     (errcode(ERRCODE_ADMIN_SHUTDOWN),
00207                      errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
00208                      errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
00209             whereToSendOutput = DestNone;
00210             SyncRepCancelWait();
00211             break;
00212         }
00213 
00214         /*
00215          * It's unclear what to do if a query cancel interrupt arrives.  We
00216          * can't actually abort at this point, but ignoring the interrupt
00217          * altogether is not helpful, so we just terminate the wait with a
00218          * suitable warning.
00219          */
00220         if (QueryCancelPending)
00221         {
00222             QueryCancelPending = false;
00223             ereport(WARNING,
00224                     (errmsg("canceling wait for synchronous replication due to user request"),
00225                      errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
00226             SyncRepCancelWait();
00227             break;
00228         }
00229 
00230         /*
00231          * If the postmaster dies, we'll probably never get an
00232          * acknowledgement, because all the wal sender processes will exit. So
00233          * just bail out.
00234          */
00235         if (!PostmasterIsAlive())
00236         {
00237             ProcDiePending = true;
00238             whereToSendOutput = DestNone;
00239             SyncRepCancelWait();
00240             break;
00241         }
00242 
00243         /*
00244          * Wait on latch.  Any condition that should wake us up will set the
00245          * latch, so no need for timeout.
00246          */
00247         WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1);
00248     }
00249 
00250     /*
00251      * WalSender has checked our LSN and has removed us from queue. Clean up
00252      * state and leave.  It's OK to reset these shared memory fields without
00253      * holding SyncRepLock, because any walsenders will ignore us anyway when
00254      * we're not on the queue.
00255      */
00256     Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
00257     MyProc->syncRepState = SYNC_REP_NOT_WAITING;
00258     MyProc->waitLSN = 0;
00259 
00260     if (new_status)
00261     {
00262         /* Reset ps display */
00263         set_ps_display(new_status, false);
00264         pfree(new_status);
00265     }
00266 }
00267 
00268 /*
00269  * Insert MyProc into the specified SyncRepQueue, maintaining sorted invariant.
00270  *
00271  * Usually we will go at tail of queue, though it's possible that we arrive
00272  * here out of order, so start at tail and work back to insertion point.
00273  */
00274 static void
00275 SyncRepQueueInsert(int mode)
00276 {
00277     PGPROC     *proc;
00278 
00279     Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
00280     proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
00281                                    &(WalSndCtl->SyncRepQueue[mode]),
00282                                    offsetof(PGPROC, syncRepLinks));
00283 
00284     while (proc)
00285     {
00286         /*
00287          * Stop at the queue element that we should after to ensure the queue
00288          * is ordered by LSN.
00289          */
00290         if (proc->waitLSN < MyProc->waitLSN)
00291             break;
00292 
00293         proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
00294                                        &(proc->syncRepLinks),
00295                                        offsetof(PGPROC, syncRepLinks));
00296     }
00297 
00298     if (proc)
00299         SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks));
00300     else
00301         SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue[mode]), &(MyProc->syncRepLinks));
00302 }
00303 
00304 /*
00305  * Acquire SyncRepLock and cancel any wait currently in progress.
00306  */
00307 static void
00308 SyncRepCancelWait(void)
00309 {
00310     LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
00311     if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
00312         SHMQueueDelete(&(MyProc->syncRepLinks));
00313     MyProc->syncRepState = SYNC_REP_NOT_WAITING;
00314     LWLockRelease(SyncRepLock);
00315 }
00316 
00317 void
00318 SyncRepCleanupAtProcExit(void)
00319 {
00320     if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
00321     {
00322         LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
00323         SHMQueueDelete(&(MyProc->syncRepLinks));
00324         LWLockRelease(SyncRepLock);
00325     }
00326 }
00327 
00328 /*
00329  * ===========================================================
00330  * Synchronous Replication functions for wal sender processes
00331  * ===========================================================
00332  */
00333 
00334 /*
00335  * Take any action required to initialise sync rep state from config
00336  * data. Called at WALSender startup and after each SIGHUP.
00337  */
00338 void
00339 SyncRepInitConfig(void)
00340 {
00341     int         priority;
00342 
00343     /*
00344      * Determine if we are a potential sync standby and remember the result
00345      * for handling replies from standby.
00346      */
00347     priority = SyncRepGetStandbyPriority();
00348     if (MyWalSnd->sync_standby_priority != priority)
00349     {
00350         LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
00351         MyWalSnd->sync_standby_priority = priority;
00352         LWLockRelease(SyncRepLock);
00353         ereport(DEBUG1,
00354             (errmsg("standby \"%s\" now has synchronous standby priority %u",
00355                     application_name, priority)));
00356     }
00357 }
00358 
00359 /*
00360  * Update the LSNs on each queue based upon our latest state. This
00361  * implements a simple policy of first-valid-standby-releases-waiter.
00362  *
00363  * Other policies are possible, which would change what we do here and what
00364  * perhaps also which information we store as well.
00365  */
00366 void
00367 SyncRepReleaseWaiters(void)
00368 {
00369     volatile WalSndCtlData *walsndctl = WalSndCtl;
00370     volatile WalSnd *syncWalSnd = NULL;
00371     int         numwrite = 0;
00372     int         numflush = 0;
00373     int         priority = 0;
00374     int         i;
00375 
00376     /*
00377      * If this WALSender is serving a standby that is not on the list of
00378      * potential standbys then we have nothing to do. If we are still starting
00379      * up, still running base backup or the current flush position is still
00380      * invalid, then leave quickly also.
00381      */
00382     if (MyWalSnd->sync_standby_priority == 0 ||
00383         MyWalSnd->state < WALSNDSTATE_STREAMING ||
00384         XLogRecPtrIsInvalid(MyWalSnd->flush))
00385         return;
00386 
00387     /*
00388      * We're a potential sync standby. Release waiters if we are the highest
00389      * priority standby. If there are multiple standbys with same priorities
00390      * then we use the first mentioned standby. If you change this, also
00391      * change pg_stat_get_wal_senders().
00392      */
00393     LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
00394 
00395     for (i = 0; i < max_wal_senders; i++)
00396     {
00397         /* use volatile pointer to prevent code rearrangement */
00398         volatile WalSnd *walsnd = &walsndctl->walsnds[i];
00399 
00400         if (walsnd->pid != 0 &&
00401             walsnd->state == WALSNDSTATE_STREAMING &&
00402             walsnd->sync_standby_priority > 0 &&
00403             (priority == 0 ||
00404              priority > walsnd->sync_standby_priority) &&
00405             !XLogRecPtrIsInvalid(walsnd->flush))
00406         {
00407             priority = walsnd->sync_standby_priority;
00408             syncWalSnd = walsnd;
00409         }
00410     }
00411 
00412     /*
00413      * We should have found ourselves at least.
00414      */
00415     Assert(syncWalSnd);
00416 
00417     /*
00418      * If we aren't managing the highest priority standby then just leave.
00419      */
00420     if (syncWalSnd != MyWalSnd)
00421     {
00422         LWLockRelease(SyncRepLock);
00423         announce_next_takeover = true;
00424         return;
00425     }
00426 
00427     /*
00428      * Set the lsn first so that when we wake backends they will release up to
00429      * this location.
00430      */
00431     if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < MyWalSnd->write)
00432     {
00433         walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write;
00434         numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
00435     }
00436     if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < MyWalSnd->flush)
00437     {
00438         walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush;
00439         numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
00440     }
00441 
00442     LWLockRelease(SyncRepLock);
00443 
00444     elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X",
00445          numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write,
00446          numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush);
00447 
00448     /*
00449      * If we are managing the highest priority standby, though we weren't
00450      * prior to this, then announce we are now the sync standby.
00451      */
00452     if (announce_next_takeover)
00453     {
00454         announce_next_takeover = false;
00455         ereport(LOG,
00456                 (errmsg("standby \"%s\" is now the synchronous standby with priority %u",
00457                         application_name, MyWalSnd->sync_standby_priority)));
00458     }
00459 }
00460 
00461 /*
00462  * Check if we are in the list of sync standbys, and if so, determine
00463  * priority sequence. Return priority if set, or zero to indicate that
00464  * we are not a potential sync standby.
00465  *
00466  * Compare the parameter SyncRepStandbyNames against the application_name
00467  * for this WALSender, or allow any name if we find a wildcard "*".
00468  */
00469 static int
00470 SyncRepGetStandbyPriority(void)
00471 {
00472     char       *rawstring;
00473     List       *elemlist;
00474     ListCell   *l;
00475     int         priority = 0;
00476     bool        found = false;
00477 
00478     /*
00479      * Since synchronous cascade replication is not allowed, we always set the
00480      * priority of cascading walsender to zero.
00481      */
00482     if (am_cascading_walsender)
00483         return 0;
00484 
00485     /* Need a modifiable copy of string */
00486     rawstring = pstrdup(SyncRepStandbyNames);
00487 
00488     /* Parse string into list of identifiers */
00489     if (!SplitIdentifierString(rawstring, ',', &elemlist))
00490     {
00491         /* syntax error in list */
00492         pfree(rawstring);
00493         list_free(elemlist);
00494         /* GUC machinery will have already complained - no need to do again */
00495         return 0;
00496     }
00497 
00498     foreach(l, elemlist)
00499     {
00500         char       *standby_name = (char *) lfirst(l);
00501 
00502         priority++;
00503 
00504         if (pg_strcasecmp(standby_name, application_name) == 0 ||
00505             pg_strcasecmp(standby_name, "*") == 0)
00506         {
00507             found = true;
00508             break;
00509         }
00510     }
00511 
00512     pfree(rawstring);
00513     list_free(elemlist);
00514 
00515     return (found ? priority : 0);
00516 }
00517 
00518 /*
00519  * Walk the specified queue from head.  Set the state of any backends that
00520  * need to be woken, remove them from the queue, and then wake them.
00521  * Pass all = true to wake whole queue; otherwise, just wake up to
00522  * the walsender's LSN.
00523  *
00524  * Must hold SyncRepLock.
00525  */
00526 int
00527 SyncRepWakeQueue(bool all, int mode)
00528 {
00529     volatile WalSndCtlData *walsndctl = WalSndCtl;
00530     PGPROC     *proc = NULL;
00531     PGPROC     *thisproc = NULL;
00532     int         numprocs = 0;
00533 
00534     Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
00535     Assert(SyncRepQueueIsOrderedByLSN(mode));
00536 
00537     proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
00538                                    &(WalSndCtl->SyncRepQueue[mode]),
00539                                    offsetof(PGPROC, syncRepLinks));
00540 
00541     while (proc)
00542     {
00543         /*
00544          * Assume the queue is ordered by LSN
00545          */
00546         if (!all && walsndctl->lsn[mode] < proc->waitLSN)
00547             return numprocs;
00548 
00549         /*
00550          * Move to next proc, so we can delete thisproc from the queue.
00551          * thisproc is valid, proc may be NULL after this.
00552          */
00553         thisproc = proc;
00554         proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
00555                                        &(proc->syncRepLinks),
00556                                        offsetof(PGPROC, syncRepLinks));
00557 
00558         /*
00559          * Set state to complete; see SyncRepWaitForLSN() for discussion of
00560          * the various states.
00561          */
00562         thisproc->syncRepState = SYNC_REP_WAIT_COMPLETE;
00563 
00564         /*
00565          * Remove thisproc from queue.
00566          */
00567         SHMQueueDelete(&(thisproc->syncRepLinks));
00568 
00569         /*
00570          * Wake only when we have set state and removed from queue.
00571          */
00572         SetLatch(&(thisproc->procLatch));
00573 
00574         numprocs++;
00575     }
00576 
00577     return numprocs;
00578 }
00579 
00580 /*
00581  * The checkpointer calls this as needed to update the shared
00582  * sync_standbys_defined flag, so that backends don't remain permanently wedged
00583  * if synchronous_standby_names is unset.  It's safe to check the current value
00584  * without the lock, because it's only ever updated by one process.  But we
00585  * must take the lock to change it.
00586  */
00587 void
00588 SyncRepUpdateSyncStandbysDefined(void)
00589 {
00590     bool        sync_standbys_defined = SyncStandbysDefined();
00591 
00592     if (sync_standbys_defined != WalSndCtl->sync_standbys_defined)
00593     {
00594         LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
00595 
00596         /*
00597          * If synchronous_standby_names has been reset to empty, it's futile
00598          * for backends to continue to waiting.  Since the user no longer
00599          * wants synchronous replication, we'd better wake them up.
00600          */
00601         if (!sync_standbys_defined)
00602         {
00603             int         i;
00604 
00605             for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
00606                 SyncRepWakeQueue(true, i);
00607         }
00608 
00609         /*
00610          * Only allow people to join the queue when there are synchronous
00611          * standbys defined.  Without this interlock, there's a race
00612          * condition: we might wake up all the current waiters; then, some
00613          * backend that hasn't yet reloaded its config might go to sleep on
00614          * the queue (and never wake up).  This prevents that.
00615          */
00616         WalSndCtl->sync_standbys_defined = sync_standbys_defined;
00617 
00618         LWLockRelease(SyncRepLock);
00619     }
00620 }
00621 
00622 #ifdef USE_ASSERT_CHECKING
00623 static bool
00624 SyncRepQueueIsOrderedByLSN(int mode)
00625 {
00626     PGPROC     *proc = NULL;
00627     XLogRecPtr  lastLSN;
00628 
00629     Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
00630 
00631     lastLSN = 0;
00632 
00633     proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
00634                                    &(WalSndCtl->SyncRepQueue[mode]),
00635                                    offsetof(PGPROC, syncRepLinks));
00636 
00637     while (proc)
00638     {
00639         /*
00640          * Check the queue is ordered by LSN and that multiple procs don't
00641          * have matching LSNs
00642          */
00643         if (proc->waitLSN <= lastLSN)
00644             return false;
00645 
00646         lastLSN = proc->waitLSN;
00647 
00648         proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
00649                                        &(proc->syncRepLinks),
00650                                        offsetof(PGPROC, syncRepLinks));
00651     }
00652 
00653     return true;
00654 }
00655 #endif
00656 
00657 /*
00658  * ===========================================================
00659  * Synchronous Replication functions executed by any process
00660  * ===========================================================
00661  */
00662 
00663 bool
00664 check_synchronous_standby_names(char **newval, void **extra, GucSource source)
00665 {
00666     char       *rawstring;
00667     List       *elemlist;
00668 
00669     /* Need a modifiable copy of string */
00670     rawstring = pstrdup(*newval);
00671 
00672     /* Parse string into list of identifiers */
00673     if (!SplitIdentifierString(rawstring, ',', &elemlist))
00674     {
00675         /* syntax error in list */
00676         GUC_check_errdetail("List syntax is invalid.");
00677         pfree(rawstring);
00678         list_free(elemlist);
00679         return false;
00680     }
00681 
00682     /*
00683      * Any additional validation of standby names should go here.
00684      *
00685      * Don't attempt to set WALSender priority because this is executed by
00686      * postmaster at startup, not WALSender, so the application_name is not
00687      * yet correctly set.
00688      */
00689 
00690     pfree(rawstring);
00691     list_free(elemlist);
00692 
00693     return true;
00694 }
00695 
00696 void
00697 assign_synchronous_commit(int newval, void *extra)
00698 {
00699     switch (newval)
00700     {
00701         case SYNCHRONOUS_COMMIT_REMOTE_WRITE:
00702             SyncRepWaitMode = SYNC_REP_WAIT_WRITE;
00703             break;
00704         case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
00705             SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
00706             break;
00707         default:
00708             SyncRepWaitMode = SYNC_REP_NO_WAIT;
00709             break;
00710     }
00711 }