00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044 #include "postgres.h"
00045
00046 #include <signal.h>
00047 #include <unistd.h>
00048
00049 #include "access/timeline.h"
00050 #include "access/transam.h"
00051 #include "access/xlog_internal.h"
00052 #include "libpq/pqformat.h"
00053 #include "libpq/pqsignal.h"
00054 #include "miscadmin.h"
00055 #include "replication/walreceiver.h"
00056 #include "replication/walsender.h"
00057 #include "storage/ipc.h"
00058 #include "storage/pmsignal.h"
00059 #include "storage/procarray.h"
00060 #include "utils/guc.h"
00061 #include "utils/ps_status.h"
00062 #include "utils/resowner.h"
00063 #include "utils/timestamp.h"
00064
00065
00066
00067 int wal_receiver_status_interval;
00068 int wal_receiver_timeout;
00069 bool hot_standby_feedback;
00070
00071
00072 walrcv_connect_type walrcv_connect = NULL;
00073 walrcv_identify_system_type walrcv_identify_system = NULL;
00074 walrcv_startstreaming_type walrcv_startstreaming = NULL;
00075 walrcv_endstreaming_type walrcv_endstreaming = NULL;
00076 walrcv_readtimelinehistoryfile_type walrcv_readtimelinehistoryfile = NULL;
00077 walrcv_receive_type walrcv_receive = NULL;
00078 walrcv_send_type walrcv_send = NULL;
00079 walrcv_disconnect_type walrcv_disconnect = NULL;
00080
00081 #define NAPTIME_PER_CYCLE 100
00082
00083
00084
00085
00086
00087
00088 static int recvFile = -1;
00089 static TimeLineID recvFileTLI = 0;
00090 static XLogSegNo recvSegNo = 0;
00091 static uint32 recvOff = 0;
00092
00093
00094
00095
00096
00097 static volatile sig_atomic_t got_SIGHUP = false;
00098 static volatile sig_atomic_t got_SIGTERM = false;
00099
00100
00101
00102
00103
00104 static struct
00105 {
00106 XLogRecPtr Write;
00107 XLogRecPtr Flush;
00108 } LogstreamResult;
00109
00110 static StringInfoData reply_message;
00111 static StringInfoData incoming_message;
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129 static volatile bool WalRcvImmediateInterruptOK = false;
00130
00131
00132 static void ProcessWalRcvInterrupts(void);
00133 static void EnableWalRcvImmediateExit(void);
00134 static void DisableWalRcvImmediateExit(void);
00135 static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
00136 static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI);
00137 static void WalRcvDie(int code, Datum arg);
00138 static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
00139 static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
00140 static void XLogWalRcvFlush(bool dying);
00141 static void XLogWalRcvSendReply(bool force, bool requestReply);
00142 static void XLogWalRcvSendHSFeedback(bool immed);
00143 static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
00144
00145
00146 static void WalRcvSigHupHandler(SIGNAL_ARGS);
00147 static void WalRcvSigUsr1Handler(SIGNAL_ARGS);
00148 static void WalRcvShutdownHandler(SIGNAL_ARGS);
00149 static void WalRcvQuickDieHandler(SIGNAL_ARGS);
00150
00151
00152 static void
00153 ProcessWalRcvInterrupts(void)
00154 {
00155
00156
00157
00158
00159
00160 CHECK_FOR_INTERRUPTS();
00161
00162 if (got_SIGTERM)
00163 {
00164 WalRcvImmediateInterruptOK = false;
00165 ereport(FATAL,
00166 (errcode(ERRCODE_ADMIN_SHUTDOWN),
00167 errmsg("terminating walreceiver process due to administrator command")));
00168 }
00169 }
00170
00171 static void
00172 EnableWalRcvImmediateExit(void)
00173 {
00174 WalRcvImmediateInterruptOK = true;
00175 ProcessWalRcvInterrupts();
00176 }
00177
00178 static void
00179 DisableWalRcvImmediateExit(void)
00180 {
00181 WalRcvImmediateInterruptOK = false;
00182 ProcessWalRcvInterrupts();
00183 }
00184
00185
00186 void
00187 WalReceiverMain(void)
00188 {
00189 char conninfo[MAXCONNINFO];
00190 XLogRecPtr startpoint;
00191 TimeLineID startpointTLI;
00192 TimeLineID primaryTLI;
00193 bool first_stream;
00194
00195
00196 volatile WalRcvData *walrcv = WalRcv;
00197 TimestampTz last_recv_timestamp;
00198 bool ping_sent;
00199
00200
00201
00202
00203
00204 Assert(walrcv != NULL);
00205
00206
00207
00208
00209
00210
00211
00212
00213 SpinLockAcquire(&walrcv->mutex);
00214 Assert(walrcv->pid == 0);
00215 switch (walrcv->walRcvState)
00216 {
00217 case WALRCV_STOPPING:
00218
00219 walrcv->walRcvState = WALRCV_STOPPED;
00220
00221
00222 case WALRCV_STOPPED:
00223 SpinLockRelease(&walrcv->mutex);
00224 proc_exit(1);
00225 break;
00226
00227 case WALRCV_STARTING:
00228
00229 break;
00230
00231 case WALRCV_WAITING:
00232 case WALRCV_STREAMING:
00233 case WALRCV_RESTARTING:
00234 default:
00235
00236 elog(PANIC, "walreceiver still running according to shared memory state");
00237 }
00238
00239 walrcv->pid = MyProcPid;
00240 walrcv->walRcvState = WALRCV_STREAMING;
00241
00242
00243 strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
00244 startpoint = walrcv->receiveStart;
00245 startpointTLI = walrcv->receiveStartTLI;
00246
00247
00248 walrcv->lastMsgSendTime = walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = GetCurrentTimestamp();
00249
00250 SpinLockRelease(&walrcv->mutex);
00251
00252
00253 on_shmem_exit(WalRcvDie, 0);
00254
00255 OwnLatch(&walrcv->latch);
00256
00257
00258
00259
00260
00261
00262
00263 #ifdef HAVE_SETSID
00264 if (setsid() < 0)
00265 elog(FATAL, "setsid() failed: %m");
00266 #endif
00267
00268
00269 pqsignal(SIGHUP, WalRcvSigHupHandler);
00270
00271 pqsignal(SIGINT, SIG_IGN);
00272 pqsignal(SIGTERM, WalRcvShutdownHandler);
00273 pqsignal(SIGQUIT, WalRcvQuickDieHandler);
00274 pqsignal(SIGALRM, SIG_IGN);
00275 pqsignal(SIGPIPE, SIG_IGN);
00276 pqsignal(SIGUSR1, WalRcvSigUsr1Handler);
00277 pqsignal(SIGUSR2, SIG_IGN);
00278
00279
00280 pqsignal(SIGCHLD, SIG_DFL);
00281 pqsignal(SIGTTIN, SIG_DFL);
00282 pqsignal(SIGTTOU, SIG_DFL);
00283 pqsignal(SIGCONT, SIG_DFL);
00284 pqsignal(SIGWINCH, SIG_DFL);
00285
00286
00287 sigdelset(&BlockSig, SIGQUIT);
00288
00289
00290 load_file("libpqwalreceiver", false);
00291 if (walrcv_connect == NULL || walrcv_startstreaming == NULL ||
00292 walrcv_endstreaming == NULL ||
00293 walrcv_identify_system == NULL ||
00294 walrcv_readtimelinehistoryfile == NULL ||
00295 walrcv_receive == NULL || walrcv_send == NULL ||
00296 walrcv_disconnect == NULL)
00297 elog(ERROR, "libpqwalreceiver didn't initialize correctly");
00298
00299
00300
00301
00302
00303 CurrentResourceOwner = ResourceOwnerCreate(NULL, "Wal Receiver");
00304
00305
00306 PG_SETMASK(&UnBlockSig);
00307
00308
00309 EnableWalRcvImmediateExit();
00310 walrcv_connect(conninfo);
00311 DisableWalRcvImmediateExit();
00312
00313 first_stream = true;
00314 for (;;)
00315 {
00316
00317
00318
00319
00320 EnableWalRcvImmediateExit();
00321 walrcv_identify_system(&primaryTLI);
00322 DisableWalRcvImmediateExit();
00323
00324
00325
00326
00327
00328 if (primaryTLI < startpointTLI)
00329 ereport(ERROR,
00330 (errmsg("highest timeline %u of the primary is behind recovery timeline %u",
00331 primaryTLI, startpointTLI)));
00332
00333
00334
00335
00336
00337
00338
00339
00340
00341
00342 WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
00343
00344
00345
00346
00347
00348
00349
00350
00351
00352
00353
00354
00355
00356 ThisTimeLineID = startpointTLI;
00357 if (walrcv_startstreaming(startpointTLI, startpoint))
00358 {
00359 bool endofwal = false;
00360
00361 if (first_stream)
00362 ereport(LOG,
00363 (errmsg("started streaming WAL from primary at %X/%X on timeline %u",
00364 (uint32) (startpoint >> 32), (uint32) startpoint,
00365 startpointTLI)));
00366 else
00367 ereport(LOG,
00368 (errmsg("restarted WAL streaming at %X/%X on timeline %u",
00369 (uint32) (startpoint >> 32), (uint32) startpoint,
00370 startpointTLI)));
00371 first_stream = false;
00372
00373
00374 LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);
00375 initStringInfo(&reply_message);
00376 initStringInfo(&incoming_message);
00377
00378
00379 last_recv_timestamp = GetCurrentTimestamp();
00380 ping_sent = false;
00381
00382
00383 while (!endofwal)
00384 {
00385 char *buf;
00386 int len;
00387
00388
00389
00390
00391
00392 if (!PostmasterIsAlive())
00393 exit(1);
00394
00395
00396
00397
00398
00399 if (!RecoveryInProgress())
00400 ereport(FATAL,
00401 (errmsg("cannot continue WAL streaming, recovery has already ended")));
00402
00403
00404 ProcessWalRcvInterrupts();
00405
00406 if (got_SIGHUP)
00407 {
00408 got_SIGHUP = false;
00409 ProcessConfigFile(PGC_SIGHUP);
00410 XLogWalRcvSendHSFeedback(true);
00411 }
00412
00413
00414 len = walrcv_receive(NAPTIME_PER_CYCLE, &buf);
00415 if (len != 0)
00416 {
00417
00418
00419
00420
00421 for (;;)
00422 {
00423 if (len > 0)
00424 {
00425
00426 last_recv_timestamp = GetCurrentTimestamp();
00427 ping_sent = false;
00428 XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
00429 }
00430 else if (len == 0)
00431 break;
00432 else if (len < 0)
00433 {
00434 ereport(LOG,
00435 (errmsg("replication terminated by primary server"),
00436 errdetail("End of WAL reached on timeline %u at %X/%X",
00437 startpointTLI,
00438 (uint32) (LogstreamResult.Write >> 32), (uint32) LogstreamResult.Write)));
00439 endofwal = true;
00440 break;
00441 }
00442 len = walrcv_receive(0, &buf);
00443 }
00444
00445
00446 XLogWalRcvSendReply(false, false);
00447
00448
00449
00450
00451
00452
00453 XLogWalRcvFlush(false);
00454 }
00455 else
00456 {
00457
00458
00459
00460
00461
00462
00463
00464
00465 bool requestReply = false;
00466
00467
00468
00469
00470
00471 if (wal_receiver_timeout > 0)
00472 {
00473 TimestampTz now = GetCurrentTimestamp();
00474 TimestampTz timeout;
00475
00476 timeout =
00477 TimestampTzPlusMilliseconds(last_recv_timestamp,
00478 wal_receiver_timeout);
00479
00480 if (now >= timeout)
00481 ereport(ERROR,
00482 (errmsg("terminating walreceiver due to timeout")));
00483
00484
00485
00486
00487
00488 if (!ping_sent)
00489 {
00490 timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
00491 (wal_receiver_timeout/2));
00492 if (now >= timeout)
00493 {
00494 requestReply = true;
00495 ping_sent = true;
00496 }
00497 }
00498 }
00499
00500 XLogWalRcvSendReply(requestReply, requestReply);
00501 XLogWalRcvSendHSFeedback(false);
00502 }
00503 }
00504
00505
00506
00507
00508
00509 EnableWalRcvImmediateExit();
00510 walrcv_endstreaming(&primaryTLI);
00511 DisableWalRcvImmediateExit();
00512
00513
00514
00515
00516
00517
00518 WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
00519 }
00520 else
00521 ereport(LOG,
00522 (errmsg("primary server contains no more WAL on requested timeline %u",
00523 startpointTLI)));
00524
00525
00526
00527
00528
00529 if (recvFile >= 0)
00530 {
00531 char xlogfname[MAXFNAMELEN];
00532
00533 XLogWalRcvFlush(false);
00534 if (close(recvFile) != 0)
00535 ereport(PANIC,
00536 (errcode_for_file_access(),
00537 errmsg("could not close log segment %s: %m",
00538 XLogFileNameP(recvFileTLI, recvSegNo))));
00539
00540
00541
00542
00543
00544 XLogFileName(xlogfname, recvFileTLI, recvSegNo);
00545 XLogArchiveForceDone(xlogfname);
00546 }
00547 recvFile = -1;
00548
00549 elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
00550 WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
00551 }
00552
00553 }
00554
00555
00556
00557
00558 static void
00559 WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
00560 {
00561
00562 volatile WalRcvData *walrcv = WalRcv;
00563 int state;
00564
00565 SpinLockAcquire(&walrcv->mutex);
00566 state = walrcv->walRcvState;
00567 if (state != WALRCV_STREAMING)
00568 {
00569 SpinLockRelease(&walrcv->mutex);
00570 if (state == WALRCV_STOPPING)
00571 proc_exit(0);
00572 else
00573 elog(FATAL, "unexpected walreceiver state");
00574 }
00575 walrcv->walRcvState = WALRCV_WAITING;
00576 walrcv->receiveStart = InvalidXLogRecPtr;
00577 walrcv->receiveStartTLI = 0;
00578 SpinLockRelease(&walrcv->mutex);
00579
00580 if (update_process_title)
00581 set_ps_display("idle", false);
00582
00583
00584
00585
00586
00587 WakeupRecovery();
00588 for (;;)
00589 {
00590 ResetLatch(&walrcv->latch);
00591
00592
00593
00594
00595
00596 if (!PostmasterIsAlive())
00597 exit(1);
00598
00599 ProcessWalRcvInterrupts();
00600
00601 SpinLockAcquire(&walrcv->mutex);
00602 Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
00603 walrcv->walRcvState == WALRCV_WAITING ||
00604 walrcv->walRcvState == WALRCV_STOPPING);
00605 if (walrcv->walRcvState == WALRCV_RESTARTING)
00606 {
00607
00608 *startpoint = walrcv->receiveStart;
00609 *startpointTLI = walrcv->receiveStartTLI;
00610 walrcv->walRcvState = WALRCV_STREAMING;
00611 SpinLockRelease(&walrcv->mutex);
00612 break;
00613 }
00614 if (walrcv->walRcvState == WALRCV_STOPPING)
00615 {
00616
00617
00618
00619
00620 SpinLockRelease(&walrcv->mutex);
00621 exit(1);
00622 }
00623 SpinLockRelease(&walrcv->mutex);
00624
00625 WaitLatch(&walrcv->latch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0);
00626 }
00627
00628 if (update_process_title)
00629 {
00630 char activitymsg[50];
00631
00632 snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X",
00633 (uint32) (*startpoint >> 32),
00634 (uint32) *startpoint);
00635 set_ps_display(activitymsg, false);
00636 }
00637 }
00638
00639
00640
00641
00642
00643 static void
00644 WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
00645 {
00646 TimeLineID tli;
00647
00648 for (tli = first; tli <= last; tli++)
00649 {
00650
00651 if (tli != 1 && !existsTimeLineHistory(tli))
00652 {
00653 char *fname;
00654 char *content;
00655 int len;
00656 char expectedfname[MAXFNAMELEN];
00657
00658 ereport(LOG,
00659 (errmsg("fetching timeline history file for timeline %u from primary server",
00660 tli)));
00661
00662 EnableWalRcvImmediateExit();
00663 walrcv_readtimelinehistoryfile(tli, &fname, &content, &len);
00664 DisableWalRcvImmediateExit();
00665
00666
00667
00668
00669
00670 TLHistoryFileName(expectedfname, tli);
00671 if (strcmp(fname, expectedfname) != 0)
00672 ereport(ERROR,
00673 (errcode(ERRCODE_PROTOCOL_VIOLATION),
00674 errmsg_internal("primary reported unexpected filename for timeline history file of timeline %u",
00675 tli)));
00676
00677
00678
00679
00680 writeTimeLineHistoryFile(tli, content, len);
00681
00682 pfree(fname);
00683 pfree(content);
00684 }
00685 }
00686 }
00687
00688
00689
00690
00691 static void
00692 WalRcvDie(int code, Datum arg)
00693 {
00694
00695 volatile WalRcvData *walrcv = WalRcv;
00696
00697
00698 XLogWalRcvFlush(true);
00699
00700 DisownLatch(&walrcv->latch);
00701
00702 SpinLockAcquire(&walrcv->mutex);
00703 Assert(walrcv->walRcvState == WALRCV_STREAMING ||
00704 walrcv->walRcvState == WALRCV_RESTARTING ||
00705 walrcv->walRcvState == WALRCV_STARTING ||
00706 walrcv->walRcvState == WALRCV_WAITING ||
00707 walrcv->walRcvState == WALRCV_STOPPING);
00708 Assert(walrcv->pid == MyProcPid);
00709 walrcv->walRcvState = WALRCV_STOPPED;
00710 walrcv->pid = 0;
00711 SpinLockRelease(&walrcv->mutex);
00712
00713
00714 if (walrcv_disconnect != NULL)
00715 walrcv_disconnect();
00716
00717
00718 WakeupRecovery();
00719 }
00720
00721
00722 static void
00723 WalRcvSigHupHandler(SIGNAL_ARGS)
00724 {
00725 got_SIGHUP = true;
00726 }
00727
00728
00729
00730 static void
00731 WalRcvSigUsr1Handler(SIGNAL_ARGS)
00732 {
00733 latch_sigusr1_handler();
00734 }
00735
00736
00737 static void
00738 WalRcvShutdownHandler(SIGNAL_ARGS)
00739 {
00740 int save_errno = errno;
00741
00742 got_SIGTERM = true;
00743
00744 SetLatch(&WalRcv->latch);
00745
00746
00747 if (!proc_exit_inprogress && WalRcvImmediateInterruptOK)
00748 ProcessWalRcvInterrupts();
00749
00750 errno = save_errno;
00751 }
00752
00753
00754
00755
00756
00757
00758
00759 static void
00760 WalRcvQuickDieHandler(SIGNAL_ARGS)
00761 {
00762 PG_SETMASK(&BlockSig);
00763
00764
00765
00766
00767
00768
00769
00770
00771
00772 on_exit_reset();
00773
00774
00775
00776
00777
00778
00779
00780
00781
00782 exit(2);
00783 }
00784
00785
00786
00787
00788 static void
00789 XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
00790 {
00791 int hdrlen;
00792 XLogRecPtr dataStart;
00793 XLogRecPtr walEnd;
00794 TimestampTz sendTime;
00795 bool replyRequested;
00796
00797 resetStringInfo(&incoming_message);
00798
00799 switch (type)
00800 {
00801 case 'w':
00802 {
00803
00804 hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
00805 if (len < hdrlen)
00806 ereport(ERROR,
00807 (errcode(ERRCODE_PROTOCOL_VIOLATION),
00808 errmsg_internal("invalid WAL message received from primary")));
00809 appendBinaryStringInfo(&incoming_message, buf, hdrlen);
00810
00811
00812 dataStart = pq_getmsgint64(&incoming_message);
00813 walEnd = pq_getmsgint64(&incoming_message);
00814 sendTime = IntegerTimestampToTimestampTz(
00815 pq_getmsgint64(&incoming_message));
00816 ProcessWalSndrMessage(walEnd, sendTime);
00817
00818 buf += hdrlen;
00819 len -= hdrlen;
00820 XLogWalRcvWrite(buf, len, dataStart);
00821 break;
00822 }
00823 case 'k':
00824 {
00825
00826 hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
00827 if (len != hdrlen)
00828 ereport(ERROR,
00829 (errcode(ERRCODE_PROTOCOL_VIOLATION),
00830 errmsg_internal("invalid keepalive message received from primary")));
00831 appendBinaryStringInfo(&incoming_message, buf, hdrlen);
00832
00833
00834 walEnd = pq_getmsgint64(&incoming_message);
00835 sendTime = IntegerTimestampToTimestampTz(
00836 pq_getmsgint64(&incoming_message));
00837 replyRequested = pq_getmsgbyte(&incoming_message);
00838
00839 ProcessWalSndrMessage(walEnd, sendTime);
00840
00841
00842 if (replyRequested)
00843 XLogWalRcvSendReply(true, false);
00844 break;
00845 }
00846 default:
00847 ereport(ERROR,
00848 (errcode(ERRCODE_PROTOCOL_VIOLATION),
00849 errmsg_internal("invalid replication message type %d",
00850 type)));
00851 }
00852 }
00853
00854
00855
00856
00857 static void
00858 XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
00859 {
00860 int startoff;
00861 int byteswritten;
00862
00863 while (nbytes > 0)
00864 {
00865 int segbytes;
00866
00867 if (recvFile < 0 || !XLByteInSeg(recptr, recvSegNo))
00868 {
00869 bool use_existent;
00870
00871
00872
00873
00874
00875 if (recvFile >= 0)
00876 {
00877 char xlogfname[MAXFNAMELEN];
00878
00879 XLogWalRcvFlush(false);
00880
00881
00882
00883
00884
00885
00886 if (close(recvFile) != 0)
00887 ereport(PANIC,
00888 (errcode_for_file_access(),
00889 errmsg("could not close log segment %s: %m",
00890 XLogFileNameP(recvFileTLI, recvSegNo))));
00891
00892
00893
00894
00895
00896 XLogFileName(xlogfname, recvFileTLI, recvSegNo);
00897 XLogArchiveForceDone(xlogfname);
00898 }
00899 recvFile = -1;
00900
00901
00902 XLByteToSeg(recptr, recvSegNo);
00903 use_existent = true;
00904 recvFile = XLogFileInit(recvSegNo, &use_existent, true);
00905 recvFileTLI = ThisTimeLineID;
00906 recvOff = 0;
00907 }
00908
00909
00910 startoff = recptr % XLogSegSize;
00911
00912 if (startoff + nbytes > XLogSegSize)
00913 segbytes = XLogSegSize - startoff;
00914 else
00915 segbytes = nbytes;
00916
00917
00918 if (recvOff != startoff)
00919 {
00920 if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0)
00921 ereport(PANIC,
00922 (errcode_for_file_access(),
00923 errmsg("could not seek in log segment %s, to offset %u: %m",
00924 XLogFileNameP(recvFileTLI, recvSegNo),
00925 startoff)));
00926 recvOff = startoff;
00927 }
00928
00929
00930 errno = 0;
00931
00932 byteswritten = write(recvFile, buf, segbytes);
00933 if (byteswritten <= 0)
00934 {
00935
00936 if (errno == 0)
00937 errno = ENOSPC;
00938 ereport(PANIC,
00939 (errcode_for_file_access(),
00940 errmsg("could not write to log segment %s "
00941 "at offset %u, length %lu: %m",
00942 XLogFileNameP(recvFileTLI, recvSegNo),
00943 recvOff, (unsigned long) segbytes)));
00944 }
00945
00946
00947 recptr += byteswritten;
00948
00949 recvOff += byteswritten;
00950 nbytes -= byteswritten;
00951 buf += byteswritten;
00952
00953 LogstreamResult.Write = recptr;
00954 }
00955 }
00956
00957
00958
00959
00960
00961
00962
00963 static void
00964 XLogWalRcvFlush(bool dying)
00965 {
00966 if (LogstreamResult.Flush < LogstreamResult.Write)
00967 {
00968
00969 volatile WalRcvData *walrcv = WalRcv;
00970
00971 issue_xlog_fsync(recvFile, recvSegNo);
00972
00973 LogstreamResult.Flush = LogstreamResult.Write;
00974
00975
00976 SpinLockAcquire(&walrcv->mutex);
00977 if (walrcv->receivedUpto < LogstreamResult.Flush)
00978 {
00979 walrcv->latestChunkStart = walrcv->receivedUpto;
00980 walrcv->receivedUpto = LogstreamResult.Flush;
00981 walrcv->receivedTLI = ThisTimeLineID;
00982 }
00983 SpinLockRelease(&walrcv->mutex);
00984
00985
00986 WakeupRecovery();
00987 if (AllowCascadeReplication())
00988 WalSndWakeup();
00989
00990
00991 if (update_process_title)
00992 {
00993 char activitymsg[50];
00994
00995 snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
00996 (uint32) (LogstreamResult.Write >> 32),
00997 (uint32) LogstreamResult.Write);
00998 set_ps_display(activitymsg, false);
00999 }
01000
01001
01002 if (!dying)
01003 XLogWalRcvSendReply(false, false);
01004 }
01005 }
01006
01007
01008
01009
01010
01011
01012
01013
01014
01015
01016
01017
01018
01019
01020 static void
01021 XLogWalRcvSendReply(bool force, bool requestReply)
01022 {
01023 static XLogRecPtr writePtr = 0;
01024 static XLogRecPtr flushPtr = 0;
01025 XLogRecPtr applyPtr;
01026 static TimestampTz sendTime = 0;
01027 TimestampTz now;
01028
01029
01030
01031
01032
01033 if (!force && wal_receiver_status_interval <= 0)
01034 return;
01035
01036
01037 now = GetCurrentTimestamp();
01038
01039
01040
01041
01042
01043
01044
01045
01046
01047
01048 if (!force
01049 && writePtr == LogstreamResult.Write
01050 && flushPtr == LogstreamResult.Flush
01051 && !TimestampDifferenceExceeds(sendTime, now,
01052 wal_receiver_status_interval * 1000))
01053 return;
01054 sendTime = now;
01055
01056
01057 writePtr = LogstreamResult.Write;
01058 flushPtr = LogstreamResult.Flush;
01059 applyPtr = GetXLogReplayRecPtr(NULL);
01060
01061 resetStringInfo(&reply_message);
01062 pq_sendbyte(&reply_message, 'r');
01063 pq_sendint64(&reply_message, writePtr);
01064 pq_sendint64(&reply_message, flushPtr);
01065 pq_sendint64(&reply_message, applyPtr);
01066 pq_sendint64(&reply_message, GetCurrentIntegerTimestamp());
01067 pq_sendbyte(&reply_message, requestReply ? 1 : 0);
01068
01069
01070 elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
01071 (uint32) (writePtr >> 32), (uint32) writePtr,
01072 (uint32) (flushPtr >> 32), (uint32) flushPtr,
01073 (uint32) (applyPtr >> 32), (uint32) applyPtr,
01074 requestReply ? " (reply requested)" : "");
01075
01076 walrcv_send(reply_message.data, reply_message.len);
01077 }
01078
01079
01080
01081
01082
01083
01084
01085
01086 static void
01087 XLogWalRcvSendHSFeedback(bool immed)
01088 {
01089 TimestampTz now;
01090 TransactionId nextXid;
01091 uint32 nextEpoch;
01092 TransactionId xmin;
01093 static TimestampTz sendTime = 0;
01094 static bool master_has_standby_xmin = false;
01095
01096
01097
01098
01099
01100 if ((wal_receiver_status_interval <= 0 || !hot_standby_feedback) &&
01101 !master_has_standby_xmin)
01102 return;
01103
01104
01105 now = GetCurrentTimestamp();
01106
01107 if (!immed)
01108 {
01109
01110
01111
01112 if (!TimestampDifferenceExceeds(sendTime, now,
01113 wal_receiver_status_interval * 1000))
01114 return;
01115 sendTime = now;
01116 }
01117
01118
01119
01120
01121
01122 if (!HotStandbyActive())
01123 {
01124 Assert(!master_has_standby_xmin);
01125 return;
01126 }
01127
01128
01129
01130
01131
01132 if (hot_standby_feedback)
01133 xmin = GetOldestXmin(true, false);
01134 else
01135 xmin = InvalidTransactionId;
01136
01137
01138
01139
01140
01141 GetNextXidAndEpoch(&nextXid, &nextEpoch);
01142 if (nextXid < xmin)
01143 nextEpoch--;
01144
01145 elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u",
01146 xmin, nextEpoch);
01147
01148
01149 resetStringInfo(&reply_message);
01150 pq_sendbyte(&reply_message, 'h');
01151 pq_sendint64(&reply_message, GetCurrentIntegerTimestamp());
01152 pq_sendint(&reply_message, xmin, 4);
01153 pq_sendint(&reply_message, nextEpoch, 4);
01154 walrcv_send(reply_message.data, reply_message.len);
01155 if (TransactionIdIsValid(xmin))
01156 master_has_standby_xmin = true;
01157 else
01158 master_has_standby_xmin = false;
01159 }
01160
01161
01162
01163
01164
01165
01166
01167 static void
01168 ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
01169 {
01170
01171 volatile WalRcvData *walrcv = WalRcv;
01172
01173 TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
01174
01175
01176 SpinLockAcquire(&walrcv->mutex);
01177 if (walrcv->latestWalEnd < walEnd)
01178 walrcv->latestWalEndTime = sendTime;
01179 walrcv->latestWalEnd = walEnd;
01180 walrcv->lastMsgSendTime = sendTime;
01181 walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
01182 SpinLockRelease(&walrcv->mutex);
01183
01184 if (log_min_messages <= DEBUG2)
01185 elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
01186 timestamptz_to_str(sendTime),
01187 timestamptz_to_str(lastMsgReceiptTime),
01188 GetReplicationApplyDelay(),
01189 GetReplicationTransferLatency());
01190 }