00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040 #include "postgres.h"
00041
00042 #include <signal.h>
00043 #include <unistd.h>
00044
00045 #include "access/timeline.h"
00046 #include "access/transam.h"
00047 #include "access/xlog_internal.h"
00048 #include "catalog/pg_type.h"
00049 #include "funcapi.h"
00050 #include "libpq/libpq.h"
00051 #include "libpq/pqformat.h"
00052 #include "miscadmin.h"
00053 #include "nodes/replnodes.h"
00054 #include "replication/basebackup.h"
00055 #include "replication/syncrep.h"
00056 #include "replication/walreceiver.h"
00057 #include "replication/walsender.h"
00058 #include "replication/walsender_private.h"
00059 #include "storage/fd.h"
00060 #include "storage/ipc.h"
00061 #include "storage/pmsignal.h"
00062 #include "storage/proc.h"
00063 #include "storage/procarray.h"
00064 #include "tcop/tcopprot.h"
00065 #include "utils/builtins.h"
00066 #include "utils/guc.h"
00067 #include "utils/memutils.h"
00068 #include "utils/ps_status.h"
00069 #include "utils/resowner.h"
00070 #include "utils/timeout.h"
00071 #include "utils/timestamp.h"
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082 #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
00083
00084
00085 WalSndCtlData *WalSndCtl = NULL;
00086
00087
00088 WalSnd *MyWalSnd = NULL;
00089
00090
00091 bool am_walsender = false;
00092 bool am_cascading_walsender = false;
00093
00094
00095
00096 int max_wal_senders = 0;
00097 int wal_sender_timeout = 60 * 1000;
00098
00099
00100
00101
00102 bool wake_wal_senders = false;
00103
00104
00105
00106
00107
00108 static int sendFile = -1;
00109 static XLogSegNo sendSegNo = 0;
00110 static uint32 sendOff = 0;
00111
00112
00113 static TimeLineID curFileTimeLine = 0;
00114
00115
00116
00117
00118
00119
00120
00121 static TimeLineID sendTimeLine = 0;
00122 static TimeLineID sendTimeLineNextTLI = 0;
00123 static bool sendTimeLineIsHistoric = false;
00124 static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr;
00125
00126
00127
00128
00129
00130 static XLogRecPtr sentPtr = 0;
00131
00132
00133 static StringInfoData output_message;
00134 static StringInfoData reply_message;
00135 static StringInfoData tmpbuf;
00136
00137
00138
00139
00140 static TimestampTz last_reply_timestamp;
00141
00142 static bool ping_sent = false;
00143
00144
00145
00146
00147
00148
00149
00150 static bool streamingDoneSending;
00151 static bool streamingDoneReceiving;
00152
00153
00154 static volatile sig_atomic_t got_SIGHUP = false;
00155 static volatile sig_atomic_t walsender_ready_to_stop = false;
00156
00157
00158
00159
00160
00161
00162
00163 static volatile sig_atomic_t replication_active = false;
00164
00165
00166 static void WalSndSigHupHandler(SIGNAL_ARGS);
00167 static void WalSndXLogSendHandler(SIGNAL_ARGS);
00168 static void WalSndLastCycleHandler(SIGNAL_ARGS);
00169
00170
00171 static void WalSndLoop(void);
00172 static void InitWalSenderSlot(void);
00173 static void WalSndKill(int code, Datum arg);
00174 static void XLogSend(bool *caughtup);
00175 static XLogRecPtr GetStandbyFlushRecPtr(void);
00176 static void IdentifySystem(void);
00177 static void StartReplication(StartReplicationCmd *cmd);
00178 static void ProcessStandbyMessage(void);
00179 static void ProcessStandbyReplyMessage(void);
00180 static void ProcessStandbyHSFeedbackMessage(void);
00181 static void ProcessRepliesIfAny(void);
00182 static void WalSndKeepalive(bool requestReply);
00183
00184
00185
00186 void
00187 InitWalSender(void)
00188 {
00189 am_cascading_walsender = RecoveryInProgress();
00190
00191
00192 InitWalSenderSlot();
00193
00194
00195 CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner");
00196
00197
00198
00199
00200
00201
00202
00203
00204 MarkPostmasterChildWalSender();
00205 SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
00206 }
00207
00208
00209
00210
00211
00212
00213
00214
00215 void
00216 WalSndErrorCleanup()
00217 {
00218 if (sendFile >= 0)
00219 {
00220 close(sendFile);
00221 sendFile = -1;
00222 }
00223
00224 replication_active = false;
00225 if (walsender_ready_to_stop)
00226 proc_exit(0);
00227
00228
00229 WalSndSetState(WALSNDSTATE_STARTUP);
00230 }
00231
00232
00233
00234
00235 static void
00236 IdentifySystem(void)
00237 {
00238 StringInfoData buf;
00239 char sysid[32];
00240 char tli[11];
00241 char xpos[MAXFNAMELEN];
00242 XLogRecPtr logptr;
00243
00244
00245
00246
00247
00248
00249 snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
00250 GetSystemIdentifier());
00251
00252 am_cascading_walsender = RecoveryInProgress();
00253 if (am_cascading_walsender)
00254 {
00255
00256 logptr = GetStandbyFlushRecPtr();
00257 }
00258 else
00259 logptr = GetInsertRecPtr();
00260
00261 snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
00262
00263 snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
00264
00265
00266 pq_beginmessage(&buf, 'T');
00267 pq_sendint(&buf, 3, 2);
00268
00269
00270 pq_sendstring(&buf, "systemid");
00271 pq_sendint(&buf, 0, 4);
00272 pq_sendint(&buf, 0, 2);
00273 pq_sendint(&buf, TEXTOID, 4);
00274 pq_sendint(&buf, -1, 2);
00275 pq_sendint(&buf, 0, 4);
00276 pq_sendint(&buf, 0, 2);
00277
00278
00279 pq_sendstring(&buf, "timeline");
00280 pq_sendint(&buf, 0, 4);
00281 pq_sendint(&buf, 0, 2);
00282 pq_sendint(&buf, INT4OID, 4);
00283 pq_sendint(&buf, 4, 2);
00284 pq_sendint(&buf, 0, 4);
00285 pq_sendint(&buf, 0, 2);
00286
00287
00288 pq_sendstring(&buf, "xlogpos");
00289 pq_sendint(&buf, 0, 4);
00290 pq_sendint(&buf, 0, 2);
00291 pq_sendint(&buf, TEXTOID, 4);
00292 pq_sendint(&buf, -1, 2);
00293 pq_sendint(&buf, 0, 4);
00294 pq_sendint(&buf, 0, 2);
00295 pq_endmessage(&buf);
00296
00297
00298 pq_beginmessage(&buf, 'D');
00299 pq_sendint(&buf, 3, 2);
00300 pq_sendint(&buf, strlen(sysid), 4);
00301 pq_sendbytes(&buf, (char *) &sysid, strlen(sysid));
00302 pq_sendint(&buf, strlen(tli), 4);
00303 pq_sendbytes(&buf, (char *) tli, strlen(tli));
00304 pq_sendint(&buf, strlen(xpos), 4);
00305 pq_sendbytes(&buf, (char *) xpos, strlen(xpos));
00306
00307 pq_endmessage(&buf);
00308 }
00309
00310
00311
00312
00313
00314 static void
00315 SendTimeLineHistory(TimeLineHistoryCmd *cmd)
00316 {
00317 StringInfoData buf;
00318 char histfname[MAXFNAMELEN];
00319 char path[MAXPGPATH];
00320 int fd;
00321 off_t histfilelen;
00322 off_t bytesleft;
00323
00324
00325
00326
00327
00328
00329 TLHistoryFileName(histfname, cmd->timeline);
00330 TLHistoryFilePath(path, cmd->timeline);
00331
00332
00333 pq_beginmessage(&buf, 'T');
00334 pq_sendint(&buf, 2, 2);
00335
00336
00337 pq_sendstring(&buf, "filename");
00338 pq_sendint(&buf, 0, 4);
00339 pq_sendint(&buf, 0, 2);
00340 pq_sendint(&buf, TEXTOID, 4);
00341 pq_sendint(&buf, -1, 2);
00342 pq_sendint(&buf, 0, 4);
00343 pq_sendint(&buf, 0, 2);
00344
00345
00346 pq_sendstring(&buf, "content");
00347 pq_sendint(&buf, 0, 4);
00348 pq_sendint(&buf, 0, 2);
00349 pq_sendint(&buf, BYTEAOID, 4);
00350 pq_sendint(&buf, -1, 2);
00351 pq_sendint(&buf, 0, 4);
00352 pq_sendint(&buf, 0, 2);
00353 pq_endmessage(&buf);
00354
00355
00356 pq_beginmessage(&buf, 'D');
00357 pq_sendint(&buf, 2, 2);
00358 pq_sendint(&buf, strlen(histfname), 4);
00359 pq_sendbytes(&buf, histfname, strlen(histfname));
00360
00361 fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0666);
00362 if (fd < 0)
00363 ereport(ERROR,
00364 (errcode_for_file_access(),
00365 errmsg("could not open file \"%s\": %m", path)));
00366
00367
00368 histfilelen = lseek(fd, 0, SEEK_END);
00369 if (histfilelen < 0)
00370 ereport(ERROR,
00371 (errcode_for_file_access(),
00372 errmsg("could not seek to end of file \"%s\": %m", path)));
00373 if (lseek(fd, 0, SEEK_SET) != 0)
00374 ereport(ERROR,
00375 (errcode_for_file_access(),
00376 errmsg("could not seek to beginning of file \"%s\": %m", path)));
00377
00378 pq_sendint(&buf, histfilelen, 4);
00379
00380 bytesleft = histfilelen;
00381 while (bytesleft > 0)
00382 {
00383 char rbuf[BLCKSZ];
00384 int nread;
00385
00386 nread = read(fd, rbuf, sizeof(rbuf));
00387 if (nread <= 0)
00388 ereport(ERROR,
00389 (errcode_for_file_access(),
00390 errmsg("could not read file \"%s\": %m",
00391 path)));
00392 pq_sendbytes(&buf, rbuf, nread);
00393 bytesleft -= nread;
00394 }
00395 CloseTransientFile(fd);
00396
00397 pq_endmessage(&buf);
00398 }
00399
00400
00401
00402
00403
00404
00405
00406 static void
00407 StartReplication(StartReplicationCmd *cmd)
00408 {
00409 StringInfoData buf;
00410 XLogRecPtr FlushPtr;
00411
00412
00413
00414
00415
00416
00417
00418
00419
00420
00421
00422
00423
00424
00425
00426 if (am_cascading_walsender)
00427 {
00428
00429 FlushPtr = GetStandbyFlushRecPtr();
00430 }
00431 else
00432 FlushPtr = GetFlushRecPtr();
00433
00434 if (cmd->timeline != 0)
00435 {
00436 XLogRecPtr switchpoint;
00437
00438 sendTimeLine = cmd->timeline;
00439 if (sendTimeLine == ThisTimeLineID)
00440 {
00441 sendTimeLineIsHistoric = false;
00442 sendTimeLineValidUpto = InvalidXLogRecPtr;
00443 }
00444 else
00445 {
00446 List *timeLineHistory;
00447
00448 sendTimeLineIsHistoric = true;
00449
00450
00451
00452
00453
00454 timeLineHistory = readTimeLineHistory(ThisTimeLineID);
00455 switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
00456 &sendTimeLineNextTLI);
00457 list_free_deep(timeLineHistory);
00458
00459
00460
00461
00462
00463
00464
00465
00466
00467
00468
00469
00470
00471
00472
00473
00474
00475
00476
00477 if (!XLogRecPtrIsInvalid(switchpoint) &&
00478 switchpoint < cmd->startpoint)
00479 {
00480 ereport(ERROR,
00481 (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
00482 (uint32) (cmd->startpoint >> 32),
00483 (uint32) (cmd->startpoint),
00484 cmd->timeline),
00485 errdetail("This server's history forked from timeline %u at %X/%X",
00486 cmd->timeline,
00487 (uint32) (switchpoint >> 32),
00488 (uint32) (switchpoint))));
00489 }
00490 sendTimeLineValidUpto = switchpoint;
00491 }
00492 }
00493 else
00494 {
00495 sendTimeLine = ThisTimeLineID;
00496 sendTimeLineValidUpto = InvalidXLogRecPtr;
00497 sendTimeLineIsHistoric = false;
00498 }
00499
00500 streamingDoneSending = streamingDoneReceiving = false;
00501
00502
00503 if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)
00504 {
00505
00506
00507
00508
00509
00510
00511
00512
00513 WalSndSetState(WALSNDSTATE_CATCHUP);
00514
00515
00516 pq_beginmessage(&buf, 'W');
00517 pq_sendbyte(&buf, 0);
00518 pq_sendint(&buf, 0, 2);
00519 pq_endmessage(&buf);
00520 pq_flush();
00521
00522
00523
00524
00525
00526 if (FlushPtr < cmd->startpoint)
00527 {
00528 ereport(ERROR,
00529 (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
00530 (uint32) (cmd->startpoint >> 32),
00531 (uint32) (cmd->startpoint),
00532 (uint32) (FlushPtr >> 32),
00533 (uint32) (FlushPtr))));
00534 }
00535
00536
00537 sentPtr = cmd->startpoint;
00538
00539
00540 {
00541
00542 volatile WalSnd *walsnd = MyWalSnd;
00543
00544 SpinLockAcquire(&walsnd->mutex);
00545 walsnd->sentPtr = sentPtr;
00546 SpinLockRelease(&walsnd->mutex);
00547 }
00548
00549 SyncRepInitConfig();
00550
00551
00552 replication_active = true;
00553
00554 WalSndLoop();
00555
00556 replication_active = false;
00557 if (walsender_ready_to_stop)
00558 proc_exit(0);
00559 WalSndSetState(WALSNDSTATE_STARTUP);
00560
00561 Assert(streamingDoneSending && streamingDoneReceiving);
00562 }
00563
00564
00565
00566
00567
00568 if (sendTimeLineIsHistoric)
00569 {
00570 char str[11];
00571 snprintf(str, sizeof(str), "%u", sendTimeLineNextTLI);
00572
00573 pq_beginmessage(&buf, 'T');
00574 pq_sendint(&buf, 1, 2);
00575
00576
00577 pq_sendstring(&buf, "next_tli");
00578 pq_sendint(&buf, 0, 4);
00579 pq_sendint(&buf, 0, 2);
00580
00581
00582
00583
00584 pq_sendint(&buf, INT8OID, 4);
00585 pq_sendint(&buf, -1, 2);
00586 pq_sendint(&buf, 0, 4);
00587 pq_sendint(&buf, 0, 2);
00588 pq_endmessage(&buf);
00589
00590
00591 pq_beginmessage(&buf, 'D');
00592 pq_sendint(&buf, 1, 2);
00593 pq_sendint(&buf, strlen(str), 4);
00594 pq_sendbytes(&buf, str, strlen(str));
00595 pq_endmessage(&buf);
00596 }
00597
00598
00599 pq_puttextmessage('C', "START_STREAMING");
00600 }
00601
00602
00603
00604
00605 void
00606 exec_replication_command(const char *cmd_string)
00607 {
00608 int parse_rc;
00609 Node *cmd_node;
00610 MemoryContext cmd_context;
00611 MemoryContext old_context;
00612
00613 elog(DEBUG1, "received replication command: %s", cmd_string);
00614
00615 CHECK_FOR_INTERRUPTS();
00616
00617 cmd_context = AllocSetContextCreate(CurrentMemoryContext,
00618 "Replication command context",
00619 ALLOCSET_DEFAULT_MINSIZE,
00620 ALLOCSET_DEFAULT_INITSIZE,
00621 ALLOCSET_DEFAULT_MAXSIZE);
00622 old_context = MemoryContextSwitchTo(cmd_context);
00623
00624 replication_scanner_init(cmd_string);
00625 parse_rc = replication_yyparse();
00626 if (parse_rc != 0)
00627 ereport(ERROR,
00628 (errcode(ERRCODE_SYNTAX_ERROR),
00629 (errmsg_internal("replication command parser returned %d",
00630 parse_rc))));
00631
00632 cmd_node = replication_parse_result;
00633
00634 switch (cmd_node->type)
00635 {
00636 case T_IdentifySystemCmd:
00637 IdentifySystem();
00638 break;
00639
00640 case T_StartReplicationCmd:
00641 StartReplication((StartReplicationCmd *) cmd_node);
00642 break;
00643
00644 case T_BaseBackupCmd:
00645 SendBaseBackup((BaseBackupCmd *) cmd_node);
00646 break;
00647
00648 case T_TimeLineHistoryCmd:
00649 SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
00650 break;
00651
00652 default:
00653 elog(ERROR, "unrecognized replication command node tag: %u",
00654 cmd_node->type);
00655 }
00656
00657
00658 MemoryContextSwitchTo(old_context);
00659 MemoryContextDelete(cmd_context);
00660
00661
00662 EndCommand("SELECT", DestRemote);
00663 }
00664
00665
00666
00667
00668
00669 static void
00670 ProcessRepliesIfAny(void)
00671 {
00672 unsigned char firstchar;
00673 int r;
00674 bool received = false;
00675
00676 for (;;)
00677 {
00678 r = pq_getbyte_if_available(&firstchar);
00679 if (r < 0)
00680 {
00681
00682 ereport(COMMERROR,
00683 (errcode(ERRCODE_PROTOCOL_VIOLATION),
00684 errmsg("unexpected EOF on standby connection")));
00685 proc_exit(0);
00686 }
00687 if (r == 0)
00688 {
00689
00690 break;
00691 }
00692
00693
00694
00695
00696
00697
00698
00699
00700 if (streamingDoneReceiving && firstchar != 'X')
00701 ereport(FATAL,
00702 (errcode(ERRCODE_PROTOCOL_VIOLATION),
00703 errmsg("unexpected standby message type \"%c\", after receiving CopyDone",
00704 firstchar)));
00705
00706
00707 switch (firstchar)
00708 {
00709
00710
00711
00712 case 'd':
00713 ProcessStandbyMessage();
00714 received = true;
00715 break;
00716
00717
00718
00719
00720
00721 case 'c':
00722 if (!streamingDoneSending)
00723 {
00724 pq_putmessage_noblock('c', NULL, 0);
00725 streamingDoneSending = true;
00726 }
00727
00728
00729 resetStringInfo(&reply_message);
00730 if (pq_getmessage(&reply_message, 0))
00731 {
00732 ereport(COMMERROR,
00733 (errcode(ERRCODE_PROTOCOL_VIOLATION),
00734 errmsg("unexpected EOF on standby connection")));
00735 proc_exit(0);
00736 }
00737
00738 streamingDoneReceiving = true;
00739 received = true;
00740 break;
00741
00742
00743
00744
00745 case 'X':
00746 proc_exit(0);
00747
00748 default:
00749 ereport(FATAL,
00750 (errcode(ERRCODE_PROTOCOL_VIOLATION),
00751 errmsg("invalid standby message type \"%c\"",
00752 firstchar)));
00753 }
00754 }
00755
00756
00757
00758
00759 if (received)
00760 {
00761 last_reply_timestamp = GetCurrentTimestamp();
00762 ping_sent = false;
00763 }
00764 }
00765
00766
00767
00768
00769 static void
00770 ProcessStandbyMessage(void)
00771 {
00772 char msgtype;
00773
00774 resetStringInfo(&reply_message);
00775
00776
00777
00778
00779 if (pq_getmessage(&reply_message, 0))
00780 {
00781 ereport(COMMERROR,
00782 (errcode(ERRCODE_PROTOCOL_VIOLATION),
00783 errmsg("unexpected EOF on standby connection")));
00784 proc_exit(0);
00785 }
00786
00787
00788
00789
00790 msgtype = pq_getmsgbyte(&reply_message);
00791
00792 switch (msgtype)
00793 {
00794 case 'r':
00795 ProcessStandbyReplyMessage();
00796 break;
00797
00798 case 'h':
00799 ProcessStandbyHSFeedbackMessage();
00800 break;
00801
00802 default:
00803 ereport(COMMERROR,
00804 (errcode(ERRCODE_PROTOCOL_VIOLATION),
00805 errmsg("unexpected message type \"%c\"", msgtype)));
00806 proc_exit(0);
00807 }
00808 }
00809
00810
00811
00812
00813 static void
00814 ProcessStandbyReplyMessage(void)
00815 {
00816 XLogRecPtr writePtr,
00817 flushPtr,
00818 applyPtr;
00819 bool replyRequested;
00820
00821
00822 writePtr = pq_getmsgint64(&reply_message);
00823 flushPtr = pq_getmsgint64(&reply_message);
00824 applyPtr = pq_getmsgint64(&reply_message);
00825 (void) pq_getmsgint64(&reply_message);
00826 replyRequested = pq_getmsgbyte(&reply_message);
00827
00828 elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",
00829 (uint32) (writePtr >> 32), (uint32) writePtr,
00830 (uint32) (flushPtr >> 32), (uint32) flushPtr,
00831 (uint32) (applyPtr >> 32), (uint32) applyPtr,
00832 replyRequested ? " (reply requested)" : "");
00833
00834
00835 if (replyRequested)
00836 WalSndKeepalive(false);
00837
00838
00839
00840
00841
00842 {
00843
00844 volatile WalSnd *walsnd = MyWalSnd;
00845
00846 SpinLockAcquire(&walsnd->mutex);
00847 walsnd->write = writePtr;
00848 walsnd->flush = flushPtr;
00849 walsnd->apply = applyPtr;
00850 SpinLockRelease(&walsnd->mutex);
00851 }
00852
00853 if (!am_cascading_walsender)
00854 SyncRepReleaseWaiters();
00855 }
00856
00857
00858
00859
00860 static void
00861 ProcessStandbyHSFeedbackMessage(void)
00862 {
00863 TransactionId nextXid;
00864 uint32 nextEpoch;
00865 TransactionId feedbackXmin;
00866 uint32 feedbackEpoch;
00867
00868
00869
00870
00871
00872 (void) pq_getmsgint64(&reply_message);
00873 feedbackXmin = pq_getmsgint(&reply_message, 4);
00874 feedbackEpoch = pq_getmsgint(&reply_message, 4);
00875
00876 elog(DEBUG2, "hot standby feedback xmin %u epoch %u",
00877 feedbackXmin,
00878 feedbackEpoch);
00879
00880
00881 if (!TransactionIdIsNormal(feedbackXmin))
00882 {
00883 MyPgXact->xmin = InvalidTransactionId;
00884 return;
00885 }
00886
00887
00888
00889
00890
00891
00892
00893
00894 GetNextXidAndEpoch(&nextXid, &nextEpoch);
00895
00896 if (feedbackXmin <= nextXid)
00897 {
00898 if (feedbackEpoch != nextEpoch)
00899 return;
00900 }
00901 else
00902 {
00903 if (feedbackEpoch + 1 != nextEpoch)
00904 return;
00905 }
00906
00907 if (!TransactionIdPrecedesOrEquals(feedbackXmin, nextXid))
00908 return;
00909
00910
00911
00912
00913
00914
00915
00916
00917
00918
00919
00920
00921
00922
00923
00924
00925
00926
00927
00928
00929
00930
00931
00932 MyPgXact->xmin = feedbackXmin;
00933 }
00934
00935
00936 static void
00937 WalSndLoop(void)
00938 {
00939 bool caughtup = false;
00940
00941
00942
00943
00944
00945 initStringInfo(&output_message);
00946 initStringInfo(&reply_message);
00947 initStringInfo(&tmpbuf);
00948
00949
00950 last_reply_timestamp = GetCurrentTimestamp();
00951 ping_sent = false;
00952
00953
00954
00955
00956
00957 for (;;)
00958 {
00959
00960 ResetLatch(&MyWalSnd->latch);
00961
00962
00963
00964
00965
00966 if (!PostmasterIsAlive())
00967 exit(1);
00968
00969
00970 if (got_SIGHUP)
00971 {
00972 got_SIGHUP = false;
00973 ProcessConfigFile(PGC_SIGHUP);
00974 SyncRepInitConfig();
00975 }
00976
00977 CHECK_FOR_INTERRUPTS();
00978
00979
00980 ProcessRepliesIfAny();
00981
00982
00983
00984
00985
00986
00987 if (!pq_is_send_pending() && streamingDoneSending && streamingDoneReceiving)
00988 break;
00989
00990
00991
00992
00993
00994
00995
00996 if (!pq_is_send_pending())
00997 XLogSend(&caughtup);
00998 else
00999 caughtup = false;
01000
01001
01002 if (pq_flush_if_writable() != 0)
01003 goto send_failure;
01004
01005
01006 if (caughtup && !pq_is_send_pending())
01007 {
01008
01009
01010
01011
01012
01013
01014
01015
01016 if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
01017 {
01018 ereport(DEBUG1,
01019 (errmsg("standby \"%s\" has now caught up with primary",
01020 application_name)));
01021 WalSndSetState(WALSNDSTATE_STREAMING);
01022 }
01023
01024
01025
01026
01027
01028
01029
01030 if (walsender_ready_to_stop)
01031 {
01032
01033 XLogSend(&caughtup);
01034 if (caughtup && !pq_is_send_pending())
01035 {
01036
01037 EndCommand("COPY 0", DestRemote);
01038 pq_flush();
01039
01040 proc_exit(0);
01041 }
01042 }
01043 }
01044
01045
01046
01047
01048
01049
01050
01051
01052 if ((caughtup && !streamingDoneSending) || pq_is_send_pending())
01053 {
01054 TimestampTz timeout = 0;
01055 long sleeptime = 10000;
01056 int wakeEvents;
01057
01058 wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT |
01059 WL_SOCKET_READABLE;
01060
01061 if (pq_is_send_pending())
01062 wakeEvents |= WL_SOCKET_WRITEABLE;
01063 else if (wal_sender_timeout > 0 && !ping_sent)
01064 {
01065
01066
01067
01068
01069
01070 timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
01071 wal_sender_timeout / 2);
01072 if (GetCurrentTimestamp() >= timeout)
01073 {
01074 WalSndKeepalive(true);
01075 ping_sent = true;
01076
01077 if (pq_flush_if_writable() != 0)
01078 break;
01079 }
01080 }
01081
01082
01083 if (wal_sender_timeout > 0)
01084 {
01085 timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
01086 wal_sender_timeout);
01087 sleeptime = 1 + (wal_sender_timeout / 10);
01088 }
01089
01090
01091 ImmediateInterruptOK = true;
01092 CHECK_FOR_INTERRUPTS();
01093 WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
01094 MyProcPort->sock, sleeptime);
01095 ImmediateInterruptOK = false;
01096
01097
01098
01099
01100
01101
01102 if (wal_sender_timeout > 0 && GetCurrentTimestamp() >= timeout)
01103 {
01104
01105
01106
01107
01108
01109 ereport(COMMERROR,
01110 (errmsg("terminating walsender process due to replication timeout")));
01111 goto send_failure;
01112 }
01113 }
01114 }
01115 return;
01116
01117 send_failure:
01118
01119
01120
01121
01122
01123
01124 if (whereToSendOutput == DestRemote)
01125 whereToSendOutput = DestNone;
01126
01127 proc_exit(0);
01128 abort();
01129 }
01130
01131
01132 static void
01133 InitWalSenderSlot(void)
01134 {
01135 int i;
01136
01137
01138
01139
01140
01141 Assert(WalSndCtl != NULL);
01142 Assert(MyWalSnd == NULL);
01143
01144
01145
01146
01147
01148 for (i = 0; i < max_wal_senders; i++)
01149 {
01150
01151 volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
01152
01153 SpinLockAcquire(&walsnd->mutex);
01154
01155 if (walsnd->pid != 0)
01156 {
01157 SpinLockRelease(&walsnd->mutex);
01158 continue;
01159 }
01160 else
01161 {
01162
01163
01164
01165 walsnd->pid = MyProcPid;
01166 walsnd->sentPtr = InvalidXLogRecPtr;
01167 walsnd->state = WALSNDSTATE_STARTUP;
01168 SpinLockRelease(&walsnd->mutex);
01169
01170 OwnLatch((Latch *) &walsnd->latch);
01171 MyWalSnd = (WalSnd *) walsnd;
01172
01173 break;
01174 }
01175 }
01176 if (MyWalSnd == NULL)
01177 ereport(FATAL,
01178 (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
01179 errmsg("number of requested standby connections "
01180 "exceeds max_wal_senders (currently %d)",
01181 max_wal_senders)));
01182
01183
01184 on_shmem_exit(WalSndKill, 0);
01185 }
01186
01187
01188 static void
01189 WalSndKill(int code, Datum arg)
01190 {
01191 Assert(MyWalSnd != NULL);
01192
01193
01194
01195
01196
01197 MyWalSnd->pid = 0;
01198 DisownLatch(&MyWalSnd->latch);
01199
01200
01201 MyWalSnd = NULL;
01202 }
01203
01204
01205
01206
01207
01208
01209
01210
01211
01212
01213
01214
01215 static void
01216 XLogRead(char *buf, XLogRecPtr startptr, Size count)
01217 {
01218 char *p;
01219 XLogRecPtr recptr;
01220 Size nbytes;
01221 XLogSegNo segno;
01222
01223 retry:
01224 p = buf;
01225 recptr = startptr;
01226 nbytes = count;
01227
01228 while (nbytes > 0)
01229 {
01230 uint32 startoff;
01231 int segbytes;
01232 int readbytes;
01233
01234 startoff = recptr % XLogSegSize;
01235
01236 if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
01237 {
01238 char path[MAXPGPATH];
01239
01240
01241 if (sendFile >= 0)
01242 close(sendFile);
01243
01244 XLByteToSeg(recptr, sendSegNo);
01245
01246
01247
01248
01249
01250
01251
01252
01253
01254
01255
01256
01257
01258
01259
01260
01261
01262
01263
01264
01265
01266
01267
01268
01269
01270
01271
01272 curFileTimeLine = sendTimeLine;
01273 if (sendTimeLineIsHistoric)
01274 {
01275 XLogSegNo endSegNo;
01276
01277 XLByteToSeg(sendTimeLineValidUpto, endSegNo);
01278 if (sendSegNo == endSegNo)
01279 curFileTimeLine = sendTimeLineNextTLI;
01280 }
01281
01282 XLogFilePath(path, curFileTimeLine, sendSegNo);
01283
01284 sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
01285 if (sendFile < 0)
01286 {
01287
01288
01289
01290
01291
01292 if (errno == ENOENT)
01293 ereport(ERROR,
01294 (errcode_for_file_access(),
01295 errmsg("requested WAL segment %s has already been removed",
01296 XLogFileNameP(curFileTimeLine, sendSegNo))));
01297 else
01298 ereport(ERROR,
01299 (errcode_for_file_access(),
01300 errmsg("could not open file \"%s\": %m",
01301 path)));
01302 }
01303 sendOff = 0;
01304 }
01305
01306
01307 if (sendOff != startoff)
01308 {
01309 if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
01310 ereport(ERROR,
01311 (errcode_for_file_access(),
01312 errmsg("could not seek in log segment %s to offset %u: %m",
01313 XLogFileNameP(curFileTimeLine, sendSegNo),
01314 startoff)));
01315 sendOff = startoff;
01316 }
01317
01318
01319 if (nbytes > (XLogSegSize - startoff))
01320 segbytes = XLogSegSize - startoff;
01321 else
01322 segbytes = nbytes;
01323
01324 readbytes = read(sendFile, p, segbytes);
01325 if (readbytes <= 0)
01326 {
01327 ereport(ERROR,
01328 (errcode_for_file_access(),
01329 errmsg("could not read from log segment %s, offset %u, length %lu: %m",
01330 XLogFileNameP(curFileTimeLine, sendSegNo),
01331 sendOff, (unsigned long) segbytes)));
01332 }
01333
01334
01335 recptr += readbytes;
01336
01337 sendOff += readbytes;
01338 nbytes -= readbytes;
01339 p += readbytes;
01340 }
01341
01342
01343
01344
01345
01346
01347
01348
01349 XLByteToSeg(startptr, segno);
01350 CheckXLogRemoved(segno, ThisTimeLineID);
01351
01352
01353
01354
01355
01356
01357
01358 if (am_cascading_walsender)
01359 {
01360
01361 volatile WalSnd *walsnd = MyWalSnd;
01362 bool reload;
01363
01364 SpinLockAcquire(&walsnd->mutex);
01365 reload = walsnd->needreload;
01366 walsnd->needreload = false;
01367 SpinLockRelease(&walsnd->mutex);
01368
01369 if (reload && sendFile >= 0)
01370 {
01371 close(sendFile);
01372 sendFile = -1;
01373
01374 goto retry;
01375 }
01376 }
01377 }
01378
01379
01380
01381
01382
01383
01384
01385
01386
01387 static void
01388 XLogSend(bool *caughtup)
01389 {
01390 XLogRecPtr SendRqstPtr;
01391 XLogRecPtr startptr;
01392 XLogRecPtr endptr;
01393 Size nbytes;
01394
01395 if (streamingDoneSending)
01396 {
01397 *caughtup = true;
01398 return;
01399 }
01400
01401
01402 if (sendTimeLineIsHistoric)
01403 {
01404
01405
01406
01407
01408
01409 SendRqstPtr = sendTimeLineValidUpto;
01410 }
01411 else if (am_cascading_walsender)
01412 {
01413
01414
01415
01416
01417
01418
01419
01420
01421
01422
01423
01424
01425
01426
01427
01428
01429
01430 bool becameHistoric = false;
01431
01432 SendRqstPtr = GetStandbyFlushRecPtr();
01433
01434 if (!RecoveryInProgress())
01435 {
01436
01437
01438
01439
01440 am_cascading_walsender = false;
01441 becameHistoric = true;
01442 }
01443 else
01444 {
01445
01446
01447
01448
01449
01450 if (sendTimeLine != ThisTimeLineID)
01451 becameHistoric = true;
01452 }
01453
01454 if (becameHistoric)
01455 {
01456
01457
01458
01459
01460
01461 List *history;
01462
01463 history = readTimeLineHistory(ThisTimeLineID);
01464 sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI);
01465 Assert(sentPtr <= sendTimeLineValidUpto);
01466 Assert(sendTimeLine < sendTimeLineNextTLI);
01467 list_free_deep(history);
01468
01469
01470 if (!(sentPtr <= sendTimeLineValidUpto))
01471 elog(ERROR, "server switched off timeline %u at %X/%X, but walsender already streamed up to %X/%X",
01472 sendTimeLine,
01473 (uint32) (sendTimeLineValidUpto >> 32),
01474 (uint32) sendTimeLineValidUpto,
01475 (uint32) (sentPtr >> 32),
01476 (uint32) sentPtr);
01477
01478 sendTimeLineIsHistoric = true;
01479
01480 SendRqstPtr = sendTimeLineValidUpto;
01481 }
01482 }
01483 else
01484 {
01485
01486
01487
01488
01489
01490
01491
01492
01493
01494
01495 SendRqstPtr = GetFlushRecPtr();
01496 }
01497
01498
01499
01500
01501
01502 if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)
01503 {
01504
01505 if (sendFile >= 0)
01506 close(sendFile);
01507 sendFile = -1;
01508
01509
01510 pq_putmessage_noblock('c', NULL, 0);
01511 streamingDoneSending = true;
01512
01513 *caughtup = true;
01514 return;
01515 }
01516
01517
01518 Assert(sentPtr <= SendRqstPtr);
01519 if (SendRqstPtr <= sentPtr)
01520 {
01521 *caughtup = true;
01522 return;
01523 }
01524
01525
01526
01527
01528
01529
01530
01531
01532
01533
01534
01535
01536 startptr = sentPtr;
01537 endptr = startptr;
01538 endptr += MAX_SEND_SIZE;
01539
01540
01541 if (SendRqstPtr <= endptr)
01542 {
01543 endptr = SendRqstPtr;
01544 if (sendTimeLineIsHistoric)
01545 *caughtup = false;
01546 else
01547 *caughtup = true;
01548 }
01549 else
01550 {
01551
01552 endptr -= (endptr % XLOG_BLCKSZ);
01553 *caughtup = false;
01554 }
01555
01556 nbytes = endptr - startptr;
01557 Assert(nbytes <= MAX_SEND_SIZE);
01558
01559
01560
01561
01562 resetStringInfo(&output_message);
01563 pq_sendbyte(&output_message, 'w');
01564
01565 pq_sendint64(&output_message, startptr);
01566 pq_sendint64(&output_message, SendRqstPtr);
01567 pq_sendint64(&output_message, 0);
01568
01569
01570
01571
01572
01573 enlargeStringInfo(&output_message, nbytes);
01574 XLogRead(&output_message.data[output_message.len], startptr, nbytes);
01575 output_message.len += nbytes;
01576 output_message.data[output_message.len] = '\0';
01577
01578
01579
01580
01581 resetStringInfo(&tmpbuf);
01582 pq_sendint64(&tmpbuf, GetCurrentIntegerTimestamp());
01583 memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
01584 tmpbuf.data, sizeof(int64));
01585
01586 pq_putmessage_noblock('d', output_message.data, output_message.len);
01587
01588 sentPtr = endptr;
01589
01590
01591 {
01592
01593 volatile WalSnd *walsnd = MyWalSnd;
01594
01595 SpinLockAcquire(&walsnd->mutex);
01596 walsnd->sentPtr = sentPtr;
01597 SpinLockRelease(&walsnd->mutex);
01598 }
01599
01600
01601 if (update_process_title)
01602 {
01603 char activitymsg[50];
01604
01605 snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
01606 (uint32) (sentPtr >> 32), (uint32) sentPtr);
01607 set_ps_display(activitymsg, false);
01608 }
01609
01610 return;
01611 }
01612
01613
01614
01615
01616
01617
01618
01619
01620
01621 static XLogRecPtr
01622 GetStandbyFlushRecPtr(void)
01623 {
01624 XLogRecPtr replayPtr;
01625 TimeLineID replayTLI;
01626 XLogRecPtr receivePtr;
01627 TimeLineID receiveTLI;
01628 XLogRecPtr result;
01629
01630
01631
01632
01633
01634
01635
01636 receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
01637 replayPtr = GetXLogReplayRecPtr(&replayTLI);
01638
01639 ThisTimeLineID = replayTLI;
01640
01641 result = replayPtr;
01642 if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
01643 result = receivePtr;
01644
01645 return result;
01646 }
01647
01648
01649
01650
01651 void
01652 WalSndRqstFileReload(void)
01653 {
01654 int i;
01655
01656 for (i = 0; i < max_wal_senders; i++)
01657 {
01658
01659 volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
01660
01661 if (walsnd->pid == 0)
01662 continue;
01663
01664 SpinLockAcquire(&walsnd->mutex);
01665 walsnd->needreload = true;
01666 SpinLockRelease(&walsnd->mutex);
01667 }
01668 }
01669
01670
01671 static void
01672 WalSndSigHupHandler(SIGNAL_ARGS)
01673 {
01674 int save_errno = errno;
01675
01676 got_SIGHUP = true;
01677 if (MyWalSnd)
01678 SetLatch(&MyWalSnd->latch);
01679
01680 errno = save_errno;
01681 }
01682
01683
01684 static void
01685 WalSndXLogSendHandler(SIGNAL_ARGS)
01686 {
01687 int save_errno = errno;
01688
01689 latch_sigusr1_handler();
01690
01691 errno = save_errno;
01692 }
01693
01694
01695 static void
01696 WalSndLastCycleHandler(SIGNAL_ARGS)
01697 {
01698 int save_errno = errno;
01699
01700
01701
01702
01703
01704
01705 if (!replication_active)
01706 kill(MyProcPid, SIGTERM);
01707
01708 walsender_ready_to_stop = true;
01709 if (MyWalSnd)
01710 SetLatch(&MyWalSnd->latch);
01711
01712 errno = save_errno;
01713 }
01714
01715
01716 void
01717 WalSndSignals(void)
01718 {
01719
01720 pqsignal(SIGHUP, WalSndSigHupHandler);
01721
01722 pqsignal(SIGINT, SIG_IGN);
01723 pqsignal(SIGTERM, die);
01724 pqsignal(SIGQUIT, quickdie);
01725 InitializeTimeouts();
01726 pqsignal(SIGPIPE, SIG_IGN);
01727 pqsignal(SIGUSR1, WalSndXLogSendHandler);
01728 pqsignal(SIGUSR2, WalSndLastCycleHandler);
01729
01730
01731
01732 pqsignal(SIGCHLD, SIG_DFL);
01733 pqsignal(SIGTTIN, SIG_DFL);
01734 pqsignal(SIGTTOU, SIG_DFL);
01735 pqsignal(SIGCONT, SIG_DFL);
01736 pqsignal(SIGWINCH, SIG_DFL);
01737 }
01738
01739
01740 Size
01741 WalSndShmemSize(void)
01742 {
01743 Size size = 0;
01744
01745 size = offsetof(WalSndCtlData, walsnds);
01746 size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
01747
01748 return size;
01749 }
01750
01751
01752 void
01753 WalSndShmemInit(void)
01754 {
01755 bool found;
01756 int i;
01757
01758 WalSndCtl = (WalSndCtlData *)
01759 ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
01760
01761 if (!found)
01762 {
01763
01764 MemSet(WalSndCtl, 0, WalSndShmemSize());
01765
01766 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
01767 SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
01768
01769 for (i = 0; i < max_wal_senders; i++)
01770 {
01771 WalSnd *walsnd = &WalSndCtl->walsnds[i];
01772
01773 SpinLockInit(&walsnd->mutex);
01774 InitSharedLatch(&walsnd->latch);
01775 }
01776 }
01777 }
01778
01779
01780
01781
01782
01783
01784
01785 void
01786 WalSndWakeup(void)
01787 {
01788 int i;
01789
01790 for (i = 0; i < max_wal_senders; i++)
01791 SetLatch(&WalSndCtl->walsnds[i].latch);
01792 }
01793
01794
01795 void
01796 WalSndSetState(WalSndState state)
01797 {
01798
01799 volatile WalSnd *walsnd = MyWalSnd;
01800
01801 Assert(am_walsender);
01802
01803 if (walsnd->state == state)
01804 return;
01805
01806 SpinLockAcquire(&walsnd->mutex);
01807 walsnd->state = state;
01808 SpinLockRelease(&walsnd->mutex);
01809 }
01810
01811
01812
01813
01814
01815 static const char *
01816 WalSndGetStateString(WalSndState state)
01817 {
01818 switch (state)
01819 {
01820 case WALSNDSTATE_STARTUP:
01821 return "startup";
01822 case WALSNDSTATE_BACKUP:
01823 return "backup";
01824 case WALSNDSTATE_CATCHUP:
01825 return "catchup";
01826 case WALSNDSTATE_STREAMING:
01827 return "streaming";
01828 }
01829 return "UNKNOWN";
01830 }
01831
01832
01833
01834
01835
01836
01837 Datum
01838 pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
01839 {
01840 #define PG_STAT_GET_WAL_SENDERS_COLS 8
01841 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
01842 TupleDesc tupdesc;
01843 Tuplestorestate *tupstore;
01844 MemoryContext per_query_ctx;
01845 MemoryContext oldcontext;
01846 int *sync_priority;
01847 int priority = 0;
01848 int sync_standby = -1;
01849 int i;
01850
01851
01852 if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
01853 ereport(ERROR,
01854 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
01855 errmsg("set-valued function called in context that cannot accept a set")));
01856 if (!(rsinfo->allowedModes & SFRM_Materialize))
01857 ereport(ERROR,
01858 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
01859 errmsg("materialize mode required, but it is not " \
01860 "allowed in this context")));
01861
01862
01863 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
01864 elog(ERROR, "return type must be a row type");
01865
01866 per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
01867 oldcontext = MemoryContextSwitchTo(per_query_ctx);
01868
01869 tupstore = tuplestore_begin_heap(true, false, work_mem);
01870 rsinfo->returnMode = SFRM_Materialize;
01871 rsinfo->setResult = tupstore;
01872 rsinfo->setDesc = tupdesc;
01873
01874 MemoryContextSwitchTo(oldcontext);
01875
01876
01877
01878
01879
01880
01881 sync_priority = palloc(sizeof(int) * max_wal_senders);
01882 LWLockAcquire(SyncRepLock, LW_SHARED);
01883 for (i = 0; i < max_wal_senders; i++)
01884 {
01885
01886 volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
01887
01888 if (walsnd->pid != 0)
01889 {
01890
01891
01892
01893
01894
01895 sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ?
01896 0 : walsnd->sync_standby_priority;
01897
01898 if (walsnd->state == WALSNDSTATE_STREAMING &&
01899 walsnd->sync_standby_priority > 0 &&
01900 (priority == 0 ||
01901 priority > walsnd->sync_standby_priority) &&
01902 !XLogRecPtrIsInvalid(walsnd->flush))
01903 {
01904 priority = walsnd->sync_standby_priority;
01905 sync_standby = i;
01906 }
01907 }
01908 }
01909 LWLockRelease(SyncRepLock);
01910
01911 for (i = 0; i < max_wal_senders; i++)
01912 {
01913
01914 volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
01915 char location[MAXFNAMELEN];
01916 XLogRecPtr sentPtr;
01917 XLogRecPtr write;
01918 XLogRecPtr flush;
01919 XLogRecPtr apply;
01920 WalSndState state;
01921 Datum values[PG_STAT_GET_WAL_SENDERS_COLS];
01922 bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
01923
01924 if (walsnd->pid == 0)
01925 continue;
01926
01927 SpinLockAcquire(&walsnd->mutex);
01928 sentPtr = walsnd->sentPtr;
01929 state = walsnd->state;
01930 write = walsnd->write;
01931 flush = walsnd->flush;
01932 apply = walsnd->apply;
01933 SpinLockRelease(&walsnd->mutex);
01934
01935 memset(nulls, 0, sizeof(nulls));
01936 values[0] = Int32GetDatum(walsnd->pid);
01937
01938 if (!superuser())
01939 {
01940
01941
01942
01943
01944 MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
01945 }
01946 else
01947 {
01948 values[1] = CStringGetTextDatum(WalSndGetStateString(state));
01949
01950 snprintf(location, sizeof(location), "%X/%X",
01951 (uint32) (sentPtr >> 32), (uint32) sentPtr);
01952 values[2] = CStringGetTextDatum(location);
01953
01954 if (write == 0)
01955 nulls[3] = true;
01956 snprintf(location, sizeof(location), "%X/%X",
01957 (uint32) (write >> 32), (uint32) write);
01958 values[3] = CStringGetTextDatum(location);
01959
01960 if (flush == 0)
01961 nulls[4] = true;
01962 snprintf(location, sizeof(location), "%X/%X",
01963 (uint32) (flush >> 32), (uint32) flush);
01964 values[4] = CStringGetTextDatum(location);
01965
01966 if (apply == 0)
01967 nulls[5] = true;
01968 snprintf(location, sizeof(location), "%X/%X",
01969 (uint32) (apply >> 32), (uint32) apply);
01970 values[5] = CStringGetTextDatum(location);
01971
01972 values[6] = Int32GetDatum(sync_priority[i]);
01973
01974
01975
01976
01977
01978 if (sync_priority[i] == 0)
01979 values[7] = CStringGetTextDatum("async");
01980 else if (i == sync_standby)
01981 values[7] = CStringGetTextDatum("sync");
01982 else
01983 values[7] = CStringGetTextDatum("potential");
01984 }
01985
01986 tuplestore_putvalues(tupstore, tupdesc, values, nulls);
01987 }
01988 pfree(sync_priority);
01989
01990
01991 tuplestore_donestoring(tupstore);
01992
01993 return (Datum) 0;
01994 }
01995
01996
01997
01998
01999
02000
02001 static void
02002 WalSndKeepalive(bool requestReply)
02003 {
02004 elog(DEBUG2, "sending replication keepalive");
02005
02006
02007 resetStringInfo(&output_message);
02008 pq_sendbyte(&output_message, 'k');
02009 pq_sendint64(&output_message, sentPtr);
02010 pq_sendint64(&output_message, GetCurrentIntegerTimestamp());
02011 pq_sendbyte(&output_message, requestReply ? 1 : 0);
02012
02013
02014 pq_putmessage_noblock('d', output_message.data, output_message.len);
02015 }
02016
02017
02018
02019
02020
02021
02022 #ifdef NOT_USED
02023
02024
02025
02026
02027 XLogRecPtr
02028 GetOldestWALSendPointer(void)
02029 {
02030 XLogRecPtr oldest = {0, 0};
02031 int i;
02032 bool found = false;
02033
02034 for (i = 0; i < max_wal_senders; i++)
02035 {
02036
02037 volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
02038 XLogRecPtr recptr;
02039
02040 if (walsnd->pid == 0)
02041 continue;
02042
02043 SpinLockAcquire(&walsnd->mutex);
02044 recptr = walsnd->sentPtr;
02045 SpinLockRelease(&walsnd->mutex);
02046
02047 if (recptr.xlogid == 0 && recptr.xrecoff == 0)
02048 continue;
02049
02050 if (!found || recptr < oldest)
02051 oldest = recptr;
02052 found = true;
02053 }
02054 return oldest;
02055 }
02056
02057 #endif