00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044 #include "postgres.h"
00045
00046 #include <unistd.h>
00047
00048 #include "access/xact.h"
00049 #include "miscadmin.h"
00050 #include "replication/syncrep.h"
00051 #include "replication/walsender.h"
00052 #include "replication/walsender_private.h"
00053 #include "storage/pmsignal.h"
00054 #include "storage/proc.h"
00055 #include "tcop/tcopprot.h"
00056 #include "utils/builtins.h"
00057 #include "utils/ps_status.h"
00058
00059
00060 char *SyncRepStandbyNames;
00061
00062 #define SyncStandbysDefined() \
00063 (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
00064
00065 static bool announce_next_takeover = true;
00066
00067 static int SyncRepWaitMode = SYNC_REP_NO_WAIT;
00068
00069 static void SyncRepQueueInsert(int mode);
00070 static void SyncRepCancelWait(void);
00071
00072 static int SyncRepGetStandbyPriority(void);
00073
00074 #ifdef USE_ASSERT_CHECKING
00075 static bool SyncRepQueueIsOrderedByLSN(int mode);
00076 #endif
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093 void
00094 SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
00095 {
00096 char *new_status = NULL;
00097 const char *old_status;
00098 int mode = SyncRepWaitMode;
00099
00100
00101
00102
00103
00104
00105 if (!SyncRepRequested() || !SyncStandbysDefined())
00106 return;
00107
00108 Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
00109 Assert(WalSndCtl != NULL);
00110
00111 LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
00112 Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122 if (!WalSndCtl->sync_standbys_defined ||
00123 XactCommitLSN <= WalSndCtl->lsn[mode])
00124 {
00125 LWLockRelease(SyncRepLock);
00126 return;
00127 }
00128
00129
00130
00131
00132
00133 MyProc->waitLSN = XactCommitLSN;
00134 MyProc->syncRepState = SYNC_REP_WAITING;
00135 SyncRepQueueInsert(mode);
00136 Assert(SyncRepQueueIsOrderedByLSN(mode));
00137 LWLockRelease(SyncRepLock);
00138
00139
00140 if (update_process_title)
00141 {
00142 int len;
00143
00144 old_status = get_ps_display(&len);
00145 new_status = (char *) palloc(len + 32 + 1);
00146 memcpy(new_status, old_status, len);
00147 sprintf(new_status + len, " waiting for %X/%X",
00148 (uint32) (XactCommitLSN >> 32), (uint32) XactCommitLSN);
00149 set_ps_display(new_status, false);
00150 new_status[len] = '\0';
00151 }
00152
00153
00154
00155
00156
00157
00158
00159 for (;;)
00160 {
00161 int syncRepState;
00162
00163
00164 ResetLatch(&MyProc->procLatch);
00165
00166
00167
00168
00169
00170
00171
00172
00173
00174
00175
00176
00177
00178
00179
00180
00181 syncRepState = MyProc->syncRepState;
00182 if (syncRepState == SYNC_REP_WAITING)
00183 {
00184 LWLockAcquire(SyncRepLock, LW_SHARED);
00185 syncRepState = MyProc->syncRepState;
00186 LWLockRelease(SyncRepLock);
00187 }
00188 if (syncRepState == SYNC_REP_WAIT_COMPLETE)
00189 break;
00190
00191
00192
00193
00194
00195
00196
00197
00198
00199
00200
00201
00202
00203 if (ProcDiePending)
00204 {
00205 ereport(WARNING,
00206 (errcode(ERRCODE_ADMIN_SHUTDOWN),
00207 errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
00208 errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
00209 whereToSendOutput = DestNone;
00210 SyncRepCancelWait();
00211 break;
00212 }
00213
00214
00215
00216
00217
00218
00219
00220 if (QueryCancelPending)
00221 {
00222 QueryCancelPending = false;
00223 ereport(WARNING,
00224 (errmsg("canceling wait for synchronous replication due to user request"),
00225 errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
00226 SyncRepCancelWait();
00227 break;
00228 }
00229
00230
00231
00232
00233
00234
00235 if (!PostmasterIsAlive())
00236 {
00237 ProcDiePending = true;
00238 whereToSendOutput = DestNone;
00239 SyncRepCancelWait();
00240 break;
00241 }
00242
00243
00244
00245
00246
00247 WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1);
00248 }
00249
00250
00251
00252
00253
00254
00255
00256 Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
00257 MyProc->syncRepState = SYNC_REP_NOT_WAITING;
00258 MyProc->waitLSN = 0;
00259
00260 if (new_status)
00261 {
00262
00263 set_ps_display(new_status, false);
00264 pfree(new_status);
00265 }
00266 }
00267
00268
00269
00270
00271
00272
00273
00274 static void
00275 SyncRepQueueInsert(int mode)
00276 {
00277 PGPROC *proc;
00278
00279 Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
00280 proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
00281 &(WalSndCtl->SyncRepQueue[mode]),
00282 offsetof(PGPROC, syncRepLinks));
00283
00284 while (proc)
00285 {
00286
00287
00288
00289
00290 if (proc->waitLSN < MyProc->waitLSN)
00291 break;
00292
00293 proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
00294 &(proc->syncRepLinks),
00295 offsetof(PGPROC, syncRepLinks));
00296 }
00297
00298 if (proc)
00299 SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks));
00300 else
00301 SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue[mode]), &(MyProc->syncRepLinks));
00302 }
00303
00304
00305
00306
00307 static void
00308 SyncRepCancelWait(void)
00309 {
00310 LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
00311 if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
00312 SHMQueueDelete(&(MyProc->syncRepLinks));
00313 MyProc->syncRepState = SYNC_REP_NOT_WAITING;
00314 LWLockRelease(SyncRepLock);
00315 }
00316
00317 void
00318 SyncRepCleanupAtProcExit(void)
00319 {
00320 if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
00321 {
00322 LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
00323 SHMQueueDelete(&(MyProc->syncRepLinks));
00324 LWLockRelease(SyncRepLock);
00325 }
00326 }
00327
00328
00329
00330
00331
00332
00333
00334
00335
00336
00337
00338 void
00339 SyncRepInitConfig(void)
00340 {
00341 int priority;
00342
00343
00344
00345
00346
00347 priority = SyncRepGetStandbyPriority();
00348 if (MyWalSnd->sync_standby_priority != priority)
00349 {
00350 LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
00351 MyWalSnd->sync_standby_priority = priority;
00352 LWLockRelease(SyncRepLock);
00353 ereport(DEBUG1,
00354 (errmsg("standby \"%s\" now has synchronous standby priority %u",
00355 application_name, priority)));
00356 }
00357 }
00358
00359
00360
00361
00362
00363
00364
00365
00366 void
00367 SyncRepReleaseWaiters(void)
00368 {
00369 volatile WalSndCtlData *walsndctl = WalSndCtl;
00370 volatile WalSnd *syncWalSnd = NULL;
00371 int numwrite = 0;
00372 int numflush = 0;
00373 int priority = 0;
00374 int i;
00375
00376
00377
00378
00379
00380
00381
00382 if (MyWalSnd->sync_standby_priority == 0 ||
00383 MyWalSnd->state < WALSNDSTATE_STREAMING ||
00384 XLogRecPtrIsInvalid(MyWalSnd->flush))
00385 return;
00386
00387
00388
00389
00390
00391
00392
00393 LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
00394
00395 for (i = 0; i < max_wal_senders; i++)
00396 {
00397
00398 volatile WalSnd *walsnd = &walsndctl->walsnds[i];
00399
00400 if (walsnd->pid != 0 &&
00401 walsnd->state == WALSNDSTATE_STREAMING &&
00402 walsnd->sync_standby_priority > 0 &&
00403 (priority == 0 ||
00404 priority > walsnd->sync_standby_priority) &&
00405 !XLogRecPtrIsInvalid(walsnd->flush))
00406 {
00407 priority = walsnd->sync_standby_priority;
00408 syncWalSnd = walsnd;
00409 }
00410 }
00411
00412
00413
00414
00415 Assert(syncWalSnd);
00416
00417
00418
00419
00420 if (syncWalSnd != MyWalSnd)
00421 {
00422 LWLockRelease(SyncRepLock);
00423 announce_next_takeover = true;
00424 return;
00425 }
00426
00427
00428
00429
00430
00431 if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < MyWalSnd->write)
00432 {
00433 walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write;
00434 numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
00435 }
00436 if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < MyWalSnd->flush)
00437 {
00438 walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush;
00439 numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
00440 }
00441
00442 LWLockRelease(SyncRepLock);
00443
00444 elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X",
00445 numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write,
00446 numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush);
00447
00448
00449
00450
00451
00452 if (announce_next_takeover)
00453 {
00454 announce_next_takeover = false;
00455 ereport(LOG,
00456 (errmsg("standby \"%s\" is now the synchronous standby with priority %u",
00457 application_name, MyWalSnd->sync_standby_priority)));
00458 }
00459 }
00460
00461
00462
00463
00464
00465
00466
00467
00468
00469 static int
00470 SyncRepGetStandbyPriority(void)
00471 {
00472 char *rawstring;
00473 List *elemlist;
00474 ListCell *l;
00475 int priority = 0;
00476 bool found = false;
00477
00478
00479
00480
00481
00482 if (am_cascading_walsender)
00483 return 0;
00484
00485
00486 rawstring = pstrdup(SyncRepStandbyNames);
00487
00488
00489 if (!SplitIdentifierString(rawstring, ',', &elemlist))
00490 {
00491
00492 pfree(rawstring);
00493 list_free(elemlist);
00494
00495 return 0;
00496 }
00497
00498 foreach(l, elemlist)
00499 {
00500 char *standby_name = (char *) lfirst(l);
00501
00502 priority++;
00503
00504 if (pg_strcasecmp(standby_name, application_name) == 0 ||
00505 pg_strcasecmp(standby_name, "*") == 0)
00506 {
00507 found = true;
00508 break;
00509 }
00510 }
00511
00512 pfree(rawstring);
00513 list_free(elemlist);
00514
00515 return (found ? priority : 0);
00516 }
00517
00518
00519
00520
00521
00522
00523
00524
00525
00526 int
00527 SyncRepWakeQueue(bool all, int mode)
00528 {
00529 volatile WalSndCtlData *walsndctl = WalSndCtl;
00530 PGPROC *proc = NULL;
00531 PGPROC *thisproc = NULL;
00532 int numprocs = 0;
00533
00534 Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
00535 Assert(SyncRepQueueIsOrderedByLSN(mode));
00536
00537 proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
00538 &(WalSndCtl->SyncRepQueue[mode]),
00539 offsetof(PGPROC, syncRepLinks));
00540
00541 while (proc)
00542 {
00543
00544
00545
00546 if (!all && walsndctl->lsn[mode] < proc->waitLSN)
00547 return numprocs;
00548
00549
00550
00551
00552
00553 thisproc = proc;
00554 proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
00555 &(proc->syncRepLinks),
00556 offsetof(PGPROC, syncRepLinks));
00557
00558
00559
00560
00561
00562 thisproc->syncRepState = SYNC_REP_WAIT_COMPLETE;
00563
00564
00565
00566
00567 SHMQueueDelete(&(thisproc->syncRepLinks));
00568
00569
00570
00571
00572 SetLatch(&(thisproc->procLatch));
00573
00574 numprocs++;
00575 }
00576
00577 return numprocs;
00578 }
00579
00580
00581
00582
00583
00584
00585
00586
00587 void
00588 SyncRepUpdateSyncStandbysDefined(void)
00589 {
00590 bool sync_standbys_defined = SyncStandbysDefined();
00591
00592 if (sync_standbys_defined != WalSndCtl->sync_standbys_defined)
00593 {
00594 LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
00595
00596
00597
00598
00599
00600
00601 if (!sync_standbys_defined)
00602 {
00603 int i;
00604
00605 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
00606 SyncRepWakeQueue(true, i);
00607 }
00608
00609
00610
00611
00612
00613
00614
00615
00616 WalSndCtl->sync_standbys_defined = sync_standbys_defined;
00617
00618 LWLockRelease(SyncRepLock);
00619 }
00620 }
00621
00622 #ifdef USE_ASSERT_CHECKING
00623 static bool
00624 SyncRepQueueIsOrderedByLSN(int mode)
00625 {
00626 PGPROC *proc = NULL;
00627 XLogRecPtr lastLSN;
00628
00629 Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
00630
00631 lastLSN = 0;
00632
00633 proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
00634 &(WalSndCtl->SyncRepQueue[mode]),
00635 offsetof(PGPROC, syncRepLinks));
00636
00637 while (proc)
00638 {
00639
00640
00641
00642
00643 if (proc->waitLSN <= lastLSN)
00644 return false;
00645
00646 lastLSN = proc->waitLSN;
00647
00648 proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
00649 &(proc->syncRepLinks),
00650 offsetof(PGPROC, syncRepLinks));
00651 }
00652
00653 return true;
00654 }
00655 #endif
00656
00657
00658
00659
00660
00661
00662
00663 bool
00664 check_synchronous_standby_names(char **newval, void **extra, GucSource source)
00665 {
00666 char *rawstring;
00667 List *elemlist;
00668
00669
00670 rawstring = pstrdup(*newval);
00671
00672
00673 if (!SplitIdentifierString(rawstring, ',', &elemlist))
00674 {
00675
00676 GUC_check_errdetail("List syntax is invalid.");
00677 pfree(rawstring);
00678 list_free(elemlist);
00679 return false;
00680 }
00681
00682
00683
00684
00685
00686
00687
00688
00689
00690 pfree(rawstring);
00691 list_free(elemlist);
00692
00693 return true;
00694 }
00695
00696 void
00697 assign_synchronous_commit(int newval, void *extra)
00698 {
00699 switch (newval)
00700 {
00701 case SYNCHRONOUS_COMMIT_REMOTE_WRITE:
00702 SyncRepWaitMode = SYNC_REP_WAIT_WRITE;
00703 break;
00704 case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
00705 SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
00706 break;
00707 default:
00708 SyncRepWaitMode = SYNC_REP_NO_WAIT;
00709 break;
00710 }
00711 }